1use crate::ai::{
2 error::AiError, provider::AiProvider, Content, ContentBlock, ConversationRequest,
3 ConversationResponse, Message, MessageRole, ModelSettings, StreamEvent, ToolUseData,
4};
5use crate::chat::events::{ChatEvent, ChatMessage, ModelInfo};
6use crate::chat::request::{prepare_request, select_model_for_agent};
7use crate::chat::tool_extraction::extract_all_tool_calls;
8use crate::chat::tools::{self, current_agent_mut};
9
10use crate::ai::tweaks::resolve_from_settings;
11use crate::settings::config::ToolCallStyle;
12use anyhow::Result;
13use chrono::Utc;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::time::sleep;
18use tokio_stream::{Stream, StreamExt};
19use tracing::{info, warn};
20
21use super::actor::ActorState;
22
23pub async fn send_ai_request(state: &mut ActorState) -> Result<()> {
24 loop {
25 let (agent, conversation) =
26 tools::current_agent(state, |a| (a.agent.clone(), a.conversation.clone()));
27
28 let (request, model_settings) = prepare_request(
29 agent.as_ref(),
30 &conversation,
31 state.provider.as_ref(),
32 state.settings.clone(),
33 &state.steering,
34 state.tools.clone(),
35 &state.prompt_builder,
36 &state.context_builder,
37 &state.modules,
38 )
39 .await?;
40
41 state.transition_timing_state(crate::chat::actor::TimingState::ProcessingAI);
42
43 let stream = match send_request_streaming_with_retry(state, request).await {
44 Ok(stream) => stream,
45 Err(e) => {
46 state
47 .event_sender
48 .send_message(ChatMessage::error(format!("Error: {e:?}")));
49 return Ok(());
50 }
51 };
52
53 state.transition_timing_state(crate::chat::actor::TimingState::Idle);
54
55 let model = model_settings.model;
56 let tool_calls = consume_ai_stream(state, stream, model_settings).await?;
57
58 if tool_calls.is_empty() {
59 if !tools::current_agent(state, |a| a.agent.requires_tool_use()) {
60 break;
61 }
62 tools::current_agent_mut(state, |a| {
63 a.conversation.push(Message {
64 role: MessageRole::User,
65 content: Content::text_only("Tool use is required. Please use one of the available tools to complete your task.".to_string()),
66 })
67 });
68 continue;
69 }
70
71 match tools::execute_tool_calls(state, tool_calls, model).await {
72 Ok(tool_results) => {
73 if tool_results.continue_conversation {
74 continue;
75 } else {
76 break;
77 }
78 }
79 Err(e) => {
80 state.event_sender.send(ChatEvent::RetryAttempt {
81 attempt: 1,
82 max_retries: 1000,
83 error: e.to_string(),
84 backoff_ms: 0,
85 });
86
87 current_agent_mut(state, |a| {
88 let _last = a.conversation.pop();
89 a.conversation.push(Message {
90 role: MessageRole::User,
91 content: Content::text_only(format!(
92 "You attempted to use tools incorrectly; the system has removed the incorrect tool calls from the conversation history. Please incorporate the following feedback feedback and retry. Here are the errors from the (removed) tool calls: {}",
93 e.to_string()
94 )),
95 });
96 });
97 continue;
98 }
99 }
100 }
101
102 Ok(())
103}
104
105fn finalize_ai_response(
106 state: &mut ActorState,
107 response: ConversationResponse,
108 model_settings: ModelSettings,
109) -> (Vec<ToolUseData>, ChatMessage) {
110 let content = response.content.clone();
111
112 info!(?response, "AI response");
113
114 state.session_token_usage.input_tokens += response.usage.input_tokens;
115 state.session_token_usage.output_tokens += response.usage.output_tokens;
116 state.session_token_usage.total_tokens += response.usage.total_tokens;
117 state.session_token_usage.cached_prompt_tokens = Some(
118 state.session_token_usage.cached_prompt_tokens.unwrap_or(0)
119 + response.usage.cached_prompt_tokens.unwrap_or(0),
120 );
121 state.session_token_usage.cache_creation_input_tokens = Some(
122 state
123 .session_token_usage
124 .cache_creation_input_tokens
125 .unwrap_or(0)
126 + response.usage.cache_creation_input_tokens.unwrap_or(0),
127 );
128 state.session_token_usage.reasoning_tokens = Some(
129 state.session_token_usage.reasoning_tokens.unwrap_or(0)
130 + response.usage.reasoning_tokens.unwrap_or(0),
131 );
132
133 let cost = state.provider.get_cost(&model_settings.model);
134 let response_cost = cost.calculate_cost(&response.usage);
135 state.session_cost += response_cost;
136
137 let reasoning = content.reasoning().first().map(|r| (*r).clone());
138
139 let extraction = extract_all_tool_calls(&content);
140 let tool_calls = extraction.tool_calls;
141 let display_text = extraction.display_text;
142 let xml_parse_error = extraction.xml_parse_error;
143 let json_parse_error = extraction.json_parse_error;
144
145 if let Some(parse_error) = xml_parse_error {
146 warn!("XML tool call parse error: {parse_error}");
147 tools::current_agent_mut(state, |a| {
148 a.conversation.push(Message {
149 role: MessageRole::User,
150 content: Content::text_only(format!(
151 "Error parsing XML tool calls: {}. Please check your XML format and retry.",
152 parse_error
153 )),
154 })
155 });
156 }
157 if let Some(parse_error) = json_parse_error {
158 warn!("JSON tool call parse error: {parse_error}");
159 tools::current_agent_mut(state, |a| {
160 a.conversation.push(Message {
161 role: MessageRole::User,
162 content: Content::text_only(format!(
163 "Error parsing JSON tool calls: {}. Please check your JSON format and retry.",
164 parse_error
165 )),
166 })
167 });
168 }
169
170 let message = ChatMessage::assistant(
171 tools::current_agent(state, |a| a.agent.name().to_string()),
172 display_text.clone(),
173 tool_calls.clone(),
174 ModelInfo {
175 model: model_settings.model,
176 },
177 response.usage,
178 reasoning,
179 );
180
181 let settings_snapshot = state.settings.settings();
182 let resolved_tweaks = resolve_from_settings(
183 &settings_snapshot,
184 state.provider.as_ref(),
185 model_settings.model,
186 );
187
188 let mut blocks: Vec<ContentBlock> = Vec::new();
189
190 for r in content.reasoning() {
191 blocks.push(ContentBlock::ReasoningContent(r.clone()));
192 }
193
194 let trimmed_text = display_text.trim();
195 if !trimmed_text.is_empty() {
196 blocks.push(ContentBlock::Text(trimmed_text.to_string()));
197 }
198
199 if resolved_tweaks.tool_call_style != ToolCallStyle::Xml {
200 for tool_use in &tool_calls {
201 blocks.push(ContentBlock::ToolUse(tool_use.clone()));
202 }
203 }
204
205 tools::current_agent_mut(state, |a| {
206 a.conversation.push(Message {
207 role: MessageRole::Assistant,
208 content: Content::new(blocks),
209 })
210 });
211
212 state.last_command_outputs.clear();
213
214 (tool_calls, message)
215}
216
217async fn consume_ai_stream(
218 state: &mut ActorState,
219 stream: Pin<Box<dyn Stream<Item = Result<StreamEvent, AiError>> + Send>>,
220 model_settings: ModelSettings,
221) -> Result<Vec<ToolUseData>> {
222 let disable_streaming = state.settings.settings().disable_streaming;
223 let message_id = format!("msg-{}", Utc::now().timestamp_millis());
224 let agent_name = tools::current_agent(state, |a| a.agent.name().to_string());
225
226 tokio::pin!(stream);
227
228 let mut tool_calls = Vec::new();
229 let mut received_text_deltas = false;
230 let mut stream_started = false;
231
232 while let Some(event) = stream.next().await {
233 let event: StreamEvent = event.map_err(|e| anyhow::anyhow!("Stream error: {e:?}"))?;
234 match event {
235 StreamEvent::TextDelta { text } => {
236 received_text_deltas = true;
237 if !disable_streaming {
238 if !stream_started {
239 state.event_sender.stream_start(
240 message_id.clone(),
241 agent_name.clone(),
242 model_settings.model,
243 );
244 stream_started = true;
245 }
246 state.event_sender.stream_delta(message_id.clone(), text);
247 }
248 }
249 StreamEvent::ReasoningDelta { text } => {
250 if !disable_streaming {
251 if !stream_started {
252 state.event_sender.stream_start(
253 message_id.clone(),
254 agent_name.clone(),
255 model_settings.model,
256 );
257 stream_started = true;
258 }
259 state
260 .event_sender
261 .stream_reasoning_delta(message_id.clone(), text);
262 }
263 }
264 StreamEvent::ContentBlockStart | StreamEvent::ContentBlockStop => {}
265 StreamEvent::MessageComplete { response } => {
266 if !disable_streaming && !received_text_deltas {
267 let full_text = response.content.text();
268 if !full_text.is_empty() {
269 if !stream_started {
270 state.event_sender.stream_start(
271 message_id.clone(),
272 agent_name.clone(),
273 model_settings.model,
274 );
275 stream_started = true;
276 }
277 state
278 .event_sender
279 .stream_delta(message_id.clone(), full_text);
280 }
281 }
282 let (calls, message) =
283 finalize_ai_response(state, response, model_settings.clone());
284 tool_calls = calls;
285 if disable_streaming {
286 state.event_sender.send_message(message);
287 } else {
288 state.event_sender.stream_end(message);
289 }
290 }
291 }
292 }
293
294 Ok(tool_calls)
295}
296
297async fn try_send_request_stream(
298 provider: &Arc<dyn AiProvider>,
299 request: &ConversationRequest,
300) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, AiError>> + Send>>, AiError> {
301 provider.converse_stream(request.clone()).await
302}
303
304fn truncate_recent_conversation(state: &mut ActorState) -> usize {
305 tools::current_agent_mut(state, |agent| {
306 let len = agent.conversation.len();
307 if len >= 2 {
308 agent.conversation.truncate(len - 2);
309 }
310 len
311 })
312}
313
314async fn send_request_streaming_with_retry(
315 state: &mut ActorState,
316 mut request: ConversationRequest,
317) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, AiError>> + Send>>> {
318 const MAX_RETRIES: u32 = 1000;
319 const MAX_TRANSIENT_RETRIES: u32 = 10;
320 const INITIAL_BACKOFF_MS: u64 = 100;
321 const MAX_BACKOFF_MS: u64 = 1000;
322 const BACKOFF_MULTIPLIER: f64 = 2.0;
323
324 let mut attempt = 0;
325
326 loop {
327 let result = try_send_request_stream(&state.provider, &request).await;
328
329 let max_retries = match &result {
330 Err(AiError::Transient(_)) => MAX_TRANSIENT_RETRIES,
331 _ => MAX_RETRIES,
332 };
333
334 match result {
335 Ok(stream) => {
336 if attempt > 0 {
337 info!("Streaming request succeeded after {} retries", attempt);
338 }
339 return Ok(stream);
340 }
341 Err(AiError::InputTooLong(_)) => {
342 state.event_sender.send_message(ChatMessage::warning(
343 "Context overflow detected, auto-compacting conversation...".to_string(),
344 ));
345 warn!("Input too long, compacting context");
346
347 let messages_before = truncate_recent_conversation(state);
348
349 compact_context(state).await?;
350
351 let messages_after = tools::current_agent(state, |a| a.conversation.len());
352 state.event_sender.send_message(ChatMessage::system(format!(
353 "Compaction complete: {} messages → {} (summary). Tracked files cleared.",
354 messages_before, messages_after
355 )));
356
357 request.messages = state
358 .spawn_module
359 .with_current_agent(|a| a.conversation.clone())
360 .unwrap_or_default();
361
362 continue;
363 }
364 Err(error) => {
365 if !should_retry(&error, attempt, max_retries) {
366 warn!(
367 attempt,
368 max_retries,
369 "Streaming request failed after {} retries: {}",
370 attempt,
371 error
372 );
373 return Err(error.into());
374 }
375
376 let backoff_ms = calculate_backoff(
377 attempt,
378 INITIAL_BACKOFF_MS,
379 MAX_BACKOFF_MS,
380 BACKOFF_MULTIPLIER,
381 );
382
383 emit_retry_event(state, attempt + 1, max_retries, &error, backoff_ms);
384
385 warn!(
386 attempt = attempt + 1,
387 max_retries = MAX_RETRIES,
388 backoff_ms,
389 error = %error,
390 "Streaming request failed, retrying after backoff"
391 );
392
393 sleep(Duration::from_millis(backoff_ms)).await;
394 attempt += 1;
395 }
396 }
397 }
398}
399
400async fn try_send_request(
401 provider: &Arc<dyn AiProvider>,
402 request: &ConversationRequest,
403) -> Result<ConversationResponse, AiError> {
404 provider.converse(request.clone()).await
405}
406
407fn should_retry(error: &AiError, attempt: u32, max_retries: u32) -> bool {
408 (matches!(error, AiError::Retryable(_)) || matches!(error, AiError::Transient(_)))
409 && attempt < max_retries
410}
411
412fn calculate_backoff(attempt: u32, initial_ms: u64, max_ms: u64, multiplier: f64) -> u64 {
413 let base_backoff = initial_ms as f64 * multiplier.powi(attempt as i32);
414 base_backoff.min(max_ms as f64) as u64
415}
416
417fn emit_retry_event(
418 state: &mut ActorState,
419 attempt: u32,
420 max_retries: u32,
421 error: &AiError,
422 backoff_ms: u64,
423) {
424 let retry_event = ChatEvent::RetryAttempt {
425 attempt,
426 max_retries,
427 error: error.to_string(),
428 backoff_ms,
429 };
430
431 state.event_sender.send(retry_event);
432}
433
434async fn compact_context(state: &mut ActorState) -> Result<()> {
435 let (conversation, agent_name) = tools::current_agent(state, |a| {
436 (a.conversation.clone(), a.agent.name().to_string())
437 });
438
439 let settings_snapshot = state.settings.settings();
440 let model_settings =
441 select_model_for_agent(&settings_snapshot, state.provider.as_ref(), &agent_name)?;
442
443 let summarization_prompt = "Please provide a concise summary of the conversation so far, preserving all critical context, decisions, and important details. The summary will be used to continue the conversation efficiently. Focus on:
4441. Key decisions made
4452. Important context about the task
4463. Current state of work and remaining work
4474. Any critical information needed to continue effectively";
448
449 let filtered_messages: Vec<Message> = conversation
454 .clone()
455 .into_iter()
456 .map(|mut msg| {
457 let filtered_blocks: Vec<ContentBlock> = msg
458 .content
459 .into_blocks()
460 .into_iter()
461 .filter(|block| {
462 !matches!(block, ContentBlock::ToolUse { .. })
463 && !matches!(block, ContentBlock::ToolResult { .. })
464 })
465 .collect();
466 msg.content = Content::new(filtered_blocks);
467 msg
468 })
469 .collect();
470
471 let mut summary_request = ConversationRequest {
472 messages: filtered_messages,
473 model: model_settings.clone(),
474 system_prompt: "You are a conversation summarizer. Create concise, comprehensive summaries that preserve critical context.".to_string(),
475 stop_sequences: vec![],
476 tools: vec![],
477 };
478
479 summary_request.messages.push(Message {
480 role: MessageRole::User,
481 content: Content::text_only(summarization_prompt.to_string()),
482 });
483
484 let summary_response = try_send_request(&state.provider, &summary_request).await?;
485 let summary_text = summary_response.content.text();
486
487 tools::current_agent_mut(state, |agent| {
488 agent.conversation.clear();
489 agent.conversation.push(Message {
490 role: MessageRole::User,
491 content: Content::text_only(format!(
492 "Context summary from previous conversation:\n{}\n\nPlease continue assisting based on this context.",
493 summary_text
494 )),
495 });
496 });
497
498 state.tracked_files.clear();
499
500 Ok(())
501}