aico/llm/
executor.rs

1use crate::consts::*;
2use crate::diffing::parser::StreamParser;
3use crate::exceptions::AicoError;
4use crate::historystore::reconstruct::reconstruct_history;
5use crate::llm::api_models::{ChatCompletionRequest, Message, StreamOptions};
6use crate::llm::client::{LlmClient, parse_sse_line};
7use crate::models::{DisplayItem, InteractionResult, TokenUsage};
8use crate::session::Session;
9use futures_util::TryStreamExt;
10use std::time::Instant;
11
12pub fn append_reasoning_delta(
13    buffer: &mut String,
14    delta: &crate::llm::api_models::ChunkDelta,
15) -> bool {
16    let start_len = buffer.len();
17
18    // Priority 1: Direct reasoning_content
19    if let Some(ref r) = delta.reasoning_content
20        && !r.is_empty()
21    {
22        buffer.push_str(r);
23        return true;
24    }
25
26    // Priority 2: Structured reasoning_details
27    if let Some(ref details) = delta.reasoning_details {
28        for detail in details {
29            match detail {
30                crate::llm::api_models::ReasoningDetail::Text { text } => buffer.push_str(text),
31                crate::llm::api_models::ReasoningDetail::Summary { summary } => {
32                    buffer.push_str(summary)
33                }
34                _ => {}
35            }
36        }
37    }
38
39    buffer.len() > start_len
40}
41
42pub fn extract_reasoning_header(reasoning_buffer: &str) -> Option<&str> {
43    static RE: std::sync::LazyLock<regex::Regex> = std::sync::LazyLock::new(|| {
44        regex::Regex::new(r"(?m)(?:^#{1,6}\s+(?P<header>.+?)[\r\n])|(?:^[*]{2}(?P<bold>.+?)[*]{2})")
45            .unwrap()
46    });
47
48    RE.captures_iter(reasoning_buffer)
49        .last()
50        .and_then(|cap| {
51            cap.name("header")
52                .or_else(|| cap.name("bold"))
53                .map(|m| m.as_str().trim())
54        })
55        .filter(|s| !s.is_empty())
56}
57
58fn format_user_content(content: &str, piped: &Option<String>) -> String {
59    if let Some(p) = piped {
60        format!(
61            "<stdin_content>\n{}\n</stdin_content>\n<prompt>\n{}\n</prompt>",
62            p.trim(),
63            content.trim()
64        )
65    } else {
66        content.to_string()
67    }
68}
69
70pub async fn build_request(
71    session: &Session,
72    system_prompt: &str,
73    user_prompt: &str,
74    mode: crate::models::Mode,
75    no_history: bool,
76    passthrough: bool,
77) -> Result<ChatCompletionRequest, AicoError> {
78    let client = LlmClient::new(&session.view.model)?;
79    let config = crate::models::InteractionConfig {
80        mode,
81        no_history,
82        passthrough,
83        model_override: None,
84    };
85    build_request_with_piped(&client, session, system_prompt, user_prompt, &None, &config).await
86}
87
88pub fn append_file_context_xml(buffer: &mut String, path: &str, content: &str) {
89    use std::fmt::Write;
90    // write! handles the formatting directly into the buffer
91    let _ = writeln!(buffer, "  <file path=\"{}\">", path);
92    buffer.push_str(content);
93    if !content.ends_with('\n') {
94        buffer.push('\n');
95    }
96    buffer.push_str("  </file>\n");
97}
98
99fn merge_display_items(items: Vec<DisplayItem>) -> Vec<DisplayItem> {
100    let mut merged = Vec::with_capacity(items.len());
101    for item in items {
102        match (merged.last_mut(), item) {
103            (Some(DisplayItem::Markdown(last)), DisplayItem::Markdown(next)) => {
104                last.push_str(&next);
105            }
106            (_, item) => merged.push(item),
107        }
108    }
109    merged
110}
111
112fn format_file_block(mut files: Vec<(&str, &str)>, intro: &str, anchor: &str) -> Vec<Message> {
113    if files.is_empty() {
114        return vec![];
115    }
116    // Ensure deterministic ordering (alphabetical by path)
117    files.sort_by(|a, b| a.0.cmp(b.0));
118
119    let mut block = "<context>\n".to_string();
120    for (path, content) in files {
121        append_file_context_xml(&mut block, path, content);
122    }
123    block.push_str("</context>");
124
125    vec![
126        Message {
127            role: "user".to_string(),
128            content: format!("{}\n\n{}", intro, block),
129        },
130        Message {
131            role: "assistant".to_string(),
132            content: anchor.to_string(),
133        },
134    ]
135}
136
137pub async fn execute_interaction(
138    session: &Session,
139    system_prompt: &str,
140    prompt_text: &str,
141    piped_content: &Option<String>,
142    config: crate::models::InteractionConfig,
143) -> Result<InteractionResult, AicoError> {
144    let model_id = config
145        .model_override
146        .clone()
147        .unwrap_or_else(|| session.view.model.clone());
148    let client = LlmClient::new(&model_id)?;
149
150    let req = build_request_with_piped(
151        &client,
152        session,
153        system_prompt,
154        prompt_text,
155        piped_content,
156        &config,
157    )
158    .await?;
159
160    let start_time = Instant::now();
161
162    let response = client.stream_chat(req).await?;
163
164    let mut full_response = String::new();
165    let mut reasoning_buffer = String::new();
166    let mut usage_data: Option<TokenUsage> = None;
167
168    let should_show_live = (config.mode == crate::models::Mode::Conversation
169        || config.mode == crate::models::Mode::Diff)
170        && crate::console::is_stdout_terminal();
171
172    let mut live_display: Option<crate::ui::live_display::LiveDisplay> = if should_show_live {
173        let mut ld =
174            crate::ui::live_display::LiveDisplay::new(crate::console::get_terminal_width() as u16);
175        // Eagerly show the initial status
176        ld.render(&[]);
177        Some(ld)
178    } else {
179        None
180    };
181
182    let mut parser = StreamParser::new(&session.context_content);
183    let mut cumulative_yields = Vec::new();
184
185    use tokio::io::AsyncBufReadExt;
186    let stream = response.bytes_stream().map_err(std::io::Error::other);
187    let reader = tokio_util::io::StreamReader::new(stream);
188    let mut lines = tokio::io::BufReader::new(reader).lines();
189
190    loop {
191        match lines.next_line().await {
192            Ok(Some(line)) => {
193                if let Some(parsed) = parse_sse_line(&line) {
194                    if let Some(choice) = parsed.choices.first() {
195                        let did_update =
196                            append_reasoning_delta(&mut reasoning_buffer, &choice.delta);
197
198                        if did_update
199                            && let Some(ref mut ld) = live_display
200                            && full_response.is_empty()
201                        {
202                            let status = extract_reasoning_header(&reasoning_buffer)
203                                .unwrap_or("Thinking...");
204                            ld.update_status(status);
205                        }
206
207                        if let Some(ref content) = choice.delta.content {
208                            full_response.push_str(content);
209
210                            let yields = parser.parse_and_resolve(content, &session.root);
211
212                            if let Some(ref mut ld) = live_display {
213                                let mut ui_items: Vec<DisplayItem> = yields
214                                    .iter()
215                                    .cloned()
216                                    .filter_map(|i| i.to_display_item(false))
217                                    .collect();
218
219                                let pending = parser.get_pending_content();
220                                if !pending.is_empty() {
221                                    let maybe_header = pending.trim_start().starts_with("File:");
222                                    let maybe_marker = pending.trim_start().starts_with("<<<");
223                                    if !maybe_header && !maybe_marker {
224                                        ui_items.push(DisplayItem::Markdown(pending.to_string()));
225                                    }
226                                }
227
228                                if !ui_items.is_empty() {
229                                    ld.render(&ui_items);
230                                }
231                            }
232                            cumulative_yields.extend(yields);
233                        }
234                    }
235                    if let Some(u) = parsed.usage {
236                        let cached = u
237                            .prompt_tokens_details
238                            .and_then(|d| d.cached_tokens)
239                            .or(u.cached_tokens);
240                        let reasoning = u
241                            .completion_tokens_details
242                            .and_then(|d| d.reasoning_tokens)
243                            .or(u.reasoning_tokens);
244                        usage_data = Some(TokenUsage {
245                            prompt_tokens: u.prompt_tokens,
246                            completion_tokens: u.completion_tokens,
247                            total_tokens: u.total_tokens,
248                            cached_tokens: cached,
249                            reasoning_tokens: reasoning,
250                            cost: u.cost,
251                        });
252                    }
253                }
254            }
255            Ok(None) => break,
256            Err(e) => {
257                if !full_response.is_empty() {
258                    eprintln!(
259                        "\n[WARN] Stream interrupted: {}. Saving partial response.",
260                        e
261                    );
262                    break;
263                } else {
264                    return Err(AicoError::Provider(format!("Stream error: {}", e)));
265                }
266            }
267        }
268    }
269
270    let duration_ms = start_time.elapsed().as_millis() as u64;
271
272    if let Some(mut ld) = live_display {
273        // Finalize the live display using whatever was yielded incrementally.
274        ld.finish(&[]);
275    }
276
277    // --- Finalization Pass for Storage ---
278    let (unified_diff, mut final_display_items, final_warnings) =
279        parser.final_resolve(&session.root);
280
281    // Collect all warnings from incremental resolution and the final pass
282    let mut all_warnings = parser.collect_warnings(&cumulative_yields);
283    all_warnings.extend(final_warnings);
284
285    // Merge incremental yields with final resolution items
286    let mut raw_display_items: Vec<DisplayItem> = cumulative_yields
287        .into_iter()
288        .filter_map(|i| i.to_display_item(true))
289        .collect();
290    raw_display_items.append(&mut final_display_items);
291
292    let all_display_items = merge_display_items(raw_display_items);
293
294    if !all_warnings.is_empty() {
295        eprintln!("\nWarnings:");
296        for w in &all_warnings {
297            eprintln!("{}", w);
298        }
299    }
300
301    let mut message_cost = None;
302    if let Some(ref usage) = usage_data {
303        message_cost = crate::llm::tokens::calculate_cost(&model_id, usage).await;
304    }
305
306    Ok(InteractionResult {
307        content: full_response,
308        display_items: Some(all_display_items),
309        token_usage: usage_data,
310        cost: message_cost,
311        duration_ms,
312        unified_diff: if unified_diff.is_empty() {
313            None
314        } else {
315            Some(unified_diff)
316        },
317    })
318}
319
320pub async fn build_request_with_piped(
321    client: &LlmClient,
322    session: &Session,
323    system_prompt: &str,
324    user_prompt: &str,
325    piped_content: &Option<String>,
326    config: &crate::models::InteractionConfig,
327) -> Result<ChatCompletionRequest, AicoError> {
328    // 1. System Prompt
329    let mut full_system_prompt = system_prompt.to_string();
330    if config.mode == crate::models::Mode::Diff {
331        full_system_prompt.push_str(DIFF_MODE_INSTRUCTIONS);
332    }
333
334    let mut messages = vec![Message {
335        role: "system".to_string(),
336        content: full_system_prompt,
337    }];
338
339    let history_to_use = if config.no_history {
340        vec![]
341    } else {
342        reconstruct_history(&session.store, &session.view, false)?
343    };
344
345    if config.passthrough {
346        for item in &history_to_use {
347            messages.push(Message {
348                role: item.record.role.to_string(),
349                content: if item.record.passthrough {
350                    item.record.content.clone()
351                } else {
352                    format_user_content(&item.record.content, &item.record.piped_content)
353                },
354            });
355        }
356
357        let final_user_content = if let Some(p) = piped_content {
358            format!(
359                "<stdin_content>\n{}\n</stdin_content>\n<prompt>\n{}\n</prompt>",
360                p.trim(),
361                user_prompt.trim()
362            )
363        } else {
364            user_prompt.to_string()
365        };
366        messages.push(Message {
367            role: "user".to_string(),
368            content: final_user_content,
369        });
370    } else {
371        // --- 2. Resolve Context State ---
372        let state = session.resolve_context_state(&history_to_use)?;
373
374        // --- 3. Linear Assembly ---
375        // A. Static Context (Ground Truth)
376        messages.extend(format_file_block(
377            state.static_files,
378            STATIC_CONTEXT_INTRO,
379            STATIC_CONTEXT_ANCHOR,
380        ));
381
382        // B. History Segment 1 (Before Updates)
383        for item in &history_to_use[..state.splice_idx] {
384            messages.push(Message {
385                role: item.record.role.to_string(),
386                content: if item.record.passthrough {
387                    item.record.content.clone()
388                } else {
389                    format_user_content(&item.record.content, &item.record.piped_content)
390                },
391            });
392        }
393
394        // C. Floating Context (The Update)
395        messages.extend(format_file_block(
396            state.floating_files,
397            FLOATING_CONTEXT_INTRO,
398            FLOATING_CONTEXT_ANCHOR,
399        ));
400
401        // D. History Segment 2 (After Updates)
402        for item in &history_to_use[state.splice_idx..] {
403            messages.push(Message {
404                role: item.record.role.to_string(),
405                content: if item.record.passthrough {
406                    item.record.content.clone()
407                } else {
408                    format_user_content(&item.record.content, &item.record.piped_content)
409                },
410            });
411        }
412
413        // --- 5. Final Alignment and User Prompt ---
414        let (align_user, align_asst) = if config.mode == crate::models::Mode::Diff {
415            (ALIGNMENT_DIFF_USER, ALIGNMENT_DIFF_ASSISTANT)
416        } else {
417            (
418                ALIGNMENT_CONVERSATION_USER,
419                ALIGNMENT_CONVERSATION_ASSISTANT,
420            )
421        };
422        messages.push(Message {
423            role: "user".to_string(),
424            content: align_user.to_string(),
425        });
426        messages.push(Message {
427            role: "assistant".to_string(),
428            content: align_asst.to_string(),
429        });
430
431        let final_user_content = format_user_content(user_prompt, piped_content);
432        messages.push(Message {
433            role: "user".to_string(),
434            content: final_user_content,
435        });
436    }
437
438    // --- 6. Turn Alignment (Merge consecutive same-role messages) ---
439    // This provides robustness against dangling messages and ensures Turn-Based API compliance.
440    let mut aligned_messages: Vec<Message> = Vec::new();
441    for msg in messages {
442        let trimmed_content = msg.content.trim();
443        if trimmed_content.is_empty() {
444            continue;
445        }
446
447        if let Some(last) = aligned_messages.last_mut()
448            && last.role == msg.role
449        {
450            last.content.push_str("\n\n");
451            last.content.push_str(trimmed_content);
452            continue;
453        }
454        aligned_messages.push(Message {
455            role: msg.role,
456            content: trimmed_content.to_string(),
457        });
458    }
459
460    Ok(ChatCompletionRequest {
461        model: client.model_id.clone(),
462        messages: aligned_messages,
463        stream: true,
464        stream_options: Some(StreamOptions {
465            include_usage: true,
466        }),
467        extra_body: client.get_extra_params(),
468    })
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use crate::models::DisplayItem;
475
476    #[test]
477    fn test_merge_display_items_collapses_consecutive_markdown() {
478        let items = vec![
479            DisplayItem::Markdown("Hello ".into()),
480            DisplayItem::Markdown("World".into()),
481            DisplayItem::Diff("diff1".into()),
482            DisplayItem::Markdown("Part 1".into()),
483            DisplayItem::Markdown("Part 2".into()),
484            DisplayItem::Diff("diff2".into()),
485        ];
486
487        let merged = merge_display_items(items);
488
489        assert_eq!(merged.len(), 4);
490        assert_eq!(merged[0], DisplayItem::Markdown("Hello World".into()));
491        assert_eq!(merged[1], DisplayItem::Diff("diff1".into()));
492        assert_eq!(merged[2], DisplayItem::Markdown("Part 1Part 2".into()));
493        assert_eq!(merged[3], DisplayItem::Diff("diff2".into()));
494    }
495
496    #[test]
497    fn test_merge_display_items_handles_empty_or_single() {
498        let items: Vec<DisplayItem> = vec![];
499        assert_eq!(merge_display_items(items).len(), 0);
500
501        let items = vec![DisplayItem::Markdown("one".into())];
502        assert_eq!(merge_display_items(items).len(), 1);
503    }
504}