Skip to main content

agent_code_lib/query/
mod.rs

1//! Query engine: the core agent loop.
2//!
3//! Implements the agentic cycle:
4//!
5//! 1. Auto-compact if context nears the window limit
6//! 2. Microcompact stale tool results
7//! 3. Call LLM with streaming
8//! 4. Accumulate response content blocks
9//! 5. Handle errors (prompt-too-long, rate limits, max-output-tokens)
10//! 6. Extract tool_use blocks
11//! 7. Execute tools (concurrent/serial batching)
12//! 8. Inject tool results into history
13//! 9. Repeat from step 1 until no tool_use or max turns
14
15pub mod source;
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use tracing::{debug, info, warn};
22use uuid::Uuid;
23
24use crate::hooks::{HookEvent, HookRegistry};
25use crate::llm::message::*;
26use crate::llm::provider::{Provider, ProviderError, ProviderRequest};
27use crate::llm::stream::StreamEvent;
28use crate::permissions::PermissionChecker;
29use crate::services::compact::{self, CompactTracking, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT};
30use crate::services::tokens;
31use crate::state::AppState;
32use crate::tools::ToolContext;
33use crate::tools::executor::{execute_tool_calls, extract_tool_calls};
34use crate::tools::registry::ToolRegistry;
35
36/// Configuration for the query engine.
37pub struct QueryEngineConfig {
38    pub max_turns: Option<usize>,
39    pub verbose: bool,
40    /// Whether this is a non-interactive (one-shot) session.
41    pub unattended: bool,
42}
43
44/// The query engine orchestrates the agent loop.
45///
46/// Central coordinator that drives the LLM → tools → LLM cycle.
47/// Manages conversation history, context compaction, tool execution,
48/// error recovery, and hook dispatch. Create via [`QueryEngine::new`].
49pub struct QueryEngine {
50    llm: Arc<dyn Provider>,
51    tools: ToolRegistry,
52    file_cache: Arc<tokio::sync::Mutex<crate::services::file_cache::FileCache>>,
53    permissions: Arc<PermissionChecker>,
54    state: AppState,
55    config: QueryEngineConfig,
56    cancel: CancellationToken,
57    hooks: HookRegistry,
58    cache_tracker: crate::services::cache_tracking::CacheTracker,
59    denial_tracker: Arc<tokio::sync::Mutex<crate::permissions::tracking::DenialTracker>>,
60    extraction_state: Arc<tokio::sync::Mutex<crate::memory::extraction::ExtractionState>>,
61    session_allows: Arc<tokio::sync::Mutex<std::collections::HashSet<String>>>,
62    permission_prompter: Option<Arc<dyn crate::tools::PermissionPrompter>>,
63    /// Cached system prompt (rebuilt only when inputs change).
64    cached_system_prompt: Option<(u64, String)>, // (hash, prompt)
65}
66
67/// Callback for streaming events to the UI.
68pub trait StreamSink: Send + Sync {
69    fn on_text(&self, text: &str);
70    fn on_tool_start(&self, tool_name: &str, input: &serde_json::Value);
71    fn on_tool_result(&self, tool_name: &str, result: &crate::tools::ToolResult);
72    fn on_thinking(&self, _text: &str) {}
73    fn on_turn_complete(&self, _turn: usize) {}
74    fn on_error(&self, error: &str);
75    fn on_usage(&self, _usage: &Usage) {}
76    fn on_compact(&self, _freed_tokens: u64) {}
77    fn on_warning(&self, _msg: &str) {}
78}
79
80/// A no-op stream sink for non-interactive mode.
81pub struct NullSink;
82impl StreamSink for NullSink {
83    fn on_text(&self, _: &str) {}
84    fn on_tool_start(&self, _: &str, _: &serde_json::Value) {}
85    fn on_tool_result(&self, _: &str, _: &crate::tools::ToolResult) {}
86    fn on_error(&self, _: &str) {}
87}
88
89impl QueryEngine {
90    pub fn new(
91        llm: Arc<dyn Provider>,
92        tools: ToolRegistry,
93        permissions: PermissionChecker,
94        state: AppState,
95        config: QueryEngineConfig,
96    ) -> Self {
97        Self {
98            llm,
99            tools,
100            file_cache: Arc::new(tokio::sync::Mutex::new(
101                crate::services::file_cache::FileCache::new(),
102            )),
103            permissions: Arc::new(permissions),
104            state,
105            config,
106            cancel: CancellationToken::new(),
107            hooks: HookRegistry::new(),
108            cache_tracker: crate::services::cache_tracking::CacheTracker::new(),
109            denial_tracker: Arc::new(tokio::sync::Mutex::new(
110                crate::permissions::tracking::DenialTracker::new(100),
111            )),
112            extraction_state: Arc::new(tokio::sync::Mutex::new(
113                crate::memory::extraction::ExtractionState::new(),
114            )),
115            session_allows: Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new())),
116            permission_prompter: None,
117            cached_system_prompt: None,
118        }
119    }
120
121    /// Load hooks from configuration into the registry.
122    pub fn load_hooks(&mut self, hook_defs: &[crate::hooks::HookDefinition]) {
123        for def in hook_defs {
124            self.hooks.register(def.clone());
125        }
126        if !hook_defs.is_empty() {
127            tracing::info!("Loaded {} hooks from config", hook_defs.len());
128        }
129    }
130
131    /// Get a reference to the app state.
132    pub fn state(&self) -> &AppState {
133        &self.state
134    }
135
136    /// Get a mutable reference to the app state.
137    pub fn state_mut(&mut self) -> &mut AppState {
138        &mut self.state
139    }
140
141    /// Install a Ctrl+C handler that triggers the cancellation token.
142    /// Call this once at startup. Subsequent Ctrl+C signals during a
143    /// turn will cancel the active operation instead of killing the process.
144    pub fn install_signal_handler(&self) {
145        let cancel = self.cancel.clone();
146        tokio::spawn(async move {
147            loop {
148                if tokio::signal::ctrl_c().await.is_ok() {
149                    if cancel.is_cancelled() {
150                        // Second Ctrl+C — hard exit.
151                        std::process::exit(130);
152                    }
153                    cancel.cancel();
154                }
155            }
156        });
157    }
158
159    /// Run a single turn: process user input through the full agent loop.
160    pub async fn run_turn(&mut self, user_input: &str) -> crate::error::Result<()> {
161        self.run_turn_with_sink(user_input, &NullSink).await
162    }
163
164    /// Run a turn with a stream sink for real-time UI updates.
165    pub async fn run_turn_with_sink(
166        &mut self,
167        user_input: &str,
168        sink: &dyn StreamSink,
169    ) -> crate::error::Result<()> {
170        // Reset cancellation token for this turn.
171        self.cancel = CancellationToken::new();
172
173        // Add the user message to history.
174        let user_msg = user_message(user_input);
175        self.state.push_message(user_msg);
176
177        let max_turns = self.config.max_turns.unwrap_or(50);
178        let mut compact_tracking = CompactTracking::default();
179        let mut retry_state = crate::llm::retry::RetryState::default();
180        let retry_config = crate::llm::retry::RetryConfig::default();
181        let mut max_output_recovery_count = 0u32;
182
183        // Agent loop: budget check → normalize → compact → call LLM → execute tools → repeat.
184        for turn in 0..max_turns {
185            self.state.turn_count = turn + 1;
186            self.state.is_query_active = true;
187
188            // Budget check before each turn.
189            let budget_config = crate::services::budget::BudgetConfig::default();
190            match crate::services::budget::check_budget(
191                self.state.total_cost_usd,
192                self.state.total_usage.total(),
193                &budget_config,
194            ) {
195                crate::services::budget::BudgetDecision::Stop { message } => {
196                    sink.on_warning(&message);
197                    self.state.is_query_active = false;
198                    return Ok(());
199                }
200                crate::services::budget::BudgetDecision::ContinueWithWarning {
201                    message, ..
202                } => {
203                    sink.on_warning(&message);
204                }
205                crate::services::budget::BudgetDecision::Continue => {}
206            }
207
208            // Normalize messages for API compatibility.
209            crate::llm::normalize::ensure_tool_result_pairing(&mut self.state.messages);
210            crate::llm::normalize::strip_empty_blocks(&mut self.state.messages);
211            crate::llm::normalize::remove_empty_messages(&mut self.state.messages);
212            crate::llm::normalize::cap_document_blocks(&mut self.state.messages, 500_000);
213            crate::llm::normalize::merge_consecutive_user_messages(&mut self.state.messages);
214
215            debug!("Agent turn {}/{}", turn + 1, max_turns);
216
217            let mut model = self.state.config.api.model.clone();
218
219            // Step 1: Auto-compact if context is too large.
220            if compact::should_auto_compact(self.state.history(), &model, &compact_tracking) {
221                let token_count = tokens::estimate_context_tokens(self.state.history());
222                let threshold = compact::auto_compact_threshold(&model);
223                info!("Auto-compact triggered: {token_count} tokens >= {threshold} threshold");
224
225                // Microcompact first: clear stale tool results.
226                let freed = compact::microcompact(&mut self.state.messages, 5);
227                if freed > 0 {
228                    sink.on_compact(freed);
229                    info!("Microcompact freed ~{freed} tokens");
230                }
231
232                // Check if microcompact was enough.
233                let post_mc_tokens = tokens::estimate_context_tokens(self.state.history());
234                if post_mc_tokens >= threshold {
235                    // Full LLM-based compaction: summarize older messages.
236                    info!("Microcompact insufficient, attempting LLM compaction");
237                    match compact::compact_with_llm(&mut self.state.messages, &*self.llm, &model)
238                        .await
239                    {
240                        Some(removed) => {
241                            info!("LLM compaction removed {removed} messages");
242                            compact_tracking.was_compacted = true;
243                            compact_tracking.consecutive_failures = 0;
244                        }
245                        None => {
246                            compact_tracking.consecutive_failures += 1;
247                            warn!(
248                                "LLM compaction failed (attempt {})",
249                                compact_tracking.consecutive_failures
250                            );
251                            // Fallback: context collapse (snip middle messages).
252                            let effective = compact::effective_context_window(&model);
253                            if let Some(collapse) =
254                                crate::services::context_collapse::collapse_to_budget(
255                                    self.state.history(),
256                                    effective,
257                                )
258                            {
259                                info!(
260                                    "Context collapse snipped {} messages, freed ~{} tokens",
261                                    collapse.snipped_count, collapse.tokens_freed
262                                );
263                                self.state.messages = collapse.api_messages;
264                                sink.on_compact(collapse.tokens_freed);
265                            } else {
266                                // Last resort: aggressive microcompact.
267                                let freed2 = compact::microcompact(&mut self.state.messages, 2);
268                                if freed2 > 0 {
269                                    sink.on_compact(freed2);
270                                }
271                            }
272                        }
273                    }
274                }
275            }
276
277            // Inject compaction reminder if compacted and feature enabled.
278            if compact_tracking.was_compacted && self.state.config.features.compaction_reminders {
279                let reminder = user_message(
280                    "<system-reminder>Context was automatically compacted. \
281                     Earlier messages were summarized. If you need details from \
282                     before compaction, ask the user or re-read the relevant files.</system-reminder>",
283                );
284                self.state.push_message(reminder);
285                compact_tracking.was_compacted = false; // Only remind once per compaction.
286            }
287
288            // Step 2: Check token warning state.
289            let warning = compact::token_warning_state(self.state.history(), &model);
290            if warning.is_blocking {
291                sink.on_warning("Context window nearly full. Consider starting a new session.");
292            } else if warning.is_above_warning {
293                sink.on_warning(&format!("Context {}% remaining", warning.percent_left));
294            }
295
296            // Step 3: Build and send the API request.
297            // Memoize: only rebuild system prompt when inputs change.
298            let prompt_hash = {
299                use std::hash::{Hash, Hasher};
300                let mut h = std::collections::hash_map::DefaultHasher::new();
301                self.state.config.api.model.hash(&mut h);
302                self.state.cwd.hash(&mut h);
303                self.state.config.mcp_servers.len().hash(&mut h);
304                self.tools.all().len().hash(&mut h);
305                h.finish()
306            };
307            let system_prompt = if let Some((cached_hash, ref cached)) = self.cached_system_prompt
308                && cached_hash == prompt_hash
309            {
310                cached.clone()
311            } else {
312                let prompt = build_system_prompt(&self.tools, &self.state);
313                self.cached_system_prompt = Some((prompt_hash, prompt.clone()));
314                prompt
315            };
316            // Use core schemas (deferred tools loaded on demand via ToolSearch).
317            let tool_schemas = self.tools.core_schemas();
318
319            // Escalate max_tokens after a max_output recovery (8k → 64k).
320            let base_tokens = self.state.config.api.max_output_tokens.unwrap_or(16384);
321            let effective_tokens = if max_output_recovery_count > 0 {
322                base_tokens.max(65536) // Escalate to at least 64k after first recovery
323            } else {
324                base_tokens
325            };
326
327            let request = ProviderRequest {
328                messages: self.state.history().to_vec(),
329                system_prompt: system_prompt.clone(),
330                tools: tool_schemas.clone(),
331                model: model.clone(),
332                max_tokens: effective_tokens,
333                temperature: None,
334                enable_caching: true,
335                tool_choice: Default::default(),
336                metadata: None,
337            };
338
339            let mut rx = match self.llm.stream(&request).await {
340                Ok(rx) => {
341                    retry_state.reset();
342                    rx
343                }
344                Err(e) => {
345                    let retryable = match &e {
346                        ProviderError::RateLimited { retry_after_ms } => {
347                            crate::llm::retry::RetryableError::RateLimited {
348                                retry_after: *retry_after_ms,
349                            }
350                        }
351                        ProviderError::Overloaded => crate::llm::retry::RetryableError::Overloaded,
352                        ProviderError::Network(_) => {
353                            crate::llm::retry::RetryableError::StreamInterrupted
354                        }
355                        other => crate::llm::retry::RetryableError::NonRetryable(other.to_string()),
356                    };
357
358                    match retry_state.next_action(&retryable, &retry_config) {
359                        crate::llm::retry::RetryAction::Retry { after } => {
360                            warn!("Retrying in {}ms", after.as_millis());
361                            tokio::time::sleep(after).await;
362                            continue;
363                        }
364                        crate::llm::retry::RetryAction::FallbackModel => {
365                            // Switch to a smaller/cheaper model for this turn.
366                            let fallback = get_fallback_model(&model);
367                            sink.on_warning(&format!("Falling back from {model} to {fallback}"));
368                            model = fallback;
369                            continue;
370                        }
371                        crate::llm::retry::RetryAction::Abort(reason) => {
372                            // Unattended retry: in non-interactive mode, retry
373                            // capacity errors with longer backoff instead of aborting.
374                            if self.config.unattended
375                                && self.state.config.features.unattended_retry
376                                && matches!(
377                                    &e,
378                                    ProviderError::Overloaded | ProviderError::RateLimited { .. }
379                                )
380                            {
381                                warn!("Unattended retry: waiting 30s for capacity");
382                                tokio::time::sleep(std::time::Duration::from_secs(30)).await;
383                                continue;
384                            }
385                            // Before giving up, try reactive compact for size errors.
386                            // Two-stage recovery: context collapse first, then microcompact.
387                            if let ProviderError::RequestTooLarge(body) = &e {
388                                let gap = compact::parse_prompt_too_long_gap(body);
389
390                                // Stage 1: Context collapse (snip middle messages).
391                                let effective = compact::effective_context_window(&model);
392                                if let Some(collapse) =
393                                    crate::services::context_collapse::collapse_to_budget(
394                                        self.state.history(),
395                                        effective,
396                                    )
397                                {
398                                    info!(
399                                        "Reactive collapse: snipped {} messages, freed ~{} tokens",
400                                        collapse.snipped_count, collapse.tokens_freed
401                                    );
402                                    self.state.messages = collapse.api_messages;
403                                    sink.on_compact(collapse.tokens_freed);
404                                    continue;
405                                }
406
407                                // Stage 2: Aggressive microcompact.
408                                let freed = compact::microcompact(&mut self.state.messages, 1);
409                                if freed > 0 {
410                                    sink.on_compact(freed);
411                                    info!(
412                                        "Reactive microcompact freed ~{freed} tokens (gap: {gap:?})"
413                                    );
414                                    continue;
415                                }
416                            }
417                            sink.on_error(&reason);
418                            self.state.is_query_active = false;
419                            return Err(crate::error::Error::Other(e.to_string()));
420                        }
421                    }
422                }
423            };
424
425            // Step 4: Stream response. Start executing read-only tools
426            // as their input completes (streaming tool execution).
427            let mut content_blocks = Vec::new();
428            let mut usage = Usage::default();
429            let mut stop_reason: Option<StopReason> = None;
430            let mut got_error = false;
431            let mut error_text = String::new();
432
433            // Streaming tool handles: tools kicked off during streaming.
434            let mut streaming_tool_handles: Vec<(
435                String,
436                String,
437                tokio::task::JoinHandle<crate::tools::ToolResult>,
438            )> = Vec::new();
439
440            while let Some(event) = rx.recv().await {
441                match event {
442                    StreamEvent::TextDelta(text) => {
443                        sink.on_text(&text);
444                    }
445                    StreamEvent::ContentBlockComplete(block) => {
446                        if let ContentBlock::ToolUse {
447                            ref id,
448                            ref name,
449                            ref input,
450                        } = block
451                        {
452                            sink.on_tool_start(name, input);
453
454                            // Start read-only tools immediately during streaming.
455                            if let Some(tool) = self.tools.get(name)
456                                && tool.is_read_only()
457                                && tool.is_concurrency_safe()
458                            {
459                                let tool = tool.clone();
460                                let input = input.clone();
461                                let cwd = std::path::PathBuf::from(&self.state.cwd);
462                                let cancel = self.cancel.clone();
463                                let perm = self.permissions.clone();
464                                let tool_id = id.clone();
465                                let tool_name = name.clone();
466
467                                let handle = tokio::spawn(async move {
468                                    match tool
469                                        .call(
470                                            input,
471                                            &ToolContext {
472                                                cwd,
473                                                cancel,
474                                                permission_checker: perm.clone(),
475                                                verbose: false,
476                                                plan_mode: false,
477                                                file_cache: None,
478                                                denial_tracker: None,
479                                                task_manager: None,
480                                                session_allows: None,
481                                                permission_prompter: None,
482                                            },
483                                        )
484                                        .await
485                                    {
486                                        Ok(r) => r,
487                                        Err(e) => crate::tools::ToolResult::error(e.to_string()),
488                                    }
489                                });
490
491                                streaming_tool_handles.push((tool_id, tool_name, handle));
492                            }
493                        }
494                        if let ContentBlock::Thinking { ref thinking, .. } = block {
495                            sink.on_thinking(thinking);
496                        }
497                        content_blocks.push(block);
498                    }
499                    StreamEvent::Done {
500                        usage: u,
501                        stop_reason: sr,
502                    } => {
503                        usage = u;
504                        stop_reason = sr;
505                        sink.on_usage(&usage);
506                    }
507                    StreamEvent::Error(msg) => {
508                        got_error = true;
509                        error_text = msg.clone();
510                        sink.on_error(&msg);
511                    }
512                    _ => {}
513                }
514            }
515
516            // Step 5: Record the assistant message.
517            let assistant_msg = Message::Assistant(AssistantMessage {
518                uuid: Uuid::new_v4(),
519                timestamp: chrono::Utc::now().to_rfc3339(),
520                content: content_blocks.clone(),
521                model: Some(model.clone()),
522                usage: Some(usage.clone()),
523                stop_reason: stop_reason.clone(),
524                request_id: None,
525            });
526            self.state.push_message(assistant_msg);
527            self.state.record_usage(&usage, &model);
528
529            // Token budget tracking per turn.
530            if self.state.config.features.token_budget && usage.total() > 0 {
531                let turn_total = usage.input_tokens + usage.output_tokens;
532                if turn_total > 100_000 {
533                    sink.on_warning(&format!(
534                        "High token usage this turn: {} tokens ({}in + {}out)",
535                        turn_total, usage.input_tokens, usage.output_tokens
536                    ));
537                }
538            }
539
540            // Record cache and telemetry.
541            let _cache_event = self.cache_tracker.record(&usage);
542            {
543                let mut span = crate::services::telemetry::api_call_span(
544                    &model,
545                    turn + 1,
546                    &self.state.session_id,
547                );
548                crate::services::telemetry::record_usage(&mut span, &usage);
549                span.finish();
550                tracing::debug!(
551                    "API call: {}ms, {}in/{}out tokens",
552                    span.duration_ms().unwrap_or(0),
553                    usage.input_tokens,
554                    usage.output_tokens,
555                );
556            }
557
558            // Step 6: Handle stream errors.
559            if got_error {
560                // Check if it's a prompt-too-long error in the stream.
561                if error_text.contains("prompt is too long")
562                    || error_text.contains("Prompt is too long")
563                {
564                    let freed = compact::microcompact(&mut self.state.messages, 1);
565                    if freed > 0 {
566                        sink.on_compact(freed);
567                        continue;
568                    }
569                }
570
571                // Check for max-output-tokens hit (partial response).
572                if content_blocks
573                    .iter()
574                    .any(|b| matches!(b, ContentBlock::Text { .. }))
575                    && error_text.contains("max_tokens")
576                    && max_output_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
577                {
578                    max_output_recovery_count += 1;
579                    info!(
580                        "Max output tokens recovery attempt {}/{}",
581                        max_output_recovery_count, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
582                    );
583                    let recovery_msg = compact::max_output_recovery_message();
584                    self.state.push_message(recovery_msg);
585                    continue;
586                }
587            }
588
589            // Step 6b: Handle max_tokens stop reason (escalate and continue).
590            if matches!(stop_reason, Some(StopReason::MaxTokens))
591                && !got_error
592                && content_blocks
593                    .iter()
594                    .any(|b| matches!(b, ContentBlock::Text { .. }))
595                && max_output_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
596            {
597                max_output_recovery_count += 1;
598                info!(
599                    "Max tokens stop reason — recovery attempt {}/{}",
600                    max_output_recovery_count, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
601                );
602                let recovery_msg = compact::max_output_recovery_message();
603                self.state.push_message(recovery_msg);
604                continue;
605            }
606
607            // Step 7: Extract tool calls from the response.
608            let tool_calls = extract_tool_calls(&content_blocks);
609
610            if tool_calls.is_empty() {
611                // No tools requested — turn is complete.
612                info!("Turn complete (no tool calls)");
613                sink.on_turn_complete(turn + 1);
614                self.state.is_query_active = false;
615
616                // Fire background memory extraction (fire-and-forget).
617                // Only runs if feature enabled and memory directory exists.
618                if self.state.config.features.extract_memories
619                    && crate::memory::ensure_memory_dir().is_some()
620                {
621                    let extraction_messages = self.state.messages.clone();
622                    let extraction_state = self.extraction_state.clone();
623                    let extraction_llm = self.llm.clone();
624                    let extraction_model = model.clone();
625                    tokio::spawn(async move {
626                        crate::memory::extraction::extract_memories_background(
627                            extraction_messages,
628                            extraction_state,
629                            extraction_llm,
630                            extraction_model,
631                        )
632                        .await;
633                    });
634                }
635
636                return Ok(());
637            }
638
639            // Step 8: Execute tool calls with pre/post hooks.
640            info!("Executing {} tool call(s)", tool_calls.len());
641            let cwd = PathBuf::from(&self.state.cwd);
642            let tool_ctx = ToolContext {
643                cwd,
644                cancel: self.cancel.clone(),
645                permission_checker: self.permissions.clone(),
646                verbose: self.config.verbose,
647                plan_mode: self.state.plan_mode,
648                file_cache: Some(self.file_cache.clone()),
649                denial_tracker: Some(self.denial_tracker.clone()),
650                task_manager: Some(self.state.task_manager.clone()),
651                session_allows: Some(self.session_allows.clone()),
652                permission_prompter: self.permission_prompter.clone(),
653            };
654
655            // Collect streaming tool results first.
656            let streaming_ids: std::collections::HashSet<String> = streaming_tool_handles
657                .iter()
658                .map(|(id, _, _)| id.clone())
659                .collect();
660
661            let mut streaming_results = Vec::new();
662            for (id, name, handle) in streaming_tool_handles.drain(..) {
663                match handle.await {
664                    Ok(result) => streaming_results.push(crate::tools::executor::ToolCallResult {
665                        tool_use_id: id,
666                        tool_name: name,
667                        result,
668                    }),
669                    Err(e) => streaming_results.push(crate::tools::executor::ToolCallResult {
670                        tool_use_id: id,
671                        tool_name: name,
672                        result: crate::tools::ToolResult::error(format!("Task failed: {e}")),
673                    }),
674                }
675            }
676
677            // Fire pre-tool-use hooks.
678            for call in &tool_calls {
679                self.hooks
680                    .run_hooks(&HookEvent::PreToolUse, Some(&call.name), &call.input)
681                    .await;
682            }
683
684            // Execute remaining tools (ones not started during streaming).
685            let remaining_calls: Vec<_> = tool_calls
686                .iter()
687                .filter(|c| !streaming_ids.contains(&c.id))
688                .cloned()
689                .collect();
690
691            let mut results = streaming_results;
692            if !remaining_calls.is_empty() {
693                let batch_results = execute_tool_calls(
694                    &remaining_calls,
695                    self.tools.all(),
696                    &tool_ctx,
697                    &self.permissions,
698                )
699                .await;
700                results.extend(batch_results);
701            }
702
703            // Step 9: Inject tool results + fire post-tool-use hooks.
704            for result in &results {
705                // Handle plan mode state transitions.
706                if !result.result.is_error {
707                    match result.tool_name.as_str() {
708                        "EnterPlanMode" => {
709                            self.state.plan_mode = true;
710                            info!("Plan mode enabled");
711                        }
712                        "ExitPlanMode" => {
713                            self.state.plan_mode = false;
714                            info!("Plan mode disabled");
715                        }
716                        _ => {}
717                    }
718                }
719
720                sink.on_tool_result(&result.tool_name, &result.result);
721
722                // Fire post-tool-use hooks.
723                self.hooks
724                    .run_hooks(
725                        &HookEvent::PostToolUse,
726                        Some(&result.tool_name),
727                        &serde_json::json!({
728                            "tool": result.tool_name,
729                            "is_error": result.result.is_error,
730                        }),
731                    )
732                    .await;
733
734                let msg = tool_result_message(
735                    &result.tool_use_id,
736                    &result.result.content,
737                    result.result.is_error,
738                );
739                self.state.push_message(msg);
740            }
741
742            // Continue the loop — the model will see the tool results.
743        }
744
745        warn!("Max turns ({max_turns}) reached");
746        sink.on_warning(&format!("Agent stopped after {max_turns} turns"));
747        self.state.is_query_active = false;
748        Ok(())
749    }
750
751    /// Cancel the current operation.
752    pub fn cancel(&self) {
753        self.cancel.cancel();
754    }
755
756    /// Get a cloneable cancel token for use in background tasks.
757    pub fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
758        self.cancel.clone()
759    }
760}
761
762/// Get a fallback model (smaller/cheaper) for retry on overload.
763fn get_fallback_model(current: &str) -> String {
764    let lower = current.to_lowercase();
765    if lower.contains("opus") {
766        // Opus → Sonnet
767        current.replace("opus", "sonnet")
768    } else if (lower.contains("gpt-5.4") || lower.contains("gpt-4.1"))
769        && !lower.contains("mini")
770        && !lower.contains("nano")
771    {
772        format!("{current}-mini")
773    } else if lower.contains("large") {
774        current.replace("large", "small")
775    } else {
776        // Already a small model, keep it.
777        current.to_string()
778    }
779}
780
781/// Build the system prompt from tool definitions, app state, and memory.
782pub fn build_system_prompt(tools: &ToolRegistry, state: &AppState) -> String {
783    let mut prompt = String::new();
784
785    prompt.push_str(
786        "You are an AI coding agent. You help users with software engineering tasks \
787         by reading, writing, and searching code. Use the tools available to you to \
788         accomplish tasks.\n\n",
789    );
790
791    // Environment context.
792    let shell = std::env::var("SHELL").unwrap_or_else(|_| "bash".to_string());
793    let is_git = std::path::Path::new(&state.cwd).join(".git").exists();
794    prompt.push_str(&format!(
795        "# Environment\n\
796         - Working directory: {}\n\
797         - Platform: {}\n\
798         - Shell: {shell}\n\
799         - Git repository: {}\n\n",
800        state.cwd,
801        std::env::consts::OS,
802        if is_git { "yes" } else { "no" },
803    ));
804
805    // Inject memory context (project + user + on-demand relevant).
806    let mut memory = crate::memory::MemoryContext::load(Some(std::path::Path::new(&state.cwd)));
807
808    // On-demand: surface relevant memories based on recent conversation.
809    let recent_text: String = state
810        .messages
811        .iter()
812        .rev()
813        .take(5)
814        .filter_map(|m| match m {
815            crate::llm::message::Message::User(u) => Some(
816                u.content
817                    .iter()
818                    .filter_map(|b| b.as_text())
819                    .collect::<Vec<_>>()
820                    .join(" "),
821            ),
822            _ => None,
823        })
824        .collect::<Vec<_>>()
825        .join(" ");
826
827    if !recent_text.is_empty() {
828        memory.load_relevant(&recent_text);
829    }
830
831    let memory_section = memory.to_system_prompt_section();
832    if !memory_section.is_empty() {
833        prompt.push_str(&memory_section);
834    }
835
836    // Tool documentation.
837    prompt.push_str("# Available Tools\n\n");
838    for tool in tools.all() {
839        if tool.is_enabled() {
840            prompt.push_str(&format!("## {}\n{}\n\n", tool.name(), tool.prompt()));
841        }
842    }
843
844    // Available skills.
845    let skills = crate::skills::SkillRegistry::load_all(Some(std::path::Path::new(&state.cwd)));
846    let invocable = skills.user_invocable();
847    if !invocable.is_empty() {
848        prompt.push_str("# Available Skills\n\n");
849        for skill in invocable {
850            let desc = skill.metadata.description.as_deref().unwrap_or("");
851            let when = skill.metadata.when_to_use.as_deref().unwrap_or("");
852            prompt.push_str(&format!("- `/{}`", skill.name));
853            if !desc.is_empty() {
854                prompt.push_str(&format!(": {desc}"));
855            }
856            if !when.is_empty() {
857                prompt.push_str(&format!(" (use when: {when})"));
858            }
859            prompt.push('\n');
860        }
861        prompt.push('\n');
862    }
863
864    // Guidelines and safety framework.
865    prompt.push_str(
866        "# Using tools\n\n\
867         Use dedicated tools instead of shell commands when available:\n\
868         - File search: Glob (not find or ls)\n\
869         - Content search: Grep (not grep or rg)\n\
870         - Read files: FileRead (not cat/head/tail)\n\
871         - Edit files: FileEdit (not sed/awk)\n\
872         - Write files: FileWrite (not echo/cat with redirect)\n\
873         - Reserve Bash for system commands and operations that require shell execution.\n\
874         - Break complex tasks into steps. Use multiple tool calls in parallel when independent.\n\
875         - Use the Agent tool for complex multi-step research or tasks that benefit from isolation.\n\n\
876         # Working with code\n\n\
877         - Read files before editing them. Understand existing code before suggesting changes.\n\
878         - Prefer editing existing files over creating new ones to avoid file bloat.\n\
879         - Only make changes that were requested. Don't add features, refactor, add comments, \
880           or make \"improvements\" beyond the ask.\n\
881         - Don't add error handling for scenarios that can't happen. Don't design for \
882           hypothetical future requirements.\n\
883         - When referencing code, include file_path:line_number.\n\
884         - Be careful not to introduce security vulnerabilities (command injection, XSS, SQL injection, \
885           OWASP top 10). If you notice insecure code you wrote, fix it immediately.\n\
886         - Don't add docstrings, comments, or type annotations to code you didn't change.\n\
887         - Three similar lines of code is better than a premature abstraction.\n\n\
888         # Git safety protocol\n\n\
889         - NEVER update the git config.\n\
890         - NEVER run destructive git commands (push --force, reset --hard, checkout ., restore ., \
891           clean -f, branch -D) unless the user explicitly requests them.\n\
892         - NEVER skip hooks (--no-verify, --no-gpg-sign) unless the user explicitly requests it.\n\
893         - NEVER force push to main/master. Warn the user if they request it.\n\
894         - Always create NEW commits rather than amending, unless the user explicitly requests amend. \
895           After hook failure, the commit did NOT happen — amend would modify the PREVIOUS commit.\n\
896         - When staging files, prefer adding specific files by name rather than git add -A or git add ., \
897           which can accidentally include sensitive files.\n\
898         - NEVER commit changes unless the user explicitly asks.\n\n\
899         # Committing changes\n\n\
900         When the user asks to commit:\n\
901         1. Run git status and git diff to see all changes.\n\
902         2. Run git log --oneline -5 to match the repository's commit message style.\n\
903         3. Draft a concise (1-2 sentence) commit message focusing on \"why\" not \"what\".\n\
904         4. Do not commit files that likely contain secrets (.env, credentials.json).\n\
905         5. Stage specific files, create the commit.\n\
906         6. If pre-commit hook fails, fix the issue and create a NEW commit.\n\
907         7. When creating commits, include a co-author attribution line at the end of the message.\n\n\
908         # Creating pull requests\n\n\
909         When the user asks to create a PR:\n\
910         1. Run git status, git diff, and git log to understand all changes on the branch.\n\
911         2. Analyze ALL commits (not just the latest) that will be in the PR.\n\
912         3. Draft a short title (under 70 chars) and detailed body with summary and test plan.\n\
913         4. Push to remote with -u flag if needed, then create PR using gh pr create.\n\
914         5. Return the PR URL when done.\n\n\
915         # Executing actions safely\n\n\
916         Consider the reversibility and blast radius of every action:\n\
917         - Freely take local, reversible actions (editing files, running tests).\n\
918         - For hard-to-reverse or shared-state actions, confirm with the user first:\n\
919           - Destructive: deleting files/branches, dropping tables, rm -rf, overwriting uncommitted changes.\n\
920           - Hard to reverse: force-pushing, git reset --hard, amending published commits.\n\
921           - Visible to others: pushing code, creating/commenting on PRs/issues, sending messages.\n\
922         - When you encounter an obstacle, do not use destructive actions as a shortcut. \
923           Identify root causes and fix underlying issues.\n\
924         - If you discover unexpected state (unfamiliar files, branches, config), investigate \
925           before deleting or overwriting — it may be the user's in-progress work.\n\n\
926         # Response style\n\n\
927         - Be concise. Lead with the answer or action, not the reasoning.\n\
928         - Skip filler, preamble, and unnecessary transitions.\n\
929         - Don't restate what the user said.\n\
930         - If you can say it in one sentence, don't use three.\n\
931         - Focus output on: decisions that need input, status updates, and errors that change the plan.\n\
932         - When referencing GitHub issues or PRs, use owner/repo#123 format.\n\
933         - Only use emojis if the user explicitly requests it.\n\n\
934         # Memory\n\n\
935         You can save information across sessions by writing memory files.\n\
936         - Save to: ~/.config/agent-code/memory/ (one .md file per topic)\n\
937         - Each file needs YAML frontmatter: name, description, type (user/feedback/project/reference)\n\
938         - After writing a file, update MEMORY.md with a one-line pointer\n\
939         - Memory types: user (role, preferences), feedback (corrections, confirmations), \
940           project (decisions, deadlines), reference (external resources)\n\
941         - Do NOT store: code patterns, git history, debugging solutions, anything derivable from code\n\
942         - Memory is a hint — always verify against current state before acting on it\n",
943    );
944
945    // Detailed tool usage examples and workflow patterns.
946    prompt.push_str(
947        "# Tool usage patterns\n\n\
948         Common patterns for effective tool use:\n\n\
949         **Read before edit**: Always read a file before editing it. This ensures you \
950         understand the current state and can make targeted changes.\n\
951         ```\n\
952         1. FileRead file_path → understand structure\n\
953         2. FileEdit old_string, new_string → targeted change\n\
954         ```\n\n\
955         **Search then act**: Use Glob to find files, Grep to find content, then read/edit.\n\
956         ```\n\
957         1. Glob **/*.rs → find Rust files\n\
958         2. Grep pattern path → find specific code\n\
959         3. FileRead → read the match\n\
960         4. FileEdit → make the change\n\
961         ```\n\n\
962         **Parallel tool calls**: When you need to read multiple independent files or run \
963         independent searches, make all the tool calls in one response. Don't serialize \
964         independent operations.\n\n\
965         **Test after change**: After editing code, run tests to verify the change works.\n\
966         ```\n\
967         1. FileEdit → make change\n\
968         2. Bash cargo test / pytest / npm test → verify\n\
969         3. If tests fail, read the error, fix, re-test\n\
970         ```\n\n\
971         # Error recovery\n\n\
972         When something goes wrong:\n\
973         - **Tool not found**: Use ToolSearch to find the right tool name.\n\
974         - **Permission denied**: Explain why the action is needed, ask the user to approve.\n\
975         - **File not found**: Use Glob to find the correct path. Check for typos.\n\
976         - **Edit failed (not unique)**: Provide more surrounding context in old_string, \
977           or use replace_all=true if renaming.\n\
978         - **Command failed**: Read the full error message. Don't retry the same command. \
979           Diagnose the root cause first.\n\
980         - **Context too large**: The system will auto-compact. If you need specific \
981           information from before compaction, re-read the relevant files.\n\
982         - **Rate limited**: The system will auto-retry with backoff. Just wait.\n\n\
983         # Common workflows\n\n\
984         **Bug fix**: Read the failing test → read the source code it tests → \
985         identify the bug → fix it → run the test → confirm it passes.\n\n\
986         **New feature**: Read existing patterns in the codebase → create or edit files → \
987         add tests → run tests → update docs if needed.\n\n\
988         **Code review**: Read the diff → identify issues (bugs, security, style) → \
989         report findings with file:line references.\n\n\
990         **Refactor**: Search for all usages of the symbol → plan the changes → \
991         edit each file → run tests to verify nothing broke.\n\n",
992    );
993
994    // MCP server instructions (dynamic, per-server).
995    if !state.config.mcp_servers.is_empty() {
996        prompt.push_str("# MCP Servers\n\n");
997        prompt.push_str(
998            "Connected MCP servers provide additional tools. MCP tools are prefixed \
999             with `mcp__{server}__{tool}`. Use them like any other tool.\n\n",
1000        );
1001        for (name, entry) in &state.config.mcp_servers {
1002            let transport = if entry.command.is_some() {
1003                "stdio"
1004            } else if entry.url.is_some() {
1005                "sse"
1006            } else {
1007                "unknown"
1008            };
1009            prompt.push_str(&format!("- **{name}** ({transport})\n"));
1010        }
1011        prompt.push('\n');
1012    }
1013
1014    // Deferred tools listing.
1015    let deferred = tools.deferred_names();
1016    if !deferred.is_empty() {
1017        prompt.push_str("# Deferred Tools\n\n");
1018        prompt.push_str(
1019            "These tools are available but not loaded by default. \
1020             Use ToolSearch to load them when needed:\n",
1021        );
1022        for name in &deferred {
1023            prompt.push_str(&format!("- {name}\n"));
1024        }
1025        prompt.push('\n');
1026    }
1027
1028    // Task management guidance.
1029    prompt.push_str(
1030        "# Task management\n\n\
1031         - Use TaskCreate to break complex work into trackable steps.\n\
1032         - Mark tasks as in_progress when starting, completed when done.\n\
1033         - Use the Agent tool to spawn subagents for parallel independent work.\n\
1034         - Use EnterPlanMode/ExitPlanMode for read-only exploration before making changes.\n\
1035         - Use EnterWorktree/ExitWorktree for isolated changes in git worktrees.\n\n\
1036         # Output formatting\n\n\
1037         - All text output is displayed to the user. Use GitHub-flavored markdown.\n\
1038         - Use fenced code blocks with language hints for code: ```rust, ```python, etc.\n\
1039         - Use inline `code` for file names, function names, and short code references.\n\
1040         - Use tables for structured comparisons.\n\
1041         - Use bullet lists for multiple items.\n\
1042         - Keep paragraphs short (2-3 sentences).\n\
1043         - Never output raw HTML or complex formatting — stick to standard markdown.\n",
1044    );
1045
1046    prompt
1047}