1pub mod source;
16
17use std::path::PathBuf;
18use std::sync::Arc;
19
20use tokio_util::sync::CancellationToken;
21use tracing::{debug, info, warn};
22use uuid::Uuid;
23
24use crate::hooks::{HookEvent, HookRegistry};
25use crate::llm::message::*;
26use crate::llm::provider::{Provider, ProviderError, ProviderRequest};
27use crate::llm::stream::StreamEvent;
28use crate::permissions::PermissionChecker;
29use crate::services::compact::{self, CompactTracking, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT};
30use crate::services::tokens;
31use crate::state::AppState;
32use crate::tools::ToolContext;
33use crate::tools::executor::{execute_tool_calls, extract_tool_calls};
34use crate::tools::registry::ToolRegistry;
35
36pub struct QueryEngineConfig {
38 pub max_turns: Option<usize>,
39 pub verbose: bool,
40 pub unattended: bool,
42}
43
44pub struct QueryEngine {
50 llm: Arc<dyn Provider>,
51 tools: ToolRegistry,
52 file_cache: Arc<tokio::sync::Mutex<crate::services::file_cache::FileCache>>,
53 permissions: Arc<PermissionChecker>,
54 state: AppState,
55 config: QueryEngineConfig,
56 cancel_shared: Arc<std::sync::Mutex<CancellationToken>>,
58 cancel: CancellationToken,
60 hooks: HookRegistry,
61 cache_tracker: crate::services::cache_tracking::CacheTracker,
62 denial_tracker: Arc<tokio::sync::Mutex<crate::permissions::tracking::DenialTracker>>,
63 extraction_state: Arc<tokio::sync::Mutex<crate::memory::extraction::ExtractionState>>,
64 session_allows: Arc<tokio::sync::Mutex<std::collections::HashSet<String>>>,
65 permission_prompter: Option<Arc<dyn crate::tools::PermissionPrompter>>,
66 cached_system_prompt: Option<(u64, String)>, }
69
70pub trait StreamSink: Send + Sync {
72 fn on_text(&self, text: &str);
73 fn on_tool_start(&self, tool_name: &str, input: &serde_json::Value);
74 fn on_tool_result(&self, tool_name: &str, result: &crate::tools::ToolResult);
75 fn on_thinking(&self, _text: &str) {}
76 fn on_turn_complete(&self, _turn: usize) {}
77 fn on_error(&self, error: &str);
78 fn on_usage(&self, _usage: &Usage) {}
79 fn on_compact(&self, _freed_tokens: u64) {}
80 fn on_warning(&self, _msg: &str) {}
81}
82
83pub struct NullSink;
85impl StreamSink for NullSink {
86 fn on_text(&self, _: &str) {}
87 fn on_tool_start(&self, _: &str, _: &serde_json::Value) {}
88 fn on_tool_result(&self, _: &str, _: &crate::tools::ToolResult) {}
89 fn on_error(&self, _: &str) {}
90}
91
92impl QueryEngine {
93 pub fn new(
94 llm: Arc<dyn Provider>,
95 tools: ToolRegistry,
96 permissions: PermissionChecker,
97 state: AppState,
98 config: QueryEngineConfig,
99 ) -> Self {
100 let cancel = CancellationToken::new();
101 let cancel_shared = Arc::new(std::sync::Mutex::new(cancel.clone()));
102 Self {
103 llm,
104 tools,
105 file_cache: Arc::new(tokio::sync::Mutex::new(
106 crate::services::file_cache::FileCache::new(),
107 )),
108 permissions: Arc::new(permissions),
109 state,
110 config,
111 cancel,
112 cancel_shared,
113 hooks: HookRegistry::new(),
114 cache_tracker: crate::services::cache_tracking::CacheTracker::new(),
115 denial_tracker: Arc::new(tokio::sync::Mutex::new(
116 crate::permissions::tracking::DenialTracker::new(100),
117 )),
118 extraction_state: Arc::new(tokio::sync::Mutex::new(
119 crate::memory::extraction::ExtractionState::new(),
120 )),
121 session_allows: Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new())),
122 permission_prompter: None,
123 cached_system_prompt: None,
124 }
125 }
126
127 pub fn load_hooks(&mut self, hook_defs: &[crate::hooks::HookDefinition]) {
129 for def in hook_defs {
130 self.hooks.register(def.clone());
131 }
132 if !hook_defs.is_empty() {
133 tracing::info!("Loaded {} hooks from config", hook_defs.len());
134 }
135 }
136
137 pub fn state(&self) -> &AppState {
139 &self.state
140 }
141
142 pub fn state_mut(&mut self) -> &mut AppState {
144 &mut self.state
145 }
146
147 pub fn install_signal_handler(&self) {
151 let shared = self.cancel_shared.clone();
152 tokio::spawn(async move {
153 let mut pending = false;
154 loop {
155 if tokio::signal::ctrl_c().await.is_ok() {
156 let token = shared.lock().unwrap().clone();
157 if token.is_cancelled() && pending {
158 std::process::exit(130);
160 }
161 token.cancel();
162 pending = true;
163 }
164 }
165 });
166 }
167
168 pub async fn run_turn(&mut self, user_input: &str) -> crate::error::Result<()> {
170 self.run_turn_with_sink(user_input, &NullSink).await
171 }
172
173 pub async fn run_turn_with_sink(
175 &mut self,
176 user_input: &str,
177 sink: &dyn StreamSink,
178 ) -> crate::error::Result<()> {
179 self.cancel = CancellationToken::new();
182 *self.cancel_shared.lock().unwrap() = self.cancel.clone();
183
184 let user_msg = user_message(user_input);
186 self.state.push_message(user_msg);
187
188 let max_turns = self.config.max_turns.unwrap_or(50);
189 let mut compact_tracking = CompactTracking::default();
190 let mut retry_state = crate::llm::retry::RetryState::default();
191 let retry_config = crate::llm::retry::RetryConfig::default();
192 let mut max_output_recovery_count = 0u32;
193
194 for turn in 0..max_turns {
196 self.state.turn_count = turn + 1;
197 self.state.is_query_active = true;
198
199 let budget_config = crate::services::budget::BudgetConfig::default();
201 match crate::services::budget::check_budget(
202 self.state.total_cost_usd,
203 self.state.total_usage.total(),
204 &budget_config,
205 ) {
206 crate::services::budget::BudgetDecision::Stop { message } => {
207 sink.on_warning(&message);
208 self.state.is_query_active = false;
209 return Ok(());
210 }
211 crate::services::budget::BudgetDecision::ContinueWithWarning {
212 message, ..
213 } => {
214 sink.on_warning(&message);
215 }
216 crate::services::budget::BudgetDecision::Continue => {}
217 }
218
219 crate::llm::normalize::ensure_tool_result_pairing(&mut self.state.messages);
221 crate::llm::normalize::strip_empty_blocks(&mut self.state.messages);
222 crate::llm::normalize::remove_empty_messages(&mut self.state.messages);
223 crate::llm::normalize::cap_document_blocks(&mut self.state.messages, 500_000);
224 crate::llm::normalize::merge_consecutive_user_messages(&mut self.state.messages);
225
226 debug!("Agent turn {}/{}", turn + 1, max_turns);
227
228 let mut model = self.state.config.api.model.clone();
229
230 if compact::should_auto_compact(self.state.history(), &model, &compact_tracking) {
232 let token_count = tokens::estimate_context_tokens(self.state.history());
233 let threshold = compact::auto_compact_threshold(&model);
234 info!("Auto-compact triggered: {token_count} tokens >= {threshold} threshold");
235
236 let freed = compact::microcompact(&mut self.state.messages, 5);
238 if freed > 0 {
239 sink.on_compact(freed);
240 info!("Microcompact freed ~{freed} tokens");
241 }
242
243 let post_mc_tokens = tokens::estimate_context_tokens(self.state.history());
245 if post_mc_tokens >= threshold {
246 info!("Microcompact insufficient, attempting LLM compaction");
248 match compact::compact_with_llm(&mut self.state.messages, &*self.llm, &model)
249 .await
250 {
251 Some(removed) => {
252 info!("LLM compaction removed {removed} messages");
253 compact_tracking.was_compacted = true;
254 compact_tracking.consecutive_failures = 0;
255 }
256 None => {
257 compact_tracking.consecutive_failures += 1;
258 warn!(
259 "LLM compaction failed (attempt {})",
260 compact_tracking.consecutive_failures
261 );
262 let effective = compact::effective_context_window(&model);
264 if let Some(collapse) =
265 crate::services::context_collapse::collapse_to_budget(
266 self.state.history(),
267 effective,
268 )
269 {
270 info!(
271 "Context collapse snipped {} messages, freed ~{} tokens",
272 collapse.snipped_count, collapse.tokens_freed
273 );
274 self.state.messages = collapse.api_messages;
275 sink.on_compact(collapse.tokens_freed);
276 } else {
277 let freed2 = compact::microcompact(&mut self.state.messages, 2);
279 if freed2 > 0 {
280 sink.on_compact(freed2);
281 }
282 }
283 }
284 }
285 }
286 }
287
288 if compact_tracking.was_compacted && self.state.config.features.compaction_reminders {
290 let reminder = user_message(
291 "<system-reminder>Context was automatically compacted. \
292 Earlier messages were summarized. If you need details from \
293 before compaction, ask the user or re-read the relevant files.</system-reminder>",
294 );
295 self.state.push_message(reminder);
296 compact_tracking.was_compacted = false; }
298
299 let warning = compact::token_warning_state(self.state.history(), &model);
301 if warning.is_blocking {
302 sink.on_warning("Context window nearly full. Consider starting a new session.");
303 } else if warning.is_above_warning {
304 sink.on_warning(&format!("Context {}% remaining", warning.percent_left));
305 }
306
307 let prompt_hash = {
310 use std::hash::{Hash, Hasher};
311 let mut h = std::collections::hash_map::DefaultHasher::new();
312 self.state.config.api.model.hash(&mut h);
313 self.state.cwd.hash(&mut h);
314 self.state.config.mcp_servers.len().hash(&mut h);
315 self.tools.all().len().hash(&mut h);
316 h.finish()
317 };
318 let system_prompt = if let Some((cached_hash, ref cached)) = self.cached_system_prompt
319 && cached_hash == prompt_hash
320 {
321 cached.clone()
322 } else {
323 let prompt = build_system_prompt(&self.tools, &self.state);
324 self.cached_system_prompt = Some((prompt_hash, prompt.clone()));
325 prompt
326 };
327 let tool_schemas = self.tools.core_schemas();
329
330 let base_tokens = self.state.config.api.max_output_tokens.unwrap_or(16384);
332 let effective_tokens = if max_output_recovery_count > 0 {
333 base_tokens.max(65536) } else {
335 base_tokens
336 };
337
338 let request = ProviderRequest {
339 messages: self.state.history().to_vec(),
340 system_prompt: system_prompt.clone(),
341 tools: tool_schemas.clone(),
342 model: model.clone(),
343 max_tokens: effective_tokens,
344 temperature: None,
345 enable_caching: self.state.config.features.prompt_caching,
346 tool_choice: Default::default(),
347 metadata: None,
348 };
349
350 let mut rx = match self.llm.stream(&request).await {
351 Ok(rx) => {
352 retry_state.reset();
353 rx
354 }
355 Err(e) => {
356 let retryable = match &e {
357 ProviderError::RateLimited { retry_after_ms } => {
358 crate::llm::retry::RetryableError::RateLimited {
359 retry_after: *retry_after_ms,
360 }
361 }
362 ProviderError::Overloaded => crate::llm::retry::RetryableError::Overloaded,
363 ProviderError::Network(_) => {
364 crate::llm::retry::RetryableError::StreamInterrupted
365 }
366 other => crate::llm::retry::RetryableError::NonRetryable(other.to_string()),
367 };
368
369 match retry_state.next_action(&retryable, &retry_config) {
370 crate::llm::retry::RetryAction::Retry { after } => {
371 warn!("Retrying in {}ms", after.as_millis());
372 tokio::time::sleep(after).await;
373 continue;
374 }
375 crate::llm::retry::RetryAction::FallbackModel => {
376 let fallback = get_fallback_model(&model);
378 sink.on_warning(&format!("Falling back from {model} to {fallback}"));
379 model = fallback;
380 continue;
381 }
382 crate::llm::retry::RetryAction::Abort(reason) => {
383 if self.config.unattended
386 && self.state.config.features.unattended_retry
387 && matches!(
388 &e,
389 ProviderError::Overloaded | ProviderError::RateLimited { .. }
390 )
391 {
392 warn!("Unattended retry: waiting 30s for capacity");
393 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
394 continue;
395 }
396 if let ProviderError::RequestTooLarge(body) = &e {
399 let gap = compact::parse_prompt_too_long_gap(body);
400
401 let effective = compact::effective_context_window(&model);
403 if let Some(collapse) =
404 crate::services::context_collapse::collapse_to_budget(
405 self.state.history(),
406 effective,
407 )
408 {
409 info!(
410 "Reactive collapse: snipped {} messages, freed ~{} tokens",
411 collapse.snipped_count, collapse.tokens_freed
412 );
413 self.state.messages = collapse.api_messages;
414 sink.on_compact(collapse.tokens_freed);
415 continue;
416 }
417
418 let freed = compact::microcompact(&mut self.state.messages, 1);
420 if freed > 0 {
421 sink.on_compact(freed);
422 info!(
423 "Reactive microcompact freed ~{freed} tokens (gap: {gap:?})"
424 );
425 continue;
426 }
427 }
428 sink.on_error(&reason);
429 self.state.is_query_active = false;
430 return Err(crate::error::Error::Other(e.to_string()));
431 }
432 }
433 }
434 };
435
436 let mut content_blocks = Vec::new();
439 let mut usage = Usage::default();
440 let mut stop_reason: Option<StopReason> = None;
441 let mut got_error = false;
442 let mut error_text = String::new();
443
444 let mut streaming_tool_handles: Vec<(
446 String,
447 String,
448 tokio::task::JoinHandle<crate::tools::ToolResult>,
449 )> = Vec::new();
450
451 let mut cancelled = false;
452 loop {
453 tokio::select! {
454 event = rx.recv() => {
455 match event {
456 Some(StreamEvent::TextDelta(text)) => {
457 sink.on_text(&text);
458 }
459 Some(StreamEvent::ContentBlockComplete(block)) => {
460 if let ContentBlock::ToolUse {
461 ref id,
462 ref name,
463 ref input,
464 } = block
465 {
466 sink.on_tool_start(name, input);
467
468 if let Some(tool) = self.tools.get(name)
470 && tool.is_read_only()
471 && tool.is_concurrency_safe()
472 {
473 let tool = tool.clone();
474 let input = input.clone();
475 let cwd = std::path::PathBuf::from(&self.state.cwd);
476 let cancel = self.cancel.clone();
477 let perm = self.permissions.clone();
478 let tool_id = id.clone();
479 let tool_name = name.clone();
480
481 let handle = tokio::spawn(async move {
482 match tool
483 .call(
484 input,
485 &ToolContext {
486 cwd,
487 cancel,
488 permission_checker: perm.clone(),
489 verbose: false,
490 plan_mode: false,
491 file_cache: None,
492 denial_tracker: None,
493 task_manager: None,
494 session_allows: None,
495 permission_prompter: None,
496 },
497 )
498 .await
499 {
500 Ok(r) => r,
501 Err(e) => crate::tools::ToolResult::error(e.to_string()),
502 }
503 });
504
505 streaming_tool_handles.push((tool_id, tool_name, handle));
506 }
507 }
508 if let ContentBlock::Thinking { ref thinking, .. } = block {
509 sink.on_thinking(thinking);
510 }
511 content_blocks.push(block);
512 }
513 Some(StreamEvent::Done {
514 usage: u,
515 stop_reason: sr,
516 }) => {
517 usage = u;
518 stop_reason = sr;
519 sink.on_usage(&usage);
520 }
521 Some(StreamEvent::Error(msg)) => {
522 got_error = true;
523 error_text = msg.clone();
524 sink.on_error(&msg);
525 }
526 Some(_) => {}
527 None => break,
528 }
529 }
530 _ = self.cancel.cancelled() => {
531 warn!("Turn cancelled by user");
532 cancelled = true;
533 for (_, _, handle) in streaming_tool_handles.drain(..) {
535 handle.abort();
536 }
537 break;
538 }
539 }
540 }
541
542 if cancelled {
543 sink.on_warning("Cancelled");
544 self.state.is_query_active = false;
545 return Ok(());
546 }
547
548 let assistant_msg = Message::Assistant(AssistantMessage {
550 uuid: Uuid::new_v4(),
551 timestamp: chrono::Utc::now().to_rfc3339(),
552 content: content_blocks.clone(),
553 model: Some(model.clone()),
554 usage: Some(usage.clone()),
555 stop_reason: stop_reason.clone(),
556 request_id: None,
557 });
558 self.state.push_message(assistant_msg);
559 self.state.record_usage(&usage, &model);
560
561 if self.state.config.features.token_budget && usage.total() > 0 {
563 let turn_total = usage.input_tokens + usage.output_tokens;
564 if turn_total > 100_000 {
565 sink.on_warning(&format!(
566 "High token usage this turn: {} tokens ({}in + {}out)",
567 turn_total, usage.input_tokens, usage.output_tokens
568 ));
569 }
570 }
571
572 let _cache_event = self.cache_tracker.record(&usage);
574 {
575 let mut span = crate::services::telemetry::api_call_span(
576 &model,
577 turn + 1,
578 &self.state.session_id,
579 );
580 crate::services::telemetry::record_usage(&mut span, &usage);
581 span.finish();
582 tracing::debug!(
583 "API call: {}ms, {}in/{}out tokens",
584 span.duration_ms().unwrap_or(0),
585 usage.input_tokens,
586 usage.output_tokens,
587 );
588 }
589
590 if got_error {
592 if error_text.contains("prompt is too long")
594 || error_text.contains("Prompt is too long")
595 {
596 let freed = compact::microcompact(&mut self.state.messages, 1);
597 if freed > 0 {
598 sink.on_compact(freed);
599 continue;
600 }
601 }
602
603 if content_blocks
605 .iter()
606 .any(|b| matches!(b, ContentBlock::Text { .. }))
607 && error_text.contains("max_tokens")
608 && max_output_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
609 {
610 max_output_recovery_count += 1;
611 info!(
612 "Max output tokens recovery attempt {}/{}",
613 max_output_recovery_count, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
614 );
615 let recovery_msg = compact::max_output_recovery_message();
616 self.state.push_message(recovery_msg);
617 continue;
618 }
619 }
620
621 if matches!(stop_reason, Some(StopReason::MaxTokens))
623 && !got_error
624 && content_blocks
625 .iter()
626 .any(|b| matches!(b, ContentBlock::Text { .. }))
627 && max_output_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
628 {
629 max_output_recovery_count += 1;
630 info!(
631 "Max tokens stop reason — recovery attempt {}/{}",
632 max_output_recovery_count, MAX_OUTPUT_TOKENS_RECOVERY_LIMIT
633 );
634 let recovery_msg = compact::max_output_recovery_message();
635 self.state.push_message(recovery_msg);
636 continue;
637 }
638
639 let tool_calls = extract_tool_calls(&content_blocks);
641
642 if tool_calls.is_empty() {
643 info!("Turn complete (no tool calls)");
645 sink.on_turn_complete(turn + 1);
646 self.state.is_query_active = false;
647
648 if self.state.config.features.extract_memories
651 && crate::memory::ensure_memory_dir().is_some()
652 {
653 let extraction_messages = self.state.messages.clone();
654 let extraction_state = self.extraction_state.clone();
655 let extraction_llm = self.llm.clone();
656 let extraction_model = model.clone();
657 tokio::spawn(async move {
658 crate::memory::extraction::extract_memories_background(
659 extraction_messages,
660 extraction_state,
661 extraction_llm,
662 extraction_model,
663 )
664 .await;
665 });
666 }
667
668 return Ok(());
669 }
670
671 info!("Executing {} tool call(s)", tool_calls.len());
673 let cwd = PathBuf::from(&self.state.cwd);
674 let tool_ctx = ToolContext {
675 cwd,
676 cancel: self.cancel.clone(),
677 permission_checker: self.permissions.clone(),
678 verbose: self.config.verbose,
679 plan_mode: self.state.plan_mode,
680 file_cache: Some(self.file_cache.clone()),
681 denial_tracker: Some(self.denial_tracker.clone()),
682 task_manager: Some(self.state.task_manager.clone()),
683 session_allows: Some(self.session_allows.clone()),
684 permission_prompter: self.permission_prompter.clone(),
685 };
686
687 let streaming_ids: std::collections::HashSet<String> = streaming_tool_handles
689 .iter()
690 .map(|(id, _, _)| id.clone())
691 .collect();
692
693 let mut streaming_results = Vec::new();
694 for (id, name, handle) in streaming_tool_handles.drain(..) {
695 match handle.await {
696 Ok(result) => streaming_results.push(crate::tools::executor::ToolCallResult {
697 tool_use_id: id,
698 tool_name: name,
699 result,
700 }),
701 Err(e) => streaming_results.push(crate::tools::executor::ToolCallResult {
702 tool_use_id: id,
703 tool_name: name,
704 result: crate::tools::ToolResult::error(format!("Task failed: {e}")),
705 }),
706 }
707 }
708
709 for call in &tool_calls {
711 self.hooks
712 .run_hooks(&HookEvent::PreToolUse, Some(&call.name), &call.input)
713 .await;
714 }
715
716 let remaining_calls: Vec<_> = tool_calls
718 .iter()
719 .filter(|c| !streaming_ids.contains(&c.id))
720 .cloned()
721 .collect();
722
723 let mut results = streaming_results;
724 if !remaining_calls.is_empty() {
725 let batch_results = execute_tool_calls(
726 &remaining_calls,
727 self.tools.all(),
728 &tool_ctx,
729 &self.permissions,
730 )
731 .await;
732 results.extend(batch_results);
733 }
734
735 for result in &results {
737 if !result.result.is_error {
739 match result.tool_name.as_str() {
740 "EnterPlanMode" => {
741 self.state.plan_mode = true;
742 info!("Plan mode enabled");
743 }
744 "ExitPlanMode" => {
745 self.state.plan_mode = false;
746 info!("Plan mode disabled");
747 }
748 _ => {}
749 }
750 }
751
752 sink.on_tool_result(&result.tool_name, &result.result);
753
754 self.hooks
756 .run_hooks(
757 &HookEvent::PostToolUse,
758 Some(&result.tool_name),
759 &serde_json::json!({
760 "tool": result.tool_name,
761 "is_error": result.result.is_error,
762 }),
763 )
764 .await;
765
766 let msg = tool_result_message(
767 &result.tool_use_id,
768 &result.result.content,
769 result.result.is_error,
770 );
771 self.state.push_message(msg);
772 }
773
774 }
776
777 warn!("Max turns ({max_turns}) reached");
778 sink.on_warning(&format!("Agent stopped after {max_turns} turns"));
779 self.state.is_query_active = false;
780 Ok(())
781 }
782
783 pub fn cancel(&self) {
785 self.cancel.cancel();
786 }
787
788 pub fn cancel_token(&self) -> tokio_util::sync::CancellationToken {
790 self.cancel.clone()
791 }
792}
793
794fn get_fallback_model(current: &str) -> String {
796 let lower = current.to_lowercase();
797 if lower.contains("opus") {
798 current.replace("opus", "sonnet")
800 } else if (lower.contains("gpt-5.4") || lower.contains("gpt-4.1"))
801 && !lower.contains("mini")
802 && !lower.contains("nano")
803 {
804 format!("{current}-mini")
805 } else if lower.contains("large") {
806 current.replace("large", "small")
807 } else {
808 current.to_string()
810 }
811}
812
813pub fn build_system_prompt(tools: &ToolRegistry, state: &AppState) -> String {
815 let mut prompt = String::new();
816
817 prompt.push_str(
818 "You are an AI coding agent. You help users with software engineering tasks \
819 by reading, writing, and searching code. Use the tools available to you to \
820 accomplish tasks.\n\n",
821 );
822
823 let shell = std::env::var("SHELL").unwrap_or_else(|_| "bash".to_string());
825 let is_git = std::path::Path::new(&state.cwd).join(".git").exists();
826 prompt.push_str(&format!(
827 "# Environment\n\
828 - Working directory: {}\n\
829 - Platform: {}\n\
830 - Shell: {shell}\n\
831 - Git repository: {}\n\n",
832 state.cwd,
833 std::env::consts::OS,
834 if is_git { "yes" } else { "no" },
835 ));
836
837 let mut memory = crate::memory::MemoryContext::load(Some(std::path::Path::new(&state.cwd)));
839
840 let recent_text: String = state
842 .messages
843 .iter()
844 .rev()
845 .take(5)
846 .filter_map(|m| match m {
847 crate::llm::message::Message::User(u) => Some(
848 u.content
849 .iter()
850 .filter_map(|b| b.as_text())
851 .collect::<Vec<_>>()
852 .join(" "),
853 ),
854 _ => None,
855 })
856 .collect::<Vec<_>>()
857 .join(" ");
858
859 if !recent_text.is_empty() {
860 memory.load_relevant(&recent_text);
861 }
862
863 let memory_section = memory.to_system_prompt_section();
864 if !memory_section.is_empty() {
865 prompt.push_str(&memory_section);
866 }
867
868 prompt.push_str("# Available Tools\n\n");
870 for tool in tools.all() {
871 if tool.is_enabled() {
872 prompt.push_str(&format!("## {}\n{}\n\n", tool.name(), tool.prompt()));
873 }
874 }
875
876 let skills = crate::skills::SkillRegistry::load_all(Some(std::path::Path::new(&state.cwd)));
878 let invocable = skills.user_invocable();
879 if !invocable.is_empty() {
880 prompt.push_str("# Available Skills\n\n");
881 for skill in invocable {
882 let desc = skill.metadata.description.as_deref().unwrap_or("");
883 let when = skill.metadata.when_to_use.as_deref().unwrap_or("");
884 prompt.push_str(&format!("- `/{}`", skill.name));
885 if !desc.is_empty() {
886 prompt.push_str(&format!(": {desc}"));
887 }
888 if !when.is_empty() {
889 prompt.push_str(&format!(" (use when: {when})"));
890 }
891 prompt.push('\n');
892 }
893 prompt.push('\n');
894 }
895
896 prompt.push_str(
898 "# Using tools\n\n\
899 Use dedicated tools instead of shell commands when available:\n\
900 - File search: Glob (not find or ls)\n\
901 - Content search: Grep (not grep or rg)\n\
902 - Read files: FileRead (not cat/head/tail)\n\
903 - Edit files: FileEdit (not sed/awk)\n\
904 - Write files: FileWrite (not echo/cat with redirect)\n\
905 - Reserve Bash for system commands and operations that require shell execution.\n\
906 - Break complex tasks into steps. Use multiple tool calls in parallel when independent.\n\
907 - Use the Agent tool for complex multi-step research or tasks that benefit from isolation.\n\n\
908 # Working with code\n\n\
909 - Read files before editing them. Understand existing code before suggesting changes.\n\
910 - Prefer editing existing files over creating new ones to avoid file bloat.\n\
911 - Only make changes that were requested. Don't add features, refactor, add comments, \
912 or make \"improvements\" beyond the ask.\n\
913 - Don't add error handling for scenarios that can't happen. Don't design for \
914 hypothetical future requirements.\n\
915 - When referencing code, include file_path:line_number.\n\
916 - Be careful not to introduce security vulnerabilities (command injection, XSS, SQL injection, \
917 OWASP top 10). If you notice insecure code you wrote, fix it immediately.\n\
918 - Don't add docstrings, comments, or type annotations to code you didn't change.\n\
919 - Three similar lines of code is better than a premature abstraction.\n\n\
920 # Git safety protocol\n\n\
921 - NEVER update the git config.\n\
922 - NEVER run destructive git commands (push --force, reset --hard, checkout ., restore ., \
923 clean -f, branch -D) unless the user explicitly requests them.\n\
924 - NEVER skip hooks (--no-verify, --no-gpg-sign) unless the user explicitly requests it.\n\
925 - NEVER force push to main/master. Warn the user if they request it.\n\
926 - Always create NEW commits rather than amending, unless the user explicitly requests amend. \
927 After hook failure, the commit did NOT happen — amend would modify the PREVIOUS commit.\n\
928 - When staging files, prefer adding specific files by name rather than git add -A or git add ., \
929 which can accidentally include sensitive files.\n\
930 - NEVER commit changes unless the user explicitly asks.\n\n\
931 # Committing changes\n\n\
932 When the user asks to commit:\n\
933 1. Run git status and git diff to see all changes.\n\
934 2. Run git log --oneline -5 to match the repository's commit message style.\n\
935 3. Draft a concise (1-2 sentence) commit message focusing on \"why\" not \"what\".\n\
936 4. Do not commit files that likely contain secrets (.env, credentials.json).\n\
937 5. Stage specific files, create the commit.\n\
938 6. If pre-commit hook fails, fix the issue and create a NEW commit.\n\
939 7. When creating commits, include a co-author attribution line at the end of the message.\n\n\
940 # Creating pull requests\n\n\
941 When the user asks to create a PR:\n\
942 1. Run git status, git diff, and git log to understand all changes on the branch.\n\
943 2. Analyze ALL commits (not just the latest) that will be in the PR.\n\
944 3. Draft a short title (under 70 chars) and detailed body with summary and test plan.\n\
945 4. Push to remote with -u flag if needed, then create PR using gh pr create.\n\
946 5. Return the PR URL when done.\n\n\
947 # Executing actions safely\n\n\
948 Consider the reversibility and blast radius of every action:\n\
949 - Freely take local, reversible actions (editing files, running tests).\n\
950 - For hard-to-reverse or shared-state actions, confirm with the user first:\n\
951 - Destructive: deleting files/branches, dropping tables, rm -rf, overwriting uncommitted changes.\n\
952 - Hard to reverse: force-pushing, git reset --hard, amending published commits.\n\
953 - Visible to others: pushing code, creating/commenting on PRs/issues, sending messages.\n\
954 - When you encounter an obstacle, do not use destructive actions as a shortcut. \
955 Identify root causes and fix underlying issues.\n\
956 - If you discover unexpected state (unfamiliar files, branches, config), investigate \
957 before deleting or overwriting — it may be the user's in-progress work.\n\n\
958 # Response style\n\n\
959 - Be concise. Lead with the answer or action, not the reasoning.\n\
960 - Skip filler, preamble, and unnecessary transitions.\n\
961 - Don't restate what the user said.\n\
962 - If you can say it in one sentence, don't use three.\n\
963 - Focus output on: decisions that need input, status updates, and errors that change the plan.\n\
964 - When referencing GitHub issues or PRs, use owner/repo#123 format.\n\
965 - Only use emojis if the user explicitly requests it.\n\n\
966 # Memory\n\n\
967 You can save information across sessions by writing memory files.\n\
968 - Save to: ~/.config/agent-code/memory/ (one .md file per topic)\n\
969 - Each file needs YAML frontmatter: name, description, type (user/feedback/project/reference)\n\
970 - After writing a file, update MEMORY.md with a one-line pointer\n\
971 - Memory types: user (role, preferences), feedback (corrections, confirmations), \
972 project (decisions, deadlines), reference (external resources)\n\
973 - Do NOT store: code patterns, git history, debugging solutions, anything derivable from code\n\
974 - Memory is a hint — always verify against current state before acting on it\n",
975 );
976
977 prompt.push_str(
979 "# Tool usage patterns\n\n\
980 Common patterns for effective tool use:\n\n\
981 **Read before edit**: Always read a file before editing it. This ensures you \
982 understand the current state and can make targeted changes.\n\
983 ```\n\
984 1. FileRead file_path → understand structure\n\
985 2. FileEdit old_string, new_string → targeted change\n\
986 ```\n\n\
987 **Search then act**: Use Glob to find files, Grep to find content, then read/edit.\n\
988 ```\n\
989 1. Glob **/*.rs → find Rust files\n\
990 2. Grep pattern path → find specific code\n\
991 3. FileRead → read the match\n\
992 4. FileEdit → make the change\n\
993 ```\n\n\
994 **Parallel tool calls**: When you need to read multiple independent files or run \
995 independent searches, make all the tool calls in one response. Don't serialize \
996 independent operations.\n\n\
997 **Test after change**: After editing code, run tests to verify the change works.\n\
998 ```\n\
999 1. FileEdit → make change\n\
1000 2. Bash cargo test / pytest / npm test → verify\n\
1001 3. If tests fail, read the error, fix, re-test\n\
1002 ```\n\n\
1003 # Error recovery\n\n\
1004 When something goes wrong:\n\
1005 - **Tool not found**: Use ToolSearch to find the right tool name.\n\
1006 - **Permission denied**: Explain why the action is needed, ask the user to approve.\n\
1007 - **File not found**: Use Glob to find the correct path. Check for typos.\n\
1008 - **Edit failed (not unique)**: Provide more surrounding context in old_string, \
1009 or use replace_all=true if renaming.\n\
1010 - **Command failed**: Read the full error message. Don't retry the same command. \
1011 Diagnose the root cause first.\n\
1012 - **Context too large**: The system will auto-compact. If you need specific \
1013 information from before compaction, re-read the relevant files.\n\
1014 - **Rate limited**: The system will auto-retry with backoff. Just wait.\n\n\
1015 # Common workflows\n\n\
1016 **Bug fix**: Read the failing test → read the source code it tests → \
1017 identify the bug → fix it → run the test → confirm it passes.\n\n\
1018 **New feature**: Read existing patterns in the codebase → create or edit files → \
1019 add tests → run tests → update docs if needed.\n\n\
1020 **Code review**: Read the diff → identify issues (bugs, security, style) → \
1021 report findings with file:line references.\n\n\
1022 **Refactor**: Search for all usages of the symbol → plan the changes → \
1023 edit each file → run tests to verify nothing broke.\n\n",
1024 );
1025
1026 if !state.config.mcp_servers.is_empty() {
1028 prompt.push_str("# MCP Servers\n\n");
1029 prompt.push_str(
1030 "Connected MCP servers provide additional tools. MCP tools are prefixed \
1031 with `mcp__{server}__{tool}`. Use them like any other tool.\n\n",
1032 );
1033 for (name, entry) in &state.config.mcp_servers {
1034 let transport = if entry.command.is_some() {
1035 "stdio"
1036 } else if entry.url.is_some() {
1037 "sse"
1038 } else {
1039 "unknown"
1040 };
1041 prompt.push_str(&format!("- **{name}** ({transport})\n"));
1042 }
1043 prompt.push('\n');
1044 }
1045
1046 let deferred = tools.deferred_names();
1048 if !deferred.is_empty() {
1049 prompt.push_str("# Deferred Tools\n\n");
1050 prompt.push_str(
1051 "These tools are available but not loaded by default. \
1052 Use ToolSearch to load them when needed:\n",
1053 );
1054 for name in &deferred {
1055 prompt.push_str(&format!("- {name}\n"));
1056 }
1057 prompt.push('\n');
1058 }
1059
1060 prompt.push_str(
1062 "# Task management\n\n\
1063 - Use TaskCreate to break complex work into trackable steps.\n\
1064 - Mark tasks as in_progress when starting, completed when done.\n\
1065 - Use the Agent tool to spawn subagents for parallel independent work.\n\
1066 - Use EnterPlanMode/ExitPlanMode for read-only exploration before making changes.\n\
1067 - Use EnterWorktree/ExitWorktree for isolated changes in git worktrees.\n\n\
1068 # Output formatting\n\n\
1069 - All text output is displayed to the user. Use GitHub-flavored markdown.\n\
1070 - Use fenced code blocks with language hints for code: ```rust, ```python, etc.\n\
1071 - Use inline `code` for file names, function names, and short code references.\n\
1072 - Use tables for structured comparisons.\n\
1073 - Use bullet lists for multiple items.\n\
1074 - Keep paragraphs short (2-3 sentences).\n\
1075 - Never output raw HTML or complex formatting — stick to standard markdown.\n",
1076 );
1077
1078 prompt
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use super::*;
1084
1085 #[test]
1089 fn cancel_shared_propagates_to_current_token() {
1090 let root = CancellationToken::new();
1091 let shared = Arc::new(std::sync::Mutex::new(root.clone()));
1092
1093 let turn1 = CancellationToken::new();
1095 *shared.lock().unwrap() = turn1.clone();
1096
1097 shared.lock().unwrap().cancel();
1099 assert!(turn1.is_cancelled());
1100
1101 let turn2 = CancellationToken::new();
1103 *shared.lock().unwrap() = turn2.clone();
1104 assert!(!turn2.is_cancelled());
1105
1106 shared.lock().unwrap().cancel();
1108 assert!(turn2.is_cancelled());
1109 }
1110
1111 #[tokio::test]
1114 async fn stream_loop_responds_to_cancellation() {
1115 let cancel = CancellationToken::new();
1116 let (tx, mut rx) = tokio::sync::mpsc::channel::<StreamEvent>(10);
1117
1118 tx.send(StreamEvent::TextDelta("hello".into()))
1120 .await
1121 .unwrap();
1122
1123 let cancel2 = cancel.clone();
1124 tokio::spawn(async move {
1125 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1127 cancel2.cancel();
1128 });
1129
1130 let mut events_received = 0u32;
1131 let mut cancelled = false;
1132
1133 loop {
1134 tokio::select! {
1135 event = rx.recv() => {
1136 match event {
1137 Some(_) => events_received += 1,
1138 None => break,
1139 }
1140 }
1141 _ = cancel.cancelled() => {
1142 cancelled = true;
1143 break;
1144 }
1145 }
1146 }
1147
1148 assert!(cancelled, "Loop should have been cancelled");
1149 assert_eq!(
1150 events_received, 1,
1151 "Should have received exactly one event before cancel"
1152 );
1153 }
1154
1155 use crate::llm::provider::{Provider, ProviderError, ProviderRequest};
1165
1166 struct HangingProvider;
1169
1170 #[async_trait::async_trait]
1171 impl Provider for HangingProvider {
1172 fn name(&self) -> &str {
1173 "hanging-mock"
1174 }
1175
1176 async fn stream(
1177 &self,
1178 _request: &ProviderRequest,
1179 ) -> Result<tokio::sync::mpsc::Receiver<StreamEvent>, ProviderError> {
1180 let (tx, rx) = tokio::sync::mpsc::channel(4);
1181 tokio::spawn(async move {
1182 let _ = tx.send(StreamEvent::TextDelta("thinking...".into())).await;
1183 let _tx_holder = tx;
1185 std::future::pending::<()>().await;
1186 });
1187 Ok(rx)
1188 }
1189 }
1190
1191 struct CompletingProvider;
1193
1194 #[async_trait::async_trait]
1195 impl Provider for CompletingProvider {
1196 fn name(&self) -> &str {
1197 "completing-mock"
1198 }
1199
1200 async fn stream(
1201 &self,
1202 _request: &ProviderRequest,
1203 ) -> Result<tokio::sync::mpsc::Receiver<StreamEvent>, ProviderError> {
1204 let (tx, rx) = tokio::sync::mpsc::channel(8);
1205 tokio::spawn(async move {
1206 let _ = tx.send(StreamEvent::TextDelta("hello".into())).await;
1207 let _ = tx
1208 .send(StreamEvent::ContentBlockComplete(ContentBlock::Text {
1209 text: "hello".into(),
1210 }))
1211 .await;
1212 let _ = tx
1213 .send(StreamEvent::Done {
1214 usage: Usage::default(),
1215 stop_reason: Some(StopReason::EndTurn),
1216 })
1217 .await;
1218 });
1220 Ok(rx)
1221 }
1222 }
1223
1224 fn build_engine(llm: Arc<dyn Provider>) -> QueryEngine {
1225 use crate::config::Config;
1226 use crate::permissions::PermissionChecker;
1227 use crate::state::AppState;
1228 use crate::tools::registry::ToolRegistry;
1229
1230 let config = Config::default();
1231 let permissions = PermissionChecker::from_config(&config.permissions);
1232 let state = AppState::new(config);
1233
1234 QueryEngine::new(
1235 llm,
1236 ToolRegistry::default_tools(),
1237 permissions,
1238 state,
1239 QueryEngineConfig {
1240 max_turns: Some(1),
1241 verbose: false,
1242 unattended: true,
1243 },
1244 )
1245 }
1246
1247 fn schedule_cancel(engine: &QueryEngine, delay_ms: u64) {
1250 let shared = engine.cancel_shared.clone();
1251 tokio::spawn(async move {
1252 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1253 shared.lock().unwrap().cancel();
1254 });
1255 }
1256
1257 #[tokio::test]
1260 async fn run_turn_with_sink_interrupts_on_cancel() {
1261 let mut engine = build_engine(Arc::new(HangingProvider));
1262 schedule_cancel(&engine, 100);
1263
1264 let result = tokio::time::timeout(
1265 std::time::Duration::from_secs(5),
1266 engine.run_turn_with_sink("test input", &NullSink),
1267 )
1268 .await;
1269
1270 assert!(
1271 result.is_ok(),
1272 "run_turn_with_sink should return promptly on cancel, not hang"
1273 );
1274 assert!(
1275 result.unwrap().is_ok(),
1276 "cancelled turn should return Ok(()), not an error"
1277 );
1278 assert!(
1279 !engine.state().is_query_active,
1280 "is_query_active should be reset after cancel"
1281 );
1282 }
1283
1284 #[tokio::test]
1289 async fn cancel_works_across_multiple_turns() {
1290 let mut engine = build_engine(Arc::new(HangingProvider));
1291
1292 schedule_cancel(&engine, 80);
1294 let r1 = tokio::time::timeout(
1295 std::time::Duration::from_secs(5),
1296 engine.run_turn_with_sink("turn 1", &NullSink),
1297 )
1298 .await;
1299 assert!(r1.is_ok(), "turn 1 should cancel promptly");
1300 assert!(!engine.state().is_query_active);
1301
1302 schedule_cancel(&engine, 80);
1306 let r2 = tokio::time::timeout(
1307 std::time::Duration::from_secs(5),
1308 engine.run_turn_with_sink("turn 2", &NullSink),
1309 )
1310 .await;
1311 assert!(
1312 r2.is_ok(),
1313 "turn 2 should also cancel promptly — regression would hang here"
1314 );
1315 assert!(!engine.state().is_query_active);
1316
1317 schedule_cancel(&engine, 80);
1319 let r3 = tokio::time::timeout(
1320 std::time::Duration::from_secs(5),
1321 engine.run_turn_with_sink("turn 3", &NullSink),
1322 )
1323 .await;
1324 assert!(r3.is_ok(), "turn 3 should still be cancellable");
1325 assert!(!engine.state().is_query_active);
1326 }
1327
1328 #[tokio::test]
1332 async fn cancel_does_not_poison_next_turn() {
1333 let mut engine = build_engine(Arc::new(HangingProvider));
1335 schedule_cancel(&engine, 80);
1336 let _ = tokio::time::timeout(
1337 std::time::Duration::from_secs(5),
1338 engine.run_turn_with_sink("turn 1", &NullSink),
1339 )
1340 .await
1341 .expect("turn 1 should cancel");
1342
1343 let mut engine2 = build_engine(Arc::new(CompletingProvider));
1349
1350 engine2.cancel_shared.lock().unwrap().cancel();
1353
1354 let result = tokio::time::timeout(
1355 std::time::Duration::from_secs(5),
1356 engine2.run_turn_with_sink("hello", &NullSink),
1357 )
1358 .await;
1359
1360 assert!(result.is_ok(), "completing turn should not hang");
1361 assert!(
1362 result.unwrap().is_ok(),
1363 "turn should succeed — the stale cancel flag must be cleared on turn start"
1364 );
1365 assert!(
1367 engine2.state().messages.len() >= 2,
1368 "normal turn should push both user and assistant messages"
1369 );
1370 }
1371
1372 #[tokio::test]
1375 async fn cancel_before_first_event_interrupts_cleanly() {
1376 let mut engine = build_engine(Arc::new(HangingProvider));
1377 schedule_cancel(&engine, 1);
1380
1381 let result = tokio::time::timeout(
1382 std::time::Duration::from_secs(5),
1383 engine.run_turn_with_sink("immediate", &NullSink),
1384 )
1385 .await;
1386
1387 assert!(result.is_ok(), "early cancel should not hang");
1388 assert!(result.unwrap().is_ok());
1389 assert!(!engine.state().is_query_active);
1390 }
1391
1392 #[tokio::test]
1394 async fn cancelled_turn_emits_warning_to_sink() {
1395 use std::sync::Mutex;
1396
1397 struct CapturingSink {
1399 warnings: Mutex<Vec<String>>,
1400 }
1401
1402 impl StreamSink for CapturingSink {
1403 fn on_text(&self, _: &str) {}
1404 fn on_tool_start(&self, _: &str, _: &serde_json::Value) {}
1405 fn on_tool_result(&self, _: &str, _: &crate::tools::ToolResult) {}
1406 fn on_error(&self, _: &str) {}
1407 fn on_warning(&self, msg: &str) {
1408 self.warnings.lock().unwrap().push(msg.to_string());
1409 }
1410 }
1411
1412 let sink = CapturingSink {
1413 warnings: Mutex::new(Vec::new()),
1414 };
1415
1416 let mut engine = build_engine(Arc::new(HangingProvider));
1417 schedule_cancel(&engine, 100);
1418
1419 let _ = tokio::time::timeout(
1420 std::time::Duration::from_secs(5),
1421 engine.run_turn_with_sink("test", &sink),
1422 )
1423 .await
1424 .expect("should not hang");
1425
1426 let warnings = sink.warnings.lock().unwrap();
1427 assert!(
1428 warnings.iter().any(|w| w.contains("Cancelled")),
1429 "expected 'Cancelled' warning in sink, got: {:?}",
1430 *warnings
1431 );
1432 }
1433}