Skip to main content

tycode_core/chat/
ai.rs

1use crate::ai::{
2    error::AiError, provider::AiProvider, Content, ContentBlock, ConversationRequest,
3    ConversationResponse, Message, MessageRole, ModelSettings, StreamEvent, ToolUseData,
4};
5use crate::chat::events::{ChatEvent, ChatMessage, ModelInfo};
6use crate::chat::request::{prepare_request, select_model_for_agent};
7use crate::chat::tool_extraction::extract_all_tool_calls;
8use crate::chat::tools::{self, current_agent_mut};
9
10use crate::ai::tweaks::resolve_from_settings;
11use crate::settings::config::ToolCallStyle;
12use anyhow::Result;
13use chrono::Utc;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::time::sleep;
18use tokio_stream::{Stream, StreamExt};
19use tracing::{info, warn};
20
21use super::actor::ActorState;
22
23pub async fn send_ai_request(state: &mut ActorState) -> Result<()> {
24    loop {
25        let (agent, conversation) =
26            tools::current_agent(state, |a| (a.agent.clone(), a.conversation.clone()));
27
28        let (request, model_settings) = prepare_request(
29            agent.as_ref(),
30            &conversation,
31            state.provider.as_ref(),
32            state.settings.clone(),
33            &state.steering,
34            state.tools.clone(),
35            &state.prompt_builder,
36            &state.context_builder,
37            &state.modules,
38        )
39        .await?;
40
41        state.transition_timing_state(crate::chat::actor::TimingState::ProcessingAI);
42
43        let stream = match send_request_streaming_with_retry(state, request).await {
44            Ok(stream) => stream,
45            Err(e) => {
46                state
47                    .event_sender
48                    .send_message(ChatMessage::error(format!("Error: {e:?}")));
49                return Ok(());
50            }
51        };
52
53        state.transition_timing_state(crate::chat::actor::TimingState::Idle);
54
55        let model = model_settings.model;
56        let tool_calls = consume_ai_stream(state, stream, model_settings).await?;
57
58        if tool_calls.is_empty() {
59            if !tools::current_agent(state, |a| a.agent.requires_tool_use()) {
60                break;
61            }
62            tools::current_agent_mut(state, |a| {
63                a.conversation.push(Message {
64                role: MessageRole::User,
65                content: Content::text_only("Tool use is required. Please use one of the available tools to complete your task.".to_string()),
66            })
67            });
68            continue;
69        }
70
71        match tools::execute_tool_calls(state, tool_calls, model).await {
72            Ok(tool_results) => {
73                if tool_results.continue_conversation {
74                    continue;
75                } else {
76                    break;
77                }
78            }
79            Err(e) => {
80                state.event_sender.send(ChatEvent::RetryAttempt {
81                    attempt: 1,
82                    max_retries: 1000,
83                    error: e.to_string(),
84                    backoff_ms: 0,
85                });
86
87                current_agent_mut(state, |a| {
88                    let _last = a.conversation.pop();
89                    a.conversation.push(Message {
90                        role: MessageRole::User,
91                        content: Content::text_only(format!(
92                            "You attempted to use tools incorrectly; the system has removed the incorrect tool calls from the conversation history. Please incorporate the following feedback feedback and retry. Here are the errors from the (removed) tool calls: {}",
93                            e.to_string()
94                        )),
95                    });
96                });
97                continue;
98            }
99        }
100    }
101
102    Ok(())
103}
104
105fn finalize_ai_response(
106    state: &mut ActorState,
107    response: ConversationResponse,
108    model_settings: ModelSettings,
109) -> (Vec<ToolUseData>, ChatMessage) {
110    let content = response.content.clone();
111
112    info!(?response, "AI response");
113
114    state.session_token_usage.input_tokens += response.usage.input_tokens;
115    state.session_token_usage.output_tokens += response.usage.output_tokens;
116    state.session_token_usage.total_tokens += response.usage.total_tokens;
117    state.session_token_usage.cached_prompt_tokens = Some(
118        state.session_token_usage.cached_prompt_tokens.unwrap_or(0)
119            + response.usage.cached_prompt_tokens.unwrap_or(0),
120    );
121    state.session_token_usage.cache_creation_input_tokens = Some(
122        state
123            .session_token_usage
124            .cache_creation_input_tokens
125            .unwrap_or(0)
126            + response.usage.cache_creation_input_tokens.unwrap_or(0),
127    );
128    state.session_token_usage.reasoning_tokens = Some(
129        state.session_token_usage.reasoning_tokens.unwrap_or(0)
130            + response.usage.reasoning_tokens.unwrap_or(0),
131    );
132
133    let cost = state.provider.get_cost(&model_settings.model);
134    let response_cost = cost.calculate_cost(&response.usage);
135    state.session_cost += response_cost;
136
137    let reasoning = content.reasoning().first().map(|r| (*r).clone());
138
139    let extraction = extract_all_tool_calls(&content);
140    let tool_calls = extraction.tool_calls;
141    let display_text = extraction.display_text;
142    let xml_parse_error = extraction.xml_parse_error;
143    let json_parse_error = extraction.json_parse_error;
144
145    if let Some(parse_error) = xml_parse_error {
146        warn!("XML tool call parse error: {parse_error}");
147        tools::current_agent_mut(state, |a| {
148            a.conversation.push(Message {
149                role: MessageRole::User,
150                content: Content::text_only(format!(
151                    "Error parsing XML tool calls: {}. Please check your XML format and retry.",
152                    parse_error
153                )),
154            })
155        });
156    }
157    if let Some(parse_error) = json_parse_error {
158        warn!("JSON tool call parse error: {parse_error}");
159        tools::current_agent_mut(state, |a| {
160            a.conversation.push(Message {
161                role: MessageRole::User,
162                content: Content::text_only(format!(
163                    "Error parsing JSON tool calls: {}. Please check your JSON format and retry.",
164                    parse_error
165                )),
166            })
167        });
168    }
169
170    let message = ChatMessage::assistant(
171        tools::current_agent(state, |a| a.agent.name().to_string()),
172        display_text.clone(),
173        tool_calls.clone(),
174        ModelInfo {
175            model: model_settings.model,
176        },
177        response.usage,
178        reasoning,
179    );
180
181    let settings_snapshot = state.settings.settings();
182    let resolved_tweaks = resolve_from_settings(
183        &settings_snapshot,
184        state.provider.as_ref(),
185        model_settings.model,
186    );
187
188    let mut blocks: Vec<ContentBlock> = Vec::new();
189
190    for r in content.reasoning() {
191        blocks.push(ContentBlock::ReasoningContent(r.clone()));
192    }
193
194    let trimmed_text = display_text.trim();
195    if !trimmed_text.is_empty() {
196        blocks.push(ContentBlock::Text(trimmed_text.to_string()));
197    }
198
199    if resolved_tweaks.tool_call_style != ToolCallStyle::Xml {
200        for tool_use in &tool_calls {
201            blocks.push(ContentBlock::ToolUse(tool_use.clone()));
202        }
203    }
204
205    tools::current_agent_mut(state, |a| {
206        a.conversation.push(Message {
207            role: MessageRole::Assistant,
208            content: Content::new(blocks),
209        })
210    });
211
212    state.last_command_outputs.clear();
213
214    (tool_calls, message)
215}
216
217async fn consume_ai_stream(
218    state: &mut ActorState,
219    stream: Pin<Box<dyn Stream<Item = Result<StreamEvent, AiError>> + Send>>,
220    model_settings: ModelSettings,
221) -> Result<Vec<ToolUseData>> {
222    let disable_streaming = state.settings.settings().disable_streaming;
223    let message_id = format!("msg-{}", Utc::now().timestamp_millis());
224    let agent_name = tools::current_agent(state, |a| a.agent.name().to_string());
225
226    tokio::pin!(stream);
227
228    let mut tool_calls = Vec::new();
229    let mut received_text_deltas = false;
230    let mut stream_started = false;
231
232    while let Some(event) = stream.next().await {
233        let event: StreamEvent = event.map_err(|e| anyhow::anyhow!("Stream error: {e:?}"))?;
234        match event {
235            StreamEvent::TextDelta { text } => {
236                received_text_deltas = true;
237                if !disable_streaming {
238                    if !stream_started {
239                        state.event_sender.stream_start(
240                            message_id.clone(),
241                            agent_name.clone(),
242                            model_settings.model,
243                        );
244                        stream_started = true;
245                    }
246                    state.event_sender.stream_delta(message_id.clone(), text);
247                }
248            }
249            StreamEvent::ReasoningDelta { text } => {
250                if !disable_streaming {
251                    if !stream_started {
252                        state.event_sender.stream_start(
253                            message_id.clone(),
254                            agent_name.clone(),
255                            model_settings.model,
256                        );
257                        stream_started = true;
258                    }
259                    state
260                        .event_sender
261                        .stream_reasoning_delta(message_id.clone(), text);
262                }
263            }
264            StreamEvent::ContentBlockStart | StreamEvent::ContentBlockStop => {}
265            StreamEvent::MessageComplete { response } => {
266                if !disable_streaming && !received_text_deltas {
267                    let full_text = response.content.text();
268                    if !full_text.is_empty() {
269                        if !stream_started {
270                            state.event_sender.stream_start(
271                                message_id.clone(),
272                                agent_name.clone(),
273                                model_settings.model,
274                            );
275                            stream_started = true;
276                        }
277                        state
278                            .event_sender
279                            .stream_delta(message_id.clone(), full_text);
280                    }
281                }
282                let (calls, message) =
283                    finalize_ai_response(state, response, model_settings.clone());
284                tool_calls = calls;
285                if disable_streaming {
286                    state.event_sender.send_message(message);
287                } else {
288                    state.event_sender.stream_end(message);
289                }
290            }
291        }
292    }
293
294    Ok(tool_calls)
295}
296
297async fn try_send_request_stream(
298    provider: &Arc<dyn AiProvider>,
299    request: &ConversationRequest,
300) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, AiError>> + Send>>, AiError> {
301    provider.converse_stream(request.clone()).await
302}
303
304fn truncate_recent_conversation(state: &mut ActorState) -> usize {
305    tools::current_agent_mut(state, |agent| {
306        let len = agent.conversation.len();
307        if len >= 2 {
308            agent.conversation.truncate(len - 2);
309        }
310        len
311    })
312}
313
314async fn send_request_streaming_with_retry(
315    state: &mut ActorState,
316    mut request: ConversationRequest,
317) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, AiError>> + Send>>> {
318    const MAX_RETRIES: u32 = 1000;
319    const MAX_TRANSIENT_RETRIES: u32 = 10;
320    const INITIAL_BACKOFF_MS: u64 = 100;
321    const MAX_BACKOFF_MS: u64 = 1000;
322    const BACKOFF_MULTIPLIER: f64 = 2.0;
323
324    let mut attempt = 0;
325
326    loop {
327        let result = try_send_request_stream(&state.provider, &request).await;
328
329        let max_retries = match &result {
330            Err(AiError::Transient(_)) => MAX_TRANSIENT_RETRIES,
331            _ => MAX_RETRIES,
332        };
333
334        match result {
335            Ok(stream) => {
336                if attempt > 0 {
337                    info!("Streaming request succeeded after {} retries", attempt);
338                }
339                return Ok(stream);
340            }
341            Err(AiError::InputTooLong(_)) => {
342                state.event_sender.send_message(ChatMessage::warning(
343                    "Context overflow detected, auto-compacting conversation...".to_string(),
344                ));
345                warn!("Input too long, compacting context");
346
347                let messages_before = truncate_recent_conversation(state);
348
349                compact_context(state).await?;
350
351                let messages_after = tools::current_agent(state, |a| a.conversation.len());
352                state.event_sender.send_message(ChatMessage::system(format!(
353                    "Compaction complete: {} messages → {} (summary). Tracked files cleared.",
354                    messages_before, messages_after
355                )));
356
357                request.messages = state
358                    .spawn_module
359                    .with_current_agent(|a| a.conversation.clone())
360                    .unwrap_or_default();
361
362                continue;
363            }
364            Err(error) => {
365                if !should_retry(&error, attempt, max_retries) {
366                    warn!(
367                        attempt,
368                        max_retries,
369                        "Streaming request failed after {} retries: {}",
370                        attempt,
371                        error
372                    );
373                    return Err(error.into());
374                }
375
376                let backoff_ms = calculate_backoff(
377                    attempt,
378                    INITIAL_BACKOFF_MS,
379                    MAX_BACKOFF_MS,
380                    BACKOFF_MULTIPLIER,
381                );
382
383                emit_retry_event(state, attempt + 1, max_retries, &error, backoff_ms);
384
385                warn!(
386                    attempt = attempt + 1,
387                    max_retries = MAX_RETRIES,
388                    backoff_ms,
389                    error = %error,
390                    "Streaming request failed, retrying after backoff"
391                );
392
393                sleep(Duration::from_millis(backoff_ms)).await;
394                attempt += 1;
395            }
396        }
397    }
398}
399
400async fn try_send_request(
401    provider: &Arc<dyn AiProvider>,
402    request: &ConversationRequest,
403) -> Result<ConversationResponse, AiError> {
404    provider.converse(request.clone()).await
405}
406
407fn should_retry(error: &AiError, attempt: u32, max_retries: u32) -> bool {
408    (matches!(error, AiError::Retryable(_)) || matches!(error, AiError::Transient(_)))
409        && attempt < max_retries
410}
411
412fn calculate_backoff(attempt: u32, initial_ms: u64, max_ms: u64, multiplier: f64) -> u64 {
413    let base_backoff = initial_ms as f64 * multiplier.powi(attempt as i32);
414    base_backoff.min(max_ms as f64) as u64
415}
416
417fn emit_retry_event(
418    state: &mut ActorState,
419    attempt: u32,
420    max_retries: u32,
421    error: &AiError,
422    backoff_ms: u64,
423) {
424    let retry_event = ChatEvent::RetryAttempt {
425        attempt,
426        max_retries,
427        error: error.to_string(),
428        backoff_ms,
429    };
430
431    state.event_sender.send(retry_event);
432}
433
434async fn compact_context(state: &mut ActorState) -> Result<()> {
435    let (conversation, agent_name) = tools::current_agent(state, |a| {
436        (a.conversation.clone(), a.agent.name().to_string())
437    });
438
439    let settings_snapshot = state.settings.settings();
440    let model_settings =
441        select_model_for_agent(&settings_snapshot, state.provider.as_ref(), &agent_name)?;
442
443    let summarization_prompt = "Please provide a concise summary of the conversation so far, preserving all critical context, decisions, and important details. The summary will be used to continue the conversation efficiently. Focus on:
4441. Key decisions made
4452. Important context about the task
4463. Current state of work and remaining work
4474. Any critical information needed to continue effectively";
448
449    // Filter ToolUse/ToolResult blocks before summarization to avoid Bedrock's
450    // toolConfig validation error. Bedrock requires toolConfig when messages contain
451    // these blocks, but summarization requests don't offer tools (tools: vec![]).
452    // Only conversational content (Text, ReasoningContent) is needed for summarization.
453    let filtered_messages: Vec<Message> = conversation
454        .clone()
455        .into_iter()
456        .map(|mut msg| {
457            let filtered_blocks: Vec<ContentBlock> = msg
458                .content
459                .into_blocks()
460                .into_iter()
461                .filter(|block| {
462                    !matches!(block, ContentBlock::ToolUse { .. })
463                        && !matches!(block, ContentBlock::ToolResult { .. })
464                })
465                .collect();
466            msg.content = Content::new(filtered_blocks);
467            msg
468        })
469        .collect();
470
471    let mut summary_request = ConversationRequest {
472        messages: filtered_messages,
473        model: model_settings.clone(),
474        system_prompt: "You are a conversation summarizer. Create concise, comprehensive summaries that preserve critical context.".to_string(),
475        stop_sequences: vec![],
476        tools: vec![],
477    };
478
479    summary_request.messages.push(Message {
480        role: MessageRole::User,
481        content: Content::text_only(summarization_prompt.to_string()),
482    });
483
484    let summary_response = try_send_request(&state.provider, &summary_request).await?;
485    let summary_text = summary_response.content.text();
486
487    tools::current_agent_mut(state, |agent| {
488        agent.conversation.clear();
489        agent.conversation.push(Message {
490            role: MessageRole::User,
491            content: Content::text_only(format!(
492                "Context summary from previous conversation:\n{}\n\nPlease continue assisting based on this context.",
493                summary_text
494            )),
495        });
496    });
497
498    state.tracked_files.clear();
499
500    Ok(())
501}