Skip to main content

agent_code_lib/query/
mod.rs

1//! Query engine: the core agent loop.
2//!
3//! Implements the agentic cycle:
4//!
5//! 1. Auto-compact if context nears the window limit
6//! 2. Microcompact stale tool results
7//! 3. Call LLM with streaming
8//! 4. Accumulate response content blocks
9//! 5. Handle errors (prompt-too-long, rate limits, max-output-tokens)
10//! 6. Extract tool_use blocks
11//! 7. Execute tools (concurrent/serial batching)
12//! 8. Inject tool results into history
13//! 9. Repeat from step 1 until no tool_use or max turns
14
15pub mod source;
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use tracing::{debug, info, warn};
22use uuid::Uuid;
23
24use crate::hooks::{HookEvent, HookRegistry};
25use crate::llm::message::*;
26use crate::llm::provider::{Provider, ProviderError, ProviderRequest};
27use crate::llm::stream::StreamEvent;
28use crate::permissions::PermissionChecker;
29use crate::services::compact::{self, CompactTracking, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT};
30use crate::services::tokens;
31use crate::state::AppState;
32use crate::tools::ToolContext;
33use crate::tools::executor::{execute_tool_calls, extract_tool_calls};
34use crate::tools::registry::ToolRegistry;
35
36/// Configuration for the query engine.
37pub struct QueryEngineConfig {
38    pub max_turns: Option<usize>,
39    pub verbose: bool,
40    /// Whether this is a non-interactive (one-shot) session.
41    pub unattended: bool,
42}
43
44/// The query engine orchestrates the agent loop.
45///
46/// Central coordinator that drives the LLM → tools → LLM cycle.
47/// Manages conversation history, context compaction, tool execution,
48/// error recovery, and hook dispatch. Create via [`QueryEngine::new`].
49pub struct QueryEngine {
50    llm: Arc<dyn Provider>,
51    tools: ToolRegistry,
52    file_cache: Arc<tokio::sync::Mutex<crate::services::file_cache::FileCache>>,
53    permissions: Arc<PermissionChecker>,
54    state: AppState,
55    config: QueryEngineConfig,
56    /// Shared handle so the signal handler always cancels the current token.
57    cancel_shared: Arc<std::sync::Mutex<CancellationToken>>,
58    /// Per-turn cancellation token (cloned from cancel_shared at turn start).
59    cancel: CancellationToken,
60    hooks: HookRegistry,
61    cache_tracker: crate::services::cache_tracking::CacheTracker,
62    denial_tracker: Arc<tokio::sync::Mutex<crate::permissions::tracking::DenialTracker>>,
63    extraction_state: Arc<tokio::sync::Mutex<crate::memory::extraction::ExtractionState>>,
64    session_allows: Arc<tokio::sync::Mutex<std::collections::HashSet<String>>>,
65    permission_prompter: Option<Arc<dyn crate::tools::PermissionPrompter>>,
66    /// Cached system prompt (rebuilt only when inputs change).
67    cached_system_prompt: Option<(u64, String)>, // (hash, prompt)
68}
69
70/// Callback for streaming events to the UI.
71pub trait StreamSink: Send + Sync {
72    fn on_text(&self, text: &str);
73    fn on_tool_start(&self, tool_name: &str, input: &serde_json::Value);
74    fn on_tool_result(&self, tool_name: &str, result: &crate::tools::ToolResult);
75    fn on_thinking(&self, _text: &str) {}
76    fn on_turn_complete(&self, _turn: usize) {}
77    fn on_error(&self, error: &str);
78    fn on_usage(&self, _usage: &Usage) {}
79    fn on_compact(&self, _freed_tokens: u64) {}
80    fn on_warning(&self, _msg: &str) {}
81}
82
83/// A no-op stream sink for non-interactive mode.
84pub struct NullSink;
85impl StreamSink for NullSink {
86    fn on_text(&self, _: &str) {}
87    fn on_tool_start(&self, _: &str, _: &serde_json::Value) {}
88    fn on_tool_result(&self, _: &str, _: &crate::tools::ToolResult) {}
89    fn on_error(&self, _: &str) {}
90}
91
92impl QueryEngine {
93    pub fn new(
94        llm: Arc<dyn Provider>,
95        tools: ToolRegistry,
96        permissions: PermissionChecker,
97        state: AppState,
98        config: QueryEngineConfig,
99    ) -> Self {
100        let cancel = CancellationToken::new();
101        let cancel_shared = Arc::new(std::sync::Mutex::new(cancel.clone()));
102        Self {
103            llm,
104            tools,
105            file_cache: Arc::new(tokio::sync::Mutex::new(
106                crate::services::file_cache::FileCache::new(),
107            )),
108            permissions: Arc::new(permissions),
109            state,
110            config,
111            cancel,
112            cancel_shared,
113            hooks: HookRegistry::new(),
114            cache_tracker: crate::services::cache_tracking::CacheTracker::new(),
115            denial_tracker: Arc::new(tokio::sync::Mutex::new(
116                crate::permissions::tracking::DenialTracker::new(100),
117            )),
118            extraction_state: Arc::new(tokio::sync::Mutex::new(
119                crate::memory::extraction::ExtractionState::new(),
120            )),
121            session_allows: Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new())),
122            permission_prompter: None,
123            cached_system_prompt: None,
124        }
125    }
126
127    /// Load hooks from configuration into the registry.
128    pub fn load_hooks(&mut self, hook_defs: &[crate::hooks::HookDefinition]) {
129        for def in hook_defs {
130            self.hooks.register(def.clone());
131        }
132        if !hook_defs.is_empty() {
133            tracing::info!("Loaded {} hooks from config", hook_defs.len());
134        }
135    }
136
137    /// Get a reference to the app state.
138    pub fn state(&self) -> &AppState {
139        &self.state
140    }
141
142    /// Get a mutable reference to the app state.
143    pub fn state_mut(&mut self) -> &mut AppState {
144        &mut self.state
145    }
146
147    /// Install a Ctrl+C handler that triggers the cancellation token.
148    /// Call this once at startup. Subsequent Ctrl+C signals during a
149    /// turn will cancel the active operation instead of killing the process.
150    pub fn install_signal_handler(&self) {
151        let shared = self.cancel_shared.clone();
152        tokio::spawn(async move {
153            let mut pending = false;
154            loop {
155                if tokio::signal::ctrl_c().await.is_ok() {
156                    let token = shared.lock().unwrap().clone();
157                    if token.is_cancelled() && pending {
158                        // Second Ctrl+C after cancel — hard exit.
159                        std::process::exit(130);
160                    }
161                    token.cancel();
162                    pending = true;
163                }
164            }
165        });
166    }
167
168    /// Run a single turn: process user input through the full agent loop.
169    pub async fn run_turn(&mut self, user_input: &str) -> crate::error::Result<()> {
170        self.run_turn_with_sink(user_input, &NullSink).await
171    }
172
173    /// Run a turn with a stream sink for real-time UI updates.
174    pub async fn run_turn_with_sink(
175        &mut self,
176        user_input: &str,
177        sink: &dyn StreamSink,
178    ) -> crate::error::Result<()> {
179        // Reset cancellation token for this turn. The shared handle is
180        // updated so the signal handler always cancels the current token.
181        self.cancel = CancellationToken::new();
182        *self.cancel_shared.lock().unwrap() = self.cancel.clone();
183
184        // Add the user message to history.
185        let user_msg = user_message(user_input);
186        self.state.push_message(user_msg);
187
188        let max_turns = self.config.max_turns.unwrap_or(50);
189        let mut compact_tracking = CompactTracking::default();
190        let mut retry_state = crate::llm::retry::RetryState::default();
191        let retry_config = crate::llm::retry::RetryConfig::default();
192        let mut max_output_recovery_count = 0u32;
193
194        // Agent loop: budget check → normalize → compact → call LLM → execute tools → repeat.
195        for turn in 0..max_turns {
196            self.state.turn_count = turn + 1;
197            self.state.is_query_active = true;
198
199            // Budget check before each turn.
200            let budget_config = crate::services::budget::BudgetConfig::default();
201            match crate::services::budget::check_budget(
202                self.state.total_cost_usd,
203                self.state.total_usage.total(),
204                &budget_config,
205            ) {
206                crate::services::budget::BudgetDecision::Stop { message } => {
207                    sink.on_warning(&message);
208                    self.state.is_query_active = false;
209                    return Ok(());
210                }
211                crate::services::budget::BudgetDecision::ContinueWithWarning {
212                    message, ..
213                } => {
214                    sink.on_warning(&message);
215                }
216                crate::services::budget::BudgetDecision::Continue => {}
217            }
218
219            // Normalize messages for API compatibility.
220            crate::llm::normalize::ensure_tool_result_pairing(&mut self.state.messages);
221            crate::llm::normalize::strip_empty_blocks(&mut self.state.messages);
222            crate::llm::normalize::remove_empty_messages(&mut self.state.messages);
223            crate::llm::normalize::cap_document_blocks(&mut self.state.messages, 500_000);
224            crate::llm::normalize::merge_consecutive_user_messages(&mut self.state.messages);
225
226            debug!("Agent turn {}/{}", turn + 1, max_turns);
227
228            let mut model = self.state.config.api.model.clone();
229
230            // Step 1: Auto-compact if context is too large.
231            if compact::should_auto_compact(self.state.history(), &model, &compact_tracking) {
232                let token_count = tokens::estimate_context_tokens(self.state.history());
233                let threshold = compact::auto_compact_threshold(&model);
234                info!("Auto-compact triggered: {token_count} tokens >= {threshold} threshold");
235
236                // Microcompact first: clear stale tool results.
237                let freed = compact::microcompact(&mut self.state.messages, 5);
238                if freed > 0 {
239                    sink.on_compact(freed);
240                    info!("Microcompact freed ~{freed} tokens");
241                }
242
243                // Check if microcompact was enough.
244                let post_mc_tokens = tokens::estimate_context_tokens(self.state.history());
245                if post_mc_tokens >= threshold {
246                    // Full LLM-based compaction: summarize older messages.
247                    info!("Microcompact insufficient, attempting LLM compaction");
248                    match compact::compact_with_llm(&mut self.state.messages, &*self.llm, &model)
249                        .await
250                    {
251                        Some(removed) => {
252                            info!("LLM compaction removed {removed} messages");
253                            compact_tracking.was_compacted = true;
254                            compact_tracking.consecutive_failures = 0;
255                        }
256                        None => {
257                            compact_tracking.consecutive_failures += 1;
258                            warn!(
259                                "LLM compaction failed (attempt {})",
260                                compact_tracking.consecutive_failures
261                            );
262                            // Fallback: context collapse (snip middle messages).
263                            let effective = compact::effective_context_window(&model);
264                            if let Some(collapse) =
265                                crate::services::context_collapse::collapse_to_budget(
266                                    self.state.history(),
267                                    effective,
268                                )
269                            {
270                                info!(
271                                    "Context collapse snipped {} messages, freed ~{} tokens",
272                                    collapse.snipped_count, collapse.tokens_freed
273                                );
274                                self.state.messages = collapse.api_messages;
275                                sink.on_compact(collapse.tokens_freed);
276                            } else {
277                                // Last resort: aggressive microcompact.
278                                let freed2 = compact::microcompact(&mut self.state.messages, 2);
279                                if freed2 > 0 {
280                                    sink.on_compact(freed2);
281                                }
282                            }
283                        }
284                    }
285                }
286            }
287
288            // Inject compaction reminder if compacted and feature enabled.
289            if compact_tracking.was_compacted && self.state.config.features.compaction_reminders {
290                let reminder = user_message(
291                    "<system-reminder>Context was automatically compacted. \
292                     Earlier messages were summarized. If you need details from \
293                     before compaction, ask the user or re-read the relevant files.</system-reminder>",
294                );
295                self.state.push_message(reminder);
296                compact_tracking.was_compacted = false; // Only remind once per compaction.
297            }
298
299            // Step 2: Check token warning state.
300            let warning = compact::token_warning_state(self.state.history(), &model);
301            if warning.is_blocking {
302                sink.on_warning("Context window nearly full. Consider starting a new session.");
303            } else if warning.is_above_warning {
304                sink.on_warning(&format!("Context {}% remaining", warning.percent_left));
305            }
306
307            // Step 3: Build and send the API request.
308            // Memoize: only rebuild system prompt when inputs change.
309            let prompt_hash = {
310                use std::hash::{Hash, Hasher};
311                let mut h = std::collections::hash_map::DefaultHasher::new();
312                self.state.config.api.model.hash(&mut h);
313                self.state.cwd.hash(&mut h);
314                self.state.config.mcp_servers.len().hash(&mut h);
315                self.tools.all().len().hash(&mut h);
316                h.finish()
317            };
318            let system_prompt = if let Some((cached_hash, ref cached)) = self.cached_system_prompt
319                && cached_hash == prompt_hash
320            {
321                cached.clone()
322            } else {
323                let prompt = build_system_prompt(&self.tools, &self.state);
324                self.cached_system_prompt = Some((prompt_hash, prompt.clone()));
325                prompt
326            };
327            // Use core schemas (deferred tools loaded on demand via ToolSearch).
328            let tool_schemas = self.tools.core_schemas();
329
330            // Escalate max_tokens after a max_output recovery (8k → 64k).
331            let base_tokens = self.state.config.api.max_output_tokens.unwrap_or(16384);
332            let effective_tokens = if max_output_recovery_count > 0 {
333                base_tokens.max(65536) // Escalate to at least 64k after first recovery
334            } else {
335                base_tokens
336            };
337
338            let request = ProviderRequest {
339                messages: self.state.history().to_vec(),
340                system_prompt: system_prompt.clone(),
341                tools: tool_schemas.clone(),
342                model: model.clone(),
343                max_tokens: effective_tokens,
344                temperature: None,
345                enable_caching: self.state.config.features.prompt_caching,
346                tool_choice: Default::default(),
347                metadata: None,
348            };
349
350            let mut rx = match self.llm.stream(&request).await {
351                Ok(rx) => {
352                    retry_state.reset();
353                    rx
354                }
355                Err(e) => {
356                    let retryable = match &e {
357                        ProviderError::RateLimited { retry_after_ms } => {
358                            crate::llm::retry::RetryableError::RateLimited {
359                                retry_after: *retry_after_ms,
360                            }
361                        }
362                        ProviderError::Overloaded => crate::llm::retry::RetryableError::Overloaded,
363                        ProviderError::Network(_) => {
364                            crate::llm::retry::RetryableError::StreamInterrupted
365                        }
366                        other => crate::llm::retry::RetryableError::NonRetryable(other.to_string()),
367                    };
368
369                    match retry_state.next_action(&retryable, &retry_config) {
370                        crate::llm::retry::RetryAction::Retry { after } => {
371                            warn!("Retrying in {}ms", after.as_millis());
372                            tokio::time::sleep(after).await;
373                            continue;
374                        }
375                        crate::llm::retry::RetryAction::FallbackModel => {
376                            // Switch to a smaller/cheaper model for this turn.
377                            let fallback = get_fallback_model(&model);
378                            sink.on_warning(&format!("Falling back from {model} to {fallback}"));
379                            model = fallback;
380                            continue;
381                        }
382                        crate::llm::retry::RetryAction::Abort(reason) => {
383                            // Unattended retry: in non-interactive mode, retry
384                            // capacity errors with longer backoff instead of aborting.
385                            if self.config.unattended
386                                && self.state.config.features.unattended_retry
387                                && matches!(
388                                    &e,
389                                    ProviderError::Overloaded | ProviderError::RateLimited { .. }
390                                )
391                            {
392                                warn!("Unattended retry: waiting 30s for capacity");
393                                tokio::time::sleep(std::time::Duration::from_secs(30)).await;
394                                continue;
395                            }
396                            // Before giving up, try reactive compact for size errors.
397                            // Two-stage recovery: context collapse first, then microcompact.
398                            if let ProviderError::RequestTooLarge(body) = &e {
399                                let gap = compact::parse_prompt_too_long_gap(body);
400
401                                // Stage 1: Context collapse (snip middle messages).
402                                let effective = compact::effective_context_window(&model);
403                                if let Some(collapse) =
404                                    crate::services::context_collapse::collapse_to_budget(
405                                        self.state.history(),
406                                        effective,
407                                    )
408                                {
409                                    info!(
410                                        "Reactive collapse: snipped {} messages, freed ~{} tokens",
411                                        collapse.snipped_count, collapse.tokens_freed
412                                    );
413                                    self.state.messages = collapse.api_messages;
414                                    sink.on_compact(collapse.tokens_freed);
415                                    continue;
416                                }
417
418                                // Stage 2: Aggressive microcompact.
419                                let freed = compact::microcompact(&mut self.state.messages, 1);
420                                if freed > 0 {
421                                    sink.on_compact(freed);
422                                    info!(
423                                        "Reactive microcompact freed ~{freed} tokens (gap: {gap:?})"
424                                    );
425                                    continue;
426                                }
427                            }
428                            sink.on_error(&reason);
429                            self.state.is_query_active = false;
430                            return Err(crate::error::Error::Other(e.to_string()));
431                        }
432                    }
433                }
434            };
435
436            // Step 4: Stream response. Start executing read-only tools
437            // as their input completes (streaming tool execution).
438            let mut content_blocks = Vec::new();
439            let mut usage = Usage::default();
440            let mut stop_reason: Option<StopReason> = None;
441            let mut got_error = false;
442            let mut error_text = String::new();
443
444            // Streaming tool handles: tools kicked off during streaming.
445            let mut streaming_tool_handles: Vec<(
446                String,
447                String,
448                tokio::task::JoinHandle<crate::tools::ToolResult>,
449            )> = Vec::new();
450
451            let mut cancelled = false;
452            loop {
453                tokio::select! {
454                    event = rx.recv() => {
455                        match event {
456                            Some(StreamEvent::TextDelta(text)) => {
457                                sink.on_text(&text);
458                            }
459                            Some(StreamEvent::ContentBlockComplete(block)) => {
460                                if let ContentBlock::ToolUse {
461                                    ref id,
462                                    ref name,
463                                    ref input,
464                                } = block
465                                {
466                                    sink.on_tool_start(name, input);
467
468                                    // Start read-only tools immediately during streaming.
469                                    if let Some(tool) = self.tools.get(name)
470                                        && tool.is_read_only()
471                                        && tool.is_concurrency_safe()
472                                    {
473                                        let tool = tool.clone();
474                                        let input = input.clone();
475                                        let cwd = std::path::PathBuf::from(&self.state.cwd);
476                                        let cancel = self.cancel.clone();
477                                        let perm = self.permissions.clone();
478                                        let tool_id = id.clone();
479                                        let tool_name = name.clone();
480
481                                        let handle = tokio::spawn(async move {
482                                            match tool
483                                                .call(
484                                                    input,
485                                                    &ToolContext {
486                                                        cwd,
487                                                        cancel,
488                                                        permission_checker: perm.clone(),
489                                                        verbose: false,
490                                                        plan_mode: false,
491                                                        file_cache: None,
492                                                        denial_tracker: None,
493                                                        task_manager: None,
494                                                        session_allows: None,
495                                                        permission_prompter: None,
496                                                    },
497                                                )
498                                                .await
499                                            {
500                                                Ok(r) => r,
501                                                Err(e) => crate::tools::ToolResult::error(e.to_string()),
502                                            }
503                                        });
504
505                                        streaming_tool_handles.push((tool_id, tool_name, handle));
506                                    }
507                                }
508                                if let ContentBlock::Thinking { ref thinking, .. } = block {
509                                    sink.on_thinking(thinking);
510                                }
511                                content_blocks.push(block);
512                            }
513                            Some(StreamEvent::Done {
514                                usage: u,
515                                stop_reason: sr,
516                            }) => {
517                                usage = u;
518                                stop_reason = sr;
519                                sink.on_usage(&usage);
520                            }
521                            Some(StreamEvent::Error(msg)) => {
522                                got_error = true;
523                                error_text = msg.clone();
524                                sink.on_error(&msg);
525                            }
526                            Some(_) => {}
527                            None => break,
528                        }
529                    }
530                    _ = self.cancel.cancelled() => {
531                        warn!("Turn cancelled by user");
532                        cancelled = true;
533                        // Abort any in-flight streaming tool handles.
534                        for (_, _, handle) in streaming_tool_handles.drain(..) {
535                            handle.abort();
536                        }
537                        break;
538                    }
539                }
540            }
541
542            if cancelled {
543                sink.on_warning("Cancelled");
544                self.state.is_query_active = false;
545                return Ok(());
546            }
547
548            // Step 5: Record the assistant message.
549            let assistant_msg = Message::Assistant(AssistantMessage {
550                uuid: Uuid::new_v4(),
551                timestamp: chrono::Utc::now().to_rfc3339(),
552                content: content_blocks.clone(),
553                model: Some(model.clone()),
554                usage: Some(usage.clone()),
555                stop_reason: stop_reason.clone(),
556                request_id: None,
557            });
558            self.state.push_message(assistant_msg);
559            self.state.record_usage(&usage, &model);
560
561            // Token budget tracking per turn.
562            if self.state.config.features.token_budget && usage.total() > 0 {
563                let turn_total = usage.input_tokens + usage.output_tokens;
564                if turn_total > 100_000 {
565                    sink.on_warning(&format!(
566                        "High token usage this turn: {} tokens ({}in + {}out)",
567                        turn_total, usage.input_tokens, usage.output_tokens
568                    ));
569                }
570            }
571
572            // Record cache and telemetry.
573            let _cache_event = self.cache_tracker.record(&usage);
574            {
575                let mut span = crate::services::telemetry::api_call_span(
576                    &model,
577                    turn + 1,
578                    &self.state.session_id,
579                );
580                crate::services::telemetry::record_usage(&mut span, &usage);
581                span.finish();
582                tracing::debug!(
583                    "API call: {}ms, {}in/{}out tokens",
584                    span.duration_ms().unwrap_or(0),
585                    usage.input_tokens,
586                    usage.output_tokens,
587                );
588            }
589
590            // Step 6: Handle stream errors.
591            if got_error {
592                // Check if it's a prompt-too-long error in the stream.
593                if error_text.contains("prompt is too long")
594                    || error_text.contains("Prompt is too long")
595                {
596                    let freed = compact::microcompact(&mut self.state.messages, 1);
597                    if freed > 0 {
598                        sink.on_compact(freed);
599                        continue;
600                    }
601                }
602
603                // Check for max-output-tokens hit (partial response).
604                if content_blocks
605                    .iter()
606                    .any(|b| matches!(b, ContentBlock::Text { .. }))
607                    && error_text.contains("max_tokens")
608                    && max_output_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
609                {
610                    max_output_recovery_count += 1;
611                    info!(
612                        "Max output tokens recovery attempt {}/{}",
613                        max_output_recovery_count, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
614                    );
615                    let recovery_msg = compact::max_output_recovery_message();
616                    self.state.push_message(recovery_msg);
617                    continue;
618                }
619            }
620
621            // Step 6b: Handle max_tokens stop reason (escalate and continue).
622            if matches!(stop_reason, Some(StopReason::MaxTokens))
623                && !got_error
624                && content_blocks
625                    .iter()
626                    .any(|b| matches!(b, ContentBlock::Text { .. }))
627                && max_output_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
628            {
629                max_output_recovery_count += 1;
630                info!(
631                    "Max tokens stop reason — recovery attempt {}/{}",
632                    max_output_recovery_count, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
633                );
634                let recovery_msg = compact::max_output_recovery_message();
635                self.state.push_message(recovery_msg);
636                continue;
637            }
638
639            // Step 7: Extract tool calls from the response.
640            let tool_calls = extract_tool_calls(&content_blocks);
641
642            if tool_calls.is_empty() {
643                // No tools requested — turn is complete.
644                info!("Turn complete (no tool calls)");
645                sink.on_turn_complete(turn + 1);
646                self.state.is_query_active = false;
647
648                // Fire background memory extraction (fire-and-forget).
649                // Only runs if feature enabled and memory directory exists.
650                if self.state.config.features.extract_memories
651                    && crate::memory::ensure_memory_dir().is_some()
652                {
653                    let extraction_messages = self.state.messages.clone();
654                    let extraction_state = self.extraction_state.clone();
655                    let extraction_llm = self.llm.clone();
656                    let extraction_model = model.clone();
657                    tokio::spawn(async move {
658                        crate::memory::extraction::extract_memories_background(
659                            extraction_messages,
660                            extraction_state,
661                            extraction_llm,
662                            extraction_model,
663                        )
664                        .await;
665                    });
666                }
667
668                return Ok(());
669            }
670
671            // Step 8: Execute tool calls with pre/post hooks.
672            info!("Executing {} tool call(s)", tool_calls.len());
673            let cwd = PathBuf::from(&self.state.cwd);
674            let tool_ctx = ToolContext {
675                cwd,
676                cancel: self.cancel.clone(),
677                permission_checker: self.permissions.clone(),
678                verbose: self.config.verbose,
679                plan_mode: self.state.plan_mode,
680                file_cache: Some(self.file_cache.clone()),
681                denial_tracker: Some(self.denial_tracker.clone()),
682                task_manager: Some(self.state.task_manager.clone()),
683                session_allows: Some(self.session_allows.clone()),
684                permission_prompter: self.permission_prompter.clone(),
685            };
686
687            // Collect streaming tool results first.
688            let streaming_ids: std::collections::HashSet<String> = streaming_tool_handles
689                .iter()
690                .map(|(id, _, _)| id.clone())
691                .collect();
692
693            let mut streaming_results = Vec::new();
694            for (id, name, handle) in streaming_tool_handles.drain(..) {
695                match handle.await {
696                    Ok(result) => streaming_results.push(crate::tools::executor::ToolCallResult {
697                        tool_use_id: id,
698                        tool_name: name,
699                        result,
700                    }),
701                    Err(e) => streaming_results.push(crate::tools::executor::ToolCallResult {
702                        tool_use_id: id,
703                        tool_name: name,
704                        result: crate::tools::ToolResult::error(format!("Task failed: {e}")),
705                    }),
706                }
707            }
708
709            // Fire pre-tool-use hooks.
710            for call in &tool_calls {
711                self.hooks
712                    .run_hooks(&HookEvent::PreToolUse, Some(&call.name), &call.input)
713                    .await;
714            }
715
716            // Execute remaining tools (ones not started during streaming).
717            let remaining_calls: Vec<_> = tool_calls
718                .iter()
719                .filter(|c| !streaming_ids.contains(&c.id))
720                .cloned()
721                .collect();
722
723            let mut results = streaming_results;
724            if !remaining_calls.is_empty() {
725                let batch_results = execute_tool_calls(
726                    &remaining_calls,
727                    self.tools.all(),
728                    &tool_ctx,
729                    &self.permissions,
730                )
731                .await;
732                results.extend(batch_results);
733            }
734
735            // Step 9: Inject tool results + fire post-tool-use hooks.
736            for result in &results {
737                // Handle plan mode state transitions.
738                if !result.result.is_error {
739                    match result.tool_name.as_str() {
740                        "EnterPlanMode" => {
741                            self.state.plan_mode = true;
742                            info!("Plan mode enabled");
743                        }
744                        "ExitPlanMode" => {
745                            self.state.plan_mode = false;
746                            info!("Plan mode disabled");
747                        }
748                        _ => {}
749                    }
750                }
751
752                sink.on_tool_result(&result.tool_name, &result.result);
753
754                // Fire post-tool-use hooks.
755                self.hooks
756                    .run_hooks(
757                        &HookEvent::PostToolUse,
758                        Some(&result.tool_name),
759                        &serde_json::json!({
760                            "tool": result.tool_name,
761                            "is_error": result.result.is_error,
762                        }),
763                    )
764                    .await;
765
766                let msg = tool_result_message(
767                    &result.tool_use_id,
768                    &result.result.content,
769                    result.result.is_error,
770                );
771                self.state.push_message(msg);
772            }
773
774            // Continue the loop — the model will see the tool results.
775        }
776
777        warn!("Max turns ({max_turns}) reached");
778        sink.on_warning(&format!("Agent stopped after {max_turns} turns"));
779        self.state.is_query_active = false;
780        Ok(())
781    }
782
783    /// Cancel the current operation.
784    pub fn cancel(&self) {
785        self.cancel.cancel();
786    }
787
788    /// Get a cloneable cancel token for use in background tasks.
789    pub fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
790        self.cancel.clone()
791    }
792}
793
794/// Get a fallback model (smaller/cheaper) for retry on overload.
795fn get_fallback_model(current: &str) -> String {
796    let lower = current.to_lowercase();
797    if lower.contains("opus") {
798        // Opus → Sonnet
799        current.replace("opus", "sonnet")
800    } else if (lower.contains("gpt-5.4") || lower.contains("gpt-4.1"))
801        && !lower.contains("mini")
802        && !lower.contains("nano")
803    {
804        format!("{current}-mini")
805    } else if lower.contains("large") {
806        current.replace("large", "small")
807    } else {
808        // Already a small model, keep it.
809        current.to_string()
810    }
811}
812
813/// Build the system prompt from tool definitions, app state, and memory.
814pub fn build_system_prompt(tools: &ToolRegistry, state: &AppState) -> String {
815    let mut prompt = String::new();
816
817    prompt.push_str(
818        "You are an AI coding agent. You help users with software engineering tasks \
819         by reading, writing, and searching code. Use the tools available to you to \
820         accomplish tasks.\n\n",
821    );
822
823    // Environment context.
824    let shell = std::env::var("SHELL").unwrap_or_else(|_| "bash".to_string());
825    let is_git = std::path::Path::new(&state.cwd).join(".git").exists();
826    prompt.push_str(&format!(
827        "# Environment\n\
828         - Working directory: {}\n\
829         - Platform: {}\n\
830         - Shell: {shell}\n\
831         - Git repository: {}\n\n",
832        state.cwd,
833        std::env::consts::OS,
834        if is_git { "yes" } else { "no" },
835    ));
836
837    // Inject memory context (project + user + on-demand relevant).
838    let mut memory = crate::memory::MemoryContext::load(Some(std::path::Path::new(&state.cwd)));
839
840    // On-demand: surface relevant memories based on recent conversation.
841    let recent_text: String = state
842        .messages
843        .iter()
844        .rev()
845        .take(5)
846        .filter_map(|m| match m {
847            crate::llm::message::Message::User(u) => Some(
848                u.content
849                    .iter()
850                    .filter_map(|b| b.as_text())
851                    .collect::<Vec<_>>()
852                    .join(" "),
853            ),
854            _ => None,
855        })
856        .collect::<Vec<_>>()
857        .join(" ");
858
859    if !recent_text.is_empty() {
860        memory.load_relevant(&recent_text);
861    }
862
863    let memory_section = memory.to_system_prompt_section();
864    if !memory_section.is_empty() {
865        prompt.push_str(&memory_section);
866    }
867
868    // Tool documentation.
869    prompt.push_str("# Available Tools\n\n");
870    for tool in tools.all() {
871        if tool.is_enabled() {
872            prompt.push_str(&format!("## {}\n{}\n\n", tool.name(), tool.prompt()));
873        }
874    }
875
876    // Available skills.
877    let skills = crate::skills::SkillRegistry::load_all(Some(std::path::Path::new(&state.cwd)));
878    let invocable = skills.user_invocable();
879    if !invocable.is_empty() {
880        prompt.push_str("# Available Skills\n\n");
881        for skill in invocable {
882            let desc = skill.metadata.description.as_deref().unwrap_or("");
883            let when = skill.metadata.when_to_use.as_deref().unwrap_or("");
884            prompt.push_str(&format!("- `/{}`", skill.name));
885            if !desc.is_empty() {
886                prompt.push_str(&format!(": {desc}"));
887            }
888            if !when.is_empty() {
889                prompt.push_str(&format!(" (use when: {when})"));
890            }
891            prompt.push('\n');
892        }
893        prompt.push('\n');
894    }
895
896    // Guidelines and safety framework.
897    prompt.push_str(
898        "# Using tools\n\n\
899         Use dedicated tools instead of shell commands when available:\n\
900         - File search: Glob (not find or ls)\n\
901         - Content search: Grep (not grep or rg)\n\
902         - Read files: FileRead (not cat/head/tail)\n\
903         - Edit files: FileEdit (not sed/awk)\n\
904         - Write files: FileWrite (not echo/cat with redirect)\n\
905         - Reserve Bash for system commands and operations that require shell execution.\n\
906         - Break complex tasks into steps. Use multiple tool calls in parallel when independent.\n\
907         - Use the Agent tool for complex multi-step research or tasks that benefit from isolation.\n\n\
908         # Working with code\n\n\
909         - Read files before editing them. Understand existing code before suggesting changes.\n\
910         - Prefer editing existing files over creating new ones to avoid file bloat.\n\
911         - Only make changes that were requested. Don't add features, refactor, add comments, \
912           or make \"improvements\" beyond the ask.\n\
913         - Don't add error handling for scenarios that can't happen. Don't design for \
914           hypothetical future requirements.\n\
915         - When referencing code, include file_path:line_number.\n\
916         - Be careful not to introduce security vulnerabilities (command injection, XSS, SQL injection, \
917           OWASP top 10). If you notice insecure code you wrote, fix it immediately.\n\
918         - Don't add docstrings, comments, or type annotations to code you didn't change.\n\
919         - Three similar lines of code is better than a premature abstraction.\n\n\
920         # Git safety protocol\n\n\
921         - NEVER update the git config.\n\
922         - NEVER run destructive git commands (push --force, reset --hard, checkout ., restore ., \
923           clean -f, branch -D) unless the user explicitly requests them.\n\
924         - NEVER skip hooks (--no-verify, --no-gpg-sign) unless the user explicitly requests it.\n\
925         - NEVER force push to main/master. Warn the user if they request it.\n\
926         - Always create NEW commits rather than amending, unless the user explicitly requests amend. \
927           After hook failure, the commit did NOT happen — amend would modify the PREVIOUS commit.\n\
928         - When staging files, prefer adding specific files by name rather than git add -A or git add ., \
929           which can accidentally include sensitive files.\n\
930         - NEVER commit changes unless the user explicitly asks.\n\n\
931         # Committing changes\n\n\
932         When the user asks to commit:\n\
933         1. Run git status and git diff to see all changes.\n\
934         2. Run git log --oneline -5 to match the repository's commit message style.\n\
935         3. Draft a concise (1-2 sentence) commit message focusing on \"why\" not \"what\".\n\
936         4. Do not commit files that likely contain secrets (.env, credentials.json).\n\
937         5. Stage specific files, create the commit.\n\
938         6. If pre-commit hook fails, fix the issue and create a NEW commit.\n\
939         7. When creating commits, include a co-author attribution line at the end of the message.\n\n\
940         # Creating pull requests\n\n\
941         When the user asks to create a PR:\n\
942         1. Run git status, git diff, and git log to understand all changes on the branch.\n\
943         2. Analyze ALL commits (not just the latest) that will be in the PR.\n\
944         3. Draft a short title (under 70 chars) and detailed body with summary and test plan.\n\
945         4. Push to remote with -u flag if needed, then create PR using gh pr create.\n\
946         5. Return the PR URL when done.\n\n\
947         # Executing actions safely\n\n\
948         Consider the reversibility and blast radius of every action:\n\
949         - Freely take local, reversible actions (editing files, running tests).\n\
950         - For hard-to-reverse or shared-state actions, confirm with the user first:\n\
951           - Destructive: deleting files/branches, dropping tables, rm -rf, overwriting uncommitted changes.\n\
952           - Hard to reverse: force-pushing, git reset --hard, amending published commits.\n\
953           - Visible to others: pushing code, creating/commenting on PRs/issues, sending messages.\n\
954         - When you encounter an obstacle, do not use destructive actions as a shortcut. \
955           Identify root causes and fix underlying issues.\n\
956         - If you discover unexpected state (unfamiliar files, branches, config), investigate \
957           before deleting or overwriting — it may be the user's in-progress work.\n\n\
958         # Response style\n\n\
959         - Be concise. Lead with the answer or action, not the reasoning.\n\
960         - Skip filler, preamble, and unnecessary transitions.\n\
961         - Don't restate what the user said.\n\
962         - If you can say it in one sentence, don't use three.\n\
963         - Focus output on: decisions that need input, status updates, and errors that change the plan.\n\
964         - When referencing GitHub issues or PRs, use owner/repo#123 format.\n\
965         - Only use emojis if the user explicitly requests it.\n\n\
966         # Memory\n\n\
967         You can save information across sessions by writing memory files.\n\
968         - Save to: ~/.config/agent-code/memory/ (one .md file per topic)\n\
969         - Each file needs YAML frontmatter: name, description, type (user/feedback/project/reference)\n\
970         - After writing a file, update MEMORY.md with a one-line pointer\n\
971         - Memory types: user (role, preferences), feedback (corrections, confirmations), \
972           project (decisions, deadlines), reference (external resources)\n\
973         - Do NOT store: code patterns, git history, debugging solutions, anything derivable from code\n\
974         - Memory is a hint — always verify against current state before acting on it\n",
975    );
976
977    // Detailed tool usage examples and workflow patterns.
978    prompt.push_str(
979        "# Tool usage patterns\n\n\
980         Common patterns for effective tool use:\n\n\
981         **Read before edit**: Always read a file before editing it. This ensures you \
982         understand the current state and can make targeted changes.\n\
983         ```\n\
984         1. FileRead file_path → understand structure\n\
985         2. FileEdit old_string, new_string → targeted change\n\
986         ```\n\n\
987         **Search then act**: Use Glob to find files, Grep to find content, then read/edit.\n\
988         ```\n\
989         1. Glob **/*.rs → find Rust files\n\
990         2. Grep pattern path → find specific code\n\
991         3. FileRead → read the match\n\
992         4. FileEdit → make the change\n\
993         ```\n\n\
994         **Parallel tool calls**: When you need to read multiple independent files or run \
995         independent searches, make all the tool calls in one response. Don't serialize \
996         independent operations.\n\n\
997         **Test after change**: After editing code, run tests to verify the change works.\n\
998         ```\n\
999         1. FileEdit → make change\n\
1000         2. Bash cargo test / pytest / npm test → verify\n\
1001         3. If tests fail, read the error, fix, re-test\n\
1002         ```\n\n\
1003         # Error recovery\n\n\
1004         When something goes wrong:\n\
1005         - **Tool not found**: Use ToolSearch to find the right tool name.\n\
1006         - **Permission denied**: Explain why the action is needed, ask the user to approve.\n\
1007         - **File not found**: Use Glob to find the correct path. Check for typos.\n\
1008         - **Edit failed (not unique)**: Provide more surrounding context in old_string, \
1009           or use replace_all=true if renaming.\n\
1010         - **Command failed**: Read the full error message. Don't retry the same command. \
1011           Diagnose the root cause first.\n\
1012         - **Context too large**: The system will auto-compact. If you need specific \
1013           information from before compaction, re-read the relevant files.\n\
1014         - **Rate limited**: The system will auto-retry with backoff. Just wait.\n\n\
1015         # Common workflows\n\n\
1016         **Bug fix**: Read the failing test → read the source code it tests → \
1017         identify the bug → fix it → run the test → confirm it passes.\n\n\
1018         **New feature**: Read existing patterns in the codebase → create or edit files → \
1019         add tests → run tests → update docs if needed.\n\n\
1020         **Code review**: Read the diff → identify issues (bugs, security, style) → \
1021         report findings with file:line references.\n\n\
1022         **Refactor**: Search for all usages of the symbol → plan the changes → \
1023         edit each file → run tests to verify nothing broke.\n\n",
1024    );
1025
1026    // MCP server instructions (dynamic, per-server).
1027    if !state.config.mcp_servers.is_empty() {
1028        prompt.push_str("# MCP Servers\n\n");
1029        prompt.push_str(
1030            "Connected MCP servers provide additional tools. MCP tools are prefixed \
1031             with `mcp__{server}__{tool}`. Use them like any other tool.\n\n",
1032        );
1033        for (name, entry) in &state.config.mcp_servers {
1034            let transport = if entry.command.is_some() {
1035                "stdio"
1036            } else if entry.url.is_some() {
1037                "sse"
1038            } else {
1039                "unknown"
1040            };
1041            prompt.push_str(&format!("- **{name}** ({transport})\n"));
1042        }
1043        prompt.push('\n');
1044    }
1045
1046    // Deferred tools listing.
1047    let deferred = tools.deferred_names();
1048    if !deferred.is_empty() {
1049        prompt.push_str("# Deferred Tools\n\n");
1050        prompt.push_str(
1051            "These tools are available but not loaded by default. \
1052             Use ToolSearch to load them when needed:\n",
1053        );
1054        for name in &deferred {
1055            prompt.push_str(&format!("- {name}\n"));
1056        }
1057        prompt.push('\n');
1058    }
1059
1060    // Task management guidance.
1061    prompt.push_str(
1062        "# Task management\n\n\
1063         - Use TaskCreate to break complex work into trackable steps.\n\
1064         - Mark tasks as in_progress when starting, completed when done.\n\
1065         - Use the Agent tool to spawn subagents for parallel independent work.\n\
1066         - Use EnterPlanMode/ExitPlanMode for read-only exploration before making changes.\n\
1067         - Use EnterWorktree/ExitWorktree for isolated changes in git worktrees.\n\n\
1068         # Output formatting\n\n\
1069         - All text output is displayed to the user. Use GitHub-flavored markdown.\n\
1070         - Use fenced code blocks with language hints for code: ```rust, ```python, etc.\n\
1071         - Use inline `code` for file names, function names, and short code references.\n\
1072         - Use tables for structured comparisons.\n\
1073         - Use bullet lists for multiple items.\n\
1074         - Keep paragraphs short (2-3 sentences).\n\
1075         - Never output raw HTML or complex formatting — stick to standard markdown.\n",
1076    );
1077
1078    prompt
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use super::*;
1084
1085    /// Verify that cancelling via the shared handle cancels the current
1086    /// turn's token (regression: the signal handler previously held a
1087    /// stale clone that couldn't cancel subsequent turns).
1088    #[test]
1089    fn cancel_shared_propagates_to_current_token() {
1090        let root = CancellationToken::new();
1091        let shared = Arc::new(std::sync::Mutex::new(root.clone()));
1092
1093        // Simulate turn reset: create a new token and update the shared handle.
1094        let turn1 = CancellationToken::new();
1095        *shared.lock().unwrap() = turn1.clone();
1096
1097        // Cancelling via the shared handle should cancel turn1.
1098        shared.lock().unwrap().cancel();
1099        assert!(turn1.is_cancelled());
1100
1101        // New turn: replace the token. The old cancellation shouldn't affect it.
1102        let turn2 = CancellationToken::new();
1103        *shared.lock().unwrap() = turn2.clone();
1104        assert!(!turn2.is_cancelled());
1105
1106        // Cancelling via shared should cancel turn2.
1107        shared.lock().unwrap().cancel();
1108        assert!(turn2.is_cancelled());
1109    }
1110
1111    /// Verify that the streaming loop breaks on cancellation by simulating
1112    /// the select pattern used in run_turn_with_sink.
1113    #[tokio::test]
1114    async fn stream_loop_responds_to_cancellation() {
1115        let cancel = CancellationToken::new();
1116        let (tx, mut rx) = tokio::sync::mpsc::channel::<StreamEvent>(10);
1117
1118        // Simulate a slow stream: send one event, then cancel before more arrive.
1119        tx.send(StreamEvent::TextDelta("hello".into()))
1120            .await
1121            .unwrap();
1122
1123        let cancel2 = cancel.clone();
1124        tokio::spawn(async move {
1125            // Small delay, then cancel.
1126            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1127            cancel2.cancel();
1128        });
1129
1130        let mut events_received = 0u32;
1131        let mut cancelled = false;
1132
1133        loop {
1134            tokio::select! {
1135                event = rx.recv() => {
1136                    match event {
1137                        Some(_) => events_received += 1,
1138                        None => break,
1139                    }
1140                }
1141                _ = cancel.cancelled() => {
1142                    cancelled = true;
1143                    break;
1144                }
1145            }
1146        }
1147
1148        assert!(cancelled, "Loop should have been cancelled");
1149        assert_eq!(
1150            events_received, 1,
1151            "Should have received exactly one event before cancel"
1152        );
1153    }
1154
1155    // ------------------------------------------------------------------
1156    // End-to-end regression tests for #103.
1157    //
1158    // These tests build a real QueryEngine with a mock Provider and
1159    // exercise run_turn_with_sink directly, verifying that cancellation
1160    // actually interrupts the streaming loop (not just the select!
1161    // pattern in isolation).
1162    // ------------------------------------------------------------------
1163
1164    use crate::llm::provider::{Provider, ProviderError, ProviderRequest};
1165
1166    /// A provider whose stream yields one TextDelta and then hangs forever.
1167    /// Simulates the real bug: a slow LLM response the user wants to interrupt.
1168    struct HangingProvider;
1169
1170    #[async_trait::async_trait]
1171    impl Provider for HangingProvider {
1172        fn name(&self) -> &str {
1173            "hanging-mock"
1174        }
1175
1176        async fn stream(
1177            &self,
1178            _request: &ProviderRequest,
1179        ) -> Result<tokio::sync::mpsc::Receiver<StreamEvent>, ProviderError> {
1180            let (tx, rx) = tokio::sync::mpsc::channel(4);
1181            tokio::spawn(async move {
1182                let _ = tx.send(StreamEvent::TextDelta("thinking...".into())).await;
1183                // Hang forever without closing the channel or sending Done.
1184                let _tx_holder = tx;
1185                std::future::pending::<()>().await;
1186            });
1187            Ok(rx)
1188        }
1189    }
1190
1191    /// A provider that completes a turn normally: emits text and a Done event.
1192    struct CompletingProvider;
1193
1194    #[async_trait::async_trait]
1195    impl Provider for CompletingProvider {
1196        fn name(&self) -> &str {
1197            "completing-mock"
1198        }
1199
1200        async fn stream(
1201            &self,
1202            _request: &ProviderRequest,
1203        ) -> Result<tokio::sync::mpsc::Receiver<StreamEvent>, ProviderError> {
1204            let (tx, rx) = tokio::sync::mpsc::channel(8);
1205            tokio::spawn(async move {
1206                let _ = tx.send(StreamEvent::TextDelta("hello".into())).await;
1207                let _ = tx
1208                    .send(StreamEvent::ContentBlockComplete(ContentBlock::Text {
1209                        text: "hello".into(),
1210                    }))
1211                    .await;
1212                let _ = tx
1213                    .send(StreamEvent::Done {
1214                        usage: Usage::default(),
1215                        stop_reason: Some(StopReason::EndTurn),
1216                    })
1217                    .await;
1218                // Channel closes when tx drops.
1219            });
1220            Ok(rx)
1221        }
1222    }
1223
1224    fn build_engine(llm: Arc<dyn Provider>) -> QueryEngine {
1225        use crate::config::Config;
1226        use crate::permissions::PermissionChecker;
1227        use crate::state::AppState;
1228        use crate::tools::registry::ToolRegistry;
1229
1230        let config = Config::default();
1231        let permissions = PermissionChecker::from_config(&config.permissions);
1232        let state = AppState::new(config);
1233
1234        QueryEngine::new(
1235            llm,
1236            ToolRegistry::default_tools(),
1237            permissions,
1238            state,
1239            QueryEngineConfig {
1240                max_turns: Some(1),
1241                verbose: false,
1242                unattended: true,
1243            },
1244        )
1245    }
1246
1247    /// Schedule a cancellation after `delay_ms` via the shared handle
1248    /// (same path the signal handler uses).
1249    fn schedule_cancel(engine: &QueryEngine, delay_ms: u64) {
1250        let shared = engine.cancel_shared.clone();
1251        tokio::spawn(async move {
1252            tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1253            shared.lock().unwrap().cancel();
1254        });
1255    }
1256
1257    /// Builds a mock provider whose stream yields one TextDelta and then hangs.
1258    /// Verifies the turn returns promptly once cancel fires.
1259    #[tokio::test]
1260    async fn run_turn_with_sink_interrupts_on_cancel() {
1261        let mut engine = build_engine(Arc::new(HangingProvider));
1262        schedule_cancel(&engine, 100);
1263
1264        let result = tokio::time::timeout(
1265            std::time::Duration::from_secs(5),
1266            engine.run_turn_with_sink("test input", &NullSink),
1267        )
1268        .await;
1269
1270        assert!(
1271            result.is_ok(),
1272            "run_turn_with_sink should return promptly on cancel, not hang"
1273        );
1274        assert!(
1275            result.unwrap().is_ok(),
1276            "cancelled turn should return Ok(()), not an error"
1277        );
1278        assert!(
1279            !engine.state().is_query_active,
1280            "is_query_active should be reset after cancel"
1281        );
1282    }
1283
1284    /// Regression test for the original #103 bug: the signal handler held
1285    /// a stale clone of the cancellation token, so Ctrl+C only worked on
1286    /// the *first* turn. This test cancels turn 1, then runs turn 2 and
1287    /// verifies it is ALSO cancellable via the same shared handle.
1288    #[tokio::test]
1289    async fn cancel_works_across_multiple_turns() {
1290        let mut engine = build_engine(Arc::new(HangingProvider));
1291
1292        // Turn 1: cancel mid-stream.
1293        schedule_cancel(&engine, 80);
1294        let r1 = tokio::time::timeout(
1295            std::time::Duration::from_secs(5),
1296            engine.run_turn_with_sink("turn 1", &NullSink),
1297        )
1298        .await;
1299        assert!(r1.is_ok(), "turn 1 should cancel promptly");
1300        assert!(!engine.state().is_query_active);
1301
1302        // Turn 2: cancel again via the same shared handle.
1303        // With the pre-fix stale-token bug, the handle would be pointing
1304        // at turn 1's already-used token and turn 2 would hang forever.
1305        schedule_cancel(&engine, 80);
1306        let r2 = tokio::time::timeout(
1307            std::time::Duration::from_secs(5),
1308            engine.run_turn_with_sink("turn 2", &NullSink),
1309        )
1310        .await;
1311        assert!(
1312            r2.is_ok(),
1313            "turn 2 should also cancel promptly — regression would hang here"
1314        );
1315        assert!(!engine.state().is_query_active);
1316
1317        // Turn 3: one more for good measure.
1318        schedule_cancel(&engine, 80);
1319        let r3 = tokio::time::timeout(
1320            std::time::Duration::from_secs(5),
1321            engine.run_turn_with_sink("turn 3", &NullSink),
1322        )
1323        .await;
1324        assert!(r3.is_ok(), "turn 3 should still be cancellable");
1325        assert!(!engine.state().is_query_active);
1326    }
1327
1328    /// Verifies that a previously-cancelled token does not poison subsequent
1329    /// turns. A fresh run_turn_with_sink on the same engine should complete
1330    /// normally even after a prior cancel.
1331    #[tokio::test]
1332    async fn cancel_does_not_poison_next_turn() {
1333        // Turn 1: hangs and gets cancelled.
1334        let mut engine = build_engine(Arc::new(HangingProvider));
1335        schedule_cancel(&engine, 80);
1336        let _ = tokio::time::timeout(
1337            std::time::Duration::from_secs(5),
1338            engine.run_turn_with_sink("turn 1", &NullSink),
1339        )
1340        .await
1341        .expect("turn 1 should cancel");
1342
1343        // Swap the provider to one that completes normally by rebuilding
1344        // the engine (we can't swap llm on an existing engine, so this
1345        // simulates the isolated "fresh turn" behavior). The key property
1346        // being tested is that the per-turn cancel reset correctly
1347        // initializes a non-cancelled token.
1348        let mut engine2 = build_engine(Arc::new(CompletingProvider));
1349
1350        // Pre-cancel engine2 to simulate a leftover cancelled state, then
1351        // verify run_turn_with_sink still works because it resets the token.
1352        engine2.cancel_shared.lock().unwrap().cancel();
1353
1354        let result = tokio::time::timeout(
1355            std::time::Duration::from_secs(5),
1356            engine2.run_turn_with_sink("hello", &NullSink),
1357        )
1358        .await;
1359
1360        assert!(result.is_ok(), "completing turn should not hang");
1361        assert!(
1362            result.unwrap().is_ok(),
1363            "turn should succeed — the stale cancel flag must be cleared on turn start"
1364        );
1365        // Message history should contain the user + assistant messages.
1366        assert!(
1367            engine2.state().messages.len() >= 2,
1368            "normal turn should push both user and assistant messages"
1369        );
1370    }
1371
1372    /// Verifies that cancelling BEFORE any event arrives still interrupts
1373    /// the turn cleanly (edge case: cancellation races with the first recv).
1374    #[tokio::test]
1375    async fn cancel_before_first_event_interrupts_cleanly() {
1376        let mut engine = build_engine(Arc::new(HangingProvider));
1377        // Very short delay so cancel likely fires before or during the
1378        // first event. The test is tolerant of either ordering.
1379        schedule_cancel(&engine, 1);
1380
1381        let result = tokio::time::timeout(
1382            std::time::Duration::from_secs(5),
1383            engine.run_turn_with_sink("immediate", &NullSink),
1384        )
1385        .await;
1386
1387        assert!(result.is_ok(), "early cancel should not hang");
1388        assert!(result.unwrap().is_ok());
1389        assert!(!engine.state().is_query_active);
1390    }
1391
1392    /// Verifies the sink receives cancellation feedback via on_warning.
1393    #[tokio::test]
1394    async fn cancelled_turn_emits_warning_to_sink() {
1395        use std::sync::Mutex;
1396
1397        /// Captures sink events for assertion.
1398        struct CapturingSink {
1399            warnings: Mutex<Vec<String>>,
1400        }
1401
1402        impl StreamSink for CapturingSink {
1403            fn on_text(&self, _: &str) {}
1404            fn on_tool_start(&self, _: &str, _: &serde_json::Value) {}
1405            fn on_tool_result(&self, _: &str, _: &crate::tools::ToolResult) {}
1406            fn on_error(&self, _: &str) {}
1407            fn on_warning(&self, msg: &str) {
1408                self.warnings.lock().unwrap().push(msg.to_string());
1409            }
1410        }
1411
1412        let sink = CapturingSink {
1413            warnings: Mutex::new(Vec::new()),
1414        };
1415
1416        let mut engine = build_engine(Arc::new(HangingProvider));
1417        schedule_cancel(&engine, 100);
1418
1419        let _ = tokio::time::timeout(
1420            std::time::Duration::from_secs(5),
1421            engine.run_turn_with_sink("test", &sink),
1422        )
1423        .await
1424        .expect("should not hang");
1425
1426        let warnings = sink.warnings.lock().unwrap();
1427        assert!(
1428            warnings.iter().any(|w| w.contains("Cancelled")),
1429            "expected 'Cancelled' warning in sink, got: {:?}",
1430            *warnings
1431        );
1432    }
1433}