Skip to main content

ai_agent/
query_engine.rs

1// Source: ~/claudecode/openclaudecode/src/QueryEngine.ts (query lifecycle + submitMessage)
2// Also translates: ~/claudecode/openclaudecode/src/query.ts (streaming, SSE, tool execution loop)
3// Note: The TypeScript QueryEngine delegates to query() for the actual API call loop.
4// This Rust port combines both into a single QueryEngine struct.
5#![allow(dead_code)]
6
7use crate::compact::{
8    self, get_auto_compact_threshold, get_compact_prompt, get_effective_context_window_size,
9};
10use crate::error::AgentError;
11use crate::hooks::{HookInput, HookRegistry};
12use crate::services::compact::microcompact::truncate_tool_result_content;
13use crate::services::api::errors::{sanitize_html_error, error_to_api_message, get_error_message_if_refusal, is_media_size_error};
14use crate::services::streaming::{
15    STALL_THRESHOLD_MS, StallStats, StreamWatchdog, StreamingResult, StreamingToolExecutor,
16    calculate_streaming_cost, cleanup_stream, get_nonstreaming_fallback_timeout_ms,
17    is_404_stream_creation_error, is_429_only_error, is_529_error, is_api_timeout_error,
18    is_auth_error, is_nonstreaming_fallback_disabled, is_stale_connection_error,
19    is_user_abort_error, parse_max_tokens_context_overflow, release_stream_resources,
20    validate_stream_completion, FallbackTriggeredError, MAX_529_RETRIES, FLOOR_OUTPUT_TOKENS,
21};
22use crate::tool::Tool as ToolTrait;
23use crate::tool::{ProgressMessage, ToolResultRenderOptions};
24use crate::tools::orchestration::{self, ToolMessageUpdate};
25use crate::types::*;
26use crate::utils::http::get_user_agent;
27use std::collections::{HashMap, HashSet};
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::{Arc, Mutex};
30use tokio::time::sleep as sleep_tokio;
31
32/// Emit an ApiRetry event to notify callers of retry progress.
33/// Matches TypeScript's createSystemAPIErrorMessage → api_retry subtype.
34fn emit_api_retry_event(
35    on_event: Option<&(dyn Fn(AgentEvent) + Send + Sync)>,
36    attempt: u32,
37    max_retries: u32,
38    retry_delay_ms: u64,
39    error_status: Option<u16>,
40    error: &str,
41) {
42    if let Some(cb) = on_event {
43        cb(AgentEvent::ApiRetry {
44            attempt,
45            max_retries,
46            retry_delay_ms,
47            error_status,
48            error: error.to_string(),
49        });
50    }
51}
52
53/// Emit a Done event with a pre-result session storage flush.
54/// Matches TypeScript's flushSessionStorage() before each result yield in QueryEngine.ts.
55fn emit_done_event(
56    on_event: &Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
57    result: QueryResult,
58) {
59    let _ = crate::utils::session_storage::flush_session_storage();
60    if let Some(cb) = on_event {
61        cb(AgentEvent::Done { result });
62    }
63}
64
65/// Format token count for human-readable display (e.g., "120.3k", "1.2m")
66fn format_tokens(tokens: u64) -> String {
67    if tokens >= 1_000_000 {
68        format!("{:.1}m", tokens as f64 / 1_000_000.0)
69    } else if tokens >= 1_000 {
70        format!("{:.1}k", tokens as f64 / 1_000.0)
71    } else {
72        format!("{}", tokens)
73    }
74}
75
76/// Return an empty JSON object value to use as default for tool call arguments
77pub(crate) fn empty_json_value() -> serde_json::Value {
78    serde_json::Value::Object(serde_json::Map::new())
79}
80
81/// Strip thinking tags from content (remove "<think>" and "</think>" blocks)
82/// Matches TypeScript's thinking removal logic
83pub(crate) fn strip_thinking(content: &str) -> String {
84    // Find and remove thinking blocks while preserving content between them
85    // This handles UTF-8 correctly because we use string operations
86    let mut result = String::new();
87    let mut in_thinking = false;
88    let mut i = 0;
89
90    while i < content.len() {
91        // Check for thinking start - must be at a valid char boundary
92        if content[i..].starts_with("<think>") {
93            in_thinking = true;
94            i += "<think>".len();
95        } else if content[i..].starts_with("</think>") {
96            in_thinking = false;
97            i += "</think>".len();
98        } else if !in_thinking {
99            // We're outside thinking block, add the character
100            // Use char indices to avoid boundary issues
101            if let Some(ch) = content[i..].chars().next() {
102                result.push(ch);
103                i += ch.len_utf8();
104            } else {
105                break;
106            }
107        } else {
108            // We're inside thinking block, skip
109            // Move to next character boundary
110            if let Some(ch) = content[i..].chars().next() {
111                i += ch.len_utf8();
112            } else {
113                break;
114            }
115        }
116    }
117
118    result.trim().to_string()
119}
120
121/// Parse Anthropic API usage info
122fn parse_anthropic_usage(usage: &serde_json::Value) -> TokenUsage {
123    let iterations = usage.get("iterations").and_then(|v| v.as_array()).map(|arr| {
124        arr.iter().filter_map(|it| {
125            Some(IterationUsage {
126                input_tokens: it.get("input_tokens").and_then(|v| v.as_u64())?,
127                output_tokens: it.get("output_tokens").and_then(|v| v.as_u64())?,
128            })
129        }).collect()
130    });
131    TokenUsage {
132        input_tokens: usage
133            .get("input_tokens")
134            .and_then(|v| v.as_u64())
135            .unwrap_or(0),
136        output_tokens: usage
137            .get("output_tokens")
138            .and_then(|v| v.as_u64())
139            .unwrap_or(0),
140        cache_creation_input_tokens: usage
141            .get("cache_creation_input_tokens")
142            .and_then(|v| v.as_u64()),
143        cache_read_input_tokens: usage
144            .get("cache_read_input_tokens")
145            .and_then(|v| v.as_u64()),
146        iterations,
147    }
148}
149
150/// Tracks auto-compaction state across iterations
151#[derive(Debug, Clone, Default)]
152pub struct AutoCompactTracking {
153    /// Whether a compaction happened in the previous turn
154    pub compacted: bool,
155    /// Unique ID per turn (for analytics)
156    pub turn_id: String,
157    /// Counter for turns since previous compact
158    pub turn_counter: u32,
159    /// Consecutive auto-compact failure count (circuit breaker)
160    pub consecutive_failures: u32,
161}
162
163/// Rendered metadata for a tool execution, computed from Tool trait methods
164#[derive(Debug, Clone)]
165pub struct ToolRenderMetadata {
166    pub user_facing_name: String,
167    pub tool_use_summary: Option<String>,
168    pub activity_description: Option<String>,
169}
170
171/// Render function closures stored alongside a tool for display hooks
172type UserFacingNameFn = Arc<dyn Fn(Option<&serde_json::Value>) -> String + Send + Sync>;
173type GetToolUseSummaryFn = Arc<dyn Fn(Option<&serde_json::Value>) -> Option<String> + Send + Sync>;
174type GetActivityDescriptionFn =
175    Arc<dyn Fn(Option<&serde_json::Value>) -> Option<String> + Send + Sync>;
176type RenderToolResultFn = Arc<
177    dyn Fn(&serde_json::Value, &[ProgressMessage], &ToolResultRenderOptions) -> Option<String>
178        + Send
179        + Sync,
180>;
181
182#[derive(Clone)]
183pub struct ToolRenderFns {
184    pub user_facing_name: UserFacingNameFn,
185    pub get_tool_use_summary: Option<GetToolUseSummaryFn>,
186    pub get_activity_description: Option<GetActivityDescriptionFn>,
187    pub render_tool_result_message: Option<RenderToolResultFn>,
188}
189
190impl ToolRenderFns {
191    /// Render a tool's result using the stored render closure.
192    /// The caller provides the tools vector for the ToolResultRenderOptions.
193    pub fn render(&self, content: &str, tools: &[crate::types::ToolDefinition]) -> Option<String> {
194        let content_value: serde_json::Value = serde_json::from_str(content).ok()?;
195        let progress_messages: Vec<ProgressMessage> = vec![];
196        let options = ToolResultRenderOptions {
197            style: None,
198            theme: "dark".to_string(),
199            tools: tools.to_vec(),
200            verbose: false,
201            is_transcript_mode: false,
202            is_brief_only: false,
203            input: None,
204        };
205        let render_fn = self.render_tool_result_message.as_ref()?;
206        render_fn(&content_value, &progress_messages, &options)
207    }
208}
209
210#[allow(dead_code)]
211pub struct QueryEngine {
212    pub(crate) config: QueryEngineConfig,
213    pub(crate) messages: Vec<crate::types::Message>,
214    turn_count: u32,
215    total_usage: TokenUsage,
216    total_cost: f64,
217    http_client: reqwest::Client,
218    /// Tool executors: name -> async function
219    tool_executors: Mutex<HashMap<String, Arc<ToolExecutor>>>,
220    /// Tool render metadata: name -> closures for computing display info and rendering results
221    tool_render_fns: Mutex<HashMap<String, ToolRenderFns>>,
222    /// Tool backfill functions: name -> function that mutates input for observers
223    tool_backfill_fns: Mutex<HashMap<String, Arc<dyn Fn(&mut serde_json::Value) + Send + Sync>>>,
224    /// Hook registry for PreToolUse/PostToolUse hooks
225    hook_registry: Arc<Mutex<Option<HookRegistry>>>,
226    /// Auto-compaction tracking state
227    auto_compact_tracking: AutoCompactTracking,
228    /// Track permission denials for SDK reporting (matches TypeScript)
229    permission_denials: Vec<PermissionDenial>,
230    /// Last stop_reason from assistant messages
231    last_stop_reason: Option<String>,
232    /// Recovery state for max_output_tokens
233    max_output_tokens_recovery_count: u32,
234    /// Recovery state for reactive compaction
235    has_attempted_reactive_compact: bool,
236    /// Count of consecutive empty response retries (for transient API failures)
237    empty_response_retries: u32,
238    /// Override for max_tokens during recovery
239    max_output_tokens_override: Option<u32>,
240    /// Whether a stop hook is currently active (prevents re-triggering)
241    stop_hook_active: bool,
242    /// Transition reason - why the previous iteration continued (for testing/analytics)
243    transition: Option<String>,
244    /// Pending tool use summary from previous turn (Haiku-generated)
245    pending_tool_use_summary: Option<String>,
246    /// Abort controller for interrupting the query engine loop
247    abort_controller: crate::utils::AbortController,
248    /// Token budget tracker (TOKEN_BUDGET feature)
249    budget_tracker: crate::token_budget::BudgetTracker,
250    /// Output tokens consumed in the current turn (for TOKEN_BUDGET)
251    turn_tokens: u64,
252    /// Memory paths already loaded by parent agents
253    loaded_nested_memory_paths: std::collections::HashSet<String>,
254    /// Content replacement state for aggregate tool result budget enforcement
255    content_replacement_state: Option<crate::services::compact::ContentReplacementState>,
256    /// When the current query started (for duration_ms in AgentEvent::Done)
257    start_time: Option<std::time::Instant>,
258    /// task_budget.remaining tracking across compaction boundaries.
259    /// Decremented by pre-compact final context after each compaction.
260    task_budget_remaining: Option<u64>,
261    /// Structured output retry count (for MAX_STRUCTURED_OUTPUT_RETRIES limit)
262    structured_output_retries: u32,
263    /// Whether the orphaned permission has been handled this engine lifetime.
264    /// Matches TypeScript's hasHandledOrphanedPermission flag.
265    has_handled_orphaned_permission: bool,
266}
267
268type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send>>;
269type ToolExecutor = dyn Fn(serde_json::Value, &ToolContext) -> BoxFuture<Result<ToolResult, AgentError>>
270    + Send
271    + Sync;
272
273/// Permission denial tracking for SDK reporting
274#[derive(Debug, Clone, Default)]
275pub struct PermissionDenial {
276    pub tool_name: String,
277    pub tool_use_id: String,
278    pub tool_input: serde_json::Value,
279}
280
281/// Orphaned permission state for session resume.
282/// When a session is resumed from a point where a tool-use was waiting
283/// on a permission decision, this struct carries the pre-stored decision
284/// so the query engine can inject the synthetic result.
285#[derive(Debug, Clone)]
286pub struct OrphanedPermission {
287    pub tool_use_id: String,
288    pub assistant_message: Message,
289    pub permission_result: crate::permission::PermissionResult,
290}
291
292pub struct QueryEngineConfig {
293    pub cwd: String,
294    pub model: String,
295    pub api_key: Option<String>,
296    pub base_url: Option<String>,
297    pub tools: Vec<ToolDefinition>,
298    pub system_prompt: Option<String>,
299    pub max_turns: u32,
300    pub max_budget_usd: Option<f64>,
301    pub max_tokens: u32,
302    /// Fallback model to use when primary model fails (e.g., rate limit)
303    pub fallback_model: Option<String>,
304    /// User context (additional context to prepend to user messages)
305    /// Matches TypeScript's prependUserContext
306    pub user_context: HashMap<String, String>,
307    /// System context (additional context to append to system prompt)
308    pub system_context: HashMap<String, String>,
309    /// Permission check function - called BEFORE tool execution
310    /// Returns PermissionResult::Allow, ::Deny, ::Ask, or ::Passthrough
311    pub can_use_tool:
312        Option<std::sync::Arc<dyn Fn(ToolDefinition, serde_json::Value) -> crate::permission::PermissionResult + Send + Sync>>,
313    /// Callback for agent events (tool start/complete/error, thinking, done)
314    pub on_event: Option<std::sync::Arc<dyn Fn(AgentEvent) + Send + Sync>>,
315    /// Thinking configuration for the API
316    /// Defaults to Adaptive if not specified
317    pub thinking: Option<crate::types::api_types::ThinkingConfig>,
318    /// External abort controller for interrupting the query engine loop.
319    /// If provided, this will be used instead of creating a new one.
320    pub abort_controller: Option<std::sync::Arc<crate::utils::AbortController>>,
321    /// Token budget target in tokens (TOKEN_BUDGET feature).
322    /// When set, the query loop continues until 90% of this budget is consumed,
323    /// or diminishing returns are detected.
324    pub token_budget: Option<f64>,
325    /// Optional agent ID for subagent identification. Token budget is skipped for subagents.
326    pub agent_id: Option<String>,
327    /// Optional session state manager for tracking agent lifecycle states
328    pub session_state: Option<std::sync::Arc<crate::session_state::SessionStateManager>>,
329    /// Memory paths already loaded by parent agents
330    pub loaded_nested_memory_paths: std::collections::HashSet<String>,
331    /// API task_budget (distinct from tokenBudget +500k auto-continue feature).
332    /// `total` is the budget for the whole agentic turn.
333    pub task_budget: Option<TaskBudget>,
334    /// Orphaned permission for session-resume scenarios.
335    /// When present, the query engine injects the assistant message and a
336    /// synthetic tool-result reflecting the stored permission decision before
337    /// the main tool-call loop begins.
338    pub orphaned_permission: Option<OrphanedPermission>,
339}
340
341#[derive(Debug, Clone)]
342pub struct TaskBudget {
343    pub total: u64,
344}
345
346impl Default for QueryEngineConfig {
347    fn default() -> Self {
348        Self {
349            cwd: String::new(),
350            model: String::new(),
351            api_key: None,
352            base_url: None,
353            tools: vec![],
354            system_prompt: None,
355            max_turns: 10,
356            max_budget_usd: None,
357            max_tokens: 16384,
358            fallback_model: None,
359            user_context: HashMap::new(),
360            system_context: HashMap::new(),
361            can_use_tool: None,
362            on_event: None,
363            thinking: None,
364            abort_controller: None,
365            token_budget: None,
366            agent_id: None,
367            session_state: None,
368            loaded_nested_memory_paths: std::collections::HashSet::new(),
369            task_budget: None,
370            orphaned_permission: None,
371        }
372    }
373}
374
375impl QueryEngine {
376    pub fn new(mut config: QueryEngineConfig) -> Self {
377        let loaded_memory_paths = config.loaded_nested_memory_paths.clone();
378        let abort_controller = config.abort_controller.take().map_or_else(
379            || crate::utils::create_abort_controller_default(),
380            |arc| (*arc).clone(),
381        );
382        Self {
383            config,
384            messages: vec![],
385            turn_count: 0,
386            total_usage: TokenUsage::default(),
387            total_cost: 0.0,
388            http_client: reqwest::Client::new(),
389            tool_executors: Mutex::new(HashMap::new()),
390            tool_render_fns: Mutex::new(HashMap::new()),
391            tool_backfill_fns: Mutex::new(HashMap::new()),
392            hook_registry: Arc::new(Mutex::new(None)),
393            auto_compact_tracking: AutoCompactTracking::default(),
394            permission_denials: Vec::new(),
395            last_stop_reason: None,
396            max_output_tokens_recovery_count: 0,
397            has_attempted_reactive_compact: false,
398            max_output_tokens_override: None,
399            stop_hook_active: false,
400            transition: None,
401            pending_tool_use_summary: None,
402            empty_response_retries: 0,
403            abort_controller,
404            budget_tracker: crate::token_budget::BudgetTracker::new(),
405            turn_tokens: 0,
406            loaded_nested_memory_paths: loaded_memory_paths,
407            content_replacement_state: Some(
408                crate::services::compact::create_content_replacement_state(),
409            ),
410            start_time: None,
411            task_budget_remaining: None,
412            structured_output_retries: 0,
413            has_handled_orphaned_permission: false,
414        }
415    }
416
417    /// Register a tool executor (without metadata).
418    /// For tools with rendering metadata, use `register_tool_with_render` instead.
419    pub fn register_tool<F>(&mut self, name: String, executor: F)
420    where
421        F: Fn(serde_json::Value, &ToolContext) -> BoxFuture<Result<ToolResult, AgentError>>
422            + Send
423            + Sync
424            + 'static,
425    {
426        self.tool_executors
427            .lock()
428            .unwrap()
429            .insert(name, Arc::new(executor));
430    }
431
432    /// Register a backfill function for a tool.
433    /// The function mutates a clone of the tool input before it's seen by hooks/events/transcripts.
434    /// The original input is still passed to the tool executor (preserves prompt cache).
435    pub fn register_tool_backfill<F>(&mut self, name: String, backfill_fn: F)
436    where
437        F: Fn(&mut serde_json::Value) + Send + Sync + 'static,
438    {
439        self.tool_backfill_fns
440            .lock()
441            .unwrap()
442            .insert(name, Arc::new(backfill_fn));
443    }
444
445    /// Register a tool executor with render metadata for display hooks.
446    /// This enables user_facing_name, get_tool_use_summary, and render_tool_result_message
447    /// to be called during event emission in execute_tool.
448    pub fn register_tool_with_render<F>(
449        &mut self,
450        name: String,
451        executor: F,
452        render_fns: ToolRenderFns,
453    ) where
454        F: Fn(serde_json::Value, &ToolContext) -> BoxFuture<Result<ToolResult, AgentError>>
455            + Send
456            + Sync
457            + 'static,
458    {
459        self.tool_executors
460            .lock()
461            .unwrap()
462            .insert(name.clone(), Arc::new(executor));
463        self.tool_render_fns
464            .lock()
465            .unwrap()
466            .insert(name, render_fns);
467    }
468
469    /// Set initial messages (for continuing a conversation)
470    /// Interrupt the running query engine. This will abort the current
471    /// tool execution loop and stop any in-flight API requests.
472    pub fn interrupt(&self) {
473        self.abort_controller.abort(None);
474    }
475
476    pub fn set_messages(&mut self, messages: Vec<crate::types::Message>) {
477        self.messages = messages;
478    }
479
480    /// Separate tools into upfront (sent immediately) and deferred (loaded via ToolSearch).
481    /// Returns (upfront_tools, deferred_tools).
482    /// This matches the TypeScript's isDeferredTool() logic.
483    pub(crate) fn separate_tools_for_request(&self) -> (Vec<ToolDefinition>, Vec<ToolDefinition>) {
484        use crate::tools::deferred_tools::{extract_discovered_tool_names, is_deferred_tool};
485
486        let mut upfront = Vec::new();
487        let mut deferred = Vec::new();
488
489        for tool in &self.config.tools {
490            if is_deferred_tool(tool) {
491                deferred.push(tool.clone());
492            } else {
493                upfront.push(tool.clone());
494            }
495        }
496
497        // If tool search is disabled (standard mode), send all tools upfront
498        if !crate::tools::deferred_tools::is_tool_search_enabled_optimistic() {
499            upfront.extend(deferred.drain(..));
500            return (upfront, deferred);
501        }
502
503        // Check for already-discovered deferred tools from message history
504        // Build API message format from our internal messages
505        let api_messages: Vec<serde_json::Value> = self
506            .messages
507            .iter()
508            .map(|msg| {
509                let role = match msg.role {
510                    api_types::MessageRole::User => "user",
511                    api_types::MessageRole::Assistant => "assistant",
512                    api_types::MessageRole::System => "system",
513                    api_types::MessageRole::Tool => "tool",
514                };
515                serde_json::json!({
516                    "role": role,
517                    "content": msg.content
518                })
519            })
520            .collect();
521
522        let discovered = extract_discovered_tool_names(&api_messages);
523
524        // Move discovered deferred tools to upfront (they've been loaded via tool_reference)
525        deferred.retain(|t| {
526            if discovered.contains(&t.name) {
527                upfront.push(t.clone());
528                false
529            } else {
530                true
531            }
532        });
533
534        // Sort and deduplicate upfront tools for prompt cache stability
535        let upfront = crate::tools::assemble_tool_pool(
536            &upfront,
537            &[], // deferred tools handled separately
538        );
539
540        (upfront, deferred)
541    }
542
543    /// Inject <available-deferred-tools> block into messages if tool search is enabled.
544    /// This tells the model about deferred tool names so it can discover them via ToolSearch.
545    pub(crate) fn maybe_inject_deferred_tools_block(
546        &self,
547        api_messages: &mut Vec<serde_json::Value>,
548    ) {
549        use crate::tools::deferred_tools::{
550            extract_discovered_tool_names, get_deferred_tool_names, is_deferred_tool,
551            is_tool_search_enabled_optimistic,
552        };
553
554        // Only inject if tool search is enabled
555        if !is_tool_search_enabled_optimistic() {
556            return;
557        }
558
559        // Get deferred tool names
560        let all_deferred = get_deferred_tool_names(&self.config.tools);
561
562        // Find already-discovered tools
563        let discovered = extract_discovered_tool_names(api_messages);
564
565        // Only show tools that haven't been discovered yet
566        let undiscovered: Vec<&str> = all_deferred
567            .iter()
568            .filter(|name| !discovered.contains(*name))
569            .map(|s| s.as_str())
570            .collect();
571
572        if undiscovered.is_empty() {
573            return;
574        }
575
576        // Build the <available-deferred-tools> block
577        let block_content = format!(
578            "<available-deferred-tools>\n{}\n</available-deferred-tools>\n\n\
579             Deferred tools appear by name above. \
580             To use a deferred tool, call ToolSearchTool with query \"select:<tool_name>\" to fetch its schema. \
581             Once fetched, the tool will be available for use.",
582            undiscovered.join("\n")
583        );
584
585        // Inject as the first user message (after any existing system messages)
586        let inject_msg = serde_json::json!({
587            "role": "user",
588            "content": block_content,
589            "is_meta": true
590        });
591
592        // Find the position to inject (after any system messages, before first real user message)
593        let mut insert_pos = 0;
594        for (i, msg) in api_messages.iter().enumerate() {
595            if msg.get("role").and_then(|v| v.as_str()) == Some("user") {
596                insert_pos = i;
597                break;
598            }
599            insert_pos = i + 1;
600        }
601
602        api_messages.insert(insert_pos, inject_msg);
603    }
604
605    /// Execute a tool by name
606    pub async fn execute_tool(
607        &mut self,
608        name: &str,
609        input: serde_json::Value,
610        tool_call_id: String,
611    ) -> Result<ToolResult, AgentError> {
612        let context = ToolContext {
613            cwd: self.config.cwd.clone(),
614            abort_signal: Arc::clone(self.abort_controller.signal()),
615        };
616
617        // Clone the Arc out of the maps
618        let (executor, render_metadata) = {
619            let executors = self.tool_executors.lock().unwrap();
620            let render_fns = self.tool_render_fns.lock().unwrap();
621            (
622                executors.get(name).cloned(),
623                render_fns.get(name).map(|fns| ToolRenderMetadata {
624                    user_facing_name: (Arc::clone(&fns.user_facing_name))(Some(&input)),
625                    tool_use_summary: fns
626                        .get_tool_use_summary
627                        .as_ref()
628                        .and_then(|f| f(Some(&input))),
629                    activity_description: fns
630                        .get_activity_description
631                        .as_ref()
632                        .and_then(|f| f(Some(&input))),
633                }),
634            )
635        };
636
637        if let Some(executor) = executor {
638            // PRE-TOOL PERMISSION CHECK - matches TypeScript's wrappedCanUseTool
639            // Returns 3-way PermissionResult: Allow, Deny, Ask, Passthrough
640            if let Some(can_use_tool_fn) = &self.config.can_use_tool {
641                if let Some(tool_def) = self.config.tools.iter().find(|t| &t.name == name) {
642                    match can_use_tool_fn(tool_def.clone(), input.clone()) {
643                        crate::permission::PermissionResult::Allow(_)
644                        | crate::permission::PermissionResult::Passthrough { .. } => {
645                            // Allowed, continue
646                        }
647                        crate::permission::PermissionResult::Deny(d) => {
648                            self.permission_denials.push(PermissionDenial {
649                                tool_name: name.to_string(),
650                                tool_use_id: tool_call_id.clone(),
651                                tool_input: input.clone(),
652                            });
653                            return Err(AgentError::Tool(format!(
654                                "Tool '{}' permission denied: {}",
655                                name, d.message
656                            )));
657                        }
658                        crate::permission::PermissionResult::Ask(a) => {
659                            // In SDK mode, Ask defaults to deny with a message
660                            // (CLI would prompt the user interactively)
661                            self.permission_denials.push(PermissionDenial {
662                                tool_name: name.to_string(),
663                                tool_use_id: tool_call_id.clone(),
664                                tool_input: input.clone(),
665                            });
666                            return Err(AgentError::Tool(format!(
667                                "Tool '{}' requires user confirmation (Ask mode not supported in SDK): {}",
668                                name, a.message
669                            )));
670                        }
671                    }
672                }
673            }
674
675            // Emit ToolStart event with render metadata
676            if let Some(ref cb) = self.config.on_event {
677                if let Some(ref metadata) = render_metadata {
678                    let user_facing = &metadata.user_facing_name;
679                    cb(AgentEvent::ToolStart {
680                        tool_name: name.to_string(),
681                        tool_call_id: tool_call_id.clone(),
682                        input: input.clone(),
683                        display_name: Some(user_facing.clone()),
684                        summary: metadata.tool_use_summary.clone(),
685                        activity_description: metadata.activity_description.clone(),
686                    });
687                } else {
688                    cb(AgentEvent::ToolStart {
689                        tool_name: name.to_string(),
690                        tool_call_id: tool_call_id.clone(),
691                        input: input.clone(),
692                        display_name: None,
693                        summary: None,
694                        activity_description: None,
695                    });
696                }
697            }
698
699            self.run_pre_tool_use_hooks(name, &input, &tool_call_id)
700                .await?;
701
702            // Execute the tool with timing
703            let tool_start = std::time::Instant::now();
704            let result = executor(input.clone(), &context).await;
705            let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
706            crate::services::model_cost::record_turn_tool_duration(tool_duration_ms);
707
708            // Emit ToolComplete or ToolError event with render hooks
709            if let Some(ref cb) = self.config.on_event {
710                match &result {
711                    Ok(tool_result) => {
712                        // Try to render the result message
713                        let rendered_result = self.render_tool_result(name, &tool_result.content);
714                        if let Some(ref metadata) = render_metadata {
715                            let display = format!(
716                                "{}({})",
717                                metadata.user_facing_name,
718                                metadata.tool_use_summary.as_deref().unwrap_or("?")
719                            );
720                            cb(AgentEvent::ToolComplete {
721                                tool_name: name.to_string(),
722                                tool_call_id: tool_call_id.clone(),
723                                result: tool_result.clone(),
724                                display_name: Some(display),
725                                rendered_result: rendered_result.clone(),
726                            });
727                        } else {
728                            cb(AgentEvent::ToolComplete {
729                                tool_name: name.to_string(),
730                                tool_call_id: tool_call_id.clone(),
731                                result: tool_result.clone(),
732                                display_name: None,
733                                rendered_result: rendered_result,
734                            });
735                        }
736                    }
737                    Err(e) => {
738                        cb(AgentEvent::ToolError {
739                            tool_name: name.to_string(),
740                            tool_call_id: tool_call_id.clone(),
741                            error: e.to_string(),
742                        });
743                    }
744                }
745            }
746
747            // Run PostToolUse or PostToolUseFailure hooks
748            match &result {
749                Ok(tool_result) => {
750                    self.run_post_tool_use_hooks(name, tool_result, &tool_call_id)
751                        .await;
752                }
753                Err(e) => {
754                    self.run_post_tool_use_failure_hooks(name, e, &tool_call_id)
755                        .await;
756                }
757            }
758
759            result
760        } else {
761            Err(AgentError::Tool(format!("Tool '{}' not found", name)))
762        }
763    }
764
765    /// Render a tool's result using its stored render_tool_result_message closure.
766    /// Returns None if the tool has no render implementation or the content can't be parsed.
767    fn render_tool_result(&self, tool_name: &str, content: &str) -> Option<String> {
768        let content_value: serde_json::Value = serde_json::from_str(content).ok()?;
769        let progress_messages: Vec<ProgressMessage> = vec![];
770        let options = ToolResultRenderOptions {
771            style: None,
772            theme: "dark".to_string(),
773            tools: self.config.tools.clone(),
774            verbose: false,
775            is_transcript_mode: false,
776            is_brief_only: false,
777            input: None,
778        };
779        let fns = self.tool_render_fns.lock().unwrap();
780        let render_fn = fns.get(tool_name)?.render_tool_result_message.as_ref()?;
781        render_fn(&content_value, &progress_messages, &options)
782    }
783
784    /// Set the hook registry
785    pub fn set_hook_registry(&self, registry: HookRegistry) {
786        let mut guard = self.hook_registry.lock().unwrap();
787        *guard = Some(registry);
788    }
789
790    /// Run PreToolUse hooks
791    async fn run_pre_tool_use_hooks(
792        &self,
793        tool_name: &str,
794        tool_input: &serde_json::Value,
795        tool_use_id: &str,
796    ) -> Result<(), AgentError> {
797        // First check if we have hooks (outside of lock)
798        let has_hooks = {
799            let guard = self.hook_registry.lock().unwrap();
800            guard
801                .as_ref()
802                .map(|r| r.has_hooks("PreToolUse"))
803                .unwrap_or(false)
804        };
805
806        if !has_hooks {
807            return Ok(());
808        }
809
810        // Build input outside of lock
811        let input = HookInput {
812            event: "PreToolUse".to_string(),
813            tool_name: Some(tool_name.to_string()),
814            tool_input: Some(tool_input.clone()),
815            tool_output: None,
816            tool_use_id: Some(tool_use_id.to_string()),
817            session_id: None,
818            cwd: Some(self.config.cwd.clone()),
819            error: None,
820            ..HookInput::default()
821        };
822
823        // Execute hooks (registry is Clone and Arc-wrapped, so we can clone the reference)
824        let registry = {
825            let guard = self.hook_registry.lock().unwrap();
826            guard.as_ref().cloned()
827        };
828
829        if let Some(registry) = registry {
830            let results = registry.execute("PreToolUse", input).await;
831
832            // Check if any hook blocked the tool use
833            for output in results {
834                if let Some(block) = output.block {
835                    if block {
836                        return Err(AgentError::Tool(format!(
837                            "Tool '{}' blocked by PreToolUse hook",
838                            tool_name
839                        )));
840                    }
841                }
842            }
843        }
844        Ok(())
845    }
846
847    /// Run PostToolUse hooks
848    async fn run_post_tool_use_hooks(
849        &self,
850        tool_name: &str,
851        tool_output: &ToolResult,
852        tool_use_id: &str,
853    ) {
854        let has_hooks = {
855            let guard = self.hook_registry.lock().unwrap();
856            guard
857                .as_ref()
858                .map(|r| r.has_hooks("PostToolUse"))
859                .unwrap_or(false)
860        };
861
862        if !has_hooks {
863            return;
864        }
865
866        let input = HookInput {
867            event: "PostToolUse".to_string(),
868            tool_name: Some(tool_name.to_string()),
869            tool_input: None,
870            tool_output: Some(serde_json::json!({
871                "result_type": tool_output.result_type,
872                "content": tool_output.content,
873                "is_error": tool_output.is_error,
874            })),
875            tool_use_id: Some(tool_use_id.to_string()),
876            session_id: None,
877            cwd: Some(self.config.cwd.clone()),
878            error: None,
879            ..HookInput::default()
880        };
881
882        let registry = {
883            let guard = self.hook_registry.lock().unwrap();
884            guard.as_ref().cloned()
885        };
886
887        if let Some(registry) = registry {
888            let _ = registry.execute("PostToolUse", input).await;
889        }
890    }
891
892    /// Run PostToolUseFailure hooks
893    async fn run_post_tool_use_failure_hooks(
894        &self,
895        tool_name: &str,
896        error: &AgentError,
897        tool_use_id: &str,
898    ) {
899        let has_hooks = {
900            let guard = self.hook_registry.lock().unwrap();
901            guard
902                .as_ref()
903                .map(|r| r.has_hooks("PostToolUseFailure"))
904                .unwrap_or(false)
905        };
906
907        if !has_hooks {
908            return;
909        }
910
911        let input = HookInput {
912            event: "PostToolUseFailure".to_string(),
913            tool_name: Some(tool_name.to_string()),
914            tool_input: None,
915            tool_output: None,
916            tool_use_id: Some(tool_use_id.to_string()),
917            session_id: None,
918            cwd: Some(self.config.cwd.clone()),
919            error: Some(error.to_string()),
920            ..HookInput::default()
921        };
922
923        let registry = {
924            let guard = self.hook_registry.lock().unwrap();
925            guard.as_ref().cloned()
926        };
927
928        if let Some(registry) = registry {
929            let _ = registry.execute("PostToolUseFailure", input).await;
930        }
931    }
932
933    pub fn get_turn_count(&self) -> u32 {
934        self.turn_count
935    }
936
937    /// Get total token usage from all API calls
938    pub fn get_usage(&self) -> TokenUsage {
939        self.total_usage.clone()
940    }
941
942    pub fn get_messages(&self) -> Vec<crate::types::Message> {
943        self.messages.clone()
944    }
945
946    /// Milliseconds elapsed since the start of the current query.
947    /// Returns 0 if no query is active (start_time is None).
948    pub fn query_duration_ms(&self) -> u64 {
949        self.start_time
950            .map(|t| std::time::Instant::now().duration_since(t).as_millis() as u64)
951            .unwrap_or(0)
952    }
953
954    /// Reset conversation state — clear messages, usage, and turn count.
955    /// Preserves config, tool executors, and abort controller.
956    pub fn reset(&mut self) {
957        self.messages.clear();
958        self.reset_counters();
959    }
960
961    /// Reset only counters (turn count, usage, cost, recovery tracking).
962    /// Does NOT clear messages — preserves conversation state across errors
963    /// so the user can replay / continue (matches TypeScript behavior).
964    pub fn reset_counters(&mut self) {
965        self.turn_count = 0;
966        self.total_usage = TokenUsage::default();
967        self.total_cost = 0.0;
968        self.permission_denials.clear();
969        self.last_stop_reason = None;
970        self.max_output_tokens_recovery_count = 0;
971        self.has_attempted_reactive_compact = false;
972        self.empty_response_retries = 0;
973        self.max_output_tokens_override = None;
974        self.stop_hook_active = false;
975        self.transition = None;
976        self.pending_tool_use_summary = None;
977        self.structured_output_retries = 0;
978    }
979
980    /// Check if the last message represents a valid successful result.
981    /// Matches TypeScript's isResultSuccessful() at queryHelpers.ts:56:
982    /// - Assistant: has non-empty content and is not an API error message
983    /// - User: has tool_result blocks (valid terminal state after tool execution)
984    /// Does NOT check stop_reason — TS only validates message type/content.
985    fn is_result_successful(&self, _last_stop_reason: Option<&str>) -> bool {
986        let last = match self.messages.last() {
987            Some(m) => m,
988            None => return false,
989        };
990        match last.role {
991            crate::types::MessageRole::Assistant => {
992                !last.content.is_empty() && last.is_api_error_message != Some(true)
993            }
994            crate::types::MessageRole::User => {
995                // User message (tool results) is a valid terminal state
996                true
997            }
998            _ => false,
999        }
1000    }
1001
1002    /// Generate synthetic tool_result messages for orphaned tool_use blocks
1003    /// in the last assistant message.  Called before terminal error handling
1004    /// so that the conversation history remains well-formed (every tool_use
1005    /// has a matching tool_result).
1006    ///
1007    /// Matches TypeScript's yieldMissingToolResultBlocks /
1008    /// addOrphanedToolResults.
1009    fn add_orphaned_tool_results(&mut self, reason: &str) {
1010        // Collect the tool_call IDs from the last assistant message first
1011        // (to avoid borrow conflicts with self.messages.push below)
1012        let orphan_ids: Vec<(String, String)> = {
1013            let last = match self.messages.last() {
1014                Some(m) => m,
1015                None => return,
1016            };
1017            if last.role != crate::types::MessageRole::Assistant {
1018                return;
1019            }
1020            let tool_calls = match &last.tool_calls {
1021                Some(tc) => tc,
1022                None => return,
1023            };
1024            tool_calls.iter()
1025                .map(|tc| (tc.id.clone(), tc.name.clone()))
1026                .collect()
1027        };
1028        if orphan_ids.is_empty() {
1029            return;
1030        }
1031
1032        // Find tool_use IDs that already have a tool_result in the messages
1033        let mut has_result = std::collections::HashSet::new();
1034        for msg in &self.messages {
1035            if msg.role == crate::types::MessageRole::Tool {
1036                if let Some(id) = &msg.tool_call_id {
1037                    has_result.insert(id.clone());
1038                }
1039            }
1040        }
1041        // Add synthetic tool_result for each orphaned tool_use
1042        for (tc_id, tc_name) in orphan_ids {
1043            if !has_result.contains(&tc_id) {
1044                self.messages.push(crate::types::Message {
1045                    role: crate::types::MessageRole::Tool,
1046                    content: format!("Tool '{}' was not executed: {}", tc_name, reason),
1047                    tool_call_id: Some(tc_id),
1048                    is_error: Some(true),
1049                    ..Default::default()
1050                });
1051            }
1052        }
1053    }
1054
1055    /// Attempt to auto-compact the conversation when token count exceeds threshold
1056    /// Translated from: compactConversation in compact.ts
1057    /// Returns Ok(true) if compaction happened, Ok(false) if not needed, Err on failure
1058    /// Execute auto-compact.
1059    /// `snip_tokens_freed` is subtracted from the token count for the threshold
1060    /// check, matching TypeScript's autocompact threshold adjustment.
1061    async fn do_auto_compact(&mut self, snip_tokens_freed: u32) -> Result<bool, AgentError> {
1062        use crate::compact::{
1063            estimate_token_count, get_auto_compact_threshold, get_compact_prompt,
1064            strip_images_from_messages, strip_reinjected_attachments,
1065        };
1066        use crate::services::compact::{
1067            PartialCompactDirection, format_compact_summary,
1068            get_compact_prompt as get_compact_prompt_service, get_compact_user_summary_message,
1069        };
1070        use crate::tools::deferred_tools::{
1071            get_deferred_tool_names, is_tool_search_enabled_optimistic,
1072        };
1073
1074        let token_count = estimate_token_count(&self.messages, self.config.max_tokens);
1075        let threshold = get_auto_compact_threshold(&self.config.model);
1076
1077        // Adjust for snip: subtract tokens snip already freed (matches TypeScript)
1078        let effective_tokens = (token_count as i64).saturating_sub(snip_tokens_freed as i64) as u32;
1079
1080        // Check if we need to compact
1081        if effective_tokens <= threshold {
1082            return Ok(false);
1083        }
1084
1085        log::info!(
1086            "[compact] Starting auto-compact: {} effective tokens ({} raw - {} snip freed), threshold: {}",
1087            effective_tokens,
1088            token_count,
1089            snip_tokens_freed,
1090            threshold
1091        );
1092
1093        // Phase 1: Pre-compact hooks
1094        // Execute pre_compact hooks and merge any custom instructions
1095        let _hook_results = self.execute_pre_compact_hooks().await;
1096
1097        // Phase 2: Try session memory compaction first (faster, no API call)
1098        if let Some(sm_result) = crate::services::compact::try_session_memory_compaction(
1099            &self.messages,
1100            None,
1101            Some(threshold as usize),
1102        )
1103        .await
1104        {
1105            if sm_result.compacted {
1106                log::info!("[compact] Session memory compaction succeeded");
1107                self.apply_compaction_result(
1108                    sm_result.messages_to_keep,
1109                    sm_result.post_compact_token_count as u32,
1110                );
1111                return Ok(true);
1112            }
1113        }
1114
1115        // Phase 3: Strip images and reinjected attachments before compact API call
1116        let stripped_messages =
1117            strip_reinjected_attachments(&strip_images_from_messages(&self.messages));
1118
1119        // Phase 4: Build compact prompt
1120        let compact_prompt = get_compact_prompt();
1121
1122        // Phase 5: Generate summary using LLM with PTL retry logic
1123        let (summary, compaction_usage) = match self
1124            .generate_summary_with_ptl_retry(&stripped_messages, &compact_prompt)
1125            .await
1126        {
1127            Ok(result) => result,
1128            Err(e) => {
1129                log::warn!("[compact] Summary generation failed: {}", e);
1130                return Err(e);
1131            }
1132        };
1133        log::debug!(
1134            "[compact] compaction_usage: input={} output={}",
1135            compaction_usage.input_tokens,
1136            compaction_usage.output_tokens
1137        );
1138
1139        // Feed compaction API cost into session total
1140        let compact_cost = crate::services::model_cost::calculate_cost_for_tokens(
1141            &self.config.model,
1142            compaction_usage.input_tokens as u32,
1143            compaction_usage.output_tokens as u32,
1144            compaction_usage.cache_read_input_tokens.unwrap_or(0) as u32,
1145            compaction_usage.cache_creation_input_tokens.unwrap_or(0) as u32,
1146        );
1147        let _ = crate::services::model_cost::add_to_total_session_cost(
1148            compact_cost,
1149            compaction_usage.input_tokens as u32,
1150            compaction_usage.output_tokens as u32,
1151            compaction_usage.cache_read_input_tokens.unwrap_or(0) as u32,
1152            compaction_usage.cache_creation_input_tokens.unwrap_or(0) as u32,
1153            0,
1154            &self.config.model,
1155        );
1156
1157        // Parse and format the summary
1158        let formatted_summary = format_compact_summary(&summary);
1159
1160        // Phase 6: Build post-compact messages
1161        let messages_to_keep: Vec<Message> = if self.messages.len() > 4 {
1162            self.messages[self.messages.len() - 4..].to_vec()
1163        } else {
1164            self.messages.clone()
1165        };
1166
1167        // Create boundary marker with summary
1168        let discovered_tools = get_deferred_tool_names(&self.config.tools);
1169        let mut boundary_content = format!(
1170            "[Previous conversation summarized]\n\n{}",
1171            get_compact_user_summary_message(&formatted_summary, Some(true), None, None)
1172        );
1173        if !discovered_tools.is_empty() && is_tool_search_enabled_optimistic() {
1174            boundary_content.push_str("\n\n<available-deferred-tools>\n");
1175            boundary_content.push_str(&discovered_tools.join("\n"));
1176            boundary_content.push_str("\n</available-deferred-tools>");
1177        }
1178
1179        let boundary_msg = Message {
1180            role: MessageRole::System,
1181            content: boundary_content,
1182            is_meta: Some(true),
1183            ..Default::default()
1184        };
1185
1186        // Create new message list: boundary + recent messages
1187        let mut new_messages = vec![boundary_msg];
1188        new_messages.extend(messages_to_keep.clone());
1189
1190        let new_token_count = estimate_token_count(&new_messages, self.config.max_tokens);
1191
1192        // true_post_compact_token_count: rough estimation from compacted messages
1193        let true_post_compact_tokens = crate::compact::rough_token_count_estimation_for_content(
1194            &new_messages.iter().map(|m| m.content.clone()).collect::<String>(),
1195        ) as u64;
1196        log::debug!(
1197            "[compact] true_post_compact_token_count={} compaction_usage.input={} compaction_usage.output={}",
1198            true_post_compact_tokens,
1199            compaction_usage.input_tokens,
1200            compaction_usage.output_tokens,
1201        );
1202
1203        // Phase 7: Post-compact phase
1204        // Clear file read state and loaded memory paths
1205        // Re-add plan attachment, plan mode attachment, skill attachment if applicable
1206        // Execute session_start hooks
1207        // Execute post_compact hooks
1208        self.execute_post_compact_hooks(&formatted_summary).await;
1209
1210        // Phase 8: Post-compaction cleanup
1211        crate::services::compact::run_post_compact_cleanup(None);
1212
1213        // Apply the new messages
1214        self.messages = new_messages;
1215
1216        log::info!(
1217            "[compact] Complete: {} tokens -> {} tokens",
1218            token_count,
1219            new_token_count
1220        );
1221
1222        Ok(true)
1223    }
1224
1225    /// Generate summary with PTL (prompt-too-long) retry logic.
1226    /// If the compact API call fails with prompt-too-long, drops oldest
1227    /// message groups until the gap is covered.
1228    async fn generate_summary_with_ptl_retry(
1229        &self,
1230        messages: &[Message],
1231        compact_prompt: &str,
1232    ) -> Result<(String, TokenUsage), AgentError> {
1233        const MAX_PTL_RETRIES: usize = 3;
1234
1235        // Build messages for summary request
1236        let mut summary_messages = self.build_summary_messages(compact_prompt);
1237
1238        for attempt in 0..MAX_PTL_RETRIES {
1239            // Estimate tokens and check if truncation needed
1240            let max_summary_tokens = 2048u32;
1241            let (truncated_messages, estimated_tokens) = compact::truncate_messages_for_summary(
1242                &summary_messages,
1243                &self.config.model,
1244                max_summary_tokens,
1245            );
1246
1247            // Verify it's safe before proceeding
1248            if estimated_tokens > 150000 {
1249                if attempt < MAX_PTL_RETRIES - 1 {
1250                    // PTL retry: drop oldest message groups
1251                    log::warn!(
1252                        "[compact] PTL retry {}/{}: {} tokens, dropping oldest groups",
1253                        attempt + 1,
1254                        MAX_PTL_RETRIES,
1255                        estimated_tokens
1256                    );
1257                    summary_messages =
1258                        self.truncate_head_for_ptl_retry(&summary_messages, estimated_tokens);
1259                    continue;
1260                }
1261                return Err(AgentError::Api(format!(
1262                    "Cannot generate summary: estimated {} tokens exceeds safe limit after {} retries",
1263                    estimated_tokens, MAX_PTL_RETRIES
1264                )));
1265            }
1266
1267            // Attempt summary generation
1268            match self
1269                .generate_summary_from_messages(&truncated_messages)
1270                .await
1271            {
1272                Ok((summary, _usage)) => return Ok((summary, _usage)),
1273                Err(e) => {
1274                    if attempt < MAX_PTL_RETRIES - 1 {
1275                        log::warn!(
1276                            "[compact] Summary attempt {}/{} failed: {}, retrying",
1277                            attempt + 1,
1278                            MAX_PTL_RETRIES,
1279                            e
1280                        );
1281                        summary_messages =
1282                            self.truncate_head_for_ptl_retry(&summary_messages, estimated_tokens);
1283                    } else {
1284                        return Err(e);
1285                    }
1286                }
1287            }
1288        }
1289
1290        Err(AgentError::Api(
1291            "Summary generation failed after max retries".to_string(),
1292        ))
1293    }
1294
1295    /// Truncate the head of messages for PTL retry.
1296    /// Groups messages by API round and drops oldest groups until gap covered.
1297    /// If unparseable token gap: drops 20% of groups.
1298    /// Keeps at least one group to ensure there's something to summarize.
1299    fn truncate_head_for_ptl_retry(
1300        &self,
1301        messages: &[Message],
1302        estimated_tokens: u32,
1303    ) -> Vec<Message> {
1304        use crate::services::compact::grouping::group_messages_by_api_round;
1305
1306        let groups = group_messages_by_api_round(messages);
1307        if groups.is_empty() {
1308            return messages.to_vec();
1309        }
1310
1311        // Calculate how many groups to drop (20% fallback)
1312        let groups_to_drop = (groups.len() as f64 * 0.2).ceil() as usize;
1313        let groups_to_drop = groups_to_drop.min(groups.len() - 1); // Keep at least one group
1314
1315        log::debug!(
1316            "[compact] Dropping {} of {} groups for PTL retry",
1317            groups_to_drop,
1318            groups.len()
1319        );
1320
1321        // Flatten remaining groups
1322        groups.into_iter().skip(groups_to_drop).flatten().collect()
1323    }
1324
1325    /// Build messages for summary generation request
1326    fn build_summary_messages(&self, compact_prompt: &str) -> Vec<Message> {
1327        let mut summary_messages = vec![Message {
1328            role: MessageRole::User,
1329            content: compact_prompt.to_string(),
1330            ..Default::default()
1331        }];
1332
1333        // Add conversation messages, excluding existing system boundary messages
1334        for msg in &self.messages {
1335            if let MessageRole::System = msg.role {
1336                // Skip system boundary messages from previous compactions
1337                if msg.content.contains("compacted") || msg.content.contains("summarized") {
1338                    continue;
1339                }
1340            }
1341            summary_messages.push(msg.clone());
1342        }
1343
1344        summary_messages
1345    }
1346
1347    /// Generate summary from a set of messages
1348    async fn generate_summary_from_messages(
1349        &self,
1350        summary_messages: &[Message],
1351    ) -> Result<(String, TokenUsage), AgentError> {
1352        // Get API configuration
1353        let api_key = self
1354            .config
1355            .api_key
1356            .as_ref()
1357            .ok_or_else(|| AgentError::Api("API key not provided".to_string()))?;
1358
1359        let base_url = self
1360            .config
1361            .base_url
1362            .as_ref()
1363            .map(|s| s.as_str())
1364            .unwrap_or("https://api.anthropic.com");
1365
1366        let model = &self.config.model;
1367
1368        // Convert messages to API format
1369        let api_summary_messages: Vec<serde_json::Value> = summary_messages
1370            .iter()
1371            .map(|msg| {
1372                let role_str = match msg.role {
1373                    MessageRole::User => "user",
1374                    MessageRole::Assistant => "assistant",
1375                    MessageRole::Tool => "user",
1376                    MessageRole::System => "system",
1377                };
1378                let mut msg_json = serde_json::json!({
1379                    "role": role_str,
1380                    "content": msg.content
1381                });
1382                if let Some(tool_call_id) = &msg.tool_call_id {
1383                    msg_json["tool_call_id"] = serde_json::json!(tool_call_id);
1384                }
1385                msg_json
1386            })
1387            .collect();
1388
1389        // Build request with model-based compaction max_tokens (TS: compact.ts:1317-1320)
1390        let compact_max_tokens = crate::utils::context::COMPACT_MAX_OUTPUT_TOKENS
1391            .min(crate::utils::context::get_max_output_tokens_for_model(model)) as u32;
1392        let request_body = serde_json::json!({
1393            "model": model,
1394            "max_tokens": compact_max_tokens,
1395            "messages": api_summary_messages,
1396        });
1397
1398        let client = reqwest::Client::new();
1399        let url = format!("{}/v1/chat/completions", base_url);
1400        let response = client
1401            .post(&url)
1402            .header("Authorization", format!("Bearer {}", api_key))
1403            .header("Content-Type", "application/json")
1404            .header("User-Agent", get_user_agent())
1405            .json(&request_body)
1406            .send()
1407            .await
1408            .map_err(|e| AgentError::Api(format!("Failed to send summary request: {}", e)))?;
1409
1410        let response_text = response
1411            .text()
1412            .await
1413            .map_err(|e| AgentError::Api(format!("Failed to read summary response: {}", e)))?;
1414
1415        let response_json: serde_json::Value =
1416            serde_json::from_str(&response_text).map_err(|e| {
1417                AgentError::Api(format!(
1418                    "Failed to parse summary response: {} - {}",
1419                    e, response_text
1420                ))
1421            })?;
1422
1423        if let Some(error) = response_json.get("error") {
1424            return Err(AgentError::Api(format!("Summary API error: {}", error)));
1425        }
1426
1427        // Extract usage from the compaction API call
1428        let usage = response_json.get("usage").map(|u| TokenUsage {
1429            input_tokens: u.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
1430            output_tokens: u.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
1431            cache_creation_input_tokens: u.get("cache_creation_input_tokens").and_then(|v| v.as_u64()),
1432            cache_read_input_tokens: u.get("cache_read_input_tokens").and_then(|v| v.as_u64()),
1433            iterations: u.get("iterations").and_then(|v| v.as_array()).map(|arr| {
1434                arr.iter().filter_map(|it| {
1435                    Some(IterationUsage {
1436                        input_tokens: it.get("input_tokens").and_then(|v| v.as_u64())?,
1437                        output_tokens: it.get("output_tokens").and_then(|v| v.as_u64())?,
1438                    })
1439                }).collect()
1440            }),
1441        }).unwrap_or_default();
1442
1443        let summary = extract_text_from_response(&response_json);
1444
1445        if summary.is_empty() {
1446            return Err(AgentError::Api("Summary response was empty".to_string()));
1447        }
1448
1449        // Parse the summary to extract just the <summary> content
1450        let parsed_summary = parse_compact_summary(&summary);
1451
1452        Ok((parsed_summary, usage))
1453    }
1454
1455    /// Execute pre-compact hooks
1456    async fn execute_pre_compact_hooks(&self) -> Option<String> {
1457        let registry = {
1458            let guard = self.hook_registry.lock().unwrap();
1459            match guard.as_ref() {
1460                Some(r) => r.clone(),
1461                None => return None,
1462            }
1463        };
1464
1465        if !registry.has_hooks("PreCompact") {
1466            return None;
1467        }
1468
1469        // Emit hooks_start event
1470        if let Some(ref cb) = self.config.on_event {
1471            cb(AgentEvent::Compact {
1472                event: CompactProgressEvent::HooksStart {
1473                    hook_type: CompactHookType::PreCompact,
1474                },
1475            });
1476        }
1477
1478        let trigger = if self.auto_compact_tracking.compacted {
1479            "auto"
1480        } else {
1481            "manual"
1482        };
1483
1484        let input = HookInput {
1485            event: "PreCompact".to_string(),
1486            tool_name: None,
1487            tool_input: Some(serde_json::json!({
1488                "trigger": trigger,
1489                "custom_instructions": null
1490            })),
1491            tool_output: None,
1492            tool_use_id: None,
1493            session_id: None,
1494            cwd: Some(self.config.cwd.clone()),
1495            error: None,
1496            ..HookInput::default()
1497        };
1498
1499        let results = registry.execute("PreCompact", input).await;
1500
1501        // Extract custom instructions from successful hooks with non-empty output
1502        let successful_outputs: Vec<String> = results
1503            .iter()
1504            .filter_map(|r| r.message.as_ref())
1505            .map(|s| s.trim().to_string())
1506            .filter(|s| !s.is_empty())
1507            .collect();
1508
1509        if successful_outputs.is_empty() {
1510            None
1511        } else {
1512            Some(successful_outputs.join("\n\n"))
1513        }
1514    }
1515
1516    /// Execute post-compact hooks
1517    async fn execute_post_compact_hooks(&self, compact_summary: &str) {
1518        let registry = {
1519            let guard = self.hook_registry.lock().unwrap();
1520            match guard.as_ref() {
1521                Some(r) => r.clone(),
1522                None => return,
1523            }
1524        };
1525
1526        if !registry.has_hooks("PostCompact") {
1527            return;
1528        }
1529
1530        // Emit hooks_start event
1531        if let Some(ref cb) = self.config.on_event {
1532            cb(AgentEvent::Compact {
1533                event: CompactProgressEvent::HooksStart {
1534                    hook_type: CompactHookType::PostCompact,
1535                },
1536            });
1537        }
1538
1539        let trigger = if self.auto_compact_tracking.compacted {
1540            "auto"
1541        } else {
1542            "manual"
1543        };
1544
1545        let input = HookInput {
1546            event: "PostCompact".to_string(),
1547            tool_name: None,
1548            tool_input: Some(serde_json::json!({
1549                "trigger": trigger,
1550                "compact_summary": compact_summary
1551            })),
1552            tool_output: None,
1553            tool_use_id: None,
1554            session_id: None,
1555            cwd: Some(self.config.cwd.clone()),
1556            error: None,
1557            ..HookInput::default()
1558        };
1559
1560        let _results = registry.execute("PostCompact", input).await;
1561    }
1562
1563    /// Apply compaction result: replace messages with boundary + kept messages
1564    fn apply_compaction_result(
1565        &mut self,
1566        messages_to_keep: Vec<Message>,
1567        _post_compact_tokens: u32,
1568    ) {
1569        let boundary_msg = Message {
1570            role: MessageRole::System,
1571            content: "[Previous conversation summarized]".to_string(),
1572            is_meta: Some(true),
1573            ..Default::default()
1574        };
1575
1576        let mut new_messages = vec![boundary_msg];
1577        new_messages.extend(messages_to_keep);
1578        self.messages = new_messages;
1579    }
1580
1581    pub async fn submit_message(
1582        &mut self,
1583        prompt: &str,
1584    ) -> Result<(String, crate::types::ExitReason), AgentError> {
1585        self.start_time = Some(std::time::Instant::now());
1586        // Transition session state to running
1587        if let Some(ref state) = self.config.session_state {
1588            state.start_running();
1589        }
1590        // Add user message to history
1591        self.messages.push(crate::types::Message {
1592            role: crate::types::MessageRole::User,
1593            content: prompt.to_string(),
1594            ..Default::default()
1595        });
1596
1597        // Prefetch relevant memories and inject into context
1598        if let Some(memory_context) = build_memory_prefetch_context(prompt, &self.config, &self.loaded_nested_memory_paths).await {
1599            self.messages.push(crate::types::Message {
1600                role: crate::types::MessageRole::User,
1601                content: memory_context,
1602                ..Default::default()
1603            });
1604        }
1605
1606        // Handle orphaned permission (only once per engine lifetime).
1607        // Matches TypeScript QueryEngine.ts:397-408 where handleOrphanedPermission
1608        // runs before the main query loop guarded by hasHandledOrphanedPermission.
1609        if !self.has_handled_orphaned_permission {
1610            if let Some(ref orphaned) = self.config.orphaned_permission {
1611                self.has_handled_orphaned_permission = true;
1612
1613                // 1. Push the assistant message (containing the tool_use) to history
1614                // Check if it's already present to avoid duplicates on CCR resume
1615                let already_present = self.messages.iter().any(|m| {
1616                    m.role == crate::types::MessageRole::Assistant
1617                        && m.tool_calls.as_ref().is_some_and(|tc| {
1618                            tc.iter().any(|tc| tc.id == orphaned.tool_use_id)
1619                        })
1620                });
1621                if !already_present {
1622                    self.messages.push(orphaned.assistant_message.clone());
1623                }
1624
1625                // 2. Generate a synthetic tool result message with the permission decision
1626                let result_content = match &orphaned.permission_result {
1627                    crate::permission::PermissionResult::Allow(_) => {
1628                        format!("Tool call {} is allowed", orphaned.tool_use_id)
1629                    }
1630                    crate::permission::PermissionResult::Deny(deny) => {
1631                        format!("Tool call {} is denied: {}", orphaned.tool_use_id, deny.message)
1632                    }
1633                    crate::permission::PermissionResult::Ask(ask) => {
1634                        format!(
1635                            "Tool call {} requires confirmation: {}",
1636                            orphaned.tool_use_id, ask.message
1637                        )
1638                    }
1639                    crate::permission::PermissionResult::Passthrough { message, .. } => {
1640                        format!("Tool call {} passed through: {}", orphaned.tool_use_id, message)
1641                    }
1642                };
1643
1644                self.messages.push(crate::types::Message {
1645                    role: crate::types::MessageRole::Tool,
1646                    content: result_content,
1647                    tool_call_id: Some(orphaned.tool_use_id.clone()),
1648                    ..Default::default()
1649                });
1650
1651                log::debug!(
1652                    "Handled orphaned permission for tool_use_id={}",
1653                    orphaned.tool_use_id
1654                );
1655            }
1656        }
1657
1658        // Note: max_turns check is done AFTER turn completes (matching TypeScript)
1659        // See below after tool execution loop for the check
1660
1661        // Emit Thinking event for the first turn before the first API call
1662        if let Some(ref cb) = self.config.on_event {
1663            cb(AgentEvent::Thinking { turn: 1 });
1664        }
1665        self.turn_count = 1;
1666
1667        // Tool call loop - continue until no more tool calls
1668        // Use config.max_turns as the limit (0xffffffff = effectively unlimited)
1669        //
1670        // Matching TypeScript query.ts flow:
1671        // Each iteration runs: snip → microcompact → context collapse → auto-compact → API call
1672        let mut max_tool_turns = self.config.max_turns;
1673        while max_tool_turns > 0 {
1674            max_tool_turns -= 1;
1675
1676            // Reset compacted flag for this iteration (TypeScript: tracking.compacted = false
1677            // is implicit via state update at top of loop)
1678            self.auto_compact_tracking.compacted = false;
1679
1680            // --- Compaction pipeline (matches TypeScript query.ts order) ---
1681
1682            // 1. Snip compact (TypeScript: snipModule!.snipCompactIfNeeded)
1683            let snip_result = crate::services::compact::snip_compact_if_known(&self.messages);
1684            // Update messages reference if snip returned modified messages
1685            // (snip_compact_if_known currently returns &self.messages unchanged,
1686            // but we capture tokens_freed for the threshold adjustment)
1687            let snip_tokens_freed = snip_result.tokens_freed;
1688
1689            // 2. Microcompact (TypeScript: deps.microcompact)
1690            crate::services::compact::microcompact::microcompact_messages(&mut self.messages);
1691
1692            // 3. Context collapse (TypeScript: contextCollapse.applyCollapsesIfNeeded)
1693            // Runs BEFORE auto-compact so that if collapse gets us under the
1694            // auto-compact threshold, auto-compact is a no-op and we keep
1695            // granular context instead of a single summary.
1696            if crate::services::context_collapse::is_context_collapse_enabled() {
1697                let collapse_result = crate::services::context_collapse::apply_collapses_if_needed(
1698                    self.messages.clone(),
1699                );
1700                if collapse_result.changed {
1701                    self.messages = collapse_result.messages;
1702                }
1703            }
1704
1705            // 4. Auto-compact check (TypeScript: deps.autocompact)
1706            // Only attempt if:
1707            // 1. Not disabled by circuit breaker (max 3 consecutive failures)
1708            // 2. Token count exceeds auto-compact threshold
1709            //
1710            // do_auto_compact internally checks token count vs threshold
1711            // (adjusted by snip_tokens_freed), so it returns Ok(false) when
1712            // compaction is not needed.
1713            if self.auto_compact_tracking.consecutive_failures < 3 {
1714                let token_estimate = compact::estimate_token_count(&self.messages, self.config.max_tokens);
1715                let threshold = get_auto_compact_threshold(&self.config.model);
1716
1717                if token_estimate > threshold {
1718                    if let Some(ref cb) = self.config.on_event {
1719                        cb(AgentEvent::Compact {
1720                            event: CompactProgressEvent::CompactStart,
1721                        });
1722                    }
1723                    // Capture pre-compact token count for the summary message
1724                    let pre_compact_tokens = token_estimate;
1725                    match self.do_auto_compact(snip_tokens_freed).await {
1726                        Ok(true) => {
1727                            // Compaction succeeded — reset tracking state (matching TypeScript)
1728                            self.auto_compact_tracking.compacted = true;
1729                            self.auto_compact_tracking.turn_id = uuid::Uuid::new_v4().to_string();
1730                            self.auto_compact_tracking.turn_counter = 0;
1731                            self.auto_compact_tracking.consecutive_failures = 0;
1732
1733                            // task_budget: decrement remaining by pre-compact final context
1734                            if self.config.task_budget.is_some() {
1735                                let pre_ctx = pre_compact_tokens as u64;
1736                                let current = self.task_budget_remaining
1737                                    .or(self.config.task_budget.as_ref().map(|tb| tb.total));
1738                                self.task_budget_remaining = Some(current.unwrap_or(0).saturating_sub(pre_ctx));
1739                            }
1740
1741                            // Fall through to API call with compacted messages
1742                            // (TypeScript: messagesForQuery = postCompactMessages, then continues)
1743
1744                            // Emit "Conversation compacted" summary to TUI/CLI (matching TypeScript)
1745                            let post_compact_tokens = compact::estimate_token_count(
1746                                &self.messages,
1747                                self.config.max_tokens,
1748                            );
1749                            let pct_reduced = if pre_compact_tokens > 0 {
1750                                ((pre_compact_tokens as i64 - post_compact_tokens as i64) as f64
1751                                    / pre_compact_tokens as f64)
1752                                    * 100.0
1753                            } else {
1754                                0.0
1755                            };
1756                            let compact_summary = format!(
1757                                "Conversation compacted: {} → {} tokens ({:.0}% reduced)",
1758                                format_tokens(pre_compact_tokens as u64),
1759                                format_tokens(post_compact_tokens as u64),
1760                                pct_reduced
1761                            );
1762                            if let Some(ref cb) = self.config.on_event {
1763                                cb(AgentEvent::Compact {
1764                                    event: CompactProgressEvent::CompactEnd {
1765                                        message: Some(compact_summary),
1766                                    },
1767                                });
1768                            }
1769                        }
1770                        Ok(false) => {
1771                            // No compaction needed or possible
1772                        }
1773                        Err(e) => {
1774                            // Compaction failed — propagate failure count so the circuit breaker
1775                            // can stop retrying on the next iteration (matching TypeScript)
1776                            self.auto_compact_tracking.consecutive_failures += 1;
1777                            eprintln!("Auto-compact failed: {}", e);
1778                        }
1779                    }
1780                    if let Some(ref cb) = self.config.on_event {
1781                        cb(AgentEvent::Compact {
1782                            event: CompactProgressEvent::CompactEnd { message: None },
1783                        });
1784                    }
1785                }
1786            }
1787
1788            // Build messages for API
1789            let api_messages = self.build_api_messages()?;
1790
1791            // Get API configuration
1792            let api_key: String = self
1793                .config
1794                .api_key
1795                .clone()
1796                .ok_or_else(|| AgentError::Api("API key not provided".to_string()))?;
1797
1798            let base_url = self
1799                .config
1800                .base_url
1801                .as_ref()
1802                .map(|s| s.as_str())
1803                .unwrap_or("https://api.anthropic.com");
1804
1805            // Use current model, or fallback model if set
1806            let current_model = if let Some(ref fallback) = self.config.fallback_model {
1807                fallback.clone()
1808            } else {
1809                self.config.model.clone()
1810            };
1811            let model = &current_model;
1812
1813            // Build request with tools if available
1814            // Always use streaming for all backends (matching TypeScript behavior)
1815            // Non-streaming fallback will be used if streaming fails
1816            // Resolve max_tokens: retry override > config override > model-based default
1817            let effective_max_tokens = self
1818                .max_output_tokens_override
1819                .unwrap_or_else(|| {
1820                    crate::utils::context::get_max_output_tokens_for_model(model) as u32
1821                });
1822            let mut request_body = serde_json::json!({
1823                "model": model,
1824                "max_tokens": effective_max_tokens,
1825                "messages": api_messages,
1826                "stream": true
1827            });
1828
1829            // Add task_budget to output_config when configured (API beta: task-budgets-2026-03-13)
1830            if self.config.task_budget.is_some() {
1831                let tb = self.config.task_budget.as_ref().unwrap();
1832                let mut task_budget_obj = serde_json::json!({
1833                    "type": "tokens",
1834                    "total": tb.total,
1835                });
1836                if let Some(remaining) = self.task_budget_remaining {
1837                    task_budget_obj["remaining"] = serde_json::json!(remaining);
1838                }
1839                request_body["output_config"] = serde_json::json!({
1840                    "task_budget": task_budget_obj,
1841                });
1842            }
1843
1844            // Add system prompt to request body (Anthropic uses separate field)
1845            // Include system_context if configured (matching TypeScript appendSystemContext)
1846            let system_prompt_to_use = if !self.config.system_context.is_empty() {
1847                let context_parts: Vec<String> = self
1848                    .config
1849                    .system_context
1850                    .iter()
1851                    .map(|(key, value)| format!("{}: {}", key, value))
1852                    .collect();
1853                let context_str = context_parts.join("\n");
1854
1855                if let Some(ref system_prompt) = self.config.system_prompt {
1856                    Some(format!("{}\n\n{}", system_prompt, context_str))
1857                } else {
1858                    Some(context_str)
1859                }
1860            } else {
1861                self.config.system_prompt.clone()
1862            };
1863
1864            if let Some(ref sp) = system_prompt_to_use {
1865                request_body["system"] = serde_json::json!(sp);
1866            }
1867
1868            // Add thinking config to request (matching TypeScript behavior)
1869            // Only for Anthropic API and when thinking is not disabled
1870            if base_url.contains("anthropic.com") {
1871                if let Some(ref thinking_config) = self.config.thinking {
1872                    match thinking_config {
1873                        crate::types::api_types::ThinkingConfig::Adaptive => {
1874                            request_body["thinking"] = serde_json::json!({
1875                                "type": "adaptive"
1876                            });
1877                        }
1878                        crate::types::api_types::ThinkingConfig::Enabled { budget_tokens } => {
1879                            // Clamp thinking budget to max_output_tokens - 1 (TS: claude.ts:1624)
1880                            let clamped_budget = std::cmp::min(
1881                                effective_max_tokens.saturating_sub(1) as u32,
1882                                *budget_tokens,
1883                            );
1884                            request_body["thinking"] = serde_json::json!({
1885                                "type": "enabled",
1886                                "budget_tokens": clamped_budget
1887                            });
1888                        }
1889                        crate::types::api_types::ThinkingConfig::Disabled => {
1890                            // Don't add thinking - it's disabled
1891                        }
1892                    }
1893                } else {
1894                    // Default: use adaptive thinking (matches TypeScript shouldEnableThinkingByDefault)
1895                    request_body["thinking"] = serde_json::json!({
1896                        "type": "adaptive"
1897                    });
1898                }
1899            }
1900
1901            // Add tools to request if we have any
1902            // Handle deferred tool loading: separate upfront vs deferred tools
1903            if !self.config.tools.is_empty() {
1904                let use_anthropic_format = base_url.contains("anthropic.com");
1905
1906                // Determine which tools to send upfront vs defer
1907                let (upfront_tools, deferred_tools) = self.separate_tools_for_request();
1908
1909                // Send only upfront tools in the tools array
1910                // Deferred tools are discovered via ToolSearchTool
1911                let tools_to_send = if upfront_tools.is_empty() {
1912                    // If no upfront tools, still send ToolSearchTool if available
1913                    &upfront_tools
1914                } else {
1915                    &upfront_tools
1916                };
1917
1918                let tools: Vec<serde_json::Value> = tools_to_send
1919                    .iter()
1920                    .map(|t| {
1921                        if use_anthropic_format {
1922                            serde_json::json!({
1923                                "type": "function",
1924                                "name": t.name,
1925                                "description": t.description,
1926                                "input_schema": t.input_schema
1927                            })
1928                        } else {
1929                            serde_json::json!({
1930                                "type": "function",
1931                                "function": {
1932                                    "name": t.name,
1933                                    "description": t.description,
1934                                    "parameters": t.input_schema
1935                                }
1936                            })
1937                        }
1938                    })
1939                    .collect();
1940                request_body["tools"] = serde_json::json!(tools);
1941
1942                // Store deferred tools info for <available-deferred-tools> injection
1943                if !deferred_tools.is_empty()
1944                    && crate::tools::deferred_tools::is_tool_search_enabled_optimistic()
1945                {
1946                    // The <available-deferred-tools> block is injected as a synthetic user message
1947                    // This is handled in build_api_messages()
1948                    let _deferred_names: Vec<&str> =
1949                        deferred_tools.iter().map(|t| t.name.as_str()).collect();
1950                }
1951            }
1952
1953            // Determine API endpoint and auth format based on backend
1954            // Anthropic uses /v1/messages, OpenAI-compatible uses /v1/chat/completions
1955            let url = if base_url.contains("anthropic.com") {
1956                format!("{}/v1/messages", base_url)
1957            } else {
1958                format!("{}/v1/chat/completions", base_url)
1959            };
1960
1961            // Track if we need to fallback to alternate model
1962            // Matching TypeScript's attemptWithFallback logic
1963            let mut attempt_with_fallback = false;
1964            let mut streaming_result: StreamingResult;
1965
1966            // Model fallback loop - try primary model first, then fallback if rate limited
1967            loop {
1968                // Use fallback model if primary failed
1969                let model_in_loop = if attempt_with_fallback {
1970                    self.config
1971                        .fallback_model
1972                        .as_ref()
1973                        .unwrap_or(&self.config.model)
1974                        .clone()
1975                } else {
1976                    self.config.model.clone()
1977                };
1978
1979                // Update request body with current model
1980                request_body["model"] = serde_json::json!(model_in_loop);
1981
1982                // Check if non-streaming fallback is disabled (matching TypeScript)
1983                if is_nonstreaming_fallback_disabled() {
1984                    return Err(AgentError::Api(
1985                        "Non-streaming fallback disabled".to_string(),
1986                    ));
1987                }
1988
1989                // Make API request with 429/529 retry and exponential backoff.
1990                // Wraps the full streaming→non-streaming fallback flow.
1991                let retry_result = make_api_request_with_429_retry(
1992                    &self.http_client,
1993                    &url,
1994                    &api_key,
1995                    request_body.clone(),
1996                    self.config.on_event.clone(),
1997                    self.config.fallback_model.clone(),
1998                    &model_in_loop,
1999                    match self.config.thinking {
2000                        Some(crate::types::api_types::ThinkingConfig::Enabled { budget_tokens }) => Some(budget_tokens),
2001                        _ => None,
2002                    },
2003                )
2004                .await;
2005
2006                match retry_result {
2007                    RetryResult::Success(result) => {
2008                        streaming_result = result;
2009                        break;
2010                    }
2011                    RetryResult::FallbackTriggered(fb_error) => {
2012                        // Only trigger once (attempt_with_fallback guard)
2013                        if attempt_with_fallback {
2014                            // Already attempted fallback, treat as terminal
2015                            // Add orphaned tool results before terminal error
2016                            self.add_orphaned_tool_results(&fb_error.to_string());
2017
2018                            // Fire StopFailure hooks (fire-and-forget)
2019                            {
2020                                let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2021                                if let Some(registry) = registry_clone {
2022                                    let _ = crate::hooks::run_stop_failure_hooks(
2023                                        &registry,
2024                                        &fb_error.to_string(),
2025                                        &self.config.cwd,
2026                                    ).await;
2027                                }
2028                            }
2029                            return Err(AgentError::Api(fb_error.to_string()));
2030                        }
2031
2032                        attempt_with_fallback = true;
2033
2034                        // Yield missing tool result blocks for any orphaned tool_use
2035                        self.add_orphaned_tool_results("Model fallback triggered");
2036
2037                        // Clear assistant message state for retry
2038                        // (remove the last assistant message that had the failed tool calls)
2039                        if let Some(last) = self.messages.last() {
2040                            if last.role == crate::types::MessageRole::Assistant {
2041                                self.messages.pop();
2042                            }
2043                        }
2044
2045                        // Update config model to fallback
2046                        self.config.model = fb_error.fallback_model.clone();
2047
2048                        // Emit warning about model switch
2049                        eprintln!(
2050                            "Switched to {} due to high demand for {}",
2051                            fb_error.fallback_model, fb_error.original_model
2052                        );
2053
2054                        continue; // Retry with fallback model
2055                    }
2056                    RetryResult::RecreateClient(recreate_err) => {
2057                        // Rebuild HTTP client and retry
2058                        self.http_client = reqwest::Client::new();
2059                        emit_api_retry_event(
2060                            self.config.on_event.as_ref().map(|a| a.as_ref()),
2061                            1,
2062                            MAX_429_RETRIES,
2063                            500,
2064                            None,
2065                            &format!("Recreating client after: {}", recreate_err),
2066                        );
2067                        sleep_tokio(std::time::Duration::from_millis(500)).await;
2068                        continue; // Retry with fresh client
2069                    }
2070                    RetryResult::Terminal(e) => {
2071                        // Handle user abort
2072                        if is_user_abort_error(&e) {
2073                            return Err(AgentError::UserAborted);
2074                        }
2075
2076                        // Check for 404 stream creation error
2077                        if is_404_stream_creation_error(&e) {
2078                            eprintln!(
2079                                "Streaming endpoint returned 404, falling back to non-streaming mode"
2080                            );
2081                        }
2082
2083                        // Check if this is a prompt-too-long error
2084                        let error_str = e.to_string().to_lowercase();
2085                        let is_prompt_too_long = error_str.contains("413")
2086                            || error_str.contains("prompt_too_long")
2087                            || error_str.contains("prompt too long")
2088                            || error_str.contains("media too large");
2089
2090                        // --- Context collapse drain stage (before reactive compact) ---
2091                        // Matches TypeScript: before reactive compact, try context collapse
2092                        // recoverFromOverflow if transition is not collapse_drain_retry.
2093                        if is_prompt_too_long
2094                            && crate::services::context_collapse::is_context_collapse_enabled()
2095                            && self.transition.as_deref() != Some("collapse_drain_retry")
2096                        {
2097                            let original_len = self.messages.len();
2098                            let drained = crate::services::context_collapse::recover_from_overflow(
2099                                self.messages.clone(),
2100                            );
2101                            // If the collapse function changed anything, use the result
2102                            if drained.len() < original_len {
2103                                self.messages = drained;
2104                                self.transition = Some("collapse_drain_retry".to_string());
2105                                continue; // Retry after collapse drain
2106                            }
2107                        }
2108
2109                        if is_prompt_too_long {
2110                            eprintln!("Prompt too large (413), attempting reactive compact...");
2111                            let _pre_compact_instructions = self.execute_pre_compact_hooks().await;
2112                            match crate::services::compact::reactive_compact::run_reactive_compact(
2113                                &self.messages,
2114                                &self.config.model,
2115                            ) {
2116                                Ok(reactive_result) if reactive_result.compacted => {
2117                                    log::info!(
2118                                        "[reactive-compact] reduced {} messages after 413 error",
2119                                        reactive_result.messages.len()
2120                                    );
2121                                    // task_budget: decrement remaining by pre-compact final context
2122                                    if self.config.task_budget.is_some() {
2123                                        let pre_ctx = crate::compact::estimate_token_count(&self.messages, 0) as u64;
2124                                        let current = self.task_budget_remaining
2125                                            .or(self.config.task_budget.as_ref().map(|tb| tb.total));
2126                                        self.task_budget_remaining = Some(current.unwrap_or(0).saturating_sub(pre_ctx));
2127                                    }
2128                                    self.messages = reactive_result.messages;
2129                                    self.execute_post_compact_hooks("Reactive compact applied after 413 error").await;
2130                                    self.transition = Some("reactive_compact_retry".to_string());
2131                                    continue; // Retry with compacted context
2132                                }
2133                                _ => {
2134                                    log::warn!(
2135                                        "[reactive-compact] no improvement possible, falling through"
2136                                    );
2137                                }
2138                            }
2139                            // Reactive compact didn't help - this is terminal
2140                            // Add orphaned tool results before terminal error
2141                            self.add_orphaned_tool_results(&e.to_string());
2142
2143                            // Fire StopFailure hooks (fire-and-forget, matches TypeScript)
2144                            {
2145                                let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2146                                if let Some(registry) = registry_clone {
2147                                    let _ = crate::hooks::run_stop_failure_hooks(&registry, &e.to_string(), &self.config.cwd).await;
2148                                }
2149                            }
2150                            return Err(e);
2151                        }
2152
2153                        // Add orphaned tool results before terminal error
2154                        self.add_orphaned_tool_results(&e.to_string());
2155
2156                        // Fire StopFailure hooks (fire-and-forget, matches TypeScript)
2157                        {
2158                            let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2159                            if let Some(registry) = registry_clone {
2160                                let _ = crate::hooks::run_stop_failure_hooks(&registry, &e.to_string(), &self.config.cwd).await;
2161                            }
2162                        }
2163                        return Err(e);
2164                    }
2165                }
2166            }
2167
2168            // Emit StreamRequestEnd — TUI can use this to hide spinner after API response
2169            if let Some(ref cb) = self.config.on_event {
2170                cb(AgentEvent::StreamRequestEnd);
2171            }
2172
2173            // Check for refusal before max_output_tokens check (matches TypeScript)
2174            if let Some(refusal_msg) = get_error_message_if_refusal(
2175                streaming_result.stop_reason.as_deref(),
2176                &self.config.model,
2177                false, // is_non_interactive
2178            ) {
2179                // Add the refusal as an API error message
2180                self.messages.push(crate::types::Message {
2181                    role: crate::types::MessageRole::Assistant,
2182                    content: refusal_msg.content.clone().unwrap_or_default(),
2183                    is_api_error_message: Some(true),
2184                    error_details: refusal_msg.error_details.clone(),
2185                    ..Default::default()
2186                });
2187                // Fire StopFailure hooks
2188                {
2189                    let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2190                    if let Some(registry) = registry_clone {
2191                        let _ = crate::hooks::run_stop_failure_hooks(
2192                            &registry,
2193                            &refusal_msg.content.as_ref().map(|s| s.as_str()).unwrap_or("refusal"),
2194                            &self.config.cwd,
2195                        ).await;
2196                    }
2197                }
2198                return Err(AgentError::Api(
2199                    refusal_msg.content.unwrap_or_else(|| "Refusal".to_string()),
2200                ));
2201            }
2202
2203            // Execute post-sampling hooks after model response is complete
2204            // (matches TypeScript executePostSamplingHooks in query.ts:999-1008)
2205            if !streaming_result.content.is_empty() || !streaming_result.tool_calls.is_empty() {
2206                let hook_messages = self.messages.clone();
2207                let hook_system_prompt = self
2208                    .config
2209                    .system_prompt
2210                    .as_deref()
2211                    .unwrap_or("")
2212                    .lines()
2213                    .map(|s| s.to_string())
2214                    .collect::<Vec<_>>();
2215                let hook_user_context = self.config.user_context.clone();
2216                let hook_system_context = self.config.system_context.clone();
2217                let hook_tool_use_context = Arc::new(
2218                    crate::utils::hooks::can_use_tool::ToolUseContext {
2219                        session_id: self
2220                            .config
2221                            .session_state
2222                            .as_ref()
2223                            .map(|_| "query_engine".to_string())
2224                            .unwrap_or_else(|| "query_engine".to_string()),
2225                        cwd: Some(self.config.cwd.clone()),
2226                        is_non_interactive_session: false,
2227                        options: None,
2228                    },
2229                );
2230                let hook_query_source = self.config.agent_id.as_ref().map(|_| "agent".to_string());
2231                let has_hook_count = {
2232                    crate::utils::hooks::get_post_sampling_hook_count() > 0
2233                };
2234                if has_hook_count {
2235                    let messages_clone = hook_messages;
2236                    let system_prompt_clone = hook_system_prompt;
2237                    let user_context_clone = hook_user_context;
2238                    let system_context_clone = hook_system_context;
2239                    let tool_use_context_clone = hook_tool_use_context;
2240                    let query_source_clone = hook_query_source;
2241                    tokio::spawn(async move {
2242                        crate::utils::hooks::execute_post_sampling_hooks(
2243                            messages_clone,
2244                            system_prompt_clone,
2245                            user_context_clone,
2246                            system_context_clone,
2247                            tool_use_context_clone,
2248                            query_source_clone,
2249                        )
2250                        .await;
2251                    });
2252                }
2253            }
2254
2255            // Check for tool calls in the streaming result
2256            if streaming_result.tool_calls.is_empty() {
2257                // Check for max_output_tokens error and handle recovery
2258                // Two-phase recovery matching TypeScript query.ts:1188-1256
2259                if streaming_result.api_error.as_deref() == Some("max_output_tokens") {
2260                    const MAX_OUTPUT_TOKENS_RECOVERY_LIMIT: u32 = 3;
2261                    const ESCALATED_MAX_TOKENS: u64 = 64_000;
2262
2263                    // Phase 1: Escalation (TS: query.ts:1188-1221)
2264                    // If no override set and no env var, escalate to 64k and retry same request
2265                    // Feature 'tengu_otk_slot_v1' always enabled per CLAUDE.md
2266                    if self.max_output_tokens_override.is_none()
2267                        && std::env::var(crate::constants::env::ai_code::MAX_OUTPUT_TOKENS).is_err()
2268                    {
2269                        self.max_output_tokens_override = Some(ESCALATED_MAX_TOKENS as u32);
2270                        if let Some(ref cb) = self.config.on_event {
2271                            cb(AgentEvent::Thinking {
2272                                turn: self.turn_count + 1,
2273                            });
2274                        }
2275                        continue;
2276                    }
2277
2278                    // Phase 2: Multi-turn recovery (TS: query.ts:1223-1252)
2279                    // Inject meta recovery message, reset override to default tokens
2280                    if self.max_output_tokens_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT {
2281                        let recovery_message = crate::types::Message {
2282                            role: crate::types::MessageRole::User,
2283                            content: "Output token limit hit. Resume directly — no apology, no recap of what you were doing. Pick up mid-thought if that is where the cut happened. Break remaining work into smaller pieces.".to_string(),
2284                            is_meta: Some(true),
2285                            ..Default::default()
2286                        };
2287                        self.messages.push(recovery_message);
2288                        // Reset override so we go back to model-based default
2289                        self.max_output_tokens_override = None;
2290                        self.max_output_tokens_recovery_count += 1;
2291
2292                        if let Some(ref cb) = self.config.on_event {
2293                            cb(AgentEvent::Thinking {
2294                                turn: self.turn_count + 1,
2295                            });
2296                        }
2297                        continue;
2298                    }
2299
2300                    // Recovery exhausted - return the error as final response
2301                    if let Some(ref cb) = self.config.on_event {
2302                        cb(AgentEvent::Done {
2303                            result: crate::types::QueryResult {
2304                                text: "Output token limit reached and recovery exhausted"
2305                                    .to_string(),
2306                                usage: self.total_usage.clone(),
2307                                num_turns: self.turn_count,
2308                                duration_ms: self.query_duration_ms(),
2309                                exit_reason: crate::types::ExitReason::MaxTokens,
2310                            },
2311                        });
2312                    }
2313                    return Ok((
2314                        "Output token limit reached and recovery exhausted".to_string(),
2315                        crate::types::ExitReason::MaxTokens,
2316                    ));
2317                }
2318
2319                // No tool calls - check for unfinished tasks before finalizing
2320                if self.config.max_turns == 0 || self.turn_count < self.config.max_turns {
2321                    if let Some(nudge) = crate::utils::inspector::check() {
2322                        log::debug!(
2323                            "[query_engine] unfinished tasks found, nudging LLM to continue (turn {})",
2324                            self.turn_count
2325                        );
2326                        self.messages.push(crate::types::Message {
2327                            role: crate::types::MessageRole::System,
2328                            content: nudge,
2329                            ..Default::default()
2330                        });
2331                        if let Some(ref cb) = self.config.on_event {
2332                            cb(AgentEvent::Thinking {
2333                                turn: self.turn_count + 1,
2334                            });
2335                        }
2336                        self.turn_count += 1;
2337                        continue;
2338                    }
2339                }
2340
2341                // No tool calls - this is the final response
2342                let response_text = streaming_result.content.clone();
2343
2344                // Don't strip thinking from result.text - preserve it for history
2345                // The thinking will still be shown during streaming via streaming_text
2346
2347                // If both content and tool_calls are empty, the API response was empty.
2348                // This can happen from rate limiting, network issues, or model errors
2349                // that slip past HTTP status checks (e.g., 200 OK with error body,
2350                // or stream dropped mid-response). Retry a couple of times.
2351                if response_text.is_empty()
2352                    && streaming_result.tool_calls.is_empty()
2353                    && self.config.max_turns > 0
2354                    && self.turn_count < self.config.max_turns
2355                {
2356                    self.empty_response_retries += 1;
2357                    if self.empty_response_retries <= 2 {
2358                        log::warn!(
2359                            "[query_engine] empty model response, retrying ({}) stop_reason={:?}",
2360                            self.empty_response_retries,
2361                            streaming_result.stop_reason,
2362                        );
2363                        // Brief backoff between retries
2364                        tokio::time::sleep(std::time::Duration::from_millis(
2365                            500 * self.empty_response_retries as u64,
2366                        ))
2367                        .await;
2368                        // Continue to rebuild and retry the API call
2369                        continue;
2370                    }
2371                    self.empty_response_retries = 0;
2372                } else {
2373                    self.empty_response_retries = 0;
2374                }
2375
2376                // If both content and tool_calls are empty, it's a genuine error
2377                if response_text.is_empty() && streaming_result.tool_calls.is_empty() {
2378                    log::error!(
2379                        "[query_engine] model returned empty response after retries: stop_reason={:?}",
2380                        streaming_result.stop_reason,
2381                    );
2382                    return Err(AgentError::Api(
2383                        "Model response contained no text and no tool calls".to_string(),
2384                    ));
2385                }
2386
2387                let final_text = response_text.clone();
2388
2389                // Update total usage (matching TypeScript usage tracking)
2390                self.total_usage.input_tokens += streaming_result.usage.input_tokens;
2391                self.total_usage.output_tokens += streaming_result.usage.output_tokens;
2392                self.total_usage.cache_creation_input_tokens = Some(
2393                    self.total_usage.cache_creation_input_tokens.unwrap_or(0)
2394                        + streaming_result.usage.cache_creation_input_tokens.unwrap_or(0),
2395                );
2396                self.total_usage.cache_read_input_tokens = Some(
2397                    self.total_usage.cache_read_input_tokens.unwrap_or(0)
2398                        + streaming_result.usage.cache_read_input_tokens.unwrap_or(0),
2399                );
2400                self.turn_tokens += streaming_result.usage.output_tokens as u64;
2401
2402                // Update total cost (matching TypeScript cost tracking)
2403                self.total_cost += streaming_result.cost;
2404
2405                // Check if USD budget has been exceeded
2406                if let Some(max_budget) = self.config.max_budget_usd {
2407                    if self.total_cost >= max_budget {
2408                        let final_text = self.messages.iter()
2409                            .rev()
2410                            .find(|m| matches!(m.role, crate::types::MessageRole::Assistant))
2411                            .map(|m| m.content.clone())
2412                            .unwrap_or_default();
2413                        if let Some(ref cb) = self.config.on_event {
2414                            cb(AgentEvent::Done {
2415                                result: crate::types::QueryResult {
2416                                    text: final_text.clone(),
2417                                    usage: self.total_usage.clone(),
2418                                    num_turns: self.turn_count,
2419                                    duration_ms: self.query_duration_ms(),
2420                                    exit_reason: crate::types::ExitReason::MaxBudgetExceeded {
2421                                        max_budget_usd: max_budget,
2422                                    },
2423                                },
2424                            });
2425                        }
2426                        return Ok((
2427                            final_text,
2428                            crate::types::ExitReason::MaxBudgetExceeded {
2429                                max_budget_usd: max_budget,
2430                            },
2431                        ));
2432
2433                    }
2434                }
2435
2436                // Update global cost state for session-level reporting
2437                let model = self.config.model.clone();
2438                let _ = crate::services::model_cost::add_to_total_session_cost(
2439                    streaming_result.cost,
2440                    streaming_result.usage.input_tokens as u32,
2441                    streaming_result.usage.output_tokens as u32,
2442                    streaming_result.usage.cache_read_input_tokens.unwrap_or(0) as u32,
2443                    streaming_result.usage.cache_creation_input_tokens.unwrap_or(0) as u32,
2444                    0,
2445                    &model,
2446                );
2447
2448                // Add assistant response to message history
2449                self.messages.push(crate::types::Message {
2450                    role: crate::types::MessageRole::Assistant,
2451                    content: response_text.clone(),
2452                    ..Default::default()
2453                });
2454
2455                // Reset recovery count on successful completion
2456                self.max_output_tokens_recovery_count = 0;
2457                self.max_output_tokens_override = None;
2458
2459                // Check max_turns limit BEFORE incrementing (TypeScript checks nextTurnCount before increment)
2460                let next_turn_count = self.turn_count + 1;
2461                if self.config.max_turns > 0 && next_turn_count > self.config.max_turns {
2462                    // Emit max_turns_reached event (matches TypeScript behavior)
2463                    // Emit Done event (matches TypeScript yielding { type: 'result' })
2464                    if let Some(ref cb) = self.config.on_event {
2465                        cb(AgentEvent::MaxTurnsReached {
2466                            max_turns: self.config.max_turns,
2467                            turn_count: next_turn_count,
2468                        });
2469                        cb(AgentEvent::Done {
2470                            result: crate::types::QueryResult {
2471                                text: final_text.clone(),
2472                                usage: self.total_usage.clone(),
2473                                num_turns: self.turn_count,
2474                                duration_ms: self.query_duration_ms(),
2475                                exit_reason: crate::types::ExitReason::MaxTurns {
2476                                    max_turns: self.config.max_turns,
2477                                    turn_count: next_turn_count,
2478                                },
2479                            },
2480                        });
2481                    }
2482                    // Return what we have, don't exceed max turns
2483                    return Ok((
2484                        final_text,
2485                        crate::types::ExitReason::MaxTurns {
2486                            max_turns: self.config.max_turns,
2487                            turn_count: next_turn_count,
2488                        },
2489                    ));
2490                }
2491
2492                // Increment turn_count AFTER tool execution (matches TypeScript behavior)
2493                self.turn_count = next_turn_count;
2494
2495                // Fire Stop hooks before finalizing (matches TypeScript handleStopHooks)
2496                // Short-circuit: if the last assistant message is an API error message,
2497                // skip stop hooks to avoid the death spiral:
2498                // error → hook blocking → retry → error → …
2499                let last_is_api_error = self.messages.iter().rev().find_map(|m| {
2500                    if m.role == crate::types::MessageRole::Assistant {
2501                        Some(m.is_api_error_message == Some(true))
2502                    } else {
2503                        None
2504                    }
2505                }).unwrap_or(false);
2506
2507                if !self.stop_hook_active && !last_is_api_error {
2508                    self.stop_hook_active = true;
2509                    let stop_result = {
2510                        let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2511                        if let Some(registry) = registry_clone {
2512                            crate::hooks::run_stop_hooks(&registry, &self.config.cwd, &final_text).await
2513                        } else {
2514                            crate::hooks::StopHookResult::default()
2515                        }
2516                    };
2517
2518                    // Memory extraction (fire-and-forget, matches TypeScript EXTRACT_MEMORIES feature)
2519                    // Only for main agent (no agent_id), not subagents
2520                    if self.config.agent_id.is_none() {
2521                        let messages: Vec<crate::types::message::Message> = self.messages
2522                            .iter()
2523                            .filter_map(|m| match serde_json::to_value(m) {
2524                                Ok(v) => serde_json::from_value(v).ok(),
2525                                Err(_) => None,
2526                            })
2527                            .collect();
2528                        let extract_ctx = crate::services::extract_memories::ExtractMemoryContext {
2529                            messages,
2530                            system_prompt: self.config
2531                                .system_prompt
2532                                .as_deref()
2533                                .unwrap_or("")
2534                                .to_string(),
2535                            user_context: self.config.user_context.clone(),
2536                            system_context: self.config.system_context.clone(),
2537                            tool_use_context: None,
2538                            agent_id: self.config.agent_id.clone(),
2539                        };
2540                        let ctx_clone = extract_ctx.clone();
2541                        tokio::spawn(async move {
2542                            crate::services::extract_memories::execute_extract_memories(
2543                                ctx_clone,
2544                                None,
2545                            )
2546                            .await;
2547                        });
2548                    }
2549
2550                    if !stop_result.blocking_errors.is_empty() {
2551                        // Inject blocking errors as system messages and re-query
2552                        for err_msg in stop_result.blocking_errors {
2553                            self.messages.push(crate::types::Message {
2554                                role: crate::types::MessageRole::System,
2555                                content: err_msg,
2556                                ..Default::default()
2557                            });
2558                        }
2559                        if let Some(ref cb) = self.config.on_event {
2560                            cb(AgentEvent::Thinking {
2561                                turn: self.turn_count + 1,
2562                            });
2563                        }
2564                        continue;
2565                    }
2566                    if stop_result.prevent_continuation {
2567                        if let Some(ref cb) = self.config.on_event {
2568                            cb(AgentEvent::Done {
2569                                result: crate::types::QueryResult {
2570                                    text: final_text.clone(),
2571                                    usage: self.total_usage.clone(),
2572                                    num_turns: self.turn_count,
2573                                    duration_ms: self.query_duration_ms(),
2574                                    exit_reason: crate::types::ExitReason::Completed,
2575                                },
2576                            });
2577                        }
2578                        return Ok((final_text, crate::types::ExitReason::Completed));
2579                    }
2580                }
2581
2582                // Emit Thinking event for next turn
2583                if let Some(ref cb) = self.config.on_event {
2584                    cb(AgentEvent::Thinking {
2585                        turn: self.turn_count + 1,
2586                    });
2587                }
2588
2589                // Check token budget (TOKEN_BUDGET feature)
2590                // When a token budget is set, we continue the loop with a nudge message
2591                // until we reach 90% of the budget or hit diminishing returns.
2592                // Snapshot output tokens at turn start for per-turn budget tracking
2593                crate::bootstrap::state::snapshot_output_tokens_for_turn(self.config.token_budget);
2594                let token_budget = self.config.token_budget;
2595                let agent_id = self.config.agent_id.clone();
2596                match crate::token_budget::check_token_budget(
2597                    &mut self.budget_tracker,
2598                    agent_id.as_deref(),
2599                    token_budget,
2600                    self.turn_tokens,
2601                ) {
2602                    crate::token_budget::TokenBudgetDecision::Continue { nudge_message } => {
2603                        // Inject nudge as synthetic user message and re-query
2604                        self.messages.push(crate::types::Message {
2605                            role: crate::types::MessageRole::User,
2606                            content: nudge_message,
2607                            ..Default::default()
2608                        });
2609                        self.transition = Some("token_budget_continuation".to_string());
2610                        continue;
2611                    }
2612                    crate::token_budget::TokenBudgetDecision::Stop { .. } => {
2613                        // Normal exit path
2614                    }
2615                }
2616
2617                // Validate result before emitting (matches TypeScript isResultSuccessful check at QueryEngine.ts:1082)
2618                let last_stop_reason = streaming_result.stop_reason.as_deref();
2619                if !self.is_result_successful(last_stop_reason) {
2620                    let error_detail = format!(
2621                        "Invalid result state: last_message_type={:?}, stop_reason={:?}",
2622                        self.messages.last().map(|m| &m.role),
2623                        last_stop_reason
2624                    );
2625                    if let Some(ref cb) = self.config.on_event {
2626                        cb(AgentEvent::Done {
2627                            result: crate::types::QueryResult {
2628                                text: final_text.clone(),
2629                                usage: self.total_usage.clone(),
2630                                num_turns: self.turn_count,
2631                                duration_ms: self.query_duration_ms(),
2632                                exit_reason: crate::types::ExitReason::ModelError { error: error_detail.clone() },
2633                            },
2634                        });
2635                    }
2636                    return Ok((final_text, crate::types::ExitReason::ModelError { error: error_detail.clone() }));
2637                }
2638
2639                // Emit Done event (matches TypeScript yielding { type: 'result' })
2640                if let Some(ref cb) = self.config.on_event {
2641                    cb(AgentEvent::Done {
2642                        result: crate::types::QueryResult {
2643                            text: final_text.clone(),
2644                            usage: self.total_usage.clone(),
2645                            num_turns: self.turn_count,
2646                            duration_ms: self.query_duration_ms(),
2647                            exit_reason: crate::types::ExitReason::Completed,
2648                        },
2649                    });
2650                }
2651                // Return the final text (already processed above)
2652                return Ok((final_text, crate::types::ExitReason::Completed));
2653            }
2654
2655            // Process tool calls from streaming result
2656            let tool_calls = streaming_result.tool_calls;
2657
2658            // Convert JSON tool calls to ToolCall structs
2659            let mut tool_call_structs: Vec<crate::types::ToolCall> = Vec::new();
2660            for tc in &tool_calls {
2661                let name = tc
2662                    .get("name")
2663                    .and_then(|n| n.as_str())
2664                    .unwrap_or("")
2665                    .to_string();
2666                let id = tc
2667                    .get("id")
2668                    .and_then(|i| i.as_str())
2669                    .unwrap_or("")
2670                    .to_string();
2671                let arguments = tc
2672                    .get("arguments")
2673                    .cloned()
2674                    .unwrap_or_else(|| empty_json_value());
2675                tool_call_structs.push(crate::types::ToolCall {
2676                    id,
2677                    r#type: "function".to_string(),
2678                    name,
2679                    arguments,
2680                });
2681            }
2682
2683            // Use orchestration for concurrent/serial tool execution
2684            // This matches TypeScript's runTools() with partitioning
2685            let tool_context = crate::types::ToolContext {
2686                cwd: self.config.cwd.clone(),
2687                abort_signal: Arc::clone(self.abort_controller.signal()),
2688            };
2689
2690            // Create executor closure using the tool executors stored in QueryEngine
2691            // Wrap in Arc so it can be cloned for concurrent execution
2692            let tool_executors = Arc::new(self.tool_executors.lock().unwrap().clone());
2693            let tool_render_fns = Arc::new(self.tool_render_fns.lock().unwrap().clone());
2694            let tool_backfill_fns = Arc::new(self.tool_backfill_fns.lock().unwrap().clone());
2695            let tools = self.config.tools.clone();
2696            let can_use_tool = self.config.can_use_tool.clone();
2697            let cwd = self.config.cwd.clone();
2698            let on_event = self.config.on_event.clone();
2699            let abort_signal = self.abort_controller.signal().clone();
2700            let hook_registry = self.hook_registry.clone();
2701
2702            let executor = move |name: String, args: serde_json::Value, tool_call_id: String| {
2703                let tool_executors = tool_executors.clone();
2704                let tool_render_fns = tool_render_fns.clone();
2705                let tool_backfill_fns = tool_backfill_fns.clone();
2706                let tools = tools.clone();
2707                let can_use_tool = can_use_tool.clone();
2708                let cwd = cwd.clone();
2709                let on_event = on_event.clone();
2710                let abort_signal = abort_signal.clone();
2711                let hook_registry = hook_registry.clone();
2712                async move {
2713                    // The actual tool execution is now handled by QueryEngine::execute_tool
2714                    // but since we are in a closure passed to orchestration::run_tools,
2715                    // we have to implement the logic here or change orchestration.
2716                    // To keep it consistent with the new execute_tool, we'll mimic its logic.
2717
2718                    // Backfill observable input (TS: toolExecution.ts:783-792)
2719                    // Clone args, call backfill on clone, use backfilled for hooks/events
2720                    // Original args passed to executor_fn (preserves prompt cache)
2721                    let mut backfilled_args = args.clone();
2722                    if let Some(backfill_fn) = tool_backfill_fns.get(&name) {
2723                        backfill_fn(&mut backfilled_args);
2724                    }
2725
2726                    // Emit ToolStart event with render metadata (use backfilled input for observers)
2727                    if let Some(ref cb) = on_event {
2728                        let meta_input = Some(&backfilled_args);
2729                        let metadata = tool_render_fns.get(&name).map(|fns| ToolRenderMetadata {
2730                            user_facing_name: (Arc::clone(&fns.user_facing_name))(meta_input),
2731                            tool_use_summary: fns
2732                                .get_tool_use_summary
2733                                .as_ref()
2734                                .and_then(|f| f(meta_input)),
2735                            activity_description: fns
2736                                .get_activity_description
2737                                .as_ref()
2738                                .and_then(|f| f(meta_input)),
2739                        });
2740                        if let Some(ref meta) = metadata {
2741                            cb(AgentEvent::ToolStart {
2742                                tool_name: name.clone(),
2743                                tool_call_id: tool_call_id.clone(),
2744                                input: backfilled_args.clone(),
2745                                display_name: Some(meta.user_facing_name.clone()),
2746                                summary: meta.tool_use_summary.clone(),
2747                                activity_description: meta.activity_description.clone(),
2748                            });
2749                        } else {
2750                            cb(AgentEvent::ToolStart {
2751                                tool_name: name.clone(),
2752                                tool_call_id: tool_call_id.clone(),
2753                                input: backfilled_args.clone(),
2754                                display_name: None,
2755                                summary: None,
2756                                activity_description: None,
2757                            });
2758                        }
2759                    }
2760
2761                    // We don't have access to `self` here, so we can't call self.execute_tool.
2762                    // However, the hooks and permissions are part of the config/registry.
2763                    // For now, let's maintain the logic but ensure we use tool_call_id.
2764
2765                    let cwd_clone = cwd.clone();
2766
2767                    let context = crate::types::ToolContext {
2768                        cwd,
2769                        abort_signal: abort_signal.clone(),
2770                    };
2771
2772                    let executor_fn = tool_executors.get(&name).cloned();
2773
2774                    if let Some(executor_fn) = executor_fn {
2775                        // Compute render metadata
2776                        let meta_input = Some(&args);
2777                        let metadata = tool_render_fns.get(&name).map(|fns| ToolRenderMetadata {
2778                            user_facing_name: (Arc::clone(&fns.user_facing_name))(meta_input),
2779                            tool_use_summary: fns
2780                                .get_tool_use_summary
2781                                .as_ref()
2782                                .and_then(|f| f(meta_input)),
2783                            activity_description: fns
2784                                .get_activity_description
2785                                .as_ref()
2786                                .and_then(|f| f(meta_input)),
2787                        });
2788
2789                        // Pre-tool permission check (3-way: Allow/Deny/Ask) - use backfilled input
2790                        if let Some(can_use_fn) = can_use_tool {
2791                            if let Some(tool_def) = tools.iter().find(|t| &t.name == &name) {
2792                                match can_use_fn(tool_def.clone(), backfilled_args.clone()) {
2793                                    crate::permission::PermissionResult::Allow(_)
2794                                    | crate::permission::PermissionResult::Passthrough { .. } => {}
2795                                    crate::permission::PermissionResult::Deny(d) => {
2796                                        return Err(crate::error::AgentError::Tool(format!(
2797                                            "Tool '{}' permission denied: {}",
2798                                            name, d.message
2799                                        )));
2800                                    }
2801                                    crate::permission::PermissionResult::Ask(a) => {
2802                                        return Err(crate::error::AgentError::Tool(format!(
2803                                            "Tool '{}' requires user confirmation (Ask not supported in SDK): {}",
2804                                            name, a.message
2805                                        )));
2806                                    }
2807                                }
2808                            }
2809                        }
2810
2811                        // PreToolUse hooks (fire before execution, can block) - use backfilled input
2812                        {
2813                            let registry_clone = hook_registry.lock().unwrap().as_ref().cloned();
2814                            if let Some(registry) = registry_clone {
2815                                if let Err(e) =
2816                                    crate::hooks::run_pre_tool_use_hooks(&registry, &name, &backfilled_args, &tool_call_id, &cwd_clone)
2817                                        .await
2818                                {
2819                                    return Err(e);
2820                                }
2821                            }
2822                        }
2823
2824                        // Execute with original args (preserves prompt cache, TS: callInput)
2825                        let result = executor_fn(args, &context).await;
2826
2827                        // PostToolUse / PostToolUseFailure hooks
2828                        {
2829                            let registry_clone = hook_registry.lock().unwrap().as_ref().cloned();
2830                            if let Some(registry) = registry_clone {
2831                                match &result {
2832                                    Ok(tool_result) => {
2833                                        let _ = crate::hooks::run_post_tool_use_hooks(&registry, &name, tool_result, &tool_call_id, &cwd_clone).await;
2834                                    }
2835                                    Err(e) => {
2836                                        let _ = crate::hooks::run_post_tool_use_failure_hooks(&registry, &name, &e.to_string(), &tool_call_id, &cwd_clone).await;
2837                                    }
2838                                }
2839                            }
2840                        }
2841
2842                        // Emit ToolComplete or ToolError event with render hooks
2843                        if let Some(ref cb) = on_event {
2844                            match &result {
2845                                Ok(tool_result) => {
2846                                    let rendered_result = tool_render_fns
2847                                        .get(&name)
2848                                        .and_then(|fns| fns.render(&tool_result.content, &tools));
2849                                    if let Some(ref meta) = metadata {
2850                                        let display = format!(
2851                                            "{}({})",
2852                                            meta.user_facing_name,
2853                                            meta.tool_use_summary.as_deref().unwrap_or("?")
2854                                        );
2855                                        cb(AgentEvent::ToolComplete {
2856                                            tool_name: name.clone(),
2857                                            tool_call_id: tool_call_id.clone(),
2858                                            result: tool_result.clone(),
2859                                            display_name: Some(display),
2860                                            rendered_result: rendered_result.clone(),
2861                                        });
2862                                    } else {
2863                                        cb(AgentEvent::ToolComplete {
2864                                            tool_name: name.clone(),
2865                                            tool_call_id: tool_call_id.clone(),
2866                                            result: tool_result.clone(),
2867                                            display_name: None,
2868                                            rendered_result: rendered_result,
2869                                        });
2870                                    }
2871                                }
2872                                Err(e) => {
2873                                    cb(AgentEvent::ToolError {
2874                                        tool_name: name.clone(),
2875                                        tool_call_id: tool_call_id.clone(),
2876                                        error: e.to_string(),
2877                                    });
2878                                }
2879                            }
2880                        }
2881
2882                        result
2883                    } else {
2884                        let err =
2885                            crate::error::AgentError::Tool(format!("Tool '{}' not found", name));
2886                        if let Some(ref cb) = on_event {
2887                            cb(AgentEvent::ToolError {
2888                                tool_name: name.clone(),
2889                                tool_call_id: tool_call_id.clone(),
2890                                error: err.to_string(),
2891                            });
2892                        }
2893                        Err(err)
2894                    }
2895                }
2896            };
2897
2898            // Add assistant message with tool_calls to history BEFORE execution
2899            // This matches TypeScript behavior - the assistant message contains tool call info
2900            let assistant_msg = crate::types::Message {
2901                role: crate::types::MessageRole::Assistant,
2902                content: format!(
2903                    "Calling tool(s): {:?}",
2904                    tool_calls
2905                        .iter()
2906                        .map(|tc| tc.get("name").and_then(|n| n.as_str()).unwrap_or(""))
2907                        .collect::<Vec<_>>()
2908                ),
2909                tool_calls: Some(tool_call_structs.clone()),
2910                ..Default::default()
2911            };
2912            self.messages.push(assistant_msg);
2913
2914            let updates = orchestration::run_tools(
2915                tool_call_structs,
2916                self.config.tools.clone(),
2917                tool_context,
2918                executor,
2919                Some(self.config.cwd.clone()),
2920                None,
2921            )
2922            .await;
2923
2924            // Process tool results - matches TypeScript's normalizeMessagesForAPI
2925            for update in updates {
2926                if let Some(message) = update.message {
2927                    // Add tool result message to history
2928                    // Truncate large tool results to prevent 413 Payload Too Large errors
2929                    let truncated_content = truncate_tool_result_content(&message.content, "");
2930                    let mut msg = message;
2931                    msg.content = truncated_content;
2932                    self.messages.push(msg);
2933                }
2934            }
2935
2936            // Enforce aggregate tool result budget after tool results are added
2937            if let Some(ref mut state) = self.content_replacement_state {
2938                crate::services::compact::apply_tool_result_budget(&mut self.messages, Some(state));
2939            }
2940
2941            // After tool execution, check max_turns BEFORE incrementing
2942            let next_turn_count = self.turn_count + 1;
2943            if self.config.max_turns > 0 && next_turn_count > self.config.max_turns {
2944                // Emit max_turns_reached event
2945                if let Some(ref cb) = self.config.on_event {
2946                    cb(AgentEvent::MaxTurnsReached {
2947                        max_turns: self.config.max_turns,
2948                        turn_count: next_turn_count,
2949                    });
2950                }
2951                // Return what we have, don't exceed max turns
2952                let final_text = self
2953                    .messages
2954                    .iter()
2955                    .filter(|m| m.role == crate::types::MessageRole::Assistant)
2956                    .last()
2957                    .map(|m| m.content.clone())
2958                    .unwrap_or_else(|| "Max turns reached".to_string());
2959                // Don't strip thinking - preserve for history
2960                let final_text = final_text;
2961                if let Some(ref cb) = self.config.on_event {
2962                    cb(AgentEvent::Done {
2963                        result: crate::types::QueryResult {
2964                            text: final_text.clone(),
2965                            usage: self.total_usage.clone(),
2966                            num_turns: self.turn_count,
2967                            duration_ms: self.query_duration_ms(),
2968                            exit_reason: crate::types::ExitReason::default(),
2969                        },
2970                    });
2971                }
2972                return Ok((final_text, crate::types::ExitReason::default()));
2973            }
2974
2975            // After tool execution, increment turn count
2976            // TypeScript increments once per full turn (user msg + assistant + tools)
2977            self.turn_count = next_turn_count;
2978
2979            // Post-compaction turn counter tracking (matches TypeScript's tracking.turnCounter++)
2980            // Only increment if we compacted in the previous turn
2981            if self.auto_compact_tracking.compacted {
2982                self.auto_compact_tracking.turn_counter += 1;
2983            }
2984
2985            // Emit Thinking event for next turn
2986            if let Some(ref cb) = self.config.on_event {
2987                cb(AgentEvent::Thinking {
2988                    turn: self.turn_count + 1,
2989                });
2990            }
2991
2992            // Continue the loop to get next response
2993            continue;
2994        }
2995
2996        // Max tool turns reached
2997        let final_text = self
2998            .messages
2999            .iter()
3000            .filter(|m| m.role == crate::types::MessageRole::Assistant)
3001            .last()
3002            .map(|m| m.content.clone())
3003            .unwrap_or_else(|| "Max tool execution turns reached".to_string());
3004
3005        // Don't strip thinking - preserve for history
3006        let final_text = final_text;
3007
3008        // Emit Done event
3009        if let Some(ref cb) = self.config.on_event {
3010            cb(AgentEvent::Done {
3011                result: crate::types::QueryResult {
3012                    text: final_text.clone(),
3013                    usage: self.total_usage.clone(),
3014                    num_turns: self.turn_count,
3015                    duration_ms: self.query_duration_ms(),
3016                    exit_reason: crate::types::ExitReason::Completed,
3017                },
3018            });
3019        }
3020
3021        Ok((final_text, crate::types::ExitReason::Completed))
3022    }
3023
3024    fn build_api_messages(&self) -> Result<Vec<serde_json::Value>, AgentError> {
3025        // Determine if this is Anthropic API or OpenAI-compatible
3026        let base_url = self
3027            .config
3028            .base_url
3029            .as_deref()
3030            .unwrap_or("https://api.anthropic.com");
3031        let is_anthropic = base_url.contains("anthropic.com");
3032
3033        // Prepend user context if configured (matching TypeScript prependUserContext)
3034        let mut all_messages = self.messages.clone();
3035        if !self.config.user_context.is_empty() {
3036            let context_parts: Vec<String> = self
3037                .config
3038                .user_context
3039                .iter()
3040                .map(|(key, value)| format!("# {}\n{}", key, value))
3041                .collect();
3042            let context_content = format!(
3043                "<system-reminder>\nAs you answer the user's questions, you can use the following context:\n{}\n\nIMPORTANT: this context may or may not be relevant to your tasks. You should not respond to this context unless it's highly relevant to the work you're doing.\n</system-reminder>\n",
3044                context_parts.join("\n")
3045            );
3046            let context_msg = crate::types::Message {
3047                role: crate::types::MessageRole::User,
3048                content: context_content,
3049                is_meta: Some(true),
3050                ..Default::default()
3051            };
3052            all_messages.insert(0, context_msg);
3053        }
3054
3055        let mut api_messages: Vec<serde_json::Value> = Vec::new();
3056
3057        // Note: System prompt is handled separately in the request body, not in messages array
3058
3059        for msg in &all_messages {
3060            match msg.role {
3061                crate::types::MessageRole::User => {
3062                    // User message - simple text content
3063                    api_messages.push(serde_json::json!({
3064                        "role": "user",
3065                        "content": msg.content
3066                    }));
3067                }
3068                crate::types::MessageRole::Assistant => {
3069                    // Assistant message - could have tool_use blocks
3070                    if let Some(tool_calls) = &msg.tool_calls {
3071                        if is_anthropic {
3072                            // Anthropic format: content array with text and tool_use blocks
3073                            let mut content_blocks: Vec<serde_json::Value> = Vec::new();
3074
3075                            // Add text content if present
3076                            if !msg.content.is_empty()
3077                                && msg.content
3078                                    != format!(
3079                                        "Calling tool: {} with args: ",
3080                                        tool_calls.first().map(|t| t.name.as_str()).unwrap_or("")
3081                                    )
3082                            {
3083                                content_blocks.push(serde_json::json!({
3084                                    "type": "text",
3085                                    "text": msg.content
3086                                }));
3087                            }
3088
3089                            // Add tool_use blocks
3090                            for tc in tool_calls {
3091                                content_blocks.push(serde_json::json!({
3092                                    "type": "tool_use",
3093                                    "id": tc.id,
3094                                    "name": tc.name,
3095                                    "input": tc.arguments
3096                                }));
3097                            }
3098
3099                            api_messages.push(serde_json::json!({
3100                                "role": "assistant",
3101                                "content": content_blocks
3102                            }));
3103                        } else {
3104                            // OpenAI format: separate content and tool_calls fields
3105                            // Build tool_calls array
3106                            let mut openai_tool_calls: Vec<serde_json::Value> = Vec::new();
3107                            for tc in tool_calls {
3108                                openai_tool_calls.push(serde_json::json!({
3109                                    "id": tc.id,
3110                                    "type": "function",
3111                                    "function": {
3112                                        "name": tc.name,
3113                                        "arguments": serde_json::to_string(&tc.arguments).unwrap_or_default()
3114                                    }
3115                                }));
3116                            }
3117
3118                            api_messages.push(serde_json::json!({
3119                                "role": "assistant",
3120                                "content": msg.content,
3121                                "tool_calls": openai_tool_calls
3122                            }));
3123                        }
3124                    } else {
3125                        // Simple assistant message with text only
3126                        api_messages.push(serde_json::json!({
3127                            "role": "assistant",
3128                            "content": msg.content
3129                        }));
3130                    }
3131                }
3132                crate::types::MessageRole::Tool => {
3133                    // Tool result message
3134                    let tool_use_id = msg.tool_call_id.clone().unwrap_or_default();
3135
3136                    // Build content for tool result
3137                    let content = if msg.is_error == Some(true) {
3138                        format!("<tool_use_error>{}</tool_use_error>", msg.content)
3139                    } else {
3140                        msg.content.clone()
3141                    };
3142
3143                    if is_anthropic {
3144                        // Anthropic API expects tool_result blocks in a 'user' role message
3145                        api_messages.push(serde_json::json!({
3146                            "role": "user",
3147                            "content": [
3148                                {
3149                                    "type": "tool_result",
3150                                    "tool_use_id": tool_use_id,
3151                                    "content": content
3152                                }
3153                            ]
3154                        }));
3155                    } else {
3156                        // OpenAI-compatible API expects plain text content for tool results
3157                        api_messages.push(serde_json::json!({
3158                            "role": "tool",
3159                            "content": content,
3160                            "tool_call_id": tool_use_id
3161                        }));
3162                    }
3163                }
3164                crate::types::MessageRole::System => {
3165                    // System messages - include as user message per Anthropic
3166                    api_messages.push(serde_json::json!({
3167                        "role": "user",
3168                        "content": msg.content
3169                    }));
3170                }
3171            }
3172        }
3173        // Inject <available-deferred-tools> block if tool search is enabled
3174        self.maybe_inject_deferred_tools_block(&mut api_messages);
3175
3176        Ok(api_messages)
3177    }
3178}
3179
3180/// Calculate which messages to keep during compaction
3181/// Keeps first few messages (system prompt context) and recent messages
3182/// This is a simplified version - TypeScript uses LLM to create a summary
3183fn calculate_compaction_messages(
3184    messages: &[crate::types::Message],
3185    target_tokens: u32,
3186) -> Vec<crate::types::Message> {
3187    if messages.len() <= 4 {
3188        // Not enough messages to need compaction
3189        return messages.to_vec();
3190    }
3191
3192    // Estimate tokens per message (rough average)
3193    let avg_tokens_per_msg = 500;
3194    let target_message_count = (target_tokens / avg_tokens_per_msg).max(10) as usize;
3195
3196    // Always keep at least first 2 messages (system + initial context)
3197    // Keep more recent messages to preserve conversation context
3198    let keep_first = 2;
3199    let keep_last = target_message_count.saturating_sub(keep_first);
3200
3201    if messages.len() <= keep_first + keep_last {
3202        return messages.to_vec();
3203    }
3204
3205    let first_part = &messages[..keep_first];
3206    let last_part = &messages[messages.len() - keep_last..];
3207
3208    let mut result = Vec::with_capacity(keep_first + keep_last);
3209    result.extend(first_part.iter().cloned());
3210    result.extend(last_part.iter().cloned());
3211    result
3212}
3213
3214/// Extract text from API response (supports both Anthropic and OpenAI formats)
3215fn extract_text_from_response(response: &serde_json::Value) -> String {
3216    // Try OpenAI format first: response.choices[].message.content
3217    if let Some(choices) = response.get("choices").and_then(|c| c.as_array()) {
3218        if let Some(first_choice) = choices.first() {
3219            if let Some(content) = first_choice.get("message").and_then(|m| m.get("content")) {
3220                if let Some(text) = content.as_str() {
3221                    return text.to_string();
3222                }
3223            }
3224        }
3225    }
3226
3227    // Try Anthropic format: response.content[].text
3228    if let Some(content) = response.get("content").and_then(|c| c.as_array()) {
3229        for block in content {
3230            if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
3231                return text.to_string();
3232            }
3233        }
3234    }
3235
3236    String::new()
3237}
3238
3239/// Parse the compact summary to extract <summary> content
3240/// Strips the <analysis> block as it's just a drafting scratchpad
3241fn parse_compact_summary(raw_summary: &str) -> String {
3242    // Extract <summary> content
3243    if let Some(start) = raw_summary.find("<summary>") {
3244        if let Some(end) = raw_summary.find("</summary>") {
3245            let mut summary = raw_summary[start + 9..end].trim().to_string();
3246
3247            // Also look for any content after </summary> that might be part of summary
3248            if let Some(after) = raw_summary.find("</summary>") {
3249                let remaining = raw_summary[after + 11..].trim();
3250                if !remaining.is_empty() && !remaining.starts_with('<') {
3251                    summary.push_str("\n\n");
3252                    summary.push_str(remaining);
3253                }
3254            }
3255
3256            // If no <summary> tag found, use the whole response as summary
3257            return if summary.is_empty() {
3258                raw_summary.trim().to_string()
3259            } else {
3260                summary
3261            };
3262        }
3263    }
3264
3265    // If no <summary> tags, try to find and remove <analysis> section
3266    let mut cleaned = raw_summary.to_string();
3267    if let Some(analysis_start) = cleaned.find("<analysis>") {
3268        if let Some(analysis_end) = cleaned.find("</analysis>") {
3269            cleaned = format!(
3270                "{}{}",
3271                &cleaned[..analysis_start],
3272                cleaned[analysis_end + 11..].trim()
3273            );
3274        }
3275    }
3276
3277    cleaned.trim().to_string()
3278}
3279
3280fn extract_tool_calls(response: &serde_json::Value) -> Vec<serde_json::Value> {
3281    // First try OpenAI format: response.choices[].message.tool_calls
3282    if let Some(choices) = response.get("choices").and_then(|c| c.as_array()) {
3283        if let Some(first_choice) = choices.first() {
3284            if let Some(message) = first_choice.get("message") {
3285                if let Some(tool_calls) = message.get("tool_calls").and_then(|t| t.as_array()) {
3286                    if !tool_calls.is_empty() {
3287                        return tool_calls
3288                            .iter()
3289                            .map(|tc| {
3290                                let func = tc.get("function");
3291                                let name = func
3292                                    .and_then(|f| f.get("name"))
3293                                    .cloned()
3294                                    .unwrap_or_else(|| empty_json_value());
3295                                // Handle arguments - could be string or object
3296                                let args = func.and_then(|f| f.get("arguments"));
3297                                let arguments = if let Some(args_val) = args {
3298                                    if let Some(arg_str) = args_val.as_str() {
3299                                        // Parse JSON string to object
3300                                        serde_json::from_str(arg_str).unwrap_or(args_val.clone())
3301                                    } else {
3302                                        args_val.clone()
3303                                    }
3304                                } else {
3305                                    serde_json::Value::Null
3306                                };
3307                                // Get tool_call_id from the tc object directly
3308                                let id = tc.get("id").cloned();
3309                                let mut result = serde_json::json!({
3310                                    "name": name,
3311                                    "arguments": arguments,
3312                                });
3313                                if let Some(id_val) = id {
3314                                    result["id"] = id_val;
3315                                }
3316                                result
3317                            })
3318                            .collect();
3319                    }
3320                }
3321            }
3322        }
3323    }
3324
3325    vec![]
3326}
3327/// Format: \n<minimax:tool_call>\n<invoke name="tool-name">\n<parameter name="key">value
3328
3329fn extract_response_text(response: &serde_json::Value) -> String {
3330    // OpenAI chat completions format
3331    if let Some(choices) = response.get("choices").and_then(|c| c.as_array()) {
3332        if let Some(first_choice) = choices.first() {
3333            if let Some(message) = first_choice.get("message") {
3334                if let Some(content) = message.get("content").and_then(|c| c.as_str()) {
3335                    return content.to_string();
3336                }
3337            }
3338        }
3339    }
3340
3341    // Fallback: check for Anthropic format
3342    if let Some(content) = response.get("content").and_then(|c| c.as_array()) {
3343        for block in content {
3344            if let Some(block_type) = block.get("type").and_then(|t| t.as_str()) {
3345                match block_type {
3346                    "text" => {
3347                        if let Some(t) = block.get("text").and_then(|t| t.as_str()) {
3348                            return t.to_string();
3349                        }
3350                    }
3351                    _ => {}
3352                }
3353            }
3354        }
3355    }
3356
3357    String::new()
3358}
3359
3360fn extract_usage(response: &serde_json::Value) -> TokenUsage {
3361    // OpenAI format: response.usage
3362    if let Some(usage) = response.get("usage") {
3363        return TokenUsage {
3364            input_tokens: usage
3365                .get("prompt_tokens")
3366                .and_then(|v| v.as_u64())
3367                .unwrap_or(0)
3368                + usage
3369                    .get("completion_tokens")
3370                    .and_then(|v| v.as_u64())
3371                    .unwrap_or(0),
3372            output_tokens: usage
3373                .get("completion_tokens")
3374                .and_then(|v| v.as_u64())
3375                .unwrap_or(0),
3376            cache_creation_input_tokens: None,
3377            cache_read_input_tokens: None,
3378            iterations: None,
3379        };
3380    }
3381
3382    // Fallback: Anthropic format
3383    let usage = response.get("usage");
3384    TokenUsage {
3385        input_tokens: usage
3386            .and_then(|u| u.get("input_tokens"))
3387            .and_then(|v| v.as_u64())
3388            .unwrap_or(0),
3389        output_tokens: usage
3390            .and_then(|u| u.get("output_tokens"))
3391            .and_then(|v| v.as_u64())
3392            .unwrap_or(0),
3393        cache_creation_input_tokens: usage
3394            .and_then(|u| u.get("cache_creation_input_tokens"))
3395            .and_then(|v| v.as_u64()),
3396        cache_read_input_tokens: usage
3397            .and_then(|u| u.get("cache_read_input_tokens"))
3398            .and_then(|v| v.as_u64()),
3399        iterations: None,
3400    }
3401}
3402
3403/// Maximum number of 429 retry attempts at the query level
3404const MAX_429_RETRIES: u32 = 5;
3405/// Base delay between 429 retries in milliseconds
3406const _429_RETRY_BASE_MS: u64 = 2000;
3407/// Maximum delay between 429 retries in milliseconds
3408const _429_RETRY_MAX_MS: u64 = 30_000;
3409/// Maximum structured output retries
3410const MAX_STRUCTURED_OUTPUT_RETRIES: u32 = 5;
3411
3412/// Result type for the retry loop.  Distinguishes between a successful API
3413/// call, a model fallback that should be handled by the caller, client
3414/// recreation (rebuild HTTP client and retry), and terminal errors.
3415///
3416/// Matches TypeScript's withRetry generator which throws
3417/// FallbackTriggeredError or returns normally.
3418enum RetryResult {
3419    Success(StreamingResult),
3420    FallbackTriggered(FallbackTriggeredError),
3421    RecreateClient(AgentError),
3422    Terminal(AgentError),
3423}
3424
3425fn error_to_message_for_retry(error: &AgentError) -> String {
3426    match error {
3427        AgentError::Api(msg) => msg.clone(),
3428        AgentError::Http(e) => format!("{}", e),
3429        other => other.to_string(),
3430    }
3431}
3432
3433/// Calculate delay with exponential backoff and jitter for retries.
3434fn calculate_retry_delay(attempt: u32) -> u64 {
3435    let base = _429_RETRY_BASE_MS * 2u64.saturating_pow(attempt.saturating_sub(1));
3436    let capped = base.min(_429_RETRY_MAX_MS);
3437    // Add up to 25% jitter
3438    let nanos = std::time::SystemTime::now()
3439        .duration_since(std::time::UNIX_EPOCH)
3440        .unwrap_or_default()
3441        .subsec_nanos();
3442    let jitter = (capped as f64 * 0.25 * (nanos as f64 / u32::MAX as f64)) as u64;
3443    capped + jitter
3444}
3445
3446/// Attempt streaming then non-streaming request.
3447/// Wraps the full streaming-to-fallback flow used by submit_message.
3448async fn async_make_api_request(
3449    client: &reqwest::Client,
3450    url: &str,
3451    api_key: &str,
3452    request_body: serde_json::Value,
3453    on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3454) -> Result<StreamingResult, AgentError> {
3455    // Try streaming first
3456    match make_anthropic_streaming_request(
3457        client,
3458        url,
3459        api_key,
3460        request_body.clone(),
3461        on_event.clone(),
3462        Arc::new(AtomicBool::new(false)),
3463    )
3464    .await
3465    {
3466        Ok(result) => return Ok(result),
3467        Err(_) => {} // Fall through to non-streaming
3468    }
3469
3470    // Streaming failed - fall back to non-streaming
3471    make_nonstreaming_request(client, url, api_key, request_body, on_event).await
3472}
3473
3474/// Make an API request with 429/529 retry and exponential backoff.
3475///
3476/// Tracks consecutive 529 errors separately.  After MAX_529_RETRIES (3)
3477/// consecutive 529s with a fallback model available, returns
3478/// RetryResult::FallbackTriggered so the caller can switch models.
3479///
3480/// On stale connection (ECONNRESET/EPIPE) or auth errors (401), returns
3481/// RetryResult::RecreateClient so the caller rebuilds the HTTP client.
3482///
3483/// On max-tokens-context-overflow, adjusts max_output_tokens and retries.
3484///
3485/// Returns RetryResult::Terminal for errors that cannot be retried.
3486///
3487/// Matches TypeScript's withRetry() generator in withRetry.ts.
3488async fn make_api_request_with_429_retry(
3489    client: &reqwest::Client,
3490    url: &str,
3491    api_key: &str,
3492    request_body: serde_json::Value,
3493    on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3494    fallback_model: Option<String>,
3495    current_model: &str,
3496    thinking_budget_tokens: Option<u32>,
3497) -> RetryResult {
3498    let mut consecutive_529s: u32 = 0;
3499    let mut last_error_str: Option<String> = None;
3500
3501    // We clone request_body so we can mutate max_tokens on overflow retries
3502    let mut mutable_request = request_body.clone();
3503
3504    for attempt in 0..=MAX_429_RETRIES {
3505        match async_make_api_request(
3506            client,
3507            url,
3508            api_key,
3509            mutable_request.clone(),
3510            on_event.clone(),
3511        )
3512        .await
3513        {
3514            Ok(result) => return RetryResult::Success(result),
3515            Err(e) => {
3516                last_error_str = Some(e.to_string());
3517
3518                // --- 529 tracking: consecutive 529 errors ---
3519                if is_529_error(&e) {
3520                    consecutive_529s += 1;
3521                    if consecutive_529s >= MAX_529_RETRIES {
3522                        if let Some(ref fb) = fallback_model {
3523                            return RetryResult::FallbackTriggered(FallbackTriggeredError {
3524                                original_model: current_model.to_string(),
3525                                fallback_model: fb.clone(),
3526                            });
3527                        }
3528                        // No fallback model -- treat as terminal after max 529s
3529                        if attempt >= MAX_429_RETRIES {
3530                            return RetryResult::Terminal(e);
3531                        }
3532                    }
3533                } else {
3534                    // Non-529 error resets the consecutive counter
3535                    consecutive_529s = 0;
3536                }
3537
3538                // --- Stale connection or auth error: recreate client ---
3539                if is_stale_connection_error(&e) || is_auth_error(&e) {
3540                    return RetryResult::RecreateClient(e);
3541                }
3542
3543                // --- Max tokens context overflow: adjust and retry ---
3544                if let Some((input_tokens, _max_tokens, context_limit)) =
3545                    parse_max_tokens_context_overflow(&e)
3546                {
3547                    let safety_buffer: u64 = 1000;
3548                    let available = context_limit.saturating_sub(input_tokens).saturating_sub(safety_buffer);
3549                    if available < FLOOR_OUTPUT_TOKENS {
3550                        return RetryResult::Terminal(e);
3551                    }
3552                    // Ensure enough tokens for thinking + at least 1 output token (TS: withRetry.ts:418-422)
3553                    let min_required = (thinking_budget_tokens.unwrap_or(0) as u64).saturating_add(1);
3554                    let adjusted = std::cmp::max(FLOOR_OUTPUT_TOKENS, std::cmp::max(available, min_required));
3555                    if let Some(max_t) = mutable_request.get_mut("max_tokens") {
3556                        *max_t = serde_json::json!(adjusted as u32);
3557                    }
3558                    // Retry immediately with adjusted max_tokens
3559                    continue;
3560                }
3561
3562                // --- Pure 429 (not 529): retry with backoff ---
3563                if is_429_only_error(&e) && attempt < MAX_429_RETRIES {
3564                    let delay = calculate_retry_delay(attempt + 1);
3565                    emit_api_retry_event(
3566                        on_event.as_ref().map(|a| a.as_ref()),
3567                        attempt + 1,
3568                        MAX_429_RETRIES,
3569                        delay,
3570                        None,
3571                        &e.to_string(),
3572                    );
3573                    sleep_tokio(std::time::Duration::from_millis(delay)).await;
3574                    continue;
3575                }
3576
3577                // --- 529 (below MAX_529_RETRIES): retry with backoff ---
3578                if is_529_error(&e) && attempt < MAX_429_RETRIES {
3579                    let delay = calculate_retry_delay(attempt + 1);
3580                    emit_api_retry_event(
3581                        on_event.as_ref().map(|a| a.as_ref()),
3582                        attempt + 1,
3583                        MAX_429_RETRIES,
3584                        delay,
3585                        None,
3586                        &e.to_string(),
3587                    );
3588                    sleep_tokio(std::time::Duration::from_millis(delay)).await;
3589                    continue;
3590                }
3591
3592                // --- Terminal error ---
3593                return RetryResult::Terminal(e);
3594            }
3595        }
3596    }
3597
3598    RetryResult::Terminal(AgentError::Api(last_error_str.unwrap_or_else(|| {
3599        "Retry exhausted".to_string()
3600    })))
3601}
3602
3603/// Make non-streaming API request (fallback when streaming fails)
3604/// Matches TypeScript's executeNonStreamingRequest behavior
3605async fn make_nonstreaming_request(
3606    client: &reqwest::Client,
3607    url: &str,
3608    api_key: &str,
3609    mut request_body: serde_json::Value,
3610    on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3611) -> Result<StreamingResult, AgentError> {
3612    // Disable streaming for non-streaming request
3613    request_body["stream"] = serde_json::json!(false);
3614
3615    // Get model name for cost tracking
3616    let model = request_body
3617        .get("model")
3618        .and_then(|v| v.as_str())
3619        .unwrap_or("unknown")
3620        .to_string();
3621
3622    // Determine if this is Anthropic API or a third-party API (OpenAI-compatible)
3623    let is_anthropic = url.contains("anthropic.com");
3624
3625    // Build the request and execute with retry (wraps .send() with exponential backoff)
3626    let request_builder = if is_anthropic {
3627        // Anthropic format
3628        client
3629            .post(url)
3630            .header("x-api-key", api_key)
3631            .header("anthropic-version", "2023-06-01")
3632            .header("Content-Type", "application/json")
3633            .header("User-Agent", get_user_agent())
3634            .json(&request_body)
3635    } else {
3636        // OpenAI-compatible format (vLLM, etc.) - use Bearer auth
3637        client
3638            .post(url)
3639            .header("Authorization", format!("Bearer {}", api_key))
3640            .header("Content-Type", "application/json")
3641            .header("User-Agent", get_user_agent())
3642            .json(&request_body)
3643    };
3644
3645    // Send request directly — no retry here since callers handle retry
3646    let response = request_builder.send().await.map_err(AgentError::from)?;
3647
3648    let status = response.status();
3649    if !status.is_success() {
3650        let error_text = response.text().await.unwrap_or_default();
3651        return Err(AgentError::Api(format!(
3652            "Non-streaming API error {}: {}",
3653            status,
3654            sanitize_html_error(&error_text)
3655        )));
3656    }
3657
3658    // Emit MessageStart event
3659    if let Some(ref cb) = on_event {
3660        cb(AgentEvent::MessageStart {
3661            message_id: uuid::Uuid::new_v4().to_string(),
3662        });
3663    }
3664
3665    // Get response body
3666    let response_text = response
3667        .text()
3668        .await
3669        .map_err(|e| AgentError::Api(format!("Failed to read non-streaming response: {}", e)))?;
3670
3671    // Parse JSON response
3672    let response_json: serde_json::Value = serde_json::from_str(&response_text).map_err(|e| {
3673        AgentError::Api(format!(
3674            "Failed to parse non-streaming response: {} - {}",
3675            e, response_text
3676        ))
3677    })?;
3678
3679    // Check for API error
3680    if let Some(error) = response_json.get("error") {
3681        // Check for max_output_tokens error type (matching TypeScript's isWithheldMaxOutputTokens)
3682        if let Some(error_type) = error.get("type").and_then(|t| t.as_str()) {
3683            if error_type == "max_tokens" || error_type == "max_output_tokens" {
3684                // Return result with api_error instead of failing - allows recovery flow
3685                let mut result = StreamingResult::default();
3686                result.api_error = Some("max_output_tokens".to_string());
3687                return Ok(result);
3688            }
3689        }
3690        // Check for prompt-too-long / 413 - trigger reactive compact
3691        let error_str = error.to_string().to_lowercase();
3692        if error_str.contains("413")
3693            || error_str.contains("prompt_too_long")
3694            || error_str.contains("prompt too long")
3695        {
3696            return Err(AgentError::Api("prompt_too_long: context size exceeded. The query engine will attempt reactive compact.".to_string()));
3697        }
3698        return Err(AgentError::Api(format!("API error: {}", error)));
3699    }
3700
3701    let mut result = StreamingResult::default();
3702
3703    // Handle Anthropic format: response.content[] with blocks
3704    if let Some(content) = response_json.get("content").and_then(|c| c.as_array()) {
3705        for block in content {
3706            let block_type = block.get("type").and_then(|t| t.as_str());
3707            match block_type {
3708                Some("text") => {
3709                    if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
3710                        result.content.push_str(text);
3711                    }
3712                }
3713                Some("thinking") | Some("redacted_thinking") => {
3714                    // Handle thinking blocks - extract thinking content
3715                    // In TypeScript, thinking is stored as structured block in content
3716                    // We need to extract it and store properly for display
3717                    if let Some(thinking) = block.get("thinking").and_then(|t| t.as_str()) {
3718                        // Store thinking with markers so TUI can extract it
3719                        result
3720                            .content
3721                            .push_str(&format!("【thinking:{}】", thinking));
3722                    }
3723                }
3724                Some("tool_use") => {
3725                    let tool_id = block.get("id").and_then(|i| i.as_str()).unwrap_or("");
3726                    let tool_name = block.get("name").and_then(|n| n.as_str()).unwrap_or("");
3727                    let tool_input = block
3728                        .get("input")
3729                        .cloned()
3730                        .unwrap_or_else(|| empty_json_value());
3731
3732                    result.tool_calls.push(serde_json::json!({
3733                        "id": tool_id,
3734                        "name": tool_name,
3735                        "arguments": tool_input,
3736                    }));
3737                }
3738                _ => {}
3739            }
3740        }
3741        // Extract usage
3742        if let Some(usage) = response_json.get("usage") {
3743            result.usage = parse_anthropic_usage(usage);
3744        }
3745        // Calculate cost (matching TypeScript cost tracking)
3746        result.cost = calculate_streaming_cost(&result.usage, &model);
3747    }
3748    // Handle OpenAI format: response.choices[].message
3749    else if let Some(choices) = response_json.get("choices").and_then(|c| c.as_array()) {
3750        if let Some(first_choice) = choices.first() {
3751            if let Some(message) = first_choice.get("message") {
3752                // Extract content
3753                if let Some(content) = message.get("content").and_then(|c| c.as_str()) {
3754                    result.content = content.to_string();
3755                }
3756                // Extract tool calls
3757                if let Some(tool_calls) = message.get("tool_calls").and_then(|t| t.as_array()) {
3758                    for tc in tool_calls {
3759                        let id = tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
3760                        let func = tc.get("function");
3761                        let name = func
3762                            .and_then(|f| f.get("name"))
3763                            .and_then(|n| n.as_str())
3764                            .unwrap_or("");
3765                        let args = func.and_then(|f| f.get("arguments"));
3766                        let args_val = if let Some(args_str) = args.and_then(|a| a.as_str()) {
3767                            serde_json::from_str(args_str).unwrap_or_else(|_| empty_json_value())
3768                        } else {
3769                            args.cloned().unwrap_or_else(|| empty_json_value())
3770                        };
3771                        result.tool_calls.push(serde_json::json!({
3772                            "id": id,
3773                            "name": name,
3774                            "arguments": args_val,
3775                        }));
3776                    }
3777                }
3778            }
3779        }
3780        // Extract usage (OpenAI format)
3781        if let Some(usage) = response_json.get("usage") {
3782            result.usage = TokenUsage {
3783                input_tokens: usage
3784                    .get("prompt_tokens")
3785                    .and_then(|v| v.as_u64())
3786                    .unwrap_or(0),
3787                output_tokens: usage
3788                    .get("completion_tokens")
3789                    .and_then(|v| v.as_u64())
3790                    .unwrap_or(0),
3791                cache_creation_input_tokens: None,
3792                cache_read_input_tokens: None,
3793                iterations: None,
3794            };
3795        }
3796        // Calculate cost (matching TypeScript cost tracking)
3797        result.cost = calculate_streaming_cost(&result.usage, &model);
3798    }
3799
3800    // Emit completion events
3801    if let Some(ref cb) = on_event {
3802        cb(AgentEvent::ContentBlockStart {
3803            index: 0,
3804            block_type: "text".to_string(),
3805        });
3806        if !result.content.is_empty() {
3807            cb(AgentEvent::ContentBlockDelta {
3808                index: 0,
3809                delta: ContentDelta::Text {
3810                    text: result.content.clone(),
3811                },
3812            });
3813        }
3814        cb(AgentEvent::ContentBlockStop { index: 0 });
3815        cb(AgentEvent::MessageStop);
3816    }
3817
3818    Ok(result)
3819}
3820
3821/// Make streaming API call and process SSE events
3822/// Matches TypeScript query.ts behavior for streaming
3823/// Includes: idle watchdog, stall detection, TTFT, cost tracking, abort handling,
3824/// stream completion validation, message_delta handling, resource cleanup.
3825async fn make_anthropic_streaming_request(
3826    client: &reqwest::Client,
3827    url: &str,
3828    api_key: &str,
3829    request_body: serde_json::Value,
3830    on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3831    abort_handle: Arc<AtomicBool>,
3832) -> Result<StreamingResult, AgentError> {
3833    use futures_util::stream::StreamExt;
3834
3835    // Determine if this is Anthropic API or a third-party API (OpenAI-compatible)
3836    let is_anthropic = url.contains("anthropic.com");
3837
3838    // Get model name from request body for cost tracking
3839    let model = request_body
3840        .get("model")
3841        .and_then(|v| v.as_str())
3842        .unwrap_or("unknown")
3843        .to_string();
3844
3845    // ─── Stream Watchdog Setup (matching TypeScript lines 1743-1793) ───
3846    let watchdog = StreamWatchdog::from_env();
3847    let watchdog_aborted = Arc::new(AtomicBool::new(false));
3848    let watchdog_aborted_clone = watchdog_aborted.clone();
3849
3850    // ─── Stall Detection Setup (matching TypeScript lines 1801-1821) ───
3851    let mut stall_stats = StallStats::default();
3852    let mut last_event_time: Option<std::time::Instant> = None;
3853    let mut is_first_chunk = true;
3854    let start_time = std::time::Instant::now();
3855
3856    // Record when the stream started (for TTFT calculation)
3857    let mut ttft_recorded = false;
3858
3859    // Build the request and execute with retry (wraps .send() with exponential backoff)
3860    // 404 stream creation errors are NOT retryable, so they bypass the retry layer
3861    let request_builder = if is_anthropic {
3862        // Anthropic format
3863        client
3864            .post(url)
3865            .header("x-api-key", api_key)
3866            .header("anthropic-version", "2023-06-01")
3867            .header("Content-Type", "application/json")
3868            .header("Accept", "text/event-stream")
3869            .header("User-Agent", get_user_agent())
3870            .json(&request_body)
3871    } else {
3872        // OpenAI-compatible format (vLLM, etc.) - use Bearer auth
3873        client
3874            .post(url)
3875            .header("Authorization", format!("Bearer {}", api_key))
3876            .header("Content-Type", "application/json")
3877            .header("Accept", "text/event-stream")
3878            .header("User-Agent", get_user_agent())
3879            .json(&request_body)
3880    };
3881
3882    // Send request directly — no retry here since callers handle retry
3883    let response = request_builder.send().await.map_err(AgentError::from)?;
3884
3885    // Check if user aborted before we even started reading
3886    if abort_handle.load(Ordering::SeqCst) {
3887        return Err(AgentError::UserAborted);
3888    }
3889
3890    let status = response.status();
3891    if !status.is_success() {
3892        let error_text = response.text().await.unwrap_or_default();
3893        let sanitized = sanitize_html_error(&error_text);
3894        // Check for 404 stream creation error (matching TypeScript)
3895        if status.as_u16() == 404 {
3896            return Err(AgentError::Stream404CreationError(format!(
3897                "Streaming endpoint returned 404: {}",
3898                sanitized
3899            )));
3900        }
3901        return Err(AgentError::Api(format!(
3902            "Streaming API error {}: {}",
3903            status, sanitized
3904        )));
3905    }
3906
3907    // Store response for cleanup
3908    let response_for_cleanup = Arc::new(Mutex::new(Some(response)));
3909    let response_for_cleanup_clone = response_for_cleanup.clone();
3910
3911    // ─── Reset stream idle timer (called at start and after each event) ───
3912    let reset_idle_timer = || {
3913        if watchdog.enabled {
3914            let watchdog_aborted_warning = watchdog_aborted_clone.clone();
3915            let watchdog_aborted_timeout = watchdog_aborted_clone.clone();
3916            let timeout_ms = watchdog.idle_timeout_ms;
3917            let warning_ms = watchdog.warning_threshold_ms;
3918            let response_for_cleanup_inner = response_for_cleanup.clone();
3919
3920            // Warning timer
3921            tokio::spawn(async move {
3922                tokio::time::sleep(std::time::Duration::from_millis(warning_ms)).await;
3923                if !watchdog_aborted_warning.load(Ordering::SeqCst) {
3924                    eprintln!(
3925                        "Streaming idle warning: no chunks received for {}s",
3926                        warning_ms / 1000
3927                    );
3928                }
3929            });
3930
3931            // Abort timer
3932            tokio::spawn(async move {
3933                tokio::time::sleep(std::time::Duration::from_millis(timeout_ms)).await;
3934                if !watchdog_aborted_timeout.load(Ordering::SeqCst) {
3935                    watchdog_aborted_timeout.store(true, Ordering::SeqCst);
3936                    eprintln!(
3937                        "Streaming idle timeout: no chunks received for {}s, aborting stream",
3938                        timeout_ms / 1000
3939                    );
3940                    // Cancel the response body to release resources
3941                    if let Ok(mut guard) = response_for_cleanup_inner.lock() {
3942                        if let Some(resp) = guard.take() {
3943                            let _ = resp.error_for_status_ref();
3944                        }
3945                    }
3946                }
3947            });
3948        }
3949    };
3950    reset_idle_timer();
3951
3952    // Get the streaming body - take ownership from the Arc
3953    let response = response_for_cleanup.lock().unwrap().take().unwrap();
3954    let body = response.bytes_stream();
3955    let mut stream: futures_util::stream::BoxStream<'_, Result<bytes::Bytes, reqwest::Error>> =
3956        Box::pin(body);
3957
3958    let mut result = StreamingResult::default();
3959    let mut current_tool_use: Option<(String, String, String)> = None; // (id, name, args_str)
3960    // OpenAI tool_calls accumulator: index -> (id, name, accumulated args)
3961    let mut openai_tool_calls: HashMap<u32, (String, String, String)> = HashMap::new();
3962    let mut openai_tool_finalized: HashSet<u32> = HashSet::new();
3963    let mut content_index: u32 = 0;
3964    let mut tool_use_index: u32 = 0;
3965    let mut thinking_index: u32 = 0;
3966    let mut in_tool_use = false;
3967    let mut text_block_started = false;
3968    let mut in_thinking = false;
3969    let mut thinking_content = String::new();
3970
3971    // ─── Process stream chunks ───
3972    'stream_loop: while let Some(chunk_result) = stream.next().await {
3973        // Check if user aborted
3974        if abort_handle.load(Ordering::SeqCst) {
3975            // Release stream resources
3976            release_stream_resources(&Some(abort_handle.clone()), &None);
3977            return Err(AgentError::UserAborted);
3978        }
3979
3980        // Check if watchdog aborted the stream
3981        if watchdog_aborted.load(Ordering::SeqCst) {
3982            release_stream_resources(&Some(abort_handle.clone()), &None);
3983            return Err(AgentError::Api(format!(
3984                "Stream idle timeout - no chunks received for {}ms",
3985                watchdog.idle_timeout_ms
3986            )));
3987        }
3988
3989        let chunk =
3990            chunk_result.map_err(|e| AgentError::Api(format!("Stream read error: {}", e)))?;
3991
3992        // Reset idle timer on each chunk
3993        reset_idle_timer();
3994
3995        // Stall detection (matching TypeScript: only after first event)
3996        let now = std::time::Instant::now();
3997        if let Some(last) = last_event_time {
3998            let gap = now.duration_since(last).as_millis() as u64;
3999            if gap > STALL_THRESHOLD_MS {
4000                stall_stats.stall_count += 1;
4001                stall_stats.total_stall_time_ms += gap;
4002                stall_stats.stall_durations.push(gap);
4003                eprintln!(
4004                    "Streaming stall detected: {:.1}s gap between events (stall #{})",
4005                    gap as f64 / 1000.0,
4006                    stall_stats.stall_count
4007                );
4008            }
4009        }
4010        last_event_time = Some(now);
4011
4012        // TTFT recording (matching TypeScript)
4013        if is_first_chunk {
4014            let ttft = now.duration_since(start_time).as_millis() as u64;
4015            result.ttft_ms = Some(ttft);
4016            ttft_recorded = true;
4017            is_first_chunk = false;
4018        }
4019
4020        // Parse the chunk as text
4021        if let Ok(text) = String::from_utf8(chunk.to_vec()) {
4022            // Check if this is SSE format (starts with "data: ")
4023            if !text.starts_with("data: ") {
4024                // Not SSE format - might be complete JSON response or vLLM streaming
4025                if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
4026                    // Check for vLLM streaming format: has "content" at top level (not in choices)
4027                    if json.get("content").is_some() && json.get("choices").is_none() {
4028                        // Check if this is a complete non-streaming response (has content array)
4029                        if let Some(content_array) = json.get("content").and_then(|c| c.as_array())
4030                        {
4031                            for block in content_array {
4032                                let block_type = block.get("type").and_then(|t| t.as_str());
4033                                match block_type {
4034                                    Some("text") => {
4035                                        if let Some(text) =
4036                                            block.get("text").and_then(|t| t.as_str())
4037                                        {
4038                                            result.content.push_str(text);
4039                                        }
4040                                    }
4041                                    Some("tool_use") => {
4042                                        let tool_id =
4043                                            block.get("id").and_then(|i| i.as_str()).unwrap_or("");
4044                                        let tool_name = block
4045                                            .get("name")
4046                                            .and_then(|n| n.as_str())
4047                                            .unwrap_or("");
4048                                        let tool_input = block
4049                                            .get("input")
4050                                            .cloned()
4051                                            .unwrap_or_else(|| empty_json_value());
4052                                        result.tool_calls.push(serde_json::json!({
4053                                            "id": tool_id,
4054                                            "name": tool_name,
4055                                            "arguments": tool_input,
4056                                        }));
4057                                    }
4058                                    _ => {}
4059                                }
4060                            }
4061                            if let Some(usage) = json.get("usage") {
4062                                result.usage = parse_anthropic_usage(usage);
4063                            }
4064                            result.message_started = true;
4065                            result.content_blocks_started += 1;
4066                            result.content_blocks_completed += 1;
4067                            // Calculate cost
4068                            result.cost = calculate_streaming_cost(&result.usage, &model);
4069                            if let Some(ref cb) = on_event {
4070                                cb(AgentEvent::MessageStart {
4071                                    message_id: json
4072                                        .get("id")
4073                                        .and_then(|i| i.as_str())
4074                                        .unwrap_or("")
4075                                        .to_string(),
4076                                });
4077                                cb(AgentEvent::ContentBlockStart {
4078                                    index: 0,
4079                                    block_type: "text".to_string(),
4080                                });
4081                                if !result.content.is_empty() {
4082                                    cb(AgentEvent::ContentBlockDelta {
4083                                        index: 0,
4084                                        delta: ContentDelta::Text {
4085                                            text: result.content.clone(),
4086                                        },
4087                                    });
4088                                }
4089                                cb(AgentEvent::ContentBlockStop { index: 0 });
4090                                cb(AgentEvent::MessageStop);
4091                            }
4092                            return Ok(result);
4093                        }
4094                        // vLLM streaming chunk - accumulate content
4095                        if let Some(content) = json.get("content").and_then(|c| c.as_str()) {
4096                            result.content.push_str(content);
4097                        }
4098                        // Check for stop reason (not null) to know when to finish
4099                        if let Some(stop_reason) = json.get("stop_reason") {
4100                            if !stop_reason.is_null() {
4101                                result.stop_reason = stop_reason.as_str().map(|s| s.to_string());
4102                                if let Some(ref cb) = on_event {
4103                                    cb(AgentEvent::ContentBlockStart {
4104                                        index: 0,
4105                                        block_type: "text".to_string(),
4106                                    });
4107                                    if !result.content.is_empty() {
4108                                        cb(AgentEvent::ContentBlockDelta {
4109                                            index: 0,
4110                                            delta: ContentDelta::Text {
4111                                                text: result.content.clone(),
4112                                            },
4113                                        });
4114                                    }
4115                                    cb(AgentEvent::ContentBlockStop { index: 0 });
4116                                    cb(AgentEvent::MessageStop);
4117                                }
4118                                result.message_started = true;
4119                                result.content_blocks_started += 1;
4120                                result.content_blocks_completed += 1;
4121                                result.cost = calculate_streaming_cost(&result.usage, &model);
4122                                return Ok(result);
4123                            }
4124                        }
4125                        continue;
4126                    }
4127
4128                    // Standard OpenAI streaming format: choices[0].delta.content
4129                    if let Some(choices) = json.get("choices").and_then(|c| c.as_array()) {
4130                        if let Some(first) = choices.first() {
4131                            if let Some(delta) = first.get("delta") {
4132                                if let Some(content) = delta.get("content").and_then(|c| c.as_str())
4133                                {
4134                                    result.content.push_str(content);
4135                                }
4136                                // Extract tool calls from delta (streaming tool calls)
4137                                if let Some(tool_calls) =
4138                                    delta.get("tool_calls").and_then(|t| t.as_array())
4139                                {
4140                                    for tc in tool_calls {
4141                                        let idx =
4142                                            tc.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4143                                                as u32;
4144                                        let id =
4145                                            tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
4146                                        let func = tc.get("function");
4147                                        let name = func
4148                                            .and_then(|f| f.get("name"))
4149                                            .and_then(|n| n.as_str())
4150                                            .unwrap_or("");
4151                                        let args_str = func
4152                                            .and_then(|f| f.get("arguments"))
4153                                            .and_then(|a| a.as_str())
4154                                            .unwrap_or("");
4155
4156                                        // Accumulate args into openai_tool_calls map
4157                                        if !openai_tool_finalized.contains(&idx) {
4158                                            let entry =
4159                                                openai_tool_calls.entry(idx).or_insert_with(|| {
4160                                                    (
4161                                                        id.to_string(),
4162                                                        name.to_string(),
4163                                                        String::new(),
4164                                                    )
4165                                                });
4166                                            if entry.0.is_empty() && !id.is_empty() {
4167                                                entry.0 = id.to_string();
4168                                            }
4169                                            if entry.1.is_empty() && !name.is_empty() {
4170                                                entry.1 = name.to_string();
4171                                            }
4172                                            entry.2.push_str(args_str);
4173                                        }
4174                                    }
4175                                }
4176                            }
4177                            // Check for finish_reason to know when to stop
4178                            if let Some(finish_reason) =
4179                                first.get("finish_reason").and_then(|f| f.as_str())
4180                            {
4181                                if !finish_reason.is_empty()
4182                                    && finish_reason != "null"
4183                                    && (!result.content.is_empty()
4184                                        || !result.tool_calls.is_empty()
4185                                        || !openai_tool_calls.is_empty())
4186                                {
4187                                    result.stop_reason = Some(finish_reason.to_string());
4188
4189                                    // Finalize accumulated OpenAI tool calls
4190                                    for (idx, (id, name, args)) in &openai_tool_calls {
4191                                        if !openai_tool_finalized.contains(idx) {
4192                                            let args_val: serde_json::Value =
4193                                                serde_json::from_str(args)
4194                                                    .unwrap_or_else(|_| empty_json_value());
4195                                            result.tool_calls.push(serde_json::json!({
4196                                                "id": id,
4197                                                "name": name,
4198                                                "arguments": args_val,
4199                                            }));
4200                                        }
4201                                    }
4202                                    openai_tool_finalized.extend(openai_tool_calls.keys().copied());
4203
4204                                    if let Some(ref cb) = on_event {
4205                                        cb(AgentEvent::ContentBlockStop { index: 0 });
4206                                        cb(AgentEvent::MessageStop);
4207                                    }
4208                                    result.message_started = true;
4209                                    result.content_blocks_started += 1;
4210                                    result.content_blocks_completed += 1;
4211                                    result.cost = calculate_streaming_cost(&result.usage, &model);
4212                                    return Ok(result);
4213                                }
4214                            }
4215                        }
4216                        continue;
4217                    }
4218
4219                    // Complete non-streaming response (standard OpenAI format)
4220                    if json.get("choices").is_some() {
4221                        if let Some(choices) = json.get("choices").and_then(|c| c.as_array()) {
4222                            if let Some(first) = choices.first() {
4223                                if let Some(msg) = first.get("message") {
4224                                    if let Some(content) =
4225                                        msg.get("content").and_then(|c| c.as_str())
4226                                    {
4227                                        result.content = content.to_string();
4228                                    }
4229                                    if let Some(tool_calls) =
4230                                        msg.get("tool_calls").and_then(|t| t.as_array())
4231                                    {
4232                                        for tc in tool_calls {
4233                                            let id =
4234                                                tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
4235                                            let func = tc.get("function");
4236                                            let name = func
4237                                                .and_then(|f| f.get("name"))
4238                                                .and_then(|n| n.as_str())
4239                                                .unwrap_or("");
4240                                            let args = func.and_then(|f| f.get("arguments"));
4241                                            let args_val = if let Some(args_str) =
4242                                                args.and_then(|a| a.as_str())
4243                                            {
4244                                                serde_json::from_str(args_str)
4245                                                    .unwrap_or_else(|_| empty_json_value())
4246                                            } else {
4247                                                args.cloned().unwrap_or_else(|| empty_json_value())
4248                                            };
4249                                            result.tool_calls.push(serde_json::json!({
4250                                                "id": id,
4251                                                "name": name,
4252                                                "arguments": args_val,
4253                                            }));
4254                                        }
4255                                    }
4256                                    // Extract stop_reason from finish_reason
4257                                    if let Some(finish_reason) =
4258                                        first.get("finish_reason").and_then(|f| f.as_str())
4259                                    {
4260                                        result.stop_reason = Some(finish_reason.to_string());
4261                                    }
4262                                }
4263                            }
4264                        }
4265                        if let Some(usage) = json.get("usage") {
4266                            result.usage = TokenUsage {
4267                                input_tokens: usage
4268                                    .get("prompt_tokens")
4269                                    .and_then(|v| v.as_u64())
4270                                    .unwrap_or(0),
4271                                output_tokens: usage
4272                                    .get("completion_tokens")
4273                                    .and_then(|v| v.as_u64())
4274                                    .unwrap_or(0),
4275                                cache_creation_input_tokens: None,
4276                                cache_read_input_tokens: None,
4277                                iterations: None,
4278                            };
4279                        }
4280                        result.message_started = true;
4281                        result.content_blocks_started += 1;
4282                        result.content_blocks_completed += 1;
4283                        result.cost = calculate_streaming_cost(&result.usage, &model);
4284                        if let Some(ref cb) = on_event {
4285                            cb(AgentEvent::ContentBlockStart {
4286                                index: 0,
4287                                block_type: "text".to_string(),
4288                            });
4289                            if !result.content.is_empty() {
4290                                cb(AgentEvent::ContentBlockDelta {
4291                                    index: 0,
4292                                    delta: ContentDelta::Text {
4293                                        text: result.content.clone(),
4294                                    },
4295                                });
4296                            }
4297                            cb(AgentEvent::ContentBlockStop { index: 0 });
4298                            cb(AgentEvent::MessageStop);
4299                        }
4300                        return Ok(result);
4301                    }
4302                }
4303                continue;
4304            }
4305
4306            // ─── Parse SSE format: "data: {...}\n\n" ───
4307            for line in text.lines() {
4308                if line.starts_with("data: ") {
4309                    let data = &line[6..];
4310
4311                    // Skip [DONE] sentinel
4312                    if data == "[DONE]" {
4313                        continue;
4314                    }
4315
4316                    // Parse JSON
4317                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
4318                        // Handle Anthropic streaming format: check for event type
4319                        if let Some(event_type) = json.get("type").and_then(|t| t.as_str()) {
4320                            match event_type {
4321                                "message_start" => {
4322                                    // Message started - get usage if present
4323                                    // Matches TypeScript: partialMessage = part.message, usage = updateUsage()
4324                                    result.message_started = true;
4325                                    if let Some(usage) = json.get("usage") {
4326                                        result.usage = parse_anthropic_usage(usage);
4327                                    }
4328                                    // Extract research data (internal only, for ant userType)
4329                                    if json.get("research").is_some() {
4330                                        result.research = json.get("research").cloned();
4331                                    }
4332                                    // Emit MessageStart event (matches TypeScript stream_event flow)
4333                                    if let Some(ref cb) = on_event {
4334                                        cb(AgentEvent::MessageStart {
4335                                            message_id: json
4336                                                .get("message")
4337                                                .and_then(|m| m.get("id"))
4338                                                .and_then(|i| i.as_str())
4339                                                .unwrap_or("")
4340                                                .to_string(),
4341                                        });
4342                                    }
4343                                }
4344                                "content_block_start" => {
4345                                    let index =
4346                                        json.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4347                                            as u32;
4348                                    let block_type = json
4349                                        .get("content_block")
4350                                        .and_then(|b| b.get("type"))
4351                                        .and_then(|t| t.as_str())
4352                                        .unwrap_or("text")
4353                                        .to_string();
4354
4355                                    result.content_blocks_started += 1;
4356
4357                                    if block_type == "tool_use" {
4358                                        tool_use_index = index;
4359                                        in_tool_use = true;
4360                                        let tool_name = json
4361                                            .get("content_block")
4362                                            .and_then(|b| b.get("name"))
4363                                            .and_then(|n| n.as_str())
4364                                            .unwrap_or("")
4365                                            .to_string();
4366                                        let tool_id = json
4367                                            .get("content_block")
4368                                            .and_then(|b| b.get("id"))
4369                                            .and_then(|i| i.as_str())
4370                                            .unwrap_or("")
4371                                            .to_string();
4372                                        current_tool_use =
4373                                            Some((tool_id, tool_name, String::new()));
4374                                    } else if block_type == "thinking"
4375                                        || block_type == "redacted_thinking"
4376                                    {
4377                                        in_thinking = true;
4378                                        thinking_index = index;
4379                                        thinking_content.clear();
4380                                    } else {
4381                                        content_index = index;
4382                                        text_block_started = true;
4383                                    }
4384
4385                                    if let Some(ref cb) = on_event {
4386                                        cb(AgentEvent::ContentBlockStart { index, block_type });
4387                                    }
4388                                }
4389                                "content_block_delta" => {
4390                                    let index =
4391                                        json.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4392                                            as u32;
4393                                    if let Some(delta) = json.get("delta") {
4394                                        let delta_type = delta.get("type").and_then(|t| t.as_str());
4395
4396                                        match delta_type {
4397                                            Some("text_delta") => {
4398                                                if let Some(text) =
4399                                                    delta.get("text").and_then(|t| t.as_str())
4400                                                {
4401                                                    result.content.push_str(text);
4402                                                    if let Some(ref cb) = on_event {
4403                                                        cb(AgentEvent::ContentBlockDelta {
4404                                                            index,
4405                                                            delta: ContentDelta::Text {
4406                                                                text: text.to_string(),
4407                                                            },
4408                                                        });
4409                                                    }
4410                                                }
4411                                            }
4412                                            Some("thinking_delta") => {
4413                                                if let Some(thinking) =
4414                                                    delta.get("thinking").and_then(|t| t.as_str())
4415                                                {
4416                                                    thinking_content.push_str(thinking);
4417                                                    if let Some(ref cb) = on_event {
4418                                                        cb(AgentEvent::ContentBlockDelta {
4419                                                            index,
4420                                                            delta: ContentDelta::Thinking {
4421                                                                text: thinking.to_string(),
4422                                                            },
4423                                                        });
4424                                                    }
4425                                                }
4426                                            }
4427                                            Some("input_json_delta") => {
4428                                                let partial_json = delta
4429                                                    .get("partial_json")
4430                                                    .and_then(|p| p.as_str())
4431                                                    .unwrap_or("");
4432
4433                                                if let Some(ref mut current) = current_tool_use {
4434                                                    current.2.push_str(partial_json);
4435                                                }
4436
4437                                                if let Some(ref cb) = on_event {
4438                                                    let tool_name = current_tool_use
4439                                                        .as_ref()
4440                                                        .map(|(_, n, _)| n.clone())
4441                                                        .unwrap_or_default();
4442                                                    let tool_id = current_tool_use
4443                                                        .as_ref()
4444                                                        .map(|(i, _, _)| i.clone())
4445                                                        .unwrap_or_default();
4446                                                    cb(AgentEvent::ContentBlockDelta {
4447                                                        index,
4448                                                        delta: ContentDelta::ToolUse {
4449                                                            id: tool_id,
4450                                                            name: tool_name,
4451                                                            input: serde_json::json!({ "partial": partial_json }),
4452                                                            is_complete: false,
4453                                                        },
4454                                                    });
4455                                                }
4456                                            }
4457                                            Some("signature_delta") => {
4458                                                // Signature delta - tracking for thinking block signing
4459                                                // No content to accumulate, but event is emitted
4460                                            }
4461                                            _ => {}
4462                                        }
4463                                    }
4464                                }
4465                                "content_block_stop" => {
4466                                    let index =
4467                                        json.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4468                                            as u32;
4469
4470                                    result.content_blocks_completed += 1;
4471
4472                                    // Check if this was a tool_use block
4473                                    if in_tool_use && index == tool_use_index {
4474                                        if let Some((id, name, args_str)) = current_tool_use.take()
4475                                        {
4476                                            let args: serde_json::Value =
4477                                                serde_json::from_str(&args_str)
4478                                                    .unwrap_or_else(|_| empty_json_value());
4479
4480                                            result.tool_calls.push(serde_json::json!({
4481                                                "id": id,
4482                                                "name": name,
4483                                                "arguments": args,
4484                                            }));
4485                                            result.any_tool_use_completed = true;
4486                                        }
4487                                        in_tool_use = false;
4488                                    }
4489
4490                                    // Check if this was a thinking block
4491                                    if in_thinking && index == thinking_index {
4492                                        if !thinking_content.is_empty() {
4493                                            result.content.push_str(&format!(
4494                                                "【thinking:{}】",
4495                                                thinking_content
4496                                            ));
4497                                        }
4498                                        in_thinking = false;
4499                                        thinking_content.clear();
4500                                    }
4501
4502                                    if let Some(ref cb) = on_event {
4503                                        cb(AgentEvent::ContentBlockStop { index });
4504                                    }
4505                                }
4506                                "message_delta" => {
4507                                    // Message delta - matches TypeScript's message_delta handling:
4508                                    // - Updates usage
4509                                    // - Extracts stop_reason
4510                                    // - Calculates cost
4511                                    if let Some(usage) = json.get("usage") {
4512                                        result.usage = parse_anthropic_usage(usage);
4513                                    }
4514                                    // Extract stop_reason from delta
4515                                    if let Some(delta) = json.get("delta") {
4516                                        if let Some(stop_reason) =
4517                                            delta.get("stop_reason").and_then(|s| s.as_str())
4518                                        {
4519                                            result.stop_reason = Some(stop_reason.to_string());
4520                                        }
4521                                    }
4522                                    // Calculate cost from current usage
4523                                    result.cost = calculate_streaming_cost(&result.usage, &model);
4524                                    if let Some(ref cb) = on_event {
4525                                        cb(AgentEvent::TokenUsage {
4526                                            usage: result.usage.clone(),
4527                                            cost: result.cost,
4528                                        });
4529                                    }
4530                                }
4531                                "message_stop" => {
4532                                    // Message complete — break from the stream loop so the
4533                                    // post-loop code can emit AgentEvent::MessageStop.
4534                                    // (The server may not close the HTTP connection
4535                                    // immediately, causing the loop to hang indefinitely.)
4536                                    break 'stream_loop;
4537                                }
4538                                _ => {}
4539                            }
4540                        }
4541
4542                        // Handle OpenAI streaming format in SSE: choices[0].delta.content
4543                        if let Some(choices) = json.get("choices").and_then(|c| c.as_array()) {
4544                            if let Some(first) = choices.first() {
4545                                if let Some(delta) = first.get("delta") {
4546                                    if let Some(content) =
4547                                        delta.get("content").and_then(|c| c.as_str())
4548                                    {
4549                                        if !content.is_empty() {
4550                                            result.content.push_str(content);
4551                                            // Emit MessageStart before first content block delta
4552                                            if !result.message_started {
4553                                                result.message_started = true;
4554                                                if let Some(ref cb) = on_event {
4555                                                    cb(AgentEvent::MessageStart {
4556                                                        message_id: uuid::Uuid::new_v4()
4557                                                            .to_string(),
4558                                                    });
4559                                                    cb(AgentEvent::ContentBlockStart {
4560                                                        index: 0,
4561                                                        block_type: "text".to_string(),
4562                                                    });
4563                                                }
4564                                            }
4565                                            if let Some(ref cb) = on_event {
4566                                                cb(AgentEvent::ContentBlockDelta {
4567                                                    index: 0,
4568                                                    delta: ContentDelta::Text {
4569                                                        text: content.to_string(),
4570                                                    },
4571                                                });
4572                                            }
4573                                        }
4574                                    }
4575                                    // Extract tool calls from delta (streaming tool calls)
4576                                    if let Some(tool_calls) =
4577                                        delta.get("tool_calls").and_then(|t| t.as_array())
4578                                    {
4579                                        // Emit MessageStart before first tool call
4580                                        if !result.message_started {
4581                                            result.message_started = true;
4582                                            if let Some(ref cb) = on_event {
4583                                                cb(AgentEvent::MessageStart {
4584                                                    message_id: uuid::Uuid::new_v4().to_string(),
4585                                                });
4586                                            }
4587                                        }
4588                                        for tc in tool_calls {
4589                                            let idx = tc
4590                                                .get("index")
4591                                                .and_then(|i| i.as_u64())
4592                                                .unwrap_or(0)
4593                                                as u32;
4594                                            let id =
4595                                                tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
4596                                            let func = tc.get("function");
4597                                            let name = func
4598                                                .and_then(|f| f.get("name"))
4599                                                .and_then(|n| n.as_str())
4600                                                .unwrap_or("");
4601                                            let args_str = func
4602                                                .and_then(|f| f.get("arguments"))
4603                                                .and_then(|a| a.as_str())
4604                                                .unwrap_or("");
4605
4606                                            // Accumulate args into openai_tool_calls map
4607                                            if !openai_tool_finalized.contains(&idx) {
4608                                                let entry = openai_tool_calls
4609                                                    .entry(idx)
4610                                                    .or_insert_with(|| {
4611                                                        (
4612                                                            id.to_string(),
4613                                                            name.to_string(),
4614                                                            String::new(),
4615                                                        )
4616                                                    });
4617                                                // Update id/name on first chunk for this index
4618                                                if entry.0.is_empty() && !id.is_empty() {
4619                                                    entry.0 = id.to_string();
4620                                                }
4621                                                if entry.1.is_empty() && !name.is_empty() {
4622                                                    entry.1 = name.to_string();
4623                                                }
4624                                                entry.2.push_str(args_str);
4625                                            }
4626                                        }
4627                                    }
4628                                }
4629                                // Check for finish_reason
4630                                if let Some(finish_reason) =
4631                                    first.get("finish_reason").and_then(|f| f.as_str())
4632                                {
4633                                    if !finish_reason.is_empty() && finish_reason != "null" {
4634                                        result.stop_reason = Some(finish_reason.to_string());
4635                                        if let Some(ref cb) = on_event {
4636                                            cb(AgentEvent::ContentBlockStop { index: 0 });
4637                                            cb(AgentEvent::MessageStop);
4638                                        }
4639                                        result.message_started = true;
4640                                        result.content_blocks_started += 1;
4641                                        result.content_blocks_completed += 1;
4642                                        result.cost =
4643                                            calculate_streaming_cost(&result.usage, &model);
4644
4645                                        // Finalize accumulated OpenAI tool calls
4646                                        for (idx, (id, name, args)) in &openai_tool_calls {
4647                                            if !openai_tool_finalized.contains(idx) {
4648                                                let args_val: serde_json::Value =
4649                                                    serde_json::from_str(args)
4650                                                        .unwrap_or_else(|_| empty_json_value());
4651                                                result.tool_calls.push(serde_json::json!({
4652                                                    "id": id,
4653                                                    "name": name,
4654                                                    "arguments": args_val,
4655                                                }));
4656                                            }
4657                                        }
4658                                        openai_tool_finalized
4659                                            .extend(openai_tool_calls.keys().copied());
4660
4661                                        return Ok(result);
4662                                    }
4663                                }
4664                            }
4665                            continue;
4666                        }
4667
4668                        // Also check for non-streaming response format (Anthropic)
4669                        if json.get("content").is_some() || json.get("id").is_some() {
4670                            if let Some(content_array) =
4671                                json.get("content").and_then(|c| c.as_array())
4672                            {
4673                                for block in content_array {
4674                                    let block_type = block.get("type").and_then(|t| t.as_str());
4675                                    match block_type {
4676                                        Some("text") => {
4677                                            if let Some(text) =
4678                                                block.get("text").and_then(|t| t.as_str())
4679                                            {
4680                                                result.content.push_str(text);
4681                                            }
4682                                        }
4683                                        Some("tool_use") => {
4684                                            let tool_id = block
4685                                                .get("id")
4686                                                .and_then(|i| i.as_str())
4687                                                .unwrap_or("");
4688                                            let tool_name = block
4689                                                .get("name")
4690                                                .and_then(|n| n.as_str())
4691                                                .unwrap_or("");
4692                                            let tool_input = block
4693                                                .get("input")
4694                                                .cloned()
4695                                                .unwrap_or_else(|| empty_json_value());
4696
4697                                            result.tool_calls.push(serde_json::json!({
4698                                                "id": tool_id,
4699                                                "name": tool_name,
4700                                                "arguments": tool_input,
4701                                            }));
4702                                            result.any_tool_use_completed = true;
4703                                        }
4704                                        _ => {}
4705                                    }
4706                                }
4707                            }
4708
4709                            if let Some(usage) = json.get("usage") {
4710                                result.usage = parse_anthropic_usage(usage);
4711                            }
4712                            result.message_started = true;
4713                            result.content_blocks_started += 1;
4714                            result.content_blocks_completed += 1;
4715                            result.cost = calculate_streaming_cost(&result.usage, &model);
4716
4717                            if let Some(ref cb) = on_event {
4718                                cb(AgentEvent::ContentBlockStart {
4719                                    index: 0,
4720                                    block_type: "text".to_string(),
4721                                });
4722                                if !result.content.is_empty() {
4723                                    cb(AgentEvent::ContentBlockDelta {
4724                                        index: 0,
4725                                        delta: ContentDelta::Text {
4726                                            text: result.content.clone(),
4727                                        },
4728                                    });
4729                                }
4730                                cb(AgentEvent::ContentBlockStop { index: 0 });
4731                                cb(AgentEvent::MessageStop);
4732                            }
4733                            return Ok(result);
4734                        }
4735                    }
4736                }
4737            }
4738        }
4739    }
4740
4741    // ─── Stream ended - final processing ───
4742
4743    // Calculate final cost
4744    result.cost = calculate_streaming_cost(&result.usage, &model);
4745
4746    // Mark watchdog as no longer running (prevent timer from firing after stream ends)
4747    watchdog_aborted.store(true, Ordering::SeqCst);
4748
4749    // Emit MessageStop event
4750    if let Some(ref cb) = on_event {
4751        cb(AgentEvent::MessageStop);
4752    }
4753
4754    // Validate stream completion (matching TypeScript: throw if no events received)
4755    validate_stream_completion(&result)?;
4756
4757    Ok(result)
4758}
4759
4760/// Build memory prefetch context by finding relevant memories for the query.
4761async fn build_memory_prefetch_context(
4762    prompt: &str,
4763    config: &QueryEngineConfig,
4764    loaded_paths: &std::collections::HashSet<String>,
4765) -> Option<String> {
4766    use crate::memdir::{find_relevant_memories, get_memory_base_dir, is_auto_memory_enabled};
4767
4768    if !is_auto_memory_enabled() {
4769        return None;
4770    }
4771
4772    let memory_dir = get_memory_base_dir();
4773
4774    let relevant = find_relevant_memories(prompt, &memory_dir).await;
4775
4776    if relevant.is_empty() {
4777        return None;
4778    }
4779
4780    let new_paths: Vec<String> = relevant
4781        .into_iter()
4782        .filter(|p| !loaded_paths.contains(p.as_str()))
4783        .collect();
4784
4785    if new_paths.is_empty() {
4786        return None;
4787    }
4788
4789    let paths_display = new_paths.join("\n");
4790    Some(format!(
4791        "<relevant-memories>\nThe following memory files may be relevant to your query:\n{}\n</relevant-memories>",
4792        paths_display
4793    ))
4794}