aico/llm/
executor.rs

1use crate::consts::*;
2use crate::diffing::parser::StreamParser;
3use crate::exceptions::AicoError;
4use crate::historystore::reconstruct::reconstruct_history;
5use crate::llm::api_models::{ChatCompletionRequest, Message, StreamOptions};
6use crate::llm::client::{LlmClient, parse_sse_line};
7use crate::models::{DisplayItem, InteractionResult, TokenUsage};
8use crate::session::Session;
9use futures_util::TryStreamExt;
10use std::time::Instant;
11
12pub fn append_reasoning_delta(
13    buffer: &mut String,
14    delta: &crate::llm::api_models::ChunkDelta,
15) -> bool {
16    let start_len = buffer.len();
17
18    // Priority 1: Direct reasoning_content
19    if let Some(ref r) = delta.reasoning_content
20        && !r.is_empty()
21    {
22        buffer.push_str(r);
23        return true;
24    }
25
26    // Priority 2: Structured reasoning_details
27    if let Some(ref details) = delta.reasoning_details {
28        for detail in details {
29            match detail {
30                crate::llm::api_models::ReasoningDetail::Text { text } => buffer.push_str(text),
31                crate::llm::api_models::ReasoningDetail::Summary { summary } => {
32                    buffer.push_str(summary)
33                }
34                _ => {}
35            }
36        }
37    }
38
39    buffer.len() > start_len
40}
41
42pub fn extract_reasoning_header(reasoning_buffer: &str) -> Option<&str> {
43    static RE: std::sync::LazyLock<regex::Regex> = std::sync::LazyLock::new(|| {
44        regex::Regex::new(r"(?m)(?:^#{1,6}\s+(?P<header>.+?)[\r\n])|(?:^[*]{2}(?P<bold>.+?)[*]{2})")
45            .unwrap()
46    });
47
48    RE.captures_iter(reasoning_buffer)
49        .last()
50        .and_then(|cap| {
51            cap.name("header")
52                .or_else(|| cap.name("bold"))
53                .map(|m| m.as_str().trim())
54        })
55        .filter(|s| !s.is_empty())
56}
57
58fn format_user_content(content: &str, piped: &Option<String>) -> String {
59    if let Some(p) = piped {
60        format!(
61            "<stdin_content>\n{}\n</stdin_content>\n<prompt>\n{}\n</prompt>",
62            p.trim(),
63            content.trim()
64        )
65    } else {
66        content.to_string()
67    }
68}
69
70pub async fn build_request(
71    session: &Session,
72    system_prompt: &str,
73    user_prompt: &str,
74    mode: crate::models::Mode,
75    no_history: bool,
76    passthrough: bool,
77) -> Result<ChatCompletionRequest, AicoError> {
78    let client = LlmClient::new(&session.view.model)?;
79    let config = crate::models::InteractionConfig {
80        mode,
81        no_history,
82        passthrough,
83        model_override: None,
84    };
85    build_request_with_piped(&client, session, system_prompt, user_prompt, &None, &config).await
86}
87
88pub fn append_file_context_xml(buffer: &mut String, path: &str, content: &str) {
89    use std::fmt::Write;
90    // write! handles the formatting directly into the buffer
91    let _ = writeln!(buffer, "  <file path=\"{}\">", path);
92    buffer.push_str(content);
93    if !content.ends_with('\n') {
94        buffer.push('\n');
95    }
96    buffer.push_str("  </file>\n");
97}
98
99fn format_file_block(mut files: Vec<(&str, &str)>, intro: &str, anchor: &str) -> Vec<Message> {
100    if files.is_empty() {
101        return vec![];
102    }
103    // Ensure deterministic ordering (alphabetical by path)
104    files.sort_by(|a, b| a.0.cmp(b.0));
105
106    let mut block = "<context>\n".to_string();
107    for (path, content) in files {
108        append_file_context_xml(&mut block, path, content);
109    }
110    block.push_str("</context>");
111
112    vec![
113        Message {
114            role: "user".to_string(),
115            content: format!("{}\n\n{}", intro, block),
116        },
117        Message {
118            role: "assistant".to_string(),
119            content: anchor.to_string(),
120        },
121    ]
122}
123
124pub async fn execute_interaction(
125    session: &Session,
126    system_prompt: &str,
127    prompt_text: &str,
128    piped_content: &Option<String>,
129    config: crate::models::InteractionConfig,
130) -> Result<InteractionResult, AicoError> {
131    let model_id = config
132        .model_override
133        .clone()
134        .unwrap_or_else(|| session.view.model.clone());
135    let client = LlmClient::new(&model_id)?;
136
137    let req = build_request_with_piped(
138        &client,
139        session,
140        system_prompt,
141        prompt_text,
142        piped_content,
143        &config,
144    )
145    .await?;
146
147    let start_time = Instant::now();
148
149    let response = client.stream_chat(req).await?;
150
151    let mut full_response = String::new();
152    let mut reasoning_buffer = String::new();
153    let mut usage_data: Option<TokenUsage> = None;
154
155    let should_show_live = (config.mode == crate::models::Mode::Conversation
156        || config.mode == crate::models::Mode::Diff)
157        && crate::console::is_stdout_terminal();
158
159    let mut live_display: Option<crate::ui::live_display::LiveDisplay> = if should_show_live {
160        let mut ld =
161            crate::ui::live_display::LiveDisplay::new(crate::console::get_terminal_width() as u16);
162        // Eagerly show the initial status
163        ld.render(&[]);
164        Some(ld)
165    } else {
166        None
167    };
168
169    let mut parser = StreamParser::new(&session.context_content);
170    let mut cumulative_yields = Vec::new();
171
172    use tokio::io::AsyncBufReadExt;
173    let stream = response.bytes_stream().map_err(std::io::Error::other);
174    let reader = tokio_util::io::StreamReader::new(stream);
175    let mut lines = tokio::io::BufReader::new(reader).lines();
176
177    loop {
178        match lines.next_line().await {
179            Ok(Some(line)) => {
180                if let Some(parsed) = parse_sse_line(&line) {
181                    if let Some(choice) = parsed.choices.first() {
182                        let did_update =
183                            append_reasoning_delta(&mut reasoning_buffer, &choice.delta);
184
185                        if did_update
186                            && let Some(ref mut ld) = live_display
187                            && full_response.is_empty()
188                        {
189                            let status = extract_reasoning_header(&reasoning_buffer)
190                                .unwrap_or("Thinking...");
191                            ld.update_status(status);
192                        }
193
194                        if let Some(ref content) = choice.delta.content {
195                            full_response.push_str(content);
196
197                            let yields = parser.parse_and_resolve(content, &session.root);
198
199                            if let Some(ref mut ld) = live_display {
200                                let mut ui_items: Vec<DisplayItem> = yields
201                                    .iter()
202                                    .cloned()
203                                    .filter_map(|i| i.to_display_item(false))
204                                    .collect();
205
206                                let pending = parser.get_pending_content();
207                                if !pending.is_empty() {
208                                    let maybe_header = pending.trim_start().starts_with("File:");
209                                    let maybe_marker = pending.trim_start().starts_with("<<<");
210                                    if !maybe_header && !maybe_marker {
211                                        ui_items.push(DisplayItem::Markdown(pending.to_string()));
212                                    }
213                                }
214
215                                if !ui_items.is_empty() {
216                                    ld.render(&ui_items);
217                                }
218                            }
219                            cumulative_yields.extend(yields);
220                        }
221                    }
222                    if let Some(u) = parsed.usage {
223                        let cached = u
224                            .prompt_tokens_details
225                            .and_then(|d| d.cached_tokens)
226                            .or(u.cached_tokens);
227                        let reasoning = u
228                            .completion_tokens_details
229                            .and_then(|d| d.reasoning_tokens)
230                            .or(u.reasoning_tokens);
231                        usage_data = Some(TokenUsage {
232                            prompt_tokens: u.prompt_tokens,
233                            completion_tokens: u.completion_tokens,
234                            total_tokens: u.total_tokens,
235                            cached_tokens: cached,
236                            reasoning_tokens: reasoning,
237                            cost: u.cost,
238                        });
239                    }
240                }
241            }
242            Ok(None) => break,
243            Err(e) => {
244                if !full_response.is_empty() {
245                    eprintln!(
246                        "\n[WARN] Stream interrupted: {}. Saving partial response.",
247                        e
248                    );
249                    break;
250                } else {
251                    return Err(AicoError::Provider(format!("Stream error: {}", e)));
252                }
253            }
254        }
255    }
256
257    let duration_ms = start_time.elapsed().as_millis() as u64;
258
259    if let Some(mut ld) = live_display {
260        // Finalize the live display using whatever was yielded incrementally.
261        ld.finish(&[]);
262    }
263
264    // --- Finalization Pass for Storage ---
265    let (unified_diff, mut final_display_items, final_warnings) =
266        parser.final_resolve(&session.root);
267
268    // Collect all warnings from incremental resolution and the final pass
269    let mut all_warnings = parser.collect_warnings(&cumulative_yields);
270    all_warnings.extend(final_warnings);
271
272    // Merge incremental yields with final resolution items
273    let mut all_display_items: Vec<DisplayItem> = cumulative_yields
274        .into_iter()
275        .filter_map(|i| i.to_display_item(true))
276        .collect();
277    all_display_items.append(&mut final_display_items);
278
279    if !all_warnings.is_empty() {
280        eprintln!("\nWarnings:");
281        for w in &all_warnings {
282            eprintln!("{}", w);
283        }
284    }
285
286    let mut message_cost = None;
287    if let Some(ref usage) = usage_data {
288        message_cost = crate::llm::tokens::calculate_cost(&model_id, usage).await;
289    }
290
291    Ok(InteractionResult {
292        content: full_response,
293        display_items: Some(all_display_items),
294        token_usage: usage_data,
295        cost: message_cost,
296        duration_ms,
297        unified_diff: if unified_diff.is_empty() {
298            None
299        } else {
300            Some(unified_diff)
301        },
302    })
303}
304
305pub async fn build_request_with_piped(
306    client: &LlmClient,
307    session: &Session,
308    system_prompt: &str,
309    user_prompt: &str,
310    piped_content: &Option<String>,
311    config: &crate::models::InteractionConfig,
312) -> Result<ChatCompletionRequest, AicoError> {
313    // 1. System Prompt
314    let mut full_system_prompt = system_prompt.to_string();
315    if config.mode == crate::models::Mode::Diff {
316        full_system_prompt.push_str(DIFF_MODE_INSTRUCTIONS);
317    }
318
319    let mut messages = vec![Message {
320        role: "system".to_string(),
321        content: full_system_prompt,
322    }];
323
324    let history_to_use = if config.no_history {
325        vec![]
326    } else {
327        reconstruct_history(&session.store, &session.view, false)?
328    };
329
330    if config.passthrough {
331        for item in &history_to_use {
332            messages.push(Message {
333                role: item.record.role.to_string(),
334                content: if item.record.passthrough {
335                    item.record.content.clone()
336                } else {
337                    format_user_content(&item.record.content, &item.record.piped_content)
338                },
339            });
340        }
341
342        let final_user_content = if let Some(p) = piped_content {
343            format!(
344                "<stdin_content>\n{}\n</stdin_content>\n<prompt>\n{}\n</prompt>",
345                p.trim(),
346                user_prompt.trim()
347            )
348        } else {
349            user_prompt.to_string()
350        };
351        messages.push(Message {
352            role: "user".to_string(),
353            content: final_user_content,
354        });
355    } else {
356        // --- 2. Resolve Context State ---
357        let state = session.resolve_context_state(&history_to_use)?;
358
359        // --- 3. Linear Assembly ---
360        // A. Static Context (Ground Truth)
361        messages.extend(format_file_block(
362            state.static_files,
363            STATIC_CONTEXT_INTRO,
364            STATIC_CONTEXT_ANCHOR,
365        ));
366
367        // B. History Segment 1 (Before Updates)
368        for item in &history_to_use[..state.splice_idx] {
369            messages.push(Message {
370                role: item.record.role.to_string(),
371                content: if item.record.passthrough {
372                    item.record.content.clone()
373                } else {
374                    format_user_content(&item.record.content, &item.record.piped_content)
375                },
376            });
377        }
378
379        // C. Floating Context (The Update)
380        messages.extend(format_file_block(
381            state.floating_files,
382            FLOATING_CONTEXT_INTRO,
383            FLOATING_CONTEXT_ANCHOR,
384        ));
385
386        // D. History Segment 2 (After Updates)
387        for item in &history_to_use[state.splice_idx..] {
388            messages.push(Message {
389                role: item.record.role.to_string(),
390                content: if item.record.passthrough {
391                    item.record.content.clone()
392                } else {
393                    format_user_content(&item.record.content, &item.record.piped_content)
394                },
395            });
396        }
397
398        // --- 5. Final Alignment and User Prompt ---
399        let (align_user, align_asst) = if config.mode == crate::models::Mode::Diff {
400            (ALIGNMENT_DIFF_USER, ALIGNMENT_DIFF_ASSISTANT)
401        } else {
402            (
403                ALIGNMENT_CONVERSATION_USER,
404                ALIGNMENT_CONVERSATION_ASSISTANT,
405            )
406        };
407        messages.push(Message {
408            role: "user".to_string(),
409            content: align_user.to_string(),
410        });
411        messages.push(Message {
412            role: "assistant".to_string(),
413            content: align_asst.to_string(),
414        });
415
416        let final_user_content = format_user_content(user_prompt, piped_content);
417        messages.push(Message {
418            role: "user".to_string(),
419            content: final_user_content,
420        });
421    }
422
423    // --- 6. Turn Alignment (Merge consecutive same-role messages) ---
424    // This provides robustness against dangling messages and ensures Turn-Based API compliance.
425    let mut aligned_messages: Vec<Message> = Vec::new();
426    for msg in messages {
427        let trimmed_content = msg.content.trim();
428        if trimmed_content.is_empty() {
429            continue;
430        }
431
432        if let Some(last) = aligned_messages.last_mut()
433            && last.role == msg.role
434        {
435            last.content.push_str("\n\n");
436            last.content.push_str(trimmed_content);
437            continue;
438        }
439        aligned_messages.push(Message {
440            role: msg.role,
441            content: trimmed_content.to_string(),
442        });
443    }
444
445    Ok(ChatCompletionRequest {
446        model: client.model_id.clone(),
447        messages: aligned_messages,
448        stream: true,
449        stream_options: Some(StreamOptions {
450            include_usage: true,
451        }),
452        extra_body: client.get_extra_params(),
453    })
454}