1#![allow(dead_code)]
6
7use crate::compact::{
8 self, get_auto_compact_threshold, get_compact_prompt, get_effective_context_window_size,
9};
10use crate::error::AgentError;
11use crate::hooks::{HookInput, HookRegistry};
12use crate::services::compact::microcompact::truncate_tool_result_content;
13use crate::services::api::errors::{sanitize_html_error, error_to_api_message, get_error_message_if_refusal, is_media_size_error};
14use crate::services::streaming::{
15 STALL_THRESHOLD_MS, StallStats, StreamWatchdog, StreamingResult, StreamingToolExecutor,
16 calculate_streaming_cost, cleanup_stream, get_nonstreaming_fallback_timeout_ms,
17 is_404_stream_creation_error, is_429_only_error, is_529_error, is_api_timeout_error,
18 is_auth_error, is_nonstreaming_fallback_disabled, is_stale_connection_error,
19 is_user_abort_error, parse_max_tokens_context_overflow, release_stream_resources,
20 validate_stream_completion, FallbackTriggeredError, MAX_529_RETRIES, FLOOR_OUTPUT_TOKENS,
21};
22use crate::tool::Tool as ToolTrait;
23use crate::tool::{ProgressMessage, ToolResultRenderOptions};
24use crate::tools::orchestration::{self, ToolMessageUpdate};
25use crate::types::*;
26use crate::utils::http::get_user_agent;
27use std::collections::{HashMap, HashSet};
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::{Arc, Mutex};
30use tokio::time::sleep as sleep_tokio;
31
32fn emit_api_retry_event(
35 on_event: Option<&(dyn Fn(AgentEvent) + Send + Sync)>,
36 attempt: u32,
37 max_retries: u32,
38 retry_delay_ms: u64,
39 error_status: Option<u16>,
40 error: &str,
41) {
42 if let Some(cb) = on_event {
43 cb(AgentEvent::ApiRetry {
44 attempt,
45 max_retries,
46 retry_delay_ms,
47 error_status,
48 error: error.to_string(),
49 });
50 }
51}
52
53fn emit_done_event(
56 on_event: &Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
57 result: QueryResult,
58) {
59 let _ = crate::utils::session_storage::flush_session_storage();
60 if let Some(cb) = on_event {
61 cb(AgentEvent::Done { result });
62 }
63}
64
65fn format_tokens(tokens: u64) -> String {
67 if tokens >= 1_000_000 {
68 format!("{:.1}m", tokens as f64 / 1_000_000.0)
69 } else if tokens >= 1_000 {
70 format!("{:.1}k", tokens as f64 / 1_000.0)
71 } else {
72 format!("{}", tokens)
73 }
74}
75
76pub(crate) fn empty_json_value() -> serde_json::Value {
78 serde_json::Value::Object(serde_json::Map::new())
79}
80
81pub(crate) fn strip_thinking(content: &str) -> String {
84 let mut result = String::new();
87 let mut in_thinking = false;
88 let mut i = 0;
89
90 while i < content.len() {
91 if content[i..].starts_with("<think>") {
93 in_thinking = true;
94 i += "<think>".len();
95 } else if content[i..].starts_with("</think>") {
96 in_thinking = false;
97 i += "</think>".len();
98 } else if !in_thinking {
99 if let Some(ch) = content[i..].chars().next() {
102 result.push(ch);
103 i += ch.len_utf8();
104 } else {
105 break;
106 }
107 } else {
108 if let Some(ch) = content[i..].chars().next() {
111 i += ch.len_utf8();
112 } else {
113 break;
114 }
115 }
116 }
117
118 result.trim().to_string()
119}
120
121fn parse_anthropic_usage(usage: &serde_json::Value) -> TokenUsage {
123 let iterations = usage.get("iterations").and_then(|v| v.as_array()).map(|arr| {
124 arr.iter().filter_map(|it| {
125 Some(IterationUsage {
126 input_tokens: it.get("input_tokens").and_then(|v| v.as_u64())?,
127 output_tokens: it.get("output_tokens").and_then(|v| v.as_u64())?,
128 })
129 }).collect()
130 });
131 TokenUsage {
132 input_tokens: usage
133 .get("input_tokens")
134 .and_then(|v| v.as_u64())
135 .unwrap_or(0),
136 output_tokens: usage
137 .get("output_tokens")
138 .and_then(|v| v.as_u64())
139 .unwrap_or(0),
140 cache_creation_input_tokens: usage
141 .get("cache_creation_input_tokens")
142 .and_then(|v| v.as_u64()),
143 cache_read_input_tokens: usage
144 .get("cache_read_input_tokens")
145 .and_then(|v| v.as_u64()),
146 iterations,
147 }
148}
149
150#[derive(Debug, Clone, Default)]
152pub struct AutoCompactTracking {
153 pub compacted: bool,
155 pub turn_id: String,
157 pub turn_counter: u32,
159 pub consecutive_failures: u32,
161}
162
163#[derive(Debug, Clone)]
165pub struct ToolRenderMetadata {
166 pub user_facing_name: String,
167 pub tool_use_summary: Option<String>,
168 pub activity_description: Option<String>,
169}
170
171type UserFacingNameFn = Arc<dyn Fn(Option<&serde_json::Value>) -> String + Send + Sync>;
173type GetToolUseSummaryFn = Arc<dyn Fn(Option<&serde_json::Value>) -> Option<String> + Send + Sync>;
174type GetActivityDescriptionFn =
175 Arc<dyn Fn(Option<&serde_json::Value>) -> Option<String> + Send + Sync>;
176type RenderToolResultFn = Arc<
177 dyn Fn(&serde_json::Value, &[ProgressMessage], &ToolResultRenderOptions) -> Option<String>
178 + Send
179 + Sync,
180>;
181
182#[derive(Clone)]
183pub struct ToolRenderFns {
184 pub user_facing_name: UserFacingNameFn,
185 pub get_tool_use_summary: Option<GetToolUseSummaryFn>,
186 pub get_activity_description: Option<GetActivityDescriptionFn>,
187 pub render_tool_result_message: Option<RenderToolResultFn>,
188}
189
190impl ToolRenderFns {
191 pub fn render(&self, content: &str, tools: &[crate::types::ToolDefinition]) -> Option<String> {
194 let content_value: serde_json::Value = serde_json::from_str(content).ok()?;
195 let progress_messages: Vec<ProgressMessage> = vec![];
196 let options = ToolResultRenderOptions {
197 style: None,
198 theme: "dark".to_string(),
199 tools: tools.to_vec(),
200 verbose: false,
201 is_transcript_mode: false,
202 is_brief_only: false,
203 input: None,
204 };
205 let render_fn = self.render_tool_result_message.as_ref()?;
206 render_fn(&content_value, &progress_messages, &options)
207 }
208}
209
210#[allow(dead_code)]
211pub struct QueryEngine {
212 pub(crate) config: QueryEngineConfig,
213 pub(crate) messages: Vec<crate::types::Message>,
214 turn_count: u32,
215 total_usage: TokenUsage,
216 total_cost: f64,
217 http_client: reqwest::Client,
218 tool_executors: Mutex<HashMap<String, Arc<ToolExecutor>>>,
220 tool_render_fns: Mutex<HashMap<String, ToolRenderFns>>,
222 tool_backfill_fns: Mutex<HashMap<String, Arc<dyn Fn(&mut serde_json::Value) + Send + Sync>>>,
224 hook_registry: Arc<Mutex<Option<HookRegistry>>>,
226 auto_compact_tracking: AutoCompactTracking,
228 permission_denials: Vec<PermissionDenial>,
230 last_stop_reason: Option<String>,
232 max_output_tokens_recovery_count: u32,
234 has_attempted_reactive_compact: bool,
236 empty_response_retries: u32,
238 max_output_tokens_override: Option<u32>,
240 stop_hook_active: bool,
242 transition: Option<String>,
244 pending_tool_use_summary: Option<String>,
246 abort_controller: crate::utils::AbortController,
248 budget_tracker: crate::token_budget::BudgetTracker,
250 turn_tokens: u64,
252 loaded_nested_memory_paths: std::collections::HashSet<String>,
254 content_replacement_state: Option<crate::services::compact::ContentReplacementState>,
256 start_time: Option<std::time::Instant>,
258 task_budget_remaining: Option<u64>,
261 structured_output_retries: u32,
263 has_handled_orphaned_permission: bool,
266}
267
268type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send>>;
269type ToolExecutor = dyn Fn(serde_json::Value, &ToolContext) -> BoxFuture<Result<ToolResult, AgentError>>
270 + Send
271 + Sync;
272
273#[derive(Debug, Clone, Default)]
275pub struct PermissionDenial {
276 pub tool_name: String,
277 pub tool_use_id: String,
278 pub tool_input: serde_json::Value,
279}
280
281#[derive(Debug, Clone)]
286pub struct OrphanedPermission {
287 pub tool_use_id: String,
288 pub assistant_message: Message,
289 pub permission_result: crate::permission::PermissionResult,
290}
291
292pub struct QueryEngineConfig {
293 pub cwd: String,
294 pub model: String,
295 pub api_key: Option<String>,
296 pub base_url: Option<String>,
297 pub tools: Vec<ToolDefinition>,
298 pub system_prompt: Option<String>,
299 pub max_turns: u32,
300 pub max_budget_usd: Option<f64>,
301 pub max_tokens: u32,
302 pub fallback_model: Option<String>,
304 pub user_context: HashMap<String, String>,
307 pub system_context: HashMap<String, String>,
309 pub can_use_tool:
312 Option<std::sync::Arc<dyn Fn(ToolDefinition, serde_json::Value) -> crate::permission::PermissionResult + Send + Sync>>,
313 pub on_event: Option<std::sync::Arc<dyn Fn(AgentEvent) + Send + Sync>>,
315 pub thinking: Option<crate::types::api_types::ThinkingConfig>,
318 pub abort_controller: Option<std::sync::Arc<crate::utils::AbortController>>,
321 pub token_budget: Option<f64>,
325 pub agent_id: Option<String>,
327 pub session_state: Option<std::sync::Arc<crate::session_state::SessionStateManager>>,
329 pub loaded_nested_memory_paths: std::collections::HashSet<String>,
331 pub task_budget: Option<TaskBudget>,
334 pub orphaned_permission: Option<OrphanedPermission>,
339}
340
341#[derive(Debug, Clone)]
342pub struct TaskBudget {
343 pub total: u64,
344}
345
346impl Default for QueryEngineConfig {
347 fn default() -> Self {
348 Self {
349 cwd: String::new(),
350 model: String::new(),
351 api_key: None,
352 base_url: None,
353 tools: vec![],
354 system_prompt: None,
355 max_turns: 10,
356 max_budget_usd: None,
357 max_tokens: 16384,
358 fallback_model: None,
359 user_context: HashMap::new(),
360 system_context: HashMap::new(),
361 can_use_tool: None,
362 on_event: None,
363 thinking: None,
364 abort_controller: None,
365 token_budget: None,
366 agent_id: None,
367 session_state: None,
368 loaded_nested_memory_paths: std::collections::HashSet::new(),
369 task_budget: None,
370 orphaned_permission: None,
371 }
372 }
373}
374
375impl QueryEngine {
376 pub fn new(mut config: QueryEngineConfig) -> Self {
377 let loaded_memory_paths = config.loaded_nested_memory_paths.clone();
378 let abort_controller = config.abort_controller.take().map_or_else(
379 || crate::utils::create_abort_controller_default(),
380 |arc| (*arc).clone(),
381 );
382 Self {
383 config,
384 messages: vec![],
385 turn_count: 0,
386 total_usage: TokenUsage::default(),
387 total_cost: 0.0,
388 http_client: reqwest::Client::new(),
389 tool_executors: Mutex::new(HashMap::new()),
390 tool_render_fns: Mutex::new(HashMap::new()),
391 tool_backfill_fns: Mutex::new(HashMap::new()),
392 hook_registry: Arc::new(Mutex::new(None)),
393 auto_compact_tracking: AutoCompactTracking::default(),
394 permission_denials: Vec::new(),
395 last_stop_reason: None,
396 max_output_tokens_recovery_count: 0,
397 has_attempted_reactive_compact: false,
398 max_output_tokens_override: None,
399 stop_hook_active: false,
400 transition: None,
401 pending_tool_use_summary: None,
402 empty_response_retries: 0,
403 abort_controller,
404 budget_tracker: crate::token_budget::BudgetTracker::new(),
405 turn_tokens: 0,
406 loaded_nested_memory_paths: loaded_memory_paths,
407 content_replacement_state: Some(
408 crate::services::compact::create_content_replacement_state(),
409 ),
410 start_time: None,
411 task_budget_remaining: None,
412 structured_output_retries: 0,
413 has_handled_orphaned_permission: false,
414 }
415 }
416
417 pub fn register_tool<F>(&mut self, name: String, executor: F)
420 where
421 F: Fn(serde_json::Value, &ToolContext) -> BoxFuture<Result<ToolResult, AgentError>>
422 + Send
423 + Sync
424 + 'static,
425 {
426 self.tool_executors
427 .lock()
428 .unwrap()
429 .insert(name, Arc::new(executor));
430 }
431
432 pub fn register_tool_backfill<F>(&mut self, name: String, backfill_fn: F)
436 where
437 F: Fn(&mut serde_json::Value) + Send + Sync + 'static,
438 {
439 self.tool_backfill_fns
440 .lock()
441 .unwrap()
442 .insert(name, Arc::new(backfill_fn));
443 }
444
445 pub fn register_tool_with_render<F>(
449 &mut self,
450 name: String,
451 executor: F,
452 render_fns: ToolRenderFns,
453 ) where
454 F: Fn(serde_json::Value, &ToolContext) -> BoxFuture<Result<ToolResult, AgentError>>
455 + Send
456 + Sync
457 + 'static,
458 {
459 self.tool_executors
460 .lock()
461 .unwrap()
462 .insert(name.clone(), Arc::new(executor));
463 self.tool_render_fns
464 .lock()
465 .unwrap()
466 .insert(name, render_fns);
467 }
468
469 pub fn interrupt(&self) {
473 self.abort_controller.abort(None);
474 }
475
476 pub fn set_messages(&mut self, messages: Vec<crate::types::Message>) {
477 self.messages = messages;
478 }
479
480 pub(crate) fn separate_tools_for_request(&self) -> (Vec<ToolDefinition>, Vec<ToolDefinition>) {
484 use crate::tools::deferred_tools::{extract_discovered_tool_names, is_deferred_tool};
485
486 let mut upfront = Vec::new();
487 let mut deferred = Vec::new();
488
489 for tool in &self.config.tools {
490 if is_deferred_tool(tool) {
491 deferred.push(tool.clone());
492 } else {
493 upfront.push(tool.clone());
494 }
495 }
496
497 if !crate::tools::deferred_tools::is_tool_search_enabled_optimistic() {
499 upfront.extend(deferred.drain(..));
500 return (upfront, deferred);
501 }
502
503 let api_messages: Vec<serde_json::Value> = self
506 .messages
507 .iter()
508 .map(|msg| {
509 let role = match msg.role {
510 api_types::MessageRole::User => "user",
511 api_types::MessageRole::Assistant => "assistant",
512 api_types::MessageRole::System => "system",
513 api_types::MessageRole::Tool => "tool",
514 };
515 serde_json::json!({
516 "role": role,
517 "content": msg.content
518 })
519 })
520 .collect();
521
522 let discovered = extract_discovered_tool_names(&api_messages);
523
524 deferred.retain(|t| {
526 if discovered.contains(&t.name) {
527 upfront.push(t.clone());
528 false
529 } else {
530 true
531 }
532 });
533
534 let upfront = crate::tools::assemble_tool_pool(
536 &upfront,
537 &[], );
539
540 (upfront, deferred)
541 }
542
543 pub(crate) fn maybe_inject_deferred_tools_block(
546 &self,
547 api_messages: &mut Vec<serde_json::Value>,
548 ) {
549 use crate::tools::deferred_tools::{
550 extract_discovered_tool_names, get_deferred_tool_names, is_deferred_tool,
551 is_tool_search_enabled_optimistic,
552 };
553
554 if !is_tool_search_enabled_optimistic() {
556 return;
557 }
558
559 let all_deferred = get_deferred_tool_names(&self.config.tools);
561
562 let discovered = extract_discovered_tool_names(api_messages);
564
565 let undiscovered: Vec<&str> = all_deferred
567 .iter()
568 .filter(|name| !discovered.contains(*name))
569 .map(|s| s.as_str())
570 .collect();
571
572 if undiscovered.is_empty() {
573 return;
574 }
575
576 let block_content = format!(
578 "<available-deferred-tools>\n{}\n</available-deferred-tools>\n\n\
579 Deferred tools appear by name above. \
580 To use a deferred tool, call ToolSearchTool with query \"select:<tool_name>\" to fetch its schema. \
581 Once fetched, the tool will be available for use.",
582 undiscovered.join("\n")
583 );
584
585 let inject_msg = serde_json::json!({
587 "role": "user",
588 "content": block_content,
589 "is_meta": true
590 });
591
592 let mut insert_pos = 0;
594 for (i, msg) in api_messages.iter().enumerate() {
595 if msg.get("role").and_then(|v| v.as_str()) == Some("user") {
596 insert_pos = i;
597 break;
598 }
599 insert_pos = i + 1;
600 }
601
602 api_messages.insert(insert_pos, inject_msg);
603 }
604
605 pub async fn execute_tool(
607 &mut self,
608 name: &str,
609 input: serde_json::Value,
610 tool_call_id: String,
611 ) -> Result<ToolResult, AgentError> {
612 let context = ToolContext {
613 cwd: self.config.cwd.clone(),
614 abort_signal: Arc::clone(self.abort_controller.signal()),
615 };
616
617 let (executor, render_metadata) = {
619 let executors = self.tool_executors.lock().unwrap();
620 let render_fns = self.tool_render_fns.lock().unwrap();
621 (
622 executors.get(name).cloned(),
623 render_fns.get(name).map(|fns| ToolRenderMetadata {
624 user_facing_name: (Arc::clone(&fns.user_facing_name))(Some(&input)),
625 tool_use_summary: fns
626 .get_tool_use_summary
627 .as_ref()
628 .and_then(|f| f(Some(&input))),
629 activity_description: fns
630 .get_activity_description
631 .as_ref()
632 .and_then(|f| f(Some(&input))),
633 }),
634 )
635 };
636
637 if let Some(executor) = executor {
638 if let Some(can_use_tool_fn) = &self.config.can_use_tool {
641 if let Some(tool_def) = self.config.tools.iter().find(|t| &t.name == name) {
642 match can_use_tool_fn(tool_def.clone(), input.clone()) {
643 crate::permission::PermissionResult::Allow(_)
644 | crate::permission::PermissionResult::Passthrough { .. } => {
645 }
647 crate::permission::PermissionResult::Deny(d) => {
648 self.permission_denials.push(PermissionDenial {
649 tool_name: name.to_string(),
650 tool_use_id: tool_call_id.clone(),
651 tool_input: input.clone(),
652 });
653 return Err(AgentError::Tool(format!(
654 "Tool '{}' permission denied: {}",
655 name, d.message
656 )));
657 }
658 crate::permission::PermissionResult::Ask(a) => {
659 self.permission_denials.push(PermissionDenial {
662 tool_name: name.to_string(),
663 tool_use_id: tool_call_id.clone(),
664 tool_input: input.clone(),
665 });
666 return Err(AgentError::Tool(format!(
667 "Tool '{}' requires user confirmation (Ask mode not supported in SDK): {}",
668 name, a.message
669 )));
670 }
671 }
672 }
673 }
674
675 if let Some(ref cb) = self.config.on_event {
677 if let Some(ref metadata) = render_metadata {
678 let user_facing = &metadata.user_facing_name;
679 cb(AgentEvent::ToolStart {
680 tool_name: name.to_string(),
681 tool_call_id: tool_call_id.clone(),
682 input: input.clone(),
683 display_name: Some(user_facing.clone()),
684 summary: metadata.tool_use_summary.clone(),
685 activity_description: metadata.activity_description.clone(),
686 });
687 } else {
688 cb(AgentEvent::ToolStart {
689 tool_name: name.to_string(),
690 tool_call_id: tool_call_id.clone(),
691 input: input.clone(),
692 display_name: None,
693 summary: None,
694 activity_description: None,
695 });
696 }
697 }
698
699 self.run_pre_tool_use_hooks(name, &input, &tool_call_id)
700 .await?;
701
702 let tool_start = std::time::Instant::now();
704 let result = executor(input.clone(), &context).await;
705 let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
706 crate::services::model_cost::record_turn_tool_duration(tool_duration_ms);
707
708 if let Some(ref cb) = self.config.on_event {
710 match &result {
711 Ok(tool_result) => {
712 let rendered_result = self.render_tool_result(name, &tool_result.content);
714 if let Some(ref metadata) = render_metadata {
715 let display = format!(
716 "{}({})",
717 metadata.user_facing_name,
718 metadata.tool_use_summary.as_deref().unwrap_or("?")
719 );
720 cb(AgentEvent::ToolComplete {
721 tool_name: name.to_string(),
722 tool_call_id: tool_call_id.clone(),
723 result: tool_result.clone(),
724 display_name: Some(display),
725 rendered_result: rendered_result.clone(),
726 });
727 } else {
728 cb(AgentEvent::ToolComplete {
729 tool_name: name.to_string(),
730 tool_call_id: tool_call_id.clone(),
731 result: tool_result.clone(),
732 display_name: None,
733 rendered_result: rendered_result,
734 });
735 }
736 }
737 Err(e) => {
738 cb(AgentEvent::ToolError {
739 tool_name: name.to_string(),
740 tool_call_id: tool_call_id.clone(),
741 error: e.to_string(),
742 });
743 }
744 }
745 }
746
747 match &result {
749 Ok(tool_result) => {
750 self.run_post_tool_use_hooks(name, tool_result, &tool_call_id)
751 .await;
752 }
753 Err(e) => {
754 self.run_post_tool_use_failure_hooks(name, e, &tool_call_id)
755 .await;
756 }
757 }
758
759 result
760 } else {
761 Err(AgentError::Tool(format!("Tool '{}' not found", name)))
762 }
763 }
764
765 fn render_tool_result(&self, tool_name: &str, content: &str) -> Option<String> {
768 let content_value: serde_json::Value = serde_json::from_str(content).ok()?;
769 let progress_messages: Vec<ProgressMessage> = vec![];
770 let options = ToolResultRenderOptions {
771 style: None,
772 theme: "dark".to_string(),
773 tools: self.config.tools.clone(),
774 verbose: false,
775 is_transcript_mode: false,
776 is_brief_only: false,
777 input: None,
778 };
779 let fns = self.tool_render_fns.lock().unwrap();
780 let render_fn = fns.get(tool_name)?.render_tool_result_message.as_ref()?;
781 render_fn(&content_value, &progress_messages, &options)
782 }
783
784 pub fn set_hook_registry(&self, registry: HookRegistry) {
786 let mut guard = self.hook_registry.lock().unwrap();
787 *guard = Some(registry);
788 }
789
790 async fn run_pre_tool_use_hooks(
792 &self,
793 tool_name: &str,
794 tool_input: &serde_json::Value,
795 tool_use_id: &str,
796 ) -> Result<(), AgentError> {
797 let has_hooks = {
799 let guard = self.hook_registry.lock().unwrap();
800 guard
801 .as_ref()
802 .map(|r| r.has_hooks("PreToolUse"))
803 .unwrap_or(false)
804 };
805
806 if !has_hooks {
807 return Ok(());
808 }
809
810 let input = HookInput {
812 event: "PreToolUse".to_string(),
813 tool_name: Some(tool_name.to_string()),
814 tool_input: Some(tool_input.clone()),
815 tool_output: None,
816 tool_use_id: Some(tool_use_id.to_string()),
817 session_id: None,
818 cwd: Some(self.config.cwd.clone()),
819 error: None,
820 ..HookInput::default()
821 };
822
823 let registry = {
825 let guard = self.hook_registry.lock().unwrap();
826 guard.as_ref().cloned()
827 };
828
829 if let Some(registry) = registry {
830 let results = registry.execute("PreToolUse", input).await;
831
832 for output in results {
834 if let Some(block) = output.block {
835 if block {
836 return Err(AgentError::Tool(format!(
837 "Tool '{}' blocked by PreToolUse hook",
838 tool_name
839 )));
840 }
841 }
842 }
843 }
844 Ok(())
845 }
846
847 async fn run_post_tool_use_hooks(
849 &self,
850 tool_name: &str,
851 tool_output: &ToolResult,
852 tool_use_id: &str,
853 ) {
854 let has_hooks = {
855 let guard = self.hook_registry.lock().unwrap();
856 guard
857 .as_ref()
858 .map(|r| r.has_hooks("PostToolUse"))
859 .unwrap_or(false)
860 };
861
862 if !has_hooks {
863 return;
864 }
865
866 let input = HookInput {
867 event: "PostToolUse".to_string(),
868 tool_name: Some(tool_name.to_string()),
869 tool_input: None,
870 tool_output: Some(serde_json::json!({
871 "result_type": tool_output.result_type,
872 "content": tool_output.content,
873 "is_error": tool_output.is_error,
874 })),
875 tool_use_id: Some(tool_use_id.to_string()),
876 session_id: None,
877 cwd: Some(self.config.cwd.clone()),
878 error: None,
879 ..HookInput::default()
880 };
881
882 let registry = {
883 let guard = self.hook_registry.lock().unwrap();
884 guard.as_ref().cloned()
885 };
886
887 if let Some(registry) = registry {
888 let _ = registry.execute("PostToolUse", input).await;
889 }
890 }
891
892 async fn run_post_tool_use_failure_hooks(
894 &self,
895 tool_name: &str,
896 error: &AgentError,
897 tool_use_id: &str,
898 ) {
899 let has_hooks = {
900 let guard = self.hook_registry.lock().unwrap();
901 guard
902 .as_ref()
903 .map(|r| r.has_hooks("PostToolUseFailure"))
904 .unwrap_or(false)
905 };
906
907 if !has_hooks {
908 return;
909 }
910
911 let input = HookInput {
912 event: "PostToolUseFailure".to_string(),
913 tool_name: Some(tool_name.to_string()),
914 tool_input: None,
915 tool_output: None,
916 tool_use_id: Some(tool_use_id.to_string()),
917 session_id: None,
918 cwd: Some(self.config.cwd.clone()),
919 error: Some(error.to_string()),
920 ..HookInput::default()
921 };
922
923 let registry = {
924 let guard = self.hook_registry.lock().unwrap();
925 guard.as_ref().cloned()
926 };
927
928 if let Some(registry) = registry {
929 let _ = registry.execute("PostToolUseFailure", input).await;
930 }
931 }
932
933 pub fn get_turn_count(&self) -> u32 {
934 self.turn_count
935 }
936
937 pub fn get_usage(&self) -> TokenUsage {
939 self.total_usage.clone()
940 }
941
942 pub fn get_messages(&self) -> Vec<crate::types::Message> {
943 self.messages.clone()
944 }
945
946 pub fn query_duration_ms(&self) -> u64 {
949 self.start_time
950 .map(|t| std::time::Instant::now().duration_since(t).as_millis() as u64)
951 .unwrap_or(0)
952 }
953
954 pub fn reset(&mut self) {
957 self.messages.clear();
958 self.reset_counters();
959 }
960
961 pub fn reset_counters(&mut self) {
965 self.turn_count = 0;
966 self.total_usage = TokenUsage::default();
967 self.total_cost = 0.0;
968 self.permission_denials.clear();
969 self.last_stop_reason = None;
970 self.max_output_tokens_recovery_count = 0;
971 self.has_attempted_reactive_compact = false;
972 self.empty_response_retries = 0;
973 self.max_output_tokens_override = None;
974 self.stop_hook_active = false;
975 self.transition = None;
976 self.pending_tool_use_summary = None;
977 self.structured_output_retries = 0;
978 }
979
980 fn is_result_successful(&self, _last_stop_reason: Option<&str>) -> bool {
986 let last = match self.messages.last() {
987 Some(m) => m,
988 None => return false,
989 };
990 match last.role {
991 crate::types::MessageRole::Assistant => {
992 !last.content.is_empty() && last.is_api_error_message != Some(true)
993 }
994 crate::types::MessageRole::User => {
995 true
997 }
998 _ => false,
999 }
1000 }
1001
1002 fn add_orphaned_tool_results(&mut self, reason: &str) {
1010 let orphan_ids: Vec<(String, String)> = {
1013 let last = match self.messages.last() {
1014 Some(m) => m,
1015 None => return,
1016 };
1017 if last.role != crate::types::MessageRole::Assistant {
1018 return;
1019 }
1020 let tool_calls = match &last.tool_calls {
1021 Some(tc) => tc,
1022 None => return,
1023 };
1024 tool_calls.iter()
1025 .map(|tc| (tc.id.clone(), tc.name.clone()))
1026 .collect()
1027 };
1028 if orphan_ids.is_empty() {
1029 return;
1030 }
1031
1032 let mut has_result = std::collections::HashSet::new();
1034 for msg in &self.messages {
1035 if msg.role == crate::types::MessageRole::Tool {
1036 if let Some(id) = &msg.tool_call_id {
1037 has_result.insert(id.clone());
1038 }
1039 }
1040 }
1041 for (tc_id, tc_name) in orphan_ids {
1043 if !has_result.contains(&tc_id) {
1044 self.messages.push(crate::types::Message {
1045 role: crate::types::MessageRole::Tool,
1046 content: format!("Tool '{}' was not executed: {}", tc_name, reason),
1047 tool_call_id: Some(tc_id),
1048 is_error: Some(true),
1049 ..Default::default()
1050 });
1051 }
1052 }
1053 }
1054
1055 async fn do_auto_compact(&mut self, snip_tokens_freed: u32) -> Result<bool, AgentError> {
1062 use crate::compact::{
1063 estimate_token_count, get_auto_compact_threshold, get_compact_prompt,
1064 strip_images_from_messages, strip_reinjected_attachments,
1065 };
1066 use crate::services::compact::{
1067 PartialCompactDirection, format_compact_summary,
1068 get_compact_prompt as get_compact_prompt_service, get_compact_user_summary_message,
1069 };
1070 use crate::tools::deferred_tools::{
1071 get_deferred_tool_names, is_tool_search_enabled_optimistic,
1072 };
1073
1074 let token_count = estimate_token_count(&self.messages, self.config.max_tokens);
1075 let threshold = get_auto_compact_threshold(&self.config.model);
1076
1077 let effective_tokens = (token_count as i64).saturating_sub(snip_tokens_freed as i64) as u32;
1079
1080 if effective_tokens <= threshold {
1082 return Ok(false);
1083 }
1084
1085 log::info!(
1086 "[compact] Starting auto-compact: {} effective tokens ({} raw - {} snip freed), threshold: {}",
1087 effective_tokens,
1088 token_count,
1089 snip_tokens_freed,
1090 threshold
1091 );
1092
1093 let _hook_results = self.execute_pre_compact_hooks().await;
1096
1097 if let Some(sm_result) = crate::services::compact::try_session_memory_compaction(
1099 &self.messages,
1100 None,
1101 Some(threshold as usize),
1102 )
1103 .await
1104 {
1105 if sm_result.compacted {
1106 log::info!("[compact] Session memory compaction succeeded");
1107 self.apply_compaction_result(
1108 sm_result.messages_to_keep,
1109 sm_result.post_compact_token_count as u32,
1110 );
1111 return Ok(true);
1112 }
1113 }
1114
1115 let stripped_messages =
1117 strip_reinjected_attachments(&strip_images_from_messages(&self.messages));
1118
1119 let compact_prompt = get_compact_prompt();
1121
1122 let (summary, compaction_usage) = match self
1124 .generate_summary_with_ptl_retry(&stripped_messages, &compact_prompt)
1125 .await
1126 {
1127 Ok(result) => result,
1128 Err(e) => {
1129 log::warn!("[compact] Summary generation failed: {}", e);
1130 return Err(e);
1131 }
1132 };
1133 log::debug!(
1134 "[compact] compaction_usage: input={} output={}",
1135 compaction_usage.input_tokens,
1136 compaction_usage.output_tokens
1137 );
1138
1139 let compact_cost = crate::services::model_cost::calculate_cost_for_tokens(
1141 &self.config.model,
1142 compaction_usage.input_tokens as u32,
1143 compaction_usage.output_tokens as u32,
1144 compaction_usage.cache_read_input_tokens.unwrap_or(0) as u32,
1145 compaction_usage.cache_creation_input_tokens.unwrap_or(0) as u32,
1146 );
1147 let _ = crate::services::model_cost::add_to_total_session_cost(
1148 compact_cost,
1149 compaction_usage.input_tokens as u32,
1150 compaction_usage.output_tokens as u32,
1151 compaction_usage.cache_read_input_tokens.unwrap_or(0) as u32,
1152 compaction_usage.cache_creation_input_tokens.unwrap_or(0) as u32,
1153 0,
1154 &self.config.model,
1155 );
1156
1157 let formatted_summary = format_compact_summary(&summary);
1159
1160 let messages_to_keep: Vec<Message> = if self.messages.len() > 4 {
1162 self.messages[self.messages.len() - 4..].to_vec()
1163 } else {
1164 self.messages.clone()
1165 };
1166
1167 let discovered_tools = get_deferred_tool_names(&self.config.tools);
1169 let mut boundary_content = format!(
1170 "[Previous conversation summarized]\n\n{}",
1171 get_compact_user_summary_message(&formatted_summary, Some(true), None, None)
1172 );
1173 if !discovered_tools.is_empty() && is_tool_search_enabled_optimistic() {
1174 boundary_content.push_str("\n\n<available-deferred-tools>\n");
1175 boundary_content.push_str(&discovered_tools.join("\n"));
1176 boundary_content.push_str("\n</available-deferred-tools>");
1177 }
1178
1179 let boundary_msg = Message {
1180 role: MessageRole::System,
1181 content: boundary_content,
1182 is_meta: Some(true),
1183 ..Default::default()
1184 };
1185
1186 let mut new_messages = vec![boundary_msg];
1188 new_messages.extend(messages_to_keep.clone());
1189
1190 let new_token_count = estimate_token_count(&new_messages, self.config.max_tokens);
1191
1192 let true_post_compact_tokens = crate::compact::rough_token_count_estimation_for_content(
1194 &new_messages.iter().map(|m| m.content.clone()).collect::<String>(),
1195 ) as u64;
1196 log::debug!(
1197 "[compact] true_post_compact_token_count={} compaction_usage.input={} compaction_usage.output={}",
1198 true_post_compact_tokens,
1199 compaction_usage.input_tokens,
1200 compaction_usage.output_tokens,
1201 );
1202
1203 self.execute_post_compact_hooks(&formatted_summary).await;
1209
1210 crate::services::compact::run_post_compact_cleanup(None);
1212
1213 self.messages = new_messages;
1215
1216 log::info!(
1217 "[compact] Complete: {} tokens -> {} tokens",
1218 token_count,
1219 new_token_count
1220 );
1221
1222 Ok(true)
1223 }
1224
1225 async fn generate_summary_with_ptl_retry(
1229 &self,
1230 messages: &[Message],
1231 compact_prompt: &str,
1232 ) -> Result<(String, TokenUsage), AgentError> {
1233 const MAX_PTL_RETRIES: usize = 3;
1234
1235 let mut summary_messages = self.build_summary_messages(compact_prompt);
1237
1238 for attempt in 0..MAX_PTL_RETRIES {
1239 let max_summary_tokens = 2048u32;
1241 let (truncated_messages, estimated_tokens) = compact::truncate_messages_for_summary(
1242 &summary_messages,
1243 &self.config.model,
1244 max_summary_tokens,
1245 );
1246
1247 if estimated_tokens > 150000 {
1249 if attempt < MAX_PTL_RETRIES - 1 {
1250 log::warn!(
1252 "[compact] PTL retry {}/{}: {} tokens, dropping oldest groups",
1253 attempt + 1,
1254 MAX_PTL_RETRIES,
1255 estimated_tokens
1256 );
1257 summary_messages =
1258 self.truncate_head_for_ptl_retry(&summary_messages, estimated_tokens);
1259 continue;
1260 }
1261 return Err(AgentError::Api(format!(
1262 "Cannot generate summary: estimated {} tokens exceeds safe limit after {} retries",
1263 estimated_tokens, MAX_PTL_RETRIES
1264 )));
1265 }
1266
1267 match self
1269 .generate_summary_from_messages(&truncated_messages)
1270 .await
1271 {
1272 Ok((summary, _usage)) => return Ok((summary, _usage)),
1273 Err(e) => {
1274 if attempt < MAX_PTL_RETRIES - 1 {
1275 log::warn!(
1276 "[compact] Summary attempt {}/{} failed: {}, retrying",
1277 attempt + 1,
1278 MAX_PTL_RETRIES,
1279 e
1280 );
1281 summary_messages =
1282 self.truncate_head_for_ptl_retry(&summary_messages, estimated_tokens);
1283 } else {
1284 return Err(e);
1285 }
1286 }
1287 }
1288 }
1289
1290 Err(AgentError::Api(
1291 "Summary generation failed after max retries".to_string(),
1292 ))
1293 }
1294
1295 fn truncate_head_for_ptl_retry(
1300 &self,
1301 messages: &[Message],
1302 estimated_tokens: u32,
1303 ) -> Vec<Message> {
1304 use crate::services::compact::grouping::group_messages_by_api_round;
1305
1306 let groups = group_messages_by_api_round(messages);
1307 if groups.is_empty() {
1308 return messages.to_vec();
1309 }
1310
1311 let groups_to_drop = (groups.len() as f64 * 0.2).ceil() as usize;
1313 let groups_to_drop = groups_to_drop.min(groups.len() - 1); log::debug!(
1316 "[compact] Dropping {} of {} groups for PTL retry",
1317 groups_to_drop,
1318 groups.len()
1319 );
1320
1321 groups.into_iter().skip(groups_to_drop).flatten().collect()
1323 }
1324
1325 fn build_summary_messages(&self, compact_prompt: &str) -> Vec<Message> {
1327 let mut summary_messages = vec![Message {
1328 role: MessageRole::User,
1329 content: compact_prompt.to_string(),
1330 ..Default::default()
1331 }];
1332
1333 for msg in &self.messages {
1335 if let MessageRole::System = msg.role {
1336 if msg.content.contains("compacted") || msg.content.contains("summarized") {
1338 continue;
1339 }
1340 }
1341 summary_messages.push(msg.clone());
1342 }
1343
1344 summary_messages
1345 }
1346
1347 async fn generate_summary_from_messages(
1349 &self,
1350 summary_messages: &[Message],
1351 ) -> Result<(String, TokenUsage), AgentError> {
1352 let api_key = self
1354 .config
1355 .api_key
1356 .as_ref()
1357 .ok_or_else(|| AgentError::Api("API key not provided".to_string()))?;
1358
1359 let base_url = self
1360 .config
1361 .base_url
1362 .as_ref()
1363 .map(|s| s.as_str())
1364 .unwrap_or("https://api.anthropic.com");
1365
1366 let model = &self.config.model;
1367
1368 let api_summary_messages: Vec<serde_json::Value> = summary_messages
1370 .iter()
1371 .map(|msg| {
1372 let role_str = match msg.role {
1373 MessageRole::User => "user",
1374 MessageRole::Assistant => "assistant",
1375 MessageRole::Tool => "user",
1376 MessageRole::System => "system",
1377 };
1378 let mut msg_json = serde_json::json!({
1379 "role": role_str,
1380 "content": msg.content
1381 });
1382 if let Some(tool_call_id) = &msg.tool_call_id {
1383 msg_json["tool_call_id"] = serde_json::json!(tool_call_id);
1384 }
1385 msg_json
1386 })
1387 .collect();
1388
1389 let compact_max_tokens = crate::utils::context::COMPACT_MAX_OUTPUT_TOKENS
1391 .min(crate::utils::context::get_max_output_tokens_for_model(model)) as u32;
1392 let request_body = serde_json::json!({
1393 "model": model,
1394 "max_tokens": compact_max_tokens,
1395 "messages": api_summary_messages,
1396 });
1397
1398 let client = reqwest::Client::new();
1399 let url = format!("{}/v1/chat/completions", base_url);
1400 let response = client
1401 .post(&url)
1402 .header("Authorization", format!("Bearer {}", api_key))
1403 .header("Content-Type", "application/json")
1404 .header("User-Agent", get_user_agent())
1405 .json(&request_body)
1406 .send()
1407 .await
1408 .map_err(|e| AgentError::Api(format!("Failed to send summary request: {}", e)))?;
1409
1410 let response_text = response
1411 .text()
1412 .await
1413 .map_err(|e| AgentError::Api(format!("Failed to read summary response: {}", e)))?;
1414
1415 let response_json: serde_json::Value =
1416 serde_json::from_str(&response_text).map_err(|e| {
1417 AgentError::Api(format!(
1418 "Failed to parse summary response: {} - {}",
1419 e, response_text
1420 ))
1421 })?;
1422
1423 if let Some(error) = response_json.get("error") {
1424 return Err(AgentError::Api(format!("Summary API error: {}", error)));
1425 }
1426
1427 let usage = response_json.get("usage").map(|u| TokenUsage {
1429 input_tokens: u.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
1430 output_tokens: u.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
1431 cache_creation_input_tokens: u.get("cache_creation_input_tokens").and_then(|v| v.as_u64()),
1432 cache_read_input_tokens: u.get("cache_read_input_tokens").and_then(|v| v.as_u64()),
1433 iterations: u.get("iterations").and_then(|v| v.as_array()).map(|arr| {
1434 arr.iter().filter_map(|it| {
1435 Some(IterationUsage {
1436 input_tokens: it.get("input_tokens").and_then(|v| v.as_u64())?,
1437 output_tokens: it.get("output_tokens").and_then(|v| v.as_u64())?,
1438 })
1439 }).collect()
1440 }),
1441 }).unwrap_or_default();
1442
1443 let summary = extract_text_from_response(&response_json);
1444
1445 if summary.is_empty() {
1446 return Err(AgentError::Api("Summary response was empty".to_string()));
1447 }
1448
1449 let parsed_summary = parse_compact_summary(&summary);
1451
1452 Ok((parsed_summary, usage))
1453 }
1454
1455 async fn execute_pre_compact_hooks(&self) -> Option<String> {
1457 let registry = {
1458 let guard = self.hook_registry.lock().unwrap();
1459 match guard.as_ref() {
1460 Some(r) => r.clone(),
1461 None => return None,
1462 }
1463 };
1464
1465 if !registry.has_hooks("PreCompact") {
1466 return None;
1467 }
1468
1469 if let Some(ref cb) = self.config.on_event {
1471 cb(AgentEvent::Compact {
1472 event: CompactProgressEvent::HooksStart {
1473 hook_type: CompactHookType::PreCompact,
1474 },
1475 });
1476 }
1477
1478 let trigger = if self.auto_compact_tracking.compacted {
1479 "auto"
1480 } else {
1481 "manual"
1482 };
1483
1484 let input = HookInput {
1485 event: "PreCompact".to_string(),
1486 tool_name: None,
1487 tool_input: Some(serde_json::json!({
1488 "trigger": trigger,
1489 "custom_instructions": null
1490 })),
1491 tool_output: None,
1492 tool_use_id: None,
1493 session_id: None,
1494 cwd: Some(self.config.cwd.clone()),
1495 error: None,
1496 ..HookInput::default()
1497 };
1498
1499 let results = registry.execute("PreCompact", input).await;
1500
1501 let successful_outputs: Vec<String> = results
1503 .iter()
1504 .filter_map(|r| r.message.as_ref())
1505 .map(|s| s.trim().to_string())
1506 .filter(|s| !s.is_empty())
1507 .collect();
1508
1509 if successful_outputs.is_empty() {
1510 None
1511 } else {
1512 Some(successful_outputs.join("\n\n"))
1513 }
1514 }
1515
1516 async fn execute_post_compact_hooks(&self, compact_summary: &str) {
1518 let registry = {
1519 let guard = self.hook_registry.lock().unwrap();
1520 match guard.as_ref() {
1521 Some(r) => r.clone(),
1522 None => return,
1523 }
1524 };
1525
1526 if !registry.has_hooks("PostCompact") {
1527 return;
1528 }
1529
1530 if let Some(ref cb) = self.config.on_event {
1532 cb(AgentEvent::Compact {
1533 event: CompactProgressEvent::HooksStart {
1534 hook_type: CompactHookType::PostCompact,
1535 },
1536 });
1537 }
1538
1539 let trigger = if self.auto_compact_tracking.compacted {
1540 "auto"
1541 } else {
1542 "manual"
1543 };
1544
1545 let input = HookInput {
1546 event: "PostCompact".to_string(),
1547 tool_name: None,
1548 tool_input: Some(serde_json::json!({
1549 "trigger": trigger,
1550 "compact_summary": compact_summary
1551 })),
1552 tool_output: None,
1553 tool_use_id: None,
1554 session_id: None,
1555 cwd: Some(self.config.cwd.clone()),
1556 error: None,
1557 ..HookInput::default()
1558 };
1559
1560 let _results = registry.execute("PostCompact", input).await;
1561 }
1562
1563 fn apply_compaction_result(
1565 &mut self,
1566 messages_to_keep: Vec<Message>,
1567 _post_compact_tokens: u32,
1568 ) {
1569 let boundary_msg = Message {
1570 role: MessageRole::System,
1571 content: "[Previous conversation summarized]".to_string(),
1572 is_meta: Some(true),
1573 ..Default::default()
1574 };
1575
1576 let mut new_messages = vec![boundary_msg];
1577 new_messages.extend(messages_to_keep);
1578 self.messages = new_messages;
1579 }
1580
1581 pub async fn submit_message(
1582 &mut self,
1583 prompt: &str,
1584 ) -> Result<(String, crate::types::ExitReason), AgentError> {
1585 self.start_time = Some(std::time::Instant::now());
1586 if let Some(ref state) = self.config.session_state {
1588 state.start_running();
1589 }
1590 self.messages.push(crate::types::Message {
1592 role: crate::types::MessageRole::User,
1593 content: prompt.to_string(),
1594 ..Default::default()
1595 });
1596
1597 if let Some(memory_context) = build_memory_prefetch_context(prompt, &self.config, &self.loaded_nested_memory_paths).await {
1599 self.messages.push(crate::types::Message {
1600 role: crate::types::MessageRole::User,
1601 content: memory_context,
1602 ..Default::default()
1603 });
1604 }
1605
1606 if !self.has_handled_orphaned_permission {
1610 if let Some(ref orphaned) = self.config.orphaned_permission {
1611 self.has_handled_orphaned_permission = true;
1612
1613 let already_present = self.messages.iter().any(|m| {
1616 m.role == crate::types::MessageRole::Assistant
1617 && m.tool_calls.as_ref().is_some_and(|tc| {
1618 tc.iter().any(|tc| tc.id == orphaned.tool_use_id)
1619 })
1620 });
1621 if !already_present {
1622 self.messages.push(orphaned.assistant_message.clone());
1623 }
1624
1625 let result_content = match &orphaned.permission_result {
1627 crate::permission::PermissionResult::Allow(_) => {
1628 format!("Tool call {} is allowed", orphaned.tool_use_id)
1629 }
1630 crate::permission::PermissionResult::Deny(deny) => {
1631 format!("Tool call {} is denied: {}", orphaned.tool_use_id, deny.message)
1632 }
1633 crate::permission::PermissionResult::Ask(ask) => {
1634 format!(
1635 "Tool call {} requires confirmation: {}",
1636 orphaned.tool_use_id, ask.message
1637 )
1638 }
1639 crate::permission::PermissionResult::Passthrough { message, .. } => {
1640 format!("Tool call {} passed through: {}", orphaned.tool_use_id, message)
1641 }
1642 };
1643
1644 self.messages.push(crate::types::Message {
1645 role: crate::types::MessageRole::Tool,
1646 content: result_content,
1647 tool_call_id: Some(orphaned.tool_use_id.clone()),
1648 ..Default::default()
1649 });
1650
1651 log::debug!(
1652 "Handled orphaned permission for tool_use_id={}",
1653 orphaned.tool_use_id
1654 );
1655 }
1656 }
1657
1658 if let Some(ref cb) = self.config.on_event {
1663 cb(AgentEvent::Thinking { turn: 1 });
1664 }
1665 self.turn_count = 1;
1666
1667 let mut max_tool_turns = self.config.max_turns;
1673 while max_tool_turns > 0 {
1674 max_tool_turns -= 1;
1675
1676 self.auto_compact_tracking.compacted = false;
1679
1680 let snip_result = crate::services::compact::snip_compact_if_known(&self.messages);
1684 let snip_tokens_freed = snip_result.tokens_freed;
1688
1689 crate::services::compact::microcompact::microcompact_messages(&mut self.messages);
1691
1692 if crate::services::context_collapse::is_context_collapse_enabled() {
1697 let collapse_result = crate::services::context_collapse::apply_collapses_if_needed(
1698 self.messages.clone(),
1699 );
1700 if collapse_result.changed {
1701 self.messages = collapse_result.messages;
1702 }
1703 }
1704
1705 if self.auto_compact_tracking.consecutive_failures < 3 {
1714 let token_estimate = compact::estimate_token_count(&self.messages, self.config.max_tokens);
1715 let threshold = get_auto_compact_threshold(&self.config.model);
1716
1717 if token_estimate > threshold {
1718 if let Some(ref cb) = self.config.on_event {
1719 cb(AgentEvent::Compact {
1720 event: CompactProgressEvent::CompactStart,
1721 });
1722 }
1723 let pre_compact_tokens = token_estimate;
1725 match self.do_auto_compact(snip_tokens_freed).await {
1726 Ok(true) => {
1727 self.auto_compact_tracking.compacted = true;
1729 self.auto_compact_tracking.turn_id = uuid::Uuid::new_v4().to_string();
1730 self.auto_compact_tracking.turn_counter = 0;
1731 self.auto_compact_tracking.consecutive_failures = 0;
1732
1733 if self.config.task_budget.is_some() {
1735 let pre_ctx = pre_compact_tokens as u64;
1736 let current = self.task_budget_remaining
1737 .or(self.config.task_budget.as_ref().map(|tb| tb.total));
1738 self.task_budget_remaining = Some(current.unwrap_or(0).saturating_sub(pre_ctx));
1739 }
1740
1741 let post_compact_tokens = compact::estimate_token_count(
1746 &self.messages,
1747 self.config.max_tokens,
1748 );
1749 let pct_reduced = if pre_compact_tokens > 0 {
1750 ((pre_compact_tokens as i64 - post_compact_tokens as i64) as f64
1751 / pre_compact_tokens as f64)
1752 * 100.0
1753 } else {
1754 0.0
1755 };
1756 let compact_summary = format!(
1757 "Conversation compacted: {} → {} tokens ({:.0}% reduced)",
1758 format_tokens(pre_compact_tokens as u64),
1759 format_tokens(post_compact_tokens as u64),
1760 pct_reduced
1761 );
1762 if let Some(ref cb) = self.config.on_event {
1763 cb(AgentEvent::Compact {
1764 event: CompactProgressEvent::CompactEnd {
1765 message: Some(compact_summary),
1766 },
1767 });
1768 }
1769 }
1770 Ok(false) => {
1771 }
1773 Err(e) => {
1774 self.auto_compact_tracking.consecutive_failures += 1;
1777 eprintln!("Auto-compact failed: {}", e);
1778 }
1779 }
1780 if let Some(ref cb) = self.config.on_event {
1781 cb(AgentEvent::Compact {
1782 event: CompactProgressEvent::CompactEnd { message: None },
1783 });
1784 }
1785 }
1786 }
1787
1788 let api_messages = self.build_api_messages()?;
1790
1791 let api_key: String = self
1793 .config
1794 .api_key
1795 .clone()
1796 .ok_or_else(|| AgentError::Api("API key not provided".to_string()))?;
1797
1798 let base_url = self
1799 .config
1800 .base_url
1801 .as_ref()
1802 .map(|s| s.as_str())
1803 .unwrap_or("https://api.anthropic.com");
1804
1805 let current_model = if let Some(ref fallback) = self.config.fallback_model {
1807 fallback.clone()
1808 } else {
1809 self.config.model.clone()
1810 };
1811 let model = ¤t_model;
1812
1813 let effective_max_tokens = self
1818 .max_output_tokens_override
1819 .unwrap_or_else(|| {
1820 crate::utils::context::get_max_output_tokens_for_model(model) as u32
1821 });
1822 let mut request_body = serde_json::json!({
1823 "model": model,
1824 "max_tokens": effective_max_tokens,
1825 "messages": api_messages,
1826 "stream": true
1827 });
1828
1829 if self.config.task_budget.is_some() {
1831 let tb = self.config.task_budget.as_ref().unwrap();
1832 let mut task_budget_obj = serde_json::json!({
1833 "type": "tokens",
1834 "total": tb.total,
1835 });
1836 if let Some(remaining) = self.task_budget_remaining {
1837 task_budget_obj["remaining"] = serde_json::json!(remaining);
1838 }
1839 request_body["output_config"] = serde_json::json!({
1840 "task_budget": task_budget_obj,
1841 });
1842 }
1843
1844 let system_prompt_to_use = if !self.config.system_context.is_empty() {
1847 let context_parts: Vec<String> = self
1848 .config
1849 .system_context
1850 .iter()
1851 .map(|(key, value)| format!("{}: {}", key, value))
1852 .collect();
1853 let context_str = context_parts.join("\n");
1854
1855 if let Some(ref system_prompt) = self.config.system_prompt {
1856 Some(format!("{}\n\n{}", system_prompt, context_str))
1857 } else {
1858 Some(context_str)
1859 }
1860 } else {
1861 self.config.system_prompt.clone()
1862 };
1863
1864 if let Some(ref sp) = system_prompt_to_use {
1865 request_body["system"] = serde_json::json!(sp);
1866 }
1867
1868 if base_url.contains("anthropic.com") {
1871 if let Some(ref thinking_config) = self.config.thinking {
1872 match thinking_config {
1873 crate::types::api_types::ThinkingConfig::Adaptive => {
1874 request_body["thinking"] = serde_json::json!({
1875 "type": "adaptive"
1876 });
1877 }
1878 crate::types::api_types::ThinkingConfig::Enabled { budget_tokens } => {
1879 let clamped_budget = std::cmp::min(
1881 effective_max_tokens.saturating_sub(1) as u32,
1882 *budget_tokens,
1883 );
1884 request_body["thinking"] = serde_json::json!({
1885 "type": "enabled",
1886 "budget_tokens": clamped_budget
1887 });
1888 }
1889 crate::types::api_types::ThinkingConfig::Disabled => {
1890 }
1892 }
1893 } else {
1894 request_body["thinking"] = serde_json::json!({
1896 "type": "adaptive"
1897 });
1898 }
1899 }
1900
1901 if !self.config.tools.is_empty() {
1904 let use_anthropic_format = base_url.contains("anthropic.com");
1905
1906 let (upfront_tools, deferred_tools) = self.separate_tools_for_request();
1908
1909 let tools_to_send = if upfront_tools.is_empty() {
1912 &upfront_tools
1914 } else {
1915 &upfront_tools
1916 };
1917
1918 let tools: Vec<serde_json::Value> = tools_to_send
1919 .iter()
1920 .map(|t| {
1921 if use_anthropic_format {
1922 serde_json::json!({
1923 "type": "function",
1924 "name": t.name,
1925 "description": t.description,
1926 "input_schema": t.input_schema
1927 })
1928 } else {
1929 serde_json::json!({
1930 "type": "function",
1931 "function": {
1932 "name": t.name,
1933 "description": t.description,
1934 "parameters": t.input_schema
1935 }
1936 })
1937 }
1938 })
1939 .collect();
1940 request_body["tools"] = serde_json::json!(tools);
1941
1942 if !deferred_tools.is_empty()
1944 && crate::tools::deferred_tools::is_tool_search_enabled_optimistic()
1945 {
1946 let _deferred_names: Vec<&str> =
1949 deferred_tools.iter().map(|t| t.name.as_str()).collect();
1950 }
1951 }
1952
1953 let url = if base_url.contains("anthropic.com") {
1956 format!("{}/v1/messages", base_url)
1957 } else {
1958 format!("{}/v1/chat/completions", base_url)
1959 };
1960
1961 let mut attempt_with_fallback = false;
1964 let mut streaming_result: StreamingResult;
1965
1966 loop {
1968 let model_in_loop = if attempt_with_fallback {
1970 self.config
1971 .fallback_model
1972 .as_ref()
1973 .unwrap_or(&self.config.model)
1974 .clone()
1975 } else {
1976 self.config.model.clone()
1977 };
1978
1979 request_body["model"] = serde_json::json!(model_in_loop);
1981
1982 if is_nonstreaming_fallback_disabled() {
1984 return Err(AgentError::Api(
1985 "Non-streaming fallback disabled".to_string(),
1986 ));
1987 }
1988
1989 let retry_result = make_api_request_with_429_retry(
1992 &self.http_client,
1993 &url,
1994 &api_key,
1995 request_body.clone(),
1996 self.config.on_event.clone(),
1997 self.config.fallback_model.clone(),
1998 &model_in_loop,
1999 match self.config.thinking {
2000 Some(crate::types::api_types::ThinkingConfig::Enabled { budget_tokens }) => Some(budget_tokens),
2001 _ => None,
2002 },
2003 )
2004 .await;
2005
2006 match retry_result {
2007 RetryResult::Success(result) => {
2008 streaming_result = result;
2009 break;
2010 }
2011 RetryResult::FallbackTriggered(fb_error) => {
2012 if attempt_with_fallback {
2014 self.add_orphaned_tool_results(&fb_error.to_string());
2017
2018 {
2020 let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2021 if let Some(registry) = registry_clone {
2022 let _ = crate::hooks::run_stop_failure_hooks(
2023 ®istry,
2024 &fb_error.to_string(),
2025 &self.config.cwd,
2026 ).await;
2027 }
2028 }
2029 return Err(AgentError::Api(fb_error.to_string()));
2030 }
2031
2032 attempt_with_fallback = true;
2033
2034 self.add_orphaned_tool_results("Model fallback triggered");
2036
2037 if let Some(last) = self.messages.last() {
2040 if last.role == crate::types::MessageRole::Assistant {
2041 self.messages.pop();
2042 }
2043 }
2044
2045 self.config.model = fb_error.fallback_model.clone();
2047
2048 eprintln!(
2050 "Switched to {} due to high demand for {}",
2051 fb_error.fallback_model, fb_error.original_model
2052 );
2053
2054 continue; }
2056 RetryResult::RecreateClient(recreate_err) => {
2057 self.http_client = reqwest::Client::new();
2059 emit_api_retry_event(
2060 self.config.on_event.as_ref().map(|a| a.as_ref()),
2061 1,
2062 MAX_429_RETRIES,
2063 500,
2064 None,
2065 &format!("Recreating client after: {}", recreate_err),
2066 );
2067 sleep_tokio(std::time::Duration::from_millis(500)).await;
2068 continue; }
2070 RetryResult::Terminal(e) => {
2071 if is_user_abort_error(&e) {
2073 return Err(AgentError::UserAborted);
2074 }
2075
2076 if is_404_stream_creation_error(&e) {
2078 eprintln!(
2079 "Streaming endpoint returned 404, falling back to non-streaming mode"
2080 );
2081 }
2082
2083 let error_str = e.to_string().to_lowercase();
2085 let is_prompt_too_long = error_str.contains("413")
2086 || error_str.contains("prompt_too_long")
2087 || error_str.contains("prompt too long")
2088 || error_str.contains("media too large");
2089
2090 if is_prompt_too_long
2094 && crate::services::context_collapse::is_context_collapse_enabled()
2095 && self.transition.as_deref() != Some("collapse_drain_retry")
2096 {
2097 let original_len = self.messages.len();
2098 let drained = crate::services::context_collapse::recover_from_overflow(
2099 self.messages.clone(),
2100 );
2101 if drained.len() < original_len {
2103 self.messages = drained;
2104 self.transition = Some("collapse_drain_retry".to_string());
2105 continue; }
2107 }
2108
2109 if is_prompt_too_long {
2110 eprintln!("Prompt too large (413), attempting reactive compact...");
2111 let _pre_compact_instructions = self.execute_pre_compact_hooks().await;
2112 match crate::services::compact::reactive_compact::run_reactive_compact(
2113 &self.messages,
2114 &self.config.model,
2115 ) {
2116 Ok(reactive_result) if reactive_result.compacted => {
2117 log::info!(
2118 "[reactive-compact] reduced {} messages after 413 error",
2119 reactive_result.messages.len()
2120 );
2121 if self.config.task_budget.is_some() {
2123 let pre_ctx = crate::compact::estimate_token_count(&self.messages, 0) as u64;
2124 let current = self.task_budget_remaining
2125 .or(self.config.task_budget.as_ref().map(|tb| tb.total));
2126 self.task_budget_remaining = Some(current.unwrap_or(0).saturating_sub(pre_ctx));
2127 }
2128 self.messages = reactive_result.messages;
2129 self.execute_post_compact_hooks("Reactive compact applied after 413 error").await;
2130 self.transition = Some("reactive_compact_retry".to_string());
2131 continue; }
2133 _ => {
2134 log::warn!(
2135 "[reactive-compact] no improvement possible, falling through"
2136 );
2137 }
2138 }
2139 self.add_orphaned_tool_results(&e.to_string());
2142
2143 {
2145 let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2146 if let Some(registry) = registry_clone {
2147 let _ = crate::hooks::run_stop_failure_hooks(®istry, &e.to_string(), &self.config.cwd).await;
2148 }
2149 }
2150 return Err(e);
2151 }
2152
2153 self.add_orphaned_tool_results(&e.to_string());
2155
2156 {
2158 let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2159 if let Some(registry) = registry_clone {
2160 let _ = crate::hooks::run_stop_failure_hooks(®istry, &e.to_string(), &self.config.cwd).await;
2161 }
2162 }
2163 return Err(e);
2164 }
2165 }
2166 }
2167
2168 if let Some(ref cb) = self.config.on_event {
2170 cb(AgentEvent::StreamRequestEnd);
2171 }
2172
2173 if let Some(refusal_msg) = get_error_message_if_refusal(
2175 streaming_result.stop_reason.as_deref(),
2176 &self.config.model,
2177 false, ) {
2179 self.messages.push(crate::types::Message {
2181 role: crate::types::MessageRole::Assistant,
2182 content: refusal_msg.content.clone().unwrap_or_default(),
2183 is_api_error_message: Some(true),
2184 error_details: refusal_msg.error_details.clone(),
2185 ..Default::default()
2186 });
2187 {
2189 let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2190 if let Some(registry) = registry_clone {
2191 let _ = crate::hooks::run_stop_failure_hooks(
2192 ®istry,
2193 &refusal_msg.content.as_ref().map(|s| s.as_str()).unwrap_or("refusal"),
2194 &self.config.cwd,
2195 ).await;
2196 }
2197 }
2198 return Err(AgentError::Api(
2199 refusal_msg.content.unwrap_or_else(|| "Refusal".to_string()),
2200 ));
2201 }
2202
2203 if !streaming_result.content.is_empty() || !streaming_result.tool_calls.is_empty() {
2206 let hook_messages = self.messages.clone();
2207 let hook_system_prompt = self
2208 .config
2209 .system_prompt
2210 .as_deref()
2211 .unwrap_or("")
2212 .lines()
2213 .map(|s| s.to_string())
2214 .collect::<Vec<_>>();
2215 let hook_user_context = self.config.user_context.clone();
2216 let hook_system_context = self.config.system_context.clone();
2217 let hook_tool_use_context = Arc::new(
2218 crate::utils::hooks::can_use_tool::ToolUseContext {
2219 session_id: self
2220 .config
2221 .session_state
2222 .as_ref()
2223 .map(|_| "query_engine".to_string())
2224 .unwrap_or_else(|| "query_engine".to_string()),
2225 cwd: Some(self.config.cwd.clone()),
2226 is_non_interactive_session: false,
2227 options: None,
2228 },
2229 );
2230 let hook_query_source = self.config.agent_id.as_ref().map(|_| "agent".to_string());
2231 let has_hook_count = {
2232 crate::utils::hooks::get_post_sampling_hook_count() > 0
2233 };
2234 if has_hook_count {
2235 let messages_clone = hook_messages;
2236 let system_prompt_clone = hook_system_prompt;
2237 let user_context_clone = hook_user_context;
2238 let system_context_clone = hook_system_context;
2239 let tool_use_context_clone = hook_tool_use_context;
2240 let query_source_clone = hook_query_source;
2241 tokio::spawn(async move {
2242 crate::utils::hooks::execute_post_sampling_hooks(
2243 messages_clone,
2244 system_prompt_clone,
2245 user_context_clone,
2246 system_context_clone,
2247 tool_use_context_clone,
2248 query_source_clone,
2249 )
2250 .await;
2251 });
2252 }
2253 }
2254
2255 if streaming_result.tool_calls.is_empty() {
2257 if streaming_result.api_error.as_deref() == Some("max_output_tokens") {
2260 const MAX_OUTPUT_TOKENS_RECOVERY_LIMIT: u32 = 3;
2261 const ESCALATED_MAX_TOKENS: u64 = 64_000;
2262
2263 if self.max_output_tokens_override.is_none()
2267 && std::env::var(crate::constants::env::ai_code::MAX_OUTPUT_TOKENS).is_err()
2268 {
2269 self.max_output_tokens_override = Some(ESCALATED_MAX_TOKENS as u32);
2270 if let Some(ref cb) = self.config.on_event {
2271 cb(AgentEvent::Thinking {
2272 turn: self.turn_count + 1,
2273 });
2274 }
2275 continue;
2276 }
2277
2278 if self.max_output_tokens_recovery_count < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT {
2281 let recovery_message = crate::types::Message {
2282 role: crate::types::MessageRole::User,
2283 content: "Output token limit hit. Resume directly — no apology, no recap of what you were doing. Pick up mid-thought if that is where the cut happened. Break remaining work into smaller pieces.".to_string(),
2284 is_meta: Some(true),
2285 ..Default::default()
2286 };
2287 self.messages.push(recovery_message);
2288 self.max_output_tokens_override = None;
2290 self.max_output_tokens_recovery_count += 1;
2291
2292 if let Some(ref cb) = self.config.on_event {
2293 cb(AgentEvent::Thinking {
2294 turn: self.turn_count + 1,
2295 });
2296 }
2297 continue;
2298 }
2299
2300 if let Some(ref cb) = self.config.on_event {
2302 cb(AgentEvent::Done {
2303 result: crate::types::QueryResult {
2304 text: "Output token limit reached and recovery exhausted"
2305 .to_string(),
2306 usage: self.total_usage.clone(),
2307 num_turns: self.turn_count,
2308 duration_ms: self.query_duration_ms(),
2309 exit_reason: crate::types::ExitReason::MaxTokens,
2310 },
2311 });
2312 }
2313 return Ok((
2314 "Output token limit reached and recovery exhausted".to_string(),
2315 crate::types::ExitReason::MaxTokens,
2316 ));
2317 }
2318
2319 if self.config.max_turns == 0 || self.turn_count < self.config.max_turns {
2321 if let Some(nudge) = crate::utils::inspector::check() {
2322 log::debug!(
2323 "[query_engine] unfinished tasks found, nudging LLM to continue (turn {})",
2324 self.turn_count
2325 );
2326 self.messages.push(crate::types::Message {
2327 role: crate::types::MessageRole::System,
2328 content: nudge,
2329 ..Default::default()
2330 });
2331 if let Some(ref cb) = self.config.on_event {
2332 cb(AgentEvent::Thinking {
2333 turn: self.turn_count + 1,
2334 });
2335 }
2336 self.turn_count += 1;
2337 continue;
2338 }
2339 }
2340
2341 let response_text = streaming_result.content.clone();
2343
2344 if response_text.is_empty()
2352 && streaming_result.tool_calls.is_empty()
2353 && self.config.max_turns > 0
2354 && self.turn_count < self.config.max_turns
2355 {
2356 self.empty_response_retries += 1;
2357 if self.empty_response_retries <= 2 {
2358 log::warn!(
2359 "[query_engine] empty model response, retrying ({}) stop_reason={:?}",
2360 self.empty_response_retries,
2361 streaming_result.stop_reason,
2362 );
2363 tokio::time::sleep(std::time::Duration::from_millis(
2365 500 * self.empty_response_retries as u64,
2366 ))
2367 .await;
2368 continue;
2370 }
2371 self.empty_response_retries = 0;
2372 } else {
2373 self.empty_response_retries = 0;
2374 }
2375
2376 if response_text.is_empty() && streaming_result.tool_calls.is_empty() {
2378 log::error!(
2379 "[query_engine] model returned empty response after retries: stop_reason={:?}",
2380 streaming_result.stop_reason,
2381 );
2382 return Err(AgentError::Api(
2383 "Model response contained no text and no tool calls".to_string(),
2384 ));
2385 }
2386
2387 let final_text = response_text.clone();
2388
2389 self.total_usage.input_tokens += streaming_result.usage.input_tokens;
2391 self.total_usage.output_tokens += streaming_result.usage.output_tokens;
2392 self.total_usage.cache_creation_input_tokens = Some(
2393 self.total_usage.cache_creation_input_tokens.unwrap_or(0)
2394 + streaming_result.usage.cache_creation_input_tokens.unwrap_or(0),
2395 );
2396 self.total_usage.cache_read_input_tokens = Some(
2397 self.total_usage.cache_read_input_tokens.unwrap_or(0)
2398 + streaming_result.usage.cache_read_input_tokens.unwrap_or(0),
2399 );
2400 self.turn_tokens += streaming_result.usage.output_tokens as u64;
2401
2402 self.total_cost += streaming_result.cost;
2404
2405 if let Some(max_budget) = self.config.max_budget_usd {
2407 if self.total_cost >= max_budget {
2408 let final_text = self.messages.iter()
2409 .rev()
2410 .find(|m| matches!(m.role, crate::types::MessageRole::Assistant))
2411 .map(|m| m.content.clone())
2412 .unwrap_or_default();
2413 if let Some(ref cb) = self.config.on_event {
2414 cb(AgentEvent::Done {
2415 result: crate::types::QueryResult {
2416 text: final_text.clone(),
2417 usage: self.total_usage.clone(),
2418 num_turns: self.turn_count,
2419 duration_ms: self.query_duration_ms(),
2420 exit_reason: crate::types::ExitReason::MaxBudgetExceeded {
2421 max_budget_usd: max_budget,
2422 },
2423 },
2424 });
2425 }
2426 return Ok((
2427 final_text,
2428 crate::types::ExitReason::MaxBudgetExceeded {
2429 max_budget_usd: max_budget,
2430 },
2431 ));
2432
2433 }
2434 }
2435
2436 let model = self.config.model.clone();
2438 let _ = crate::services::model_cost::add_to_total_session_cost(
2439 streaming_result.cost,
2440 streaming_result.usage.input_tokens as u32,
2441 streaming_result.usage.output_tokens as u32,
2442 streaming_result.usage.cache_read_input_tokens.unwrap_or(0) as u32,
2443 streaming_result.usage.cache_creation_input_tokens.unwrap_or(0) as u32,
2444 0,
2445 &model,
2446 );
2447
2448 self.messages.push(crate::types::Message {
2450 role: crate::types::MessageRole::Assistant,
2451 content: response_text.clone(),
2452 ..Default::default()
2453 });
2454
2455 self.max_output_tokens_recovery_count = 0;
2457 self.max_output_tokens_override = None;
2458
2459 let next_turn_count = self.turn_count + 1;
2461 if self.config.max_turns > 0 && next_turn_count > self.config.max_turns {
2462 if let Some(ref cb) = self.config.on_event {
2465 cb(AgentEvent::MaxTurnsReached {
2466 max_turns: self.config.max_turns,
2467 turn_count: next_turn_count,
2468 });
2469 cb(AgentEvent::Done {
2470 result: crate::types::QueryResult {
2471 text: final_text.clone(),
2472 usage: self.total_usage.clone(),
2473 num_turns: self.turn_count,
2474 duration_ms: self.query_duration_ms(),
2475 exit_reason: crate::types::ExitReason::MaxTurns {
2476 max_turns: self.config.max_turns,
2477 turn_count: next_turn_count,
2478 },
2479 },
2480 });
2481 }
2482 return Ok((
2484 final_text,
2485 crate::types::ExitReason::MaxTurns {
2486 max_turns: self.config.max_turns,
2487 turn_count: next_turn_count,
2488 },
2489 ));
2490 }
2491
2492 self.turn_count = next_turn_count;
2494
2495 let last_is_api_error = self.messages.iter().rev().find_map(|m| {
2500 if m.role == crate::types::MessageRole::Assistant {
2501 Some(m.is_api_error_message == Some(true))
2502 } else {
2503 None
2504 }
2505 }).unwrap_or(false);
2506
2507 if !self.stop_hook_active && !last_is_api_error {
2508 self.stop_hook_active = true;
2509 let stop_result = {
2510 let registry_clone = self.hook_registry.lock().unwrap().as_ref().cloned();
2511 if let Some(registry) = registry_clone {
2512 crate::hooks::run_stop_hooks(®istry, &self.config.cwd, &final_text).await
2513 } else {
2514 crate::hooks::StopHookResult::default()
2515 }
2516 };
2517
2518 if self.config.agent_id.is_none() {
2521 let messages: Vec<crate::types::message::Message> = self.messages
2522 .iter()
2523 .filter_map(|m| match serde_json::to_value(m) {
2524 Ok(v) => serde_json::from_value(v).ok(),
2525 Err(_) => None,
2526 })
2527 .collect();
2528 let extract_ctx = crate::services::extract_memories::ExtractMemoryContext {
2529 messages,
2530 system_prompt: self.config
2531 .system_prompt
2532 .as_deref()
2533 .unwrap_or("")
2534 .to_string(),
2535 user_context: self.config.user_context.clone(),
2536 system_context: self.config.system_context.clone(),
2537 tool_use_context: None,
2538 agent_id: self.config.agent_id.clone(),
2539 };
2540 let ctx_clone = extract_ctx.clone();
2541 tokio::spawn(async move {
2542 crate::services::extract_memories::execute_extract_memories(
2543 ctx_clone,
2544 None,
2545 )
2546 .await;
2547 });
2548 }
2549
2550 if !stop_result.blocking_errors.is_empty() {
2551 for err_msg in stop_result.blocking_errors {
2553 self.messages.push(crate::types::Message {
2554 role: crate::types::MessageRole::System,
2555 content: err_msg,
2556 ..Default::default()
2557 });
2558 }
2559 if let Some(ref cb) = self.config.on_event {
2560 cb(AgentEvent::Thinking {
2561 turn: self.turn_count + 1,
2562 });
2563 }
2564 continue;
2565 }
2566 if stop_result.prevent_continuation {
2567 if let Some(ref cb) = self.config.on_event {
2568 cb(AgentEvent::Done {
2569 result: crate::types::QueryResult {
2570 text: final_text.clone(),
2571 usage: self.total_usage.clone(),
2572 num_turns: self.turn_count,
2573 duration_ms: self.query_duration_ms(),
2574 exit_reason: crate::types::ExitReason::Completed,
2575 },
2576 });
2577 }
2578 return Ok((final_text, crate::types::ExitReason::Completed));
2579 }
2580 }
2581
2582 if let Some(ref cb) = self.config.on_event {
2584 cb(AgentEvent::Thinking {
2585 turn: self.turn_count + 1,
2586 });
2587 }
2588
2589 crate::bootstrap::state::snapshot_output_tokens_for_turn(self.config.token_budget);
2594 let token_budget = self.config.token_budget;
2595 let agent_id = self.config.agent_id.clone();
2596 match crate::token_budget::check_token_budget(
2597 &mut self.budget_tracker,
2598 agent_id.as_deref(),
2599 token_budget,
2600 self.turn_tokens,
2601 ) {
2602 crate::token_budget::TokenBudgetDecision::Continue { nudge_message } => {
2603 self.messages.push(crate::types::Message {
2605 role: crate::types::MessageRole::User,
2606 content: nudge_message,
2607 ..Default::default()
2608 });
2609 self.transition = Some("token_budget_continuation".to_string());
2610 continue;
2611 }
2612 crate::token_budget::TokenBudgetDecision::Stop { .. } => {
2613 }
2615 }
2616
2617 let last_stop_reason = streaming_result.stop_reason.as_deref();
2619 if !self.is_result_successful(last_stop_reason) {
2620 let error_detail = format!(
2621 "Invalid result state: last_message_type={:?}, stop_reason={:?}",
2622 self.messages.last().map(|m| &m.role),
2623 last_stop_reason
2624 );
2625 if let Some(ref cb) = self.config.on_event {
2626 cb(AgentEvent::Done {
2627 result: crate::types::QueryResult {
2628 text: final_text.clone(),
2629 usage: self.total_usage.clone(),
2630 num_turns: self.turn_count,
2631 duration_ms: self.query_duration_ms(),
2632 exit_reason: crate::types::ExitReason::ModelError { error: error_detail.clone() },
2633 },
2634 });
2635 }
2636 return Ok((final_text, crate::types::ExitReason::ModelError { error: error_detail.clone() }));
2637 }
2638
2639 if let Some(ref cb) = self.config.on_event {
2641 cb(AgentEvent::Done {
2642 result: crate::types::QueryResult {
2643 text: final_text.clone(),
2644 usage: self.total_usage.clone(),
2645 num_turns: self.turn_count,
2646 duration_ms: self.query_duration_ms(),
2647 exit_reason: crate::types::ExitReason::Completed,
2648 },
2649 });
2650 }
2651 return Ok((final_text, crate::types::ExitReason::Completed));
2653 }
2654
2655 let tool_calls = streaming_result.tool_calls;
2657
2658 let mut tool_call_structs: Vec<crate::types::ToolCall> = Vec::new();
2660 for tc in &tool_calls {
2661 let name = tc
2662 .get("name")
2663 .and_then(|n| n.as_str())
2664 .unwrap_or("")
2665 .to_string();
2666 let id = tc
2667 .get("id")
2668 .and_then(|i| i.as_str())
2669 .unwrap_or("")
2670 .to_string();
2671 let arguments = tc
2672 .get("arguments")
2673 .cloned()
2674 .unwrap_or_else(|| empty_json_value());
2675 tool_call_structs.push(crate::types::ToolCall {
2676 id,
2677 r#type: "function".to_string(),
2678 name,
2679 arguments,
2680 });
2681 }
2682
2683 let tool_context = crate::types::ToolContext {
2686 cwd: self.config.cwd.clone(),
2687 abort_signal: Arc::clone(self.abort_controller.signal()),
2688 };
2689
2690 let tool_executors = Arc::new(self.tool_executors.lock().unwrap().clone());
2693 let tool_render_fns = Arc::new(self.tool_render_fns.lock().unwrap().clone());
2694 let tool_backfill_fns = Arc::new(self.tool_backfill_fns.lock().unwrap().clone());
2695 let tools = self.config.tools.clone();
2696 let can_use_tool = self.config.can_use_tool.clone();
2697 let cwd = self.config.cwd.clone();
2698 let on_event = self.config.on_event.clone();
2699 let abort_signal = self.abort_controller.signal().clone();
2700 let hook_registry = self.hook_registry.clone();
2701
2702 let executor = move |name: String, args: serde_json::Value, tool_call_id: String| {
2703 let tool_executors = tool_executors.clone();
2704 let tool_render_fns = tool_render_fns.clone();
2705 let tool_backfill_fns = tool_backfill_fns.clone();
2706 let tools = tools.clone();
2707 let can_use_tool = can_use_tool.clone();
2708 let cwd = cwd.clone();
2709 let on_event = on_event.clone();
2710 let abort_signal = abort_signal.clone();
2711 let hook_registry = hook_registry.clone();
2712 async move {
2713 let mut backfilled_args = args.clone();
2722 if let Some(backfill_fn) = tool_backfill_fns.get(&name) {
2723 backfill_fn(&mut backfilled_args);
2724 }
2725
2726 if let Some(ref cb) = on_event {
2728 let meta_input = Some(&backfilled_args);
2729 let metadata = tool_render_fns.get(&name).map(|fns| ToolRenderMetadata {
2730 user_facing_name: (Arc::clone(&fns.user_facing_name))(meta_input),
2731 tool_use_summary: fns
2732 .get_tool_use_summary
2733 .as_ref()
2734 .and_then(|f| f(meta_input)),
2735 activity_description: fns
2736 .get_activity_description
2737 .as_ref()
2738 .and_then(|f| f(meta_input)),
2739 });
2740 if let Some(ref meta) = metadata {
2741 cb(AgentEvent::ToolStart {
2742 tool_name: name.clone(),
2743 tool_call_id: tool_call_id.clone(),
2744 input: backfilled_args.clone(),
2745 display_name: Some(meta.user_facing_name.clone()),
2746 summary: meta.tool_use_summary.clone(),
2747 activity_description: meta.activity_description.clone(),
2748 });
2749 } else {
2750 cb(AgentEvent::ToolStart {
2751 tool_name: name.clone(),
2752 tool_call_id: tool_call_id.clone(),
2753 input: backfilled_args.clone(),
2754 display_name: None,
2755 summary: None,
2756 activity_description: None,
2757 });
2758 }
2759 }
2760
2761 let cwd_clone = cwd.clone();
2766
2767 let context = crate::types::ToolContext {
2768 cwd,
2769 abort_signal: abort_signal.clone(),
2770 };
2771
2772 let executor_fn = tool_executors.get(&name).cloned();
2773
2774 if let Some(executor_fn) = executor_fn {
2775 let meta_input = Some(&args);
2777 let metadata = tool_render_fns.get(&name).map(|fns| ToolRenderMetadata {
2778 user_facing_name: (Arc::clone(&fns.user_facing_name))(meta_input),
2779 tool_use_summary: fns
2780 .get_tool_use_summary
2781 .as_ref()
2782 .and_then(|f| f(meta_input)),
2783 activity_description: fns
2784 .get_activity_description
2785 .as_ref()
2786 .and_then(|f| f(meta_input)),
2787 });
2788
2789 if let Some(can_use_fn) = can_use_tool {
2791 if let Some(tool_def) = tools.iter().find(|t| &t.name == &name) {
2792 match can_use_fn(tool_def.clone(), backfilled_args.clone()) {
2793 crate::permission::PermissionResult::Allow(_)
2794 | crate::permission::PermissionResult::Passthrough { .. } => {}
2795 crate::permission::PermissionResult::Deny(d) => {
2796 return Err(crate::error::AgentError::Tool(format!(
2797 "Tool '{}' permission denied: {}",
2798 name, d.message
2799 )));
2800 }
2801 crate::permission::PermissionResult::Ask(a) => {
2802 return Err(crate::error::AgentError::Tool(format!(
2803 "Tool '{}' requires user confirmation (Ask not supported in SDK): {}",
2804 name, a.message
2805 )));
2806 }
2807 }
2808 }
2809 }
2810
2811 {
2813 let registry_clone = hook_registry.lock().unwrap().as_ref().cloned();
2814 if let Some(registry) = registry_clone {
2815 if let Err(e) =
2816 crate::hooks::run_pre_tool_use_hooks(®istry, &name, &backfilled_args, &tool_call_id, &cwd_clone)
2817 .await
2818 {
2819 return Err(e);
2820 }
2821 }
2822 }
2823
2824 let result = executor_fn(args, &context).await;
2826
2827 {
2829 let registry_clone = hook_registry.lock().unwrap().as_ref().cloned();
2830 if let Some(registry) = registry_clone {
2831 match &result {
2832 Ok(tool_result) => {
2833 let _ = crate::hooks::run_post_tool_use_hooks(®istry, &name, tool_result, &tool_call_id, &cwd_clone).await;
2834 }
2835 Err(e) => {
2836 let _ = crate::hooks::run_post_tool_use_failure_hooks(®istry, &name, &e.to_string(), &tool_call_id, &cwd_clone).await;
2837 }
2838 }
2839 }
2840 }
2841
2842 if let Some(ref cb) = on_event {
2844 match &result {
2845 Ok(tool_result) => {
2846 let rendered_result = tool_render_fns
2847 .get(&name)
2848 .and_then(|fns| fns.render(&tool_result.content, &tools));
2849 if let Some(ref meta) = metadata {
2850 let display = format!(
2851 "{}({})",
2852 meta.user_facing_name,
2853 meta.tool_use_summary.as_deref().unwrap_or("?")
2854 );
2855 cb(AgentEvent::ToolComplete {
2856 tool_name: name.clone(),
2857 tool_call_id: tool_call_id.clone(),
2858 result: tool_result.clone(),
2859 display_name: Some(display),
2860 rendered_result: rendered_result.clone(),
2861 });
2862 } else {
2863 cb(AgentEvent::ToolComplete {
2864 tool_name: name.clone(),
2865 tool_call_id: tool_call_id.clone(),
2866 result: tool_result.clone(),
2867 display_name: None,
2868 rendered_result: rendered_result,
2869 });
2870 }
2871 }
2872 Err(e) => {
2873 cb(AgentEvent::ToolError {
2874 tool_name: name.clone(),
2875 tool_call_id: tool_call_id.clone(),
2876 error: e.to_string(),
2877 });
2878 }
2879 }
2880 }
2881
2882 result
2883 } else {
2884 let err =
2885 crate::error::AgentError::Tool(format!("Tool '{}' not found", name));
2886 if let Some(ref cb) = on_event {
2887 cb(AgentEvent::ToolError {
2888 tool_name: name.clone(),
2889 tool_call_id: tool_call_id.clone(),
2890 error: err.to_string(),
2891 });
2892 }
2893 Err(err)
2894 }
2895 }
2896 };
2897
2898 let assistant_msg = crate::types::Message {
2901 role: crate::types::MessageRole::Assistant,
2902 content: format!(
2903 "Calling tool(s): {:?}",
2904 tool_calls
2905 .iter()
2906 .map(|tc| tc.get("name").and_then(|n| n.as_str()).unwrap_or(""))
2907 .collect::<Vec<_>>()
2908 ),
2909 tool_calls: Some(tool_call_structs.clone()),
2910 ..Default::default()
2911 };
2912 self.messages.push(assistant_msg);
2913
2914 let updates = orchestration::run_tools(
2915 tool_call_structs,
2916 self.config.tools.clone(),
2917 tool_context,
2918 executor,
2919 Some(self.config.cwd.clone()),
2920 None,
2921 )
2922 .await;
2923
2924 for update in updates {
2926 if let Some(message) = update.message {
2927 let truncated_content = truncate_tool_result_content(&message.content, "");
2930 let mut msg = message;
2931 msg.content = truncated_content;
2932 self.messages.push(msg);
2933 }
2934 }
2935
2936 if let Some(ref mut state) = self.content_replacement_state {
2938 crate::services::compact::apply_tool_result_budget(&mut self.messages, Some(state));
2939 }
2940
2941 let next_turn_count = self.turn_count + 1;
2943 if self.config.max_turns > 0 && next_turn_count > self.config.max_turns {
2944 if let Some(ref cb) = self.config.on_event {
2946 cb(AgentEvent::MaxTurnsReached {
2947 max_turns: self.config.max_turns,
2948 turn_count: next_turn_count,
2949 });
2950 }
2951 let final_text = self
2953 .messages
2954 .iter()
2955 .filter(|m| m.role == crate::types::MessageRole::Assistant)
2956 .last()
2957 .map(|m| m.content.clone())
2958 .unwrap_or_else(|| "Max turns reached".to_string());
2959 let final_text = final_text;
2961 if let Some(ref cb) = self.config.on_event {
2962 cb(AgentEvent::Done {
2963 result: crate::types::QueryResult {
2964 text: final_text.clone(),
2965 usage: self.total_usage.clone(),
2966 num_turns: self.turn_count,
2967 duration_ms: self.query_duration_ms(),
2968 exit_reason: crate::types::ExitReason::default(),
2969 },
2970 });
2971 }
2972 return Ok((final_text, crate::types::ExitReason::default()));
2973 }
2974
2975 self.turn_count = next_turn_count;
2978
2979 if self.auto_compact_tracking.compacted {
2982 self.auto_compact_tracking.turn_counter += 1;
2983 }
2984
2985 if let Some(ref cb) = self.config.on_event {
2987 cb(AgentEvent::Thinking {
2988 turn: self.turn_count + 1,
2989 });
2990 }
2991
2992 continue;
2994 }
2995
2996 let final_text = self
2998 .messages
2999 .iter()
3000 .filter(|m| m.role == crate::types::MessageRole::Assistant)
3001 .last()
3002 .map(|m| m.content.clone())
3003 .unwrap_or_else(|| "Max tool execution turns reached".to_string());
3004
3005 let final_text = final_text;
3007
3008 if let Some(ref cb) = self.config.on_event {
3010 cb(AgentEvent::Done {
3011 result: crate::types::QueryResult {
3012 text: final_text.clone(),
3013 usage: self.total_usage.clone(),
3014 num_turns: self.turn_count,
3015 duration_ms: self.query_duration_ms(),
3016 exit_reason: crate::types::ExitReason::Completed,
3017 },
3018 });
3019 }
3020
3021 Ok((final_text, crate::types::ExitReason::Completed))
3022 }
3023
3024 fn build_api_messages(&self) -> Result<Vec<serde_json::Value>, AgentError> {
3025 let base_url = self
3027 .config
3028 .base_url
3029 .as_deref()
3030 .unwrap_or("https://api.anthropic.com");
3031 let is_anthropic = base_url.contains("anthropic.com");
3032
3033 let mut all_messages = self.messages.clone();
3035 if !self.config.user_context.is_empty() {
3036 let context_parts: Vec<String> = self
3037 .config
3038 .user_context
3039 .iter()
3040 .map(|(key, value)| format!("# {}\n{}", key, value))
3041 .collect();
3042 let context_content = format!(
3043 "<system-reminder>\nAs you answer the user's questions, you can use the following context:\n{}\n\nIMPORTANT: this context may or may not be relevant to your tasks. You should not respond to this context unless it's highly relevant to the work you're doing.\n</system-reminder>\n",
3044 context_parts.join("\n")
3045 );
3046 let context_msg = crate::types::Message {
3047 role: crate::types::MessageRole::User,
3048 content: context_content,
3049 is_meta: Some(true),
3050 ..Default::default()
3051 };
3052 all_messages.insert(0, context_msg);
3053 }
3054
3055 let mut api_messages: Vec<serde_json::Value> = Vec::new();
3056
3057 for msg in &all_messages {
3060 match msg.role {
3061 crate::types::MessageRole::User => {
3062 api_messages.push(serde_json::json!({
3064 "role": "user",
3065 "content": msg.content
3066 }));
3067 }
3068 crate::types::MessageRole::Assistant => {
3069 if let Some(tool_calls) = &msg.tool_calls {
3071 if is_anthropic {
3072 let mut content_blocks: Vec<serde_json::Value> = Vec::new();
3074
3075 if !msg.content.is_empty()
3077 && msg.content
3078 != format!(
3079 "Calling tool: {} with args: ",
3080 tool_calls.first().map(|t| t.name.as_str()).unwrap_or("")
3081 )
3082 {
3083 content_blocks.push(serde_json::json!({
3084 "type": "text",
3085 "text": msg.content
3086 }));
3087 }
3088
3089 for tc in tool_calls {
3091 content_blocks.push(serde_json::json!({
3092 "type": "tool_use",
3093 "id": tc.id,
3094 "name": tc.name,
3095 "input": tc.arguments
3096 }));
3097 }
3098
3099 api_messages.push(serde_json::json!({
3100 "role": "assistant",
3101 "content": content_blocks
3102 }));
3103 } else {
3104 let mut openai_tool_calls: Vec<serde_json::Value> = Vec::new();
3107 for tc in tool_calls {
3108 openai_tool_calls.push(serde_json::json!({
3109 "id": tc.id,
3110 "type": "function",
3111 "function": {
3112 "name": tc.name,
3113 "arguments": serde_json::to_string(&tc.arguments).unwrap_or_default()
3114 }
3115 }));
3116 }
3117
3118 api_messages.push(serde_json::json!({
3119 "role": "assistant",
3120 "content": msg.content,
3121 "tool_calls": openai_tool_calls
3122 }));
3123 }
3124 } else {
3125 api_messages.push(serde_json::json!({
3127 "role": "assistant",
3128 "content": msg.content
3129 }));
3130 }
3131 }
3132 crate::types::MessageRole::Tool => {
3133 let tool_use_id = msg.tool_call_id.clone().unwrap_or_default();
3135
3136 let content = if msg.is_error == Some(true) {
3138 format!("<tool_use_error>{}</tool_use_error>", msg.content)
3139 } else {
3140 msg.content.clone()
3141 };
3142
3143 if is_anthropic {
3144 api_messages.push(serde_json::json!({
3146 "role": "user",
3147 "content": [
3148 {
3149 "type": "tool_result",
3150 "tool_use_id": tool_use_id,
3151 "content": content
3152 }
3153 ]
3154 }));
3155 } else {
3156 api_messages.push(serde_json::json!({
3158 "role": "tool",
3159 "content": content,
3160 "tool_call_id": tool_use_id
3161 }));
3162 }
3163 }
3164 crate::types::MessageRole::System => {
3165 api_messages.push(serde_json::json!({
3167 "role": "user",
3168 "content": msg.content
3169 }));
3170 }
3171 }
3172 }
3173 self.maybe_inject_deferred_tools_block(&mut api_messages);
3175
3176 Ok(api_messages)
3177 }
3178}
3179
3180fn calculate_compaction_messages(
3184 messages: &[crate::types::Message],
3185 target_tokens: u32,
3186) -> Vec<crate::types::Message> {
3187 if messages.len() <= 4 {
3188 return messages.to_vec();
3190 }
3191
3192 let avg_tokens_per_msg = 500;
3194 let target_message_count = (target_tokens / avg_tokens_per_msg).max(10) as usize;
3195
3196 let keep_first = 2;
3199 let keep_last = target_message_count.saturating_sub(keep_first);
3200
3201 if messages.len() <= keep_first + keep_last {
3202 return messages.to_vec();
3203 }
3204
3205 let first_part = &messages[..keep_first];
3206 let last_part = &messages[messages.len() - keep_last..];
3207
3208 let mut result = Vec::with_capacity(keep_first + keep_last);
3209 result.extend(first_part.iter().cloned());
3210 result.extend(last_part.iter().cloned());
3211 result
3212}
3213
3214fn extract_text_from_response(response: &serde_json::Value) -> String {
3216 if let Some(choices) = response.get("choices").and_then(|c| c.as_array()) {
3218 if let Some(first_choice) = choices.first() {
3219 if let Some(content) = first_choice.get("message").and_then(|m| m.get("content")) {
3220 if let Some(text) = content.as_str() {
3221 return text.to_string();
3222 }
3223 }
3224 }
3225 }
3226
3227 if let Some(content) = response.get("content").and_then(|c| c.as_array()) {
3229 for block in content {
3230 if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
3231 return text.to_string();
3232 }
3233 }
3234 }
3235
3236 String::new()
3237}
3238
3239fn parse_compact_summary(raw_summary: &str) -> String {
3242 if let Some(start) = raw_summary.find("<summary>") {
3244 if let Some(end) = raw_summary.find("</summary>") {
3245 let mut summary = raw_summary[start + 9..end].trim().to_string();
3246
3247 if let Some(after) = raw_summary.find("</summary>") {
3249 let remaining = raw_summary[after + 11..].trim();
3250 if !remaining.is_empty() && !remaining.starts_with('<') {
3251 summary.push_str("\n\n");
3252 summary.push_str(remaining);
3253 }
3254 }
3255
3256 return if summary.is_empty() {
3258 raw_summary.trim().to_string()
3259 } else {
3260 summary
3261 };
3262 }
3263 }
3264
3265 let mut cleaned = raw_summary.to_string();
3267 if let Some(analysis_start) = cleaned.find("<analysis>") {
3268 if let Some(analysis_end) = cleaned.find("</analysis>") {
3269 cleaned = format!(
3270 "{}{}",
3271 &cleaned[..analysis_start],
3272 cleaned[analysis_end + 11..].trim()
3273 );
3274 }
3275 }
3276
3277 cleaned.trim().to_string()
3278}
3279
3280fn extract_tool_calls(response: &serde_json::Value) -> Vec<serde_json::Value> {
3281 if let Some(choices) = response.get("choices").and_then(|c| c.as_array()) {
3283 if let Some(first_choice) = choices.first() {
3284 if let Some(message) = first_choice.get("message") {
3285 if let Some(tool_calls) = message.get("tool_calls").and_then(|t| t.as_array()) {
3286 if !tool_calls.is_empty() {
3287 return tool_calls
3288 .iter()
3289 .map(|tc| {
3290 let func = tc.get("function");
3291 let name = func
3292 .and_then(|f| f.get("name"))
3293 .cloned()
3294 .unwrap_or_else(|| empty_json_value());
3295 let args = func.and_then(|f| f.get("arguments"));
3297 let arguments = if let Some(args_val) = args {
3298 if let Some(arg_str) = args_val.as_str() {
3299 serde_json::from_str(arg_str).unwrap_or(args_val.clone())
3301 } else {
3302 args_val.clone()
3303 }
3304 } else {
3305 serde_json::Value::Null
3306 };
3307 let id = tc.get("id").cloned();
3309 let mut result = serde_json::json!({
3310 "name": name,
3311 "arguments": arguments,
3312 });
3313 if let Some(id_val) = id {
3314 result["id"] = id_val;
3315 }
3316 result
3317 })
3318 .collect();
3319 }
3320 }
3321 }
3322 }
3323 }
3324
3325 vec![]
3326}
3327fn extract_response_text(response: &serde_json::Value) -> String {
3330 if let Some(choices) = response.get("choices").and_then(|c| c.as_array()) {
3332 if let Some(first_choice) = choices.first() {
3333 if let Some(message) = first_choice.get("message") {
3334 if let Some(content) = message.get("content").and_then(|c| c.as_str()) {
3335 return content.to_string();
3336 }
3337 }
3338 }
3339 }
3340
3341 if let Some(content) = response.get("content").and_then(|c| c.as_array()) {
3343 for block in content {
3344 if let Some(block_type) = block.get("type").and_then(|t| t.as_str()) {
3345 match block_type {
3346 "text" => {
3347 if let Some(t) = block.get("text").and_then(|t| t.as_str()) {
3348 return t.to_string();
3349 }
3350 }
3351 _ => {}
3352 }
3353 }
3354 }
3355 }
3356
3357 String::new()
3358}
3359
3360fn extract_usage(response: &serde_json::Value) -> TokenUsage {
3361 if let Some(usage) = response.get("usage") {
3363 return TokenUsage {
3364 input_tokens: usage
3365 .get("prompt_tokens")
3366 .and_then(|v| v.as_u64())
3367 .unwrap_or(0)
3368 + usage
3369 .get("completion_tokens")
3370 .and_then(|v| v.as_u64())
3371 .unwrap_or(0),
3372 output_tokens: usage
3373 .get("completion_tokens")
3374 .and_then(|v| v.as_u64())
3375 .unwrap_or(0),
3376 cache_creation_input_tokens: None,
3377 cache_read_input_tokens: None,
3378 iterations: None,
3379 };
3380 }
3381
3382 let usage = response.get("usage");
3384 TokenUsage {
3385 input_tokens: usage
3386 .and_then(|u| u.get("input_tokens"))
3387 .and_then(|v| v.as_u64())
3388 .unwrap_or(0),
3389 output_tokens: usage
3390 .and_then(|u| u.get("output_tokens"))
3391 .and_then(|v| v.as_u64())
3392 .unwrap_or(0),
3393 cache_creation_input_tokens: usage
3394 .and_then(|u| u.get("cache_creation_input_tokens"))
3395 .and_then(|v| v.as_u64()),
3396 cache_read_input_tokens: usage
3397 .and_then(|u| u.get("cache_read_input_tokens"))
3398 .and_then(|v| v.as_u64()),
3399 iterations: None,
3400 }
3401}
3402
3403const MAX_429_RETRIES: u32 = 5;
3405const _429_RETRY_BASE_MS: u64 = 2000;
3407const _429_RETRY_MAX_MS: u64 = 30_000;
3409const MAX_STRUCTURED_OUTPUT_RETRIES: u32 = 5;
3411
3412enum RetryResult {
3419 Success(StreamingResult),
3420 FallbackTriggered(FallbackTriggeredError),
3421 RecreateClient(AgentError),
3422 Terminal(AgentError),
3423}
3424
3425fn error_to_message_for_retry(error: &AgentError) -> String {
3426 match error {
3427 AgentError::Api(msg) => msg.clone(),
3428 AgentError::Http(e) => format!("{}", e),
3429 other => other.to_string(),
3430 }
3431}
3432
3433fn calculate_retry_delay(attempt: u32) -> u64 {
3435 let base = _429_RETRY_BASE_MS * 2u64.saturating_pow(attempt.saturating_sub(1));
3436 let capped = base.min(_429_RETRY_MAX_MS);
3437 let nanos = std::time::SystemTime::now()
3439 .duration_since(std::time::UNIX_EPOCH)
3440 .unwrap_or_default()
3441 .subsec_nanos();
3442 let jitter = (capped as f64 * 0.25 * (nanos as f64 / u32::MAX as f64)) as u64;
3443 capped + jitter
3444}
3445
3446async fn async_make_api_request(
3449 client: &reqwest::Client,
3450 url: &str,
3451 api_key: &str,
3452 request_body: serde_json::Value,
3453 on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3454) -> Result<StreamingResult, AgentError> {
3455 match make_anthropic_streaming_request(
3457 client,
3458 url,
3459 api_key,
3460 request_body.clone(),
3461 on_event.clone(),
3462 Arc::new(AtomicBool::new(false)),
3463 )
3464 .await
3465 {
3466 Ok(result) => return Ok(result),
3467 Err(_) => {} }
3469
3470 make_nonstreaming_request(client, url, api_key, request_body, on_event).await
3472}
3473
3474async fn make_api_request_with_429_retry(
3489 client: &reqwest::Client,
3490 url: &str,
3491 api_key: &str,
3492 request_body: serde_json::Value,
3493 on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3494 fallback_model: Option<String>,
3495 current_model: &str,
3496 thinking_budget_tokens: Option<u32>,
3497) -> RetryResult {
3498 let mut consecutive_529s: u32 = 0;
3499 let mut last_error_str: Option<String> = None;
3500
3501 let mut mutable_request = request_body.clone();
3503
3504 for attempt in 0..=MAX_429_RETRIES {
3505 match async_make_api_request(
3506 client,
3507 url,
3508 api_key,
3509 mutable_request.clone(),
3510 on_event.clone(),
3511 )
3512 .await
3513 {
3514 Ok(result) => return RetryResult::Success(result),
3515 Err(e) => {
3516 last_error_str = Some(e.to_string());
3517
3518 if is_529_error(&e) {
3520 consecutive_529s += 1;
3521 if consecutive_529s >= MAX_529_RETRIES {
3522 if let Some(ref fb) = fallback_model {
3523 return RetryResult::FallbackTriggered(FallbackTriggeredError {
3524 original_model: current_model.to_string(),
3525 fallback_model: fb.clone(),
3526 });
3527 }
3528 if attempt >= MAX_429_RETRIES {
3530 return RetryResult::Terminal(e);
3531 }
3532 }
3533 } else {
3534 consecutive_529s = 0;
3536 }
3537
3538 if is_stale_connection_error(&e) || is_auth_error(&e) {
3540 return RetryResult::RecreateClient(e);
3541 }
3542
3543 if let Some((input_tokens, _max_tokens, context_limit)) =
3545 parse_max_tokens_context_overflow(&e)
3546 {
3547 let safety_buffer: u64 = 1000;
3548 let available = context_limit.saturating_sub(input_tokens).saturating_sub(safety_buffer);
3549 if available < FLOOR_OUTPUT_TOKENS {
3550 return RetryResult::Terminal(e);
3551 }
3552 let min_required = (thinking_budget_tokens.unwrap_or(0) as u64).saturating_add(1);
3554 let adjusted = std::cmp::max(FLOOR_OUTPUT_TOKENS, std::cmp::max(available, min_required));
3555 if let Some(max_t) = mutable_request.get_mut("max_tokens") {
3556 *max_t = serde_json::json!(adjusted as u32);
3557 }
3558 continue;
3560 }
3561
3562 if is_429_only_error(&e) && attempt < MAX_429_RETRIES {
3564 let delay = calculate_retry_delay(attempt + 1);
3565 emit_api_retry_event(
3566 on_event.as_ref().map(|a| a.as_ref()),
3567 attempt + 1,
3568 MAX_429_RETRIES,
3569 delay,
3570 None,
3571 &e.to_string(),
3572 );
3573 sleep_tokio(std::time::Duration::from_millis(delay)).await;
3574 continue;
3575 }
3576
3577 if is_529_error(&e) && attempt < MAX_429_RETRIES {
3579 let delay = calculate_retry_delay(attempt + 1);
3580 emit_api_retry_event(
3581 on_event.as_ref().map(|a| a.as_ref()),
3582 attempt + 1,
3583 MAX_429_RETRIES,
3584 delay,
3585 None,
3586 &e.to_string(),
3587 );
3588 sleep_tokio(std::time::Duration::from_millis(delay)).await;
3589 continue;
3590 }
3591
3592 return RetryResult::Terminal(e);
3594 }
3595 }
3596 }
3597
3598 RetryResult::Terminal(AgentError::Api(last_error_str.unwrap_or_else(|| {
3599 "Retry exhausted".to_string()
3600 })))
3601}
3602
3603async fn make_nonstreaming_request(
3606 client: &reqwest::Client,
3607 url: &str,
3608 api_key: &str,
3609 mut request_body: serde_json::Value,
3610 on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3611) -> Result<StreamingResult, AgentError> {
3612 request_body["stream"] = serde_json::json!(false);
3614
3615 let model = request_body
3617 .get("model")
3618 .and_then(|v| v.as_str())
3619 .unwrap_or("unknown")
3620 .to_string();
3621
3622 let is_anthropic = url.contains("anthropic.com");
3624
3625 let request_builder = if is_anthropic {
3627 client
3629 .post(url)
3630 .header("x-api-key", api_key)
3631 .header("anthropic-version", "2023-06-01")
3632 .header("Content-Type", "application/json")
3633 .header("User-Agent", get_user_agent())
3634 .json(&request_body)
3635 } else {
3636 client
3638 .post(url)
3639 .header("Authorization", format!("Bearer {}", api_key))
3640 .header("Content-Type", "application/json")
3641 .header("User-Agent", get_user_agent())
3642 .json(&request_body)
3643 };
3644
3645 let response = request_builder.send().await.map_err(AgentError::from)?;
3647
3648 let status = response.status();
3649 if !status.is_success() {
3650 let error_text = response.text().await.unwrap_or_default();
3651 return Err(AgentError::Api(format!(
3652 "Non-streaming API error {}: {}",
3653 status,
3654 sanitize_html_error(&error_text)
3655 )));
3656 }
3657
3658 if let Some(ref cb) = on_event {
3660 cb(AgentEvent::MessageStart {
3661 message_id: uuid::Uuid::new_v4().to_string(),
3662 });
3663 }
3664
3665 let response_text = response
3667 .text()
3668 .await
3669 .map_err(|e| AgentError::Api(format!("Failed to read non-streaming response: {}", e)))?;
3670
3671 let response_json: serde_json::Value = serde_json::from_str(&response_text).map_err(|e| {
3673 AgentError::Api(format!(
3674 "Failed to parse non-streaming response: {} - {}",
3675 e, response_text
3676 ))
3677 })?;
3678
3679 if let Some(error) = response_json.get("error") {
3681 if let Some(error_type) = error.get("type").and_then(|t| t.as_str()) {
3683 if error_type == "max_tokens" || error_type == "max_output_tokens" {
3684 let mut result = StreamingResult::default();
3686 result.api_error = Some("max_output_tokens".to_string());
3687 return Ok(result);
3688 }
3689 }
3690 let error_str = error.to_string().to_lowercase();
3692 if error_str.contains("413")
3693 || error_str.contains("prompt_too_long")
3694 || error_str.contains("prompt too long")
3695 {
3696 return Err(AgentError::Api("prompt_too_long: context size exceeded. The query engine will attempt reactive compact.".to_string()));
3697 }
3698 return Err(AgentError::Api(format!("API error: {}", error)));
3699 }
3700
3701 let mut result = StreamingResult::default();
3702
3703 if let Some(content) = response_json.get("content").and_then(|c| c.as_array()) {
3705 for block in content {
3706 let block_type = block.get("type").and_then(|t| t.as_str());
3707 match block_type {
3708 Some("text") => {
3709 if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
3710 result.content.push_str(text);
3711 }
3712 }
3713 Some("thinking") | Some("redacted_thinking") => {
3714 if let Some(thinking) = block.get("thinking").and_then(|t| t.as_str()) {
3718 result
3720 .content
3721 .push_str(&format!("【thinking:{}】", thinking));
3722 }
3723 }
3724 Some("tool_use") => {
3725 let tool_id = block.get("id").and_then(|i| i.as_str()).unwrap_or("");
3726 let tool_name = block.get("name").and_then(|n| n.as_str()).unwrap_or("");
3727 let tool_input = block
3728 .get("input")
3729 .cloned()
3730 .unwrap_or_else(|| empty_json_value());
3731
3732 result.tool_calls.push(serde_json::json!({
3733 "id": tool_id,
3734 "name": tool_name,
3735 "arguments": tool_input,
3736 }));
3737 }
3738 _ => {}
3739 }
3740 }
3741 if let Some(usage) = response_json.get("usage") {
3743 result.usage = parse_anthropic_usage(usage);
3744 }
3745 result.cost = calculate_streaming_cost(&result.usage, &model);
3747 }
3748 else if let Some(choices) = response_json.get("choices").and_then(|c| c.as_array()) {
3750 if let Some(first_choice) = choices.first() {
3751 if let Some(message) = first_choice.get("message") {
3752 if let Some(content) = message.get("content").and_then(|c| c.as_str()) {
3754 result.content = content.to_string();
3755 }
3756 if let Some(tool_calls) = message.get("tool_calls").and_then(|t| t.as_array()) {
3758 for tc in tool_calls {
3759 let id = tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
3760 let func = tc.get("function");
3761 let name = func
3762 .and_then(|f| f.get("name"))
3763 .and_then(|n| n.as_str())
3764 .unwrap_or("");
3765 let args = func.and_then(|f| f.get("arguments"));
3766 let args_val = if let Some(args_str) = args.and_then(|a| a.as_str()) {
3767 serde_json::from_str(args_str).unwrap_or_else(|_| empty_json_value())
3768 } else {
3769 args.cloned().unwrap_or_else(|| empty_json_value())
3770 };
3771 result.tool_calls.push(serde_json::json!({
3772 "id": id,
3773 "name": name,
3774 "arguments": args_val,
3775 }));
3776 }
3777 }
3778 }
3779 }
3780 if let Some(usage) = response_json.get("usage") {
3782 result.usage = TokenUsage {
3783 input_tokens: usage
3784 .get("prompt_tokens")
3785 .and_then(|v| v.as_u64())
3786 .unwrap_or(0),
3787 output_tokens: usage
3788 .get("completion_tokens")
3789 .and_then(|v| v.as_u64())
3790 .unwrap_or(0),
3791 cache_creation_input_tokens: None,
3792 cache_read_input_tokens: None,
3793 iterations: None,
3794 };
3795 }
3796 result.cost = calculate_streaming_cost(&result.usage, &model);
3798 }
3799
3800 if let Some(ref cb) = on_event {
3802 cb(AgentEvent::ContentBlockStart {
3803 index: 0,
3804 block_type: "text".to_string(),
3805 });
3806 if !result.content.is_empty() {
3807 cb(AgentEvent::ContentBlockDelta {
3808 index: 0,
3809 delta: ContentDelta::Text {
3810 text: result.content.clone(),
3811 },
3812 });
3813 }
3814 cb(AgentEvent::ContentBlockStop { index: 0 });
3815 cb(AgentEvent::MessageStop);
3816 }
3817
3818 Ok(result)
3819}
3820
3821async fn make_anthropic_streaming_request(
3826 client: &reqwest::Client,
3827 url: &str,
3828 api_key: &str,
3829 request_body: serde_json::Value,
3830 on_event: Option<Arc<dyn Fn(AgentEvent) + Send + Sync>>,
3831 abort_handle: Arc<AtomicBool>,
3832) -> Result<StreamingResult, AgentError> {
3833 use futures_util::stream::StreamExt;
3834
3835 let is_anthropic = url.contains("anthropic.com");
3837
3838 let model = request_body
3840 .get("model")
3841 .and_then(|v| v.as_str())
3842 .unwrap_or("unknown")
3843 .to_string();
3844
3845 let watchdog = StreamWatchdog::from_env();
3847 let watchdog_aborted = Arc::new(AtomicBool::new(false));
3848 let watchdog_aborted_clone = watchdog_aborted.clone();
3849
3850 let mut stall_stats = StallStats::default();
3852 let mut last_event_time: Option<std::time::Instant> = None;
3853 let mut is_first_chunk = true;
3854 let start_time = std::time::Instant::now();
3855
3856 let mut ttft_recorded = false;
3858
3859 let request_builder = if is_anthropic {
3862 client
3864 .post(url)
3865 .header("x-api-key", api_key)
3866 .header("anthropic-version", "2023-06-01")
3867 .header("Content-Type", "application/json")
3868 .header("Accept", "text/event-stream")
3869 .header("User-Agent", get_user_agent())
3870 .json(&request_body)
3871 } else {
3872 client
3874 .post(url)
3875 .header("Authorization", format!("Bearer {}", api_key))
3876 .header("Content-Type", "application/json")
3877 .header("Accept", "text/event-stream")
3878 .header("User-Agent", get_user_agent())
3879 .json(&request_body)
3880 };
3881
3882 let response = request_builder.send().await.map_err(AgentError::from)?;
3884
3885 if abort_handle.load(Ordering::SeqCst) {
3887 return Err(AgentError::UserAborted);
3888 }
3889
3890 let status = response.status();
3891 if !status.is_success() {
3892 let error_text = response.text().await.unwrap_or_default();
3893 let sanitized = sanitize_html_error(&error_text);
3894 if status.as_u16() == 404 {
3896 return Err(AgentError::Stream404CreationError(format!(
3897 "Streaming endpoint returned 404: {}",
3898 sanitized
3899 )));
3900 }
3901 return Err(AgentError::Api(format!(
3902 "Streaming API error {}: {}",
3903 status, sanitized
3904 )));
3905 }
3906
3907 let response_for_cleanup = Arc::new(Mutex::new(Some(response)));
3909 let response_for_cleanup_clone = response_for_cleanup.clone();
3910
3911 let reset_idle_timer = || {
3913 if watchdog.enabled {
3914 let watchdog_aborted_warning = watchdog_aborted_clone.clone();
3915 let watchdog_aborted_timeout = watchdog_aborted_clone.clone();
3916 let timeout_ms = watchdog.idle_timeout_ms;
3917 let warning_ms = watchdog.warning_threshold_ms;
3918 let response_for_cleanup_inner = response_for_cleanup.clone();
3919
3920 tokio::spawn(async move {
3922 tokio::time::sleep(std::time::Duration::from_millis(warning_ms)).await;
3923 if !watchdog_aborted_warning.load(Ordering::SeqCst) {
3924 eprintln!(
3925 "Streaming idle warning: no chunks received for {}s",
3926 warning_ms / 1000
3927 );
3928 }
3929 });
3930
3931 tokio::spawn(async move {
3933 tokio::time::sleep(std::time::Duration::from_millis(timeout_ms)).await;
3934 if !watchdog_aborted_timeout.load(Ordering::SeqCst) {
3935 watchdog_aborted_timeout.store(true, Ordering::SeqCst);
3936 eprintln!(
3937 "Streaming idle timeout: no chunks received for {}s, aborting stream",
3938 timeout_ms / 1000
3939 );
3940 if let Ok(mut guard) = response_for_cleanup_inner.lock() {
3942 if let Some(resp) = guard.take() {
3943 let _ = resp.error_for_status_ref();
3944 }
3945 }
3946 }
3947 });
3948 }
3949 };
3950 reset_idle_timer();
3951
3952 let response = response_for_cleanup.lock().unwrap().take().unwrap();
3954 let body = response.bytes_stream();
3955 let mut stream: futures_util::stream::BoxStream<'_, Result<bytes::Bytes, reqwest::Error>> =
3956 Box::pin(body);
3957
3958 let mut result = StreamingResult::default();
3959 let mut current_tool_use: Option<(String, String, String)> = None; let mut openai_tool_calls: HashMap<u32, (String, String, String)> = HashMap::new();
3962 let mut openai_tool_finalized: HashSet<u32> = HashSet::new();
3963 let mut content_index: u32 = 0;
3964 let mut tool_use_index: u32 = 0;
3965 let mut thinking_index: u32 = 0;
3966 let mut in_tool_use = false;
3967 let mut text_block_started = false;
3968 let mut in_thinking = false;
3969 let mut thinking_content = String::new();
3970
3971 'stream_loop: while let Some(chunk_result) = stream.next().await {
3973 if abort_handle.load(Ordering::SeqCst) {
3975 release_stream_resources(&Some(abort_handle.clone()), &None);
3977 return Err(AgentError::UserAborted);
3978 }
3979
3980 if watchdog_aborted.load(Ordering::SeqCst) {
3982 release_stream_resources(&Some(abort_handle.clone()), &None);
3983 return Err(AgentError::Api(format!(
3984 "Stream idle timeout - no chunks received for {}ms",
3985 watchdog.idle_timeout_ms
3986 )));
3987 }
3988
3989 let chunk =
3990 chunk_result.map_err(|e| AgentError::Api(format!("Stream read error: {}", e)))?;
3991
3992 reset_idle_timer();
3994
3995 let now = std::time::Instant::now();
3997 if let Some(last) = last_event_time {
3998 let gap = now.duration_since(last).as_millis() as u64;
3999 if gap > STALL_THRESHOLD_MS {
4000 stall_stats.stall_count += 1;
4001 stall_stats.total_stall_time_ms += gap;
4002 stall_stats.stall_durations.push(gap);
4003 eprintln!(
4004 "Streaming stall detected: {:.1}s gap between events (stall #{})",
4005 gap as f64 / 1000.0,
4006 stall_stats.stall_count
4007 );
4008 }
4009 }
4010 last_event_time = Some(now);
4011
4012 if is_first_chunk {
4014 let ttft = now.duration_since(start_time).as_millis() as u64;
4015 result.ttft_ms = Some(ttft);
4016 ttft_recorded = true;
4017 is_first_chunk = false;
4018 }
4019
4020 if let Ok(text) = String::from_utf8(chunk.to_vec()) {
4022 if !text.starts_with("data: ") {
4024 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
4026 if json.get("content").is_some() && json.get("choices").is_none() {
4028 if let Some(content_array) = json.get("content").and_then(|c| c.as_array())
4030 {
4031 for block in content_array {
4032 let block_type = block.get("type").and_then(|t| t.as_str());
4033 match block_type {
4034 Some("text") => {
4035 if let Some(text) =
4036 block.get("text").and_then(|t| t.as_str())
4037 {
4038 result.content.push_str(text);
4039 }
4040 }
4041 Some("tool_use") => {
4042 let tool_id =
4043 block.get("id").and_then(|i| i.as_str()).unwrap_or("");
4044 let tool_name = block
4045 .get("name")
4046 .and_then(|n| n.as_str())
4047 .unwrap_or("");
4048 let tool_input = block
4049 .get("input")
4050 .cloned()
4051 .unwrap_or_else(|| empty_json_value());
4052 result.tool_calls.push(serde_json::json!({
4053 "id": tool_id,
4054 "name": tool_name,
4055 "arguments": tool_input,
4056 }));
4057 }
4058 _ => {}
4059 }
4060 }
4061 if let Some(usage) = json.get("usage") {
4062 result.usage = parse_anthropic_usage(usage);
4063 }
4064 result.message_started = true;
4065 result.content_blocks_started += 1;
4066 result.content_blocks_completed += 1;
4067 result.cost = calculate_streaming_cost(&result.usage, &model);
4069 if let Some(ref cb) = on_event {
4070 cb(AgentEvent::MessageStart {
4071 message_id: json
4072 .get("id")
4073 .and_then(|i| i.as_str())
4074 .unwrap_or("")
4075 .to_string(),
4076 });
4077 cb(AgentEvent::ContentBlockStart {
4078 index: 0,
4079 block_type: "text".to_string(),
4080 });
4081 if !result.content.is_empty() {
4082 cb(AgentEvent::ContentBlockDelta {
4083 index: 0,
4084 delta: ContentDelta::Text {
4085 text: result.content.clone(),
4086 },
4087 });
4088 }
4089 cb(AgentEvent::ContentBlockStop { index: 0 });
4090 cb(AgentEvent::MessageStop);
4091 }
4092 return Ok(result);
4093 }
4094 if let Some(content) = json.get("content").and_then(|c| c.as_str()) {
4096 result.content.push_str(content);
4097 }
4098 if let Some(stop_reason) = json.get("stop_reason") {
4100 if !stop_reason.is_null() {
4101 result.stop_reason = stop_reason.as_str().map(|s| s.to_string());
4102 if let Some(ref cb) = on_event {
4103 cb(AgentEvent::ContentBlockStart {
4104 index: 0,
4105 block_type: "text".to_string(),
4106 });
4107 if !result.content.is_empty() {
4108 cb(AgentEvent::ContentBlockDelta {
4109 index: 0,
4110 delta: ContentDelta::Text {
4111 text: result.content.clone(),
4112 },
4113 });
4114 }
4115 cb(AgentEvent::ContentBlockStop { index: 0 });
4116 cb(AgentEvent::MessageStop);
4117 }
4118 result.message_started = true;
4119 result.content_blocks_started += 1;
4120 result.content_blocks_completed += 1;
4121 result.cost = calculate_streaming_cost(&result.usage, &model);
4122 return Ok(result);
4123 }
4124 }
4125 continue;
4126 }
4127
4128 if let Some(choices) = json.get("choices").and_then(|c| c.as_array()) {
4130 if let Some(first) = choices.first() {
4131 if let Some(delta) = first.get("delta") {
4132 if let Some(content) = delta.get("content").and_then(|c| c.as_str())
4133 {
4134 result.content.push_str(content);
4135 }
4136 if let Some(tool_calls) =
4138 delta.get("tool_calls").and_then(|t| t.as_array())
4139 {
4140 for tc in tool_calls {
4141 let idx =
4142 tc.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4143 as u32;
4144 let id =
4145 tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
4146 let func = tc.get("function");
4147 let name = func
4148 .and_then(|f| f.get("name"))
4149 .and_then(|n| n.as_str())
4150 .unwrap_or("");
4151 let args_str = func
4152 .and_then(|f| f.get("arguments"))
4153 .and_then(|a| a.as_str())
4154 .unwrap_or("");
4155
4156 if !openai_tool_finalized.contains(&idx) {
4158 let entry =
4159 openai_tool_calls.entry(idx).or_insert_with(|| {
4160 (
4161 id.to_string(),
4162 name.to_string(),
4163 String::new(),
4164 )
4165 });
4166 if entry.0.is_empty() && !id.is_empty() {
4167 entry.0 = id.to_string();
4168 }
4169 if entry.1.is_empty() && !name.is_empty() {
4170 entry.1 = name.to_string();
4171 }
4172 entry.2.push_str(args_str);
4173 }
4174 }
4175 }
4176 }
4177 if let Some(finish_reason) =
4179 first.get("finish_reason").and_then(|f| f.as_str())
4180 {
4181 if !finish_reason.is_empty()
4182 && finish_reason != "null"
4183 && (!result.content.is_empty()
4184 || !result.tool_calls.is_empty()
4185 || !openai_tool_calls.is_empty())
4186 {
4187 result.stop_reason = Some(finish_reason.to_string());
4188
4189 for (idx, (id, name, args)) in &openai_tool_calls {
4191 if !openai_tool_finalized.contains(idx) {
4192 let args_val: serde_json::Value =
4193 serde_json::from_str(args)
4194 .unwrap_or_else(|_| empty_json_value());
4195 result.tool_calls.push(serde_json::json!({
4196 "id": id,
4197 "name": name,
4198 "arguments": args_val,
4199 }));
4200 }
4201 }
4202 openai_tool_finalized.extend(openai_tool_calls.keys().copied());
4203
4204 if let Some(ref cb) = on_event {
4205 cb(AgentEvent::ContentBlockStop { index: 0 });
4206 cb(AgentEvent::MessageStop);
4207 }
4208 result.message_started = true;
4209 result.content_blocks_started += 1;
4210 result.content_blocks_completed += 1;
4211 result.cost = calculate_streaming_cost(&result.usage, &model);
4212 return Ok(result);
4213 }
4214 }
4215 }
4216 continue;
4217 }
4218
4219 if json.get("choices").is_some() {
4221 if let Some(choices) = json.get("choices").and_then(|c| c.as_array()) {
4222 if let Some(first) = choices.first() {
4223 if let Some(msg) = first.get("message") {
4224 if let Some(content) =
4225 msg.get("content").and_then(|c| c.as_str())
4226 {
4227 result.content = content.to_string();
4228 }
4229 if let Some(tool_calls) =
4230 msg.get("tool_calls").and_then(|t| t.as_array())
4231 {
4232 for tc in tool_calls {
4233 let id =
4234 tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
4235 let func = tc.get("function");
4236 let name = func
4237 .and_then(|f| f.get("name"))
4238 .and_then(|n| n.as_str())
4239 .unwrap_or("");
4240 let args = func.and_then(|f| f.get("arguments"));
4241 let args_val = if let Some(args_str) =
4242 args.and_then(|a| a.as_str())
4243 {
4244 serde_json::from_str(args_str)
4245 .unwrap_or_else(|_| empty_json_value())
4246 } else {
4247 args.cloned().unwrap_or_else(|| empty_json_value())
4248 };
4249 result.tool_calls.push(serde_json::json!({
4250 "id": id,
4251 "name": name,
4252 "arguments": args_val,
4253 }));
4254 }
4255 }
4256 if let Some(finish_reason) =
4258 first.get("finish_reason").and_then(|f| f.as_str())
4259 {
4260 result.stop_reason = Some(finish_reason.to_string());
4261 }
4262 }
4263 }
4264 }
4265 if let Some(usage) = json.get("usage") {
4266 result.usage = TokenUsage {
4267 input_tokens: usage
4268 .get("prompt_tokens")
4269 .and_then(|v| v.as_u64())
4270 .unwrap_or(0),
4271 output_tokens: usage
4272 .get("completion_tokens")
4273 .and_then(|v| v.as_u64())
4274 .unwrap_or(0),
4275 cache_creation_input_tokens: None,
4276 cache_read_input_tokens: None,
4277 iterations: None,
4278 };
4279 }
4280 result.message_started = true;
4281 result.content_blocks_started += 1;
4282 result.content_blocks_completed += 1;
4283 result.cost = calculate_streaming_cost(&result.usage, &model);
4284 if let Some(ref cb) = on_event {
4285 cb(AgentEvent::ContentBlockStart {
4286 index: 0,
4287 block_type: "text".to_string(),
4288 });
4289 if !result.content.is_empty() {
4290 cb(AgentEvent::ContentBlockDelta {
4291 index: 0,
4292 delta: ContentDelta::Text {
4293 text: result.content.clone(),
4294 },
4295 });
4296 }
4297 cb(AgentEvent::ContentBlockStop { index: 0 });
4298 cb(AgentEvent::MessageStop);
4299 }
4300 return Ok(result);
4301 }
4302 }
4303 continue;
4304 }
4305
4306 for line in text.lines() {
4308 if line.starts_with("data: ") {
4309 let data = &line[6..];
4310
4311 if data == "[DONE]" {
4313 continue;
4314 }
4315
4316 if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
4318 if let Some(event_type) = json.get("type").and_then(|t| t.as_str()) {
4320 match event_type {
4321 "message_start" => {
4322 result.message_started = true;
4325 if let Some(usage) = json.get("usage") {
4326 result.usage = parse_anthropic_usage(usage);
4327 }
4328 if json.get("research").is_some() {
4330 result.research = json.get("research").cloned();
4331 }
4332 if let Some(ref cb) = on_event {
4334 cb(AgentEvent::MessageStart {
4335 message_id: json
4336 .get("message")
4337 .and_then(|m| m.get("id"))
4338 .and_then(|i| i.as_str())
4339 .unwrap_or("")
4340 .to_string(),
4341 });
4342 }
4343 }
4344 "content_block_start" => {
4345 let index =
4346 json.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4347 as u32;
4348 let block_type = json
4349 .get("content_block")
4350 .and_then(|b| b.get("type"))
4351 .and_then(|t| t.as_str())
4352 .unwrap_or("text")
4353 .to_string();
4354
4355 result.content_blocks_started += 1;
4356
4357 if block_type == "tool_use" {
4358 tool_use_index = index;
4359 in_tool_use = true;
4360 let tool_name = json
4361 .get("content_block")
4362 .and_then(|b| b.get("name"))
4363 .and_then(|n| n.as_str())
4364 .unwrap_or("")
4365 .to_string();
4366 let tool_id = json
4367 .get("content_block")
4368 .and_then(|b| b.get("id"))
4369 .and_then(|i| i.as_str())
4370 .unwrap_or("")
4371 .to_string();
4372 current_tool_use =
4373 Some((tool_id, tool_name, String::new()));
4374 } else if block_type == "thinking"
4375 || block_type == "redacted_thinking"
4376 {
4377 in_thinking = true;
4378 thinking_index = index;
4379 thinking_content.clear();
4380 } else {
4381 content_index = index;
4382 text_block_started = true;
4383 }
4384
4385 if let Some(ref cb) = on_event {
4386 cb(AgentEvent::ContentBlockStart { index, block_type });
4387 }
4388 }
4389 "content_block_delta" => {
4390 let index =
4391 json.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4392 as u32;
4393 if let Some(delta) = json.get("delta") {
4394 let delta_type = delta.get("type").and_then(|t| t.as_str());
4395
4396 match delta_type {
4397 Some("text_delta") => {
4398 if let Some(text) =
4399 delta.get("text").and_then(|t| t.as_str())
4400 {
4401 result.content.push_str(text);
4402 if let Some(ref cb) = on_event {
4403 cb(AgentEvent::ContentBlockDelta {
4404 index,
4405 delta: ContentDelta::Text {
4406 text: text.to_string(),
4407 },
4408 });
4409 }
4410 }
4411 }
4412 Some("thinking_delta") => {
4413 if let Some(thinking) =
4414 delta.get("thinking").and_then(|t| t.as_str())
4415 {
4416 thinking_content.push_str(thinking);
4417 if let Some(ref cb) = on_event {
4418 cb(AgentEvent::ContentBlockDelta {
4419 index,
4420 delta: ContentDelta::Thinking {
4421 text: thinking.to_string(),
4422 },
4423 });
4424 }
4425 }
4426 }
4427 Some("input_json_delta") => {
4428 let partial_json = delta
4429 .get("partial_json")
4430 .and_then(|p| p.as_str())
4431 .unwrap_or("");
4432
4433 if let Some(ref mut current) = current_tool_use {
4434 current.2.push_str(partial_json);
4435 }
4436
4437 if let Some(ref cb) = on_event {
4438 let tool_name = current_tool_use
4439 .as_ref()
4440 .map(|(_, n, _)| n.clone())
4441 .unwrap_or_default();
4442 let tool_id = current_tool_use
4443 .as_ref()
4444 .map(|(i, _, _)| i.clone())
4445 .unwrap_or_default();
4446 cb(AgentEvent::ContentBlockDelta {
4447 index,
4448 delta: ContentDelta::ToolUse {
4449 id: tool_id,
4450 name: tool_name,
4451 input: serde_json::json!({ "partial": partial_json }),
4452 is_complete: false,
4453 },
4454 });
4455 }
4456 }
4457 Some("signature_delta") => {
4458 }
4461 _ => {}
4462 }
4463 }
4464 }
4465 "content_block_stop" => {
4466 let index =
4467 json.get("index").and_then(|i| i.as_u64()).unwrap_or(0)
4468 as u32;
4469
4470 result.content_blocks_completed += 1;
4471
4472 if in_tool_use && index == tool_use_index {
4474 if let Some((id, name, args_str)) = current_tool_use.take()
4475 {
4476 let args: serde_json::Value =
4477 serde_json::from_str(&args_str)
4478 .unwrap_or_else(|_| empty_json_value());
4479
4480 result.tool_calls.push(serde_json::json!({
4481 "id": id,
4482 "name": name,
4483 "arguments": args,
4484 }));
4485 result.any_tool_use_completed = true;
4486 }
4487 in_tool_use = false;
4488 }
4489
4490 if in_thinking && index == thinking_index {
4492 if !thinking_content.is_empty() {
4493 result.content.push_str(&format!(
4494 "【thinking:{}】",
4495 thinking_content
4496 ));
4497 }
4498 in_thinking = false;
4499 thinking_content.clear();
4500 }
4501
4502 if let Some(ref cb) = on_event {
4503 cb(AgentEvent::ContentBlockStop { index });
4504 }
4505 }
4506 "message_delta" => {
4507 if let Some(usage) = json.get("usage") {
4512 result.usage = parse_anthropic_usage(usage);
4513 }
4514 if let Some(delta) = json.get("delta") {
4516 if let Some(stop_reason) =
4517 delta.get("stop_reason").and_then(|s| s.as_str())
4518 {
4519 result.stop_reason = Some(stop_reason.to_string());
4520 }
4521 }
4522 result.cost = calculate_streaming_cost(&result.usage, &model);
4524 if let Some(ref cb) = on_event {
4525 cb(AgentEvent::TokenUsage {
4526 usage: result.usage.clone(),
4527 cost: result.cost,
4528 });
4529 }
4530 }
4531 "message_stop" => {
4532 break 'stream_loop;
4537 }
4538 _ => {}
4539 }
4540 }
4541
4542 if let Some(choices) = json.get("choices").and_then(|c| c.as_array()) {
4544 if let Some(first) = choices.first() {
4545 if let Some(delta) = first.get("delta") {
4546 if let Some(content) =
4547 delta.get("content").and_then(|c| c.as_str())
4548 {
4549 if !content.is_empty() {
4550 result.content.push_str(content);
4551 if !result.message_started {
4553 result.message_started = true;
4554 if let Some(ref cb) = on_event {
4555 cb(AgentEvent::MessageStart {
4556 message_id: uuid::Uuid::new_v4()
4557 .to_string(),
4558 });
4559 cb(AgentEvent::ContentBlockStart {
4560 index: 0,
4561 block_type: "text".to_string(),
4562 });
4563 }
4564 }
4565 if let Some(ref cb) = on_event {
4566 cb(AgentEvent::ContentBlockDelta {
4567 index: 0,
4568 delta: ContentDelta::Text {
4569 text: content.to_string(),
4570 },
4571 });
4572 }
4573 }
4574 }
4575 if let Some(tool_calls) =
4577 delta.get("tool_calls").and_then(|t| t.as_array())
4578 {
4579 if !result.message_started {
4581 result.message_started = true;
4582 if let Some(ref cb) = on_event {
4583 cb(AgentEvent::MessageStart {
4584 message_id: uuid::Uuid::new_v4().to_string(),
4585 });
4586 }
4587 }
4588 for tc in tool_calls {
4589 let idx = tc
4590 .get("index")
4591 .and_then(|i| i.as_u64())
4592 .unwrap_or(0)
4593 as u32;
4594 let id =
4595 tc.get("id").and_then(|i| i.as_str()).unwrap_or("");
4596 let func = tc.get("function");
4597 let name = func
4598 .and_then(|f| f.get("name"))
4599 .and_then(|n| n.as_str())
4600 .unwrap_or("");
4601 let args_str = func
4602 .and_then(|f| f.get("arguments"))
4603 .and_then(|a| a.as_str())
4604 .unwrap_or("");
4605
4606 if !openai_tool_finalized.contains(&idx) {
4608 let entry = openai_tool_calls
4609 .entry(idx)
4610 .or_insert_with(|| {
4611 (
4612 id.to_string(),
4613 name.to_string(),
4614 String::new(),
4615 )
4616 });
4617 if entry.0.is_empty() && !id.is_empty() {
4619 entry.0 = id.to_string();
4620 }
4621 if entry.1.is_empty() && !name.is_empty() {
4622 entry.1 = name.to_string();
4623 }
4624 entry.2.push_str(args_str);
4625 }
4626 }
4627 }
4628 }
4629 if let Some(finish_reason) =
4631 first.get("finish_reason").and_then(|f| f.as_str())
4632 {
4633 if !finish_reason.is_empty() && finish_reason != "null" {
4634 result.stop_reason = Some(finish_reason.to_string());
4635 if let Some(ref cb) = on_event {
4636 cb(AgentEvent::ContentBlockStop { index: 0 });
4637 cb(AgentEvent::MessageStop);
4638 }
4639 result.message_started = true;
4640 result.content_blocks_started += 1;
4641 result.content_blocks_completed += 1;
4642 result.cost =
4643 calculate_streaming_cost(&result.usage, &model);
4644
4645 for (idx, (id, name, args)) in &openai_tool_calls {
4647 if !openai_tool_finalized.contains(idx) {
4648 let args_val: serde_json::Value =
4649 serde_json::from_str(args)
4650 .unwrap_or_else(|_| empty_json_value());
4651 result.tool_calls.push(serde_json::json!({
4652 "id": id,
4653 "name": name,
4654 "arguments": args_val,
4655 }));
4656 }
4657 }
4658 openai_tool_finalized
4659 .extend(openai_tool_calls.keys().copied());
4660
4661 return Ok(result);
4662 }
4663 }
4664 }
4665 continue;
4666 }
4667
4668 if json.get("content").is_some() || json.get("id").is_some() {
4670 if let Some(content_array) =
4671 json.get("content").and_then(|c| c.as_array())
4672 {
4673 for block in content_array {
4674 let block_type = block.get("type").and_then(|t| t.as_str());
4675 match block_type {
4676 Some("text") => {
4677 if let Some(text) =
4678 block.get("text").and_then(|t| t.as_str())
4679 {
4680 result.content.push_str(text);
4681 }
4682 }
4683 Some("tool_use") => {
4684 let tool_id = block
4685 .get("id")
4686 .and_then(|i| i.as_str())
4687 .unwrap_or("");
4688 let tool_name = block
4689 .get("name")
4690 .and_then(|n| n.as_str())
4691 .unwrap_or("");
4692 let tool_input = block
4693 .get("input")
4694 .cloned()
4695 .unwrap_or_else(|| empty_json_value());
4696
4697 result.tool_calls.push(serde_json::json!({
4698 "id": tool_id,
4699 "name": tool_name,
4700 "arguments": tool_input,
4701 }));
4702 result.any_tool_use_completed = true;
4703 }
4704 _ => {}
4705 }
4706 }
4707 }
4708
4709 if let Some(usage) = json.get("usage") {
4710 result.usage = parse_anthropic_usage(usage);
4711 }
4712 result.message_started = true;
4713 result.content_blocks_started += 1;
4714 result.content_blocks_completed += 1;
4715 result.cost = calculate_streaming_cost(&result.usage, &model);
4716
4717 if let Some(ref cb) = on_event {
4718 cb(AgentEvent::ContentBlockStart {
4719 index: 0,
4720 block_type: "text".to_string(),
4721 });
4722 if !result.content.is_empty() {
4723 cb(AgentEvent::ContentBlockDelta {
4724 index: 0,
4725 delta: ContentDelta::Text {
4726 text: result.content.clone(),
4727 },
4728 });
4729 }
4730 cb(AgentEvent::ContentBlockStop { index: 0 });
4731 cb(AgentEvent::MessageStop);
4732 }
4733 return Ok(result);
4734 }
4735 }
4736 }
4737 }
4738 }
4739 }
4740
4741 result.cost = calculate_streaming_cost(&result.usage, &model);
4745
4746 watchdog_aborted.store(true, Ordering::SeqCst);
4748
4749 if let Some(ref cb) = on_event {
4751 cb(AgentEvent::MessageStop);
4752 }
4753
4754 validate_stream_completion(&result)?;
4756
4757 Ok(result)
4758}
4759
4760async fn build_memory_prefetch_context(
4762 prompt: &str,
4763 config: &QueryEngineConfig,
4764 loaded_paths: &std::collections::HashSet<String>,
4765) -> Option<String> {
4766 use crate::memdir::{find_relevant_memories, get_memory_base_dir, is_auto_memory_enabled};
4767
4768 if !is_auto_memory_enabled() {
4769 return None;
4770 }
4771
4772 let memory_dir = get_memory_base_dir();
4773
4774 let relevant = find_relevant_memories(prompt, &memory_dir).await;
4775
4776 if relevant.is_empty() {
4777 return None;
4778 }
4779
4780 let new_paths: Vec<String> = relevant
4781 .into_iter()
4782 .filter(|p| !loaded_paths.contains(p.as_str()))
4783 .collect();
4784
4785 if new_paths.is_empty() {
4786 return None;
4787 }
4788
4789 let paths_display = new_paths.join("\n");
4790 Some(format!(
4791 "<relevant-memories>\nThe following memory files may be relevant to your query:\n{}\n</relevant-memories>",
4792 paths_display
4793 ))
4794}