Skip to main content

aico/llm/
executor.rs

1use crate::consts::*;
2use crate::diffing::parser::StreamParser;
3use crate::exceptions::AicoError;
4use crate::llm::api_models::{ChatCompletionRequest, Message, StreamOptions};
5use crate::llm::client::{LlmClient, parse_sse_line};
6use crate::models::{DisplayItem, InteractionResult, TokenUsage};
7use crate::session::Session;
8use futures_util::TryStreamExt;
9use std::time::Instant;
10
11pub fn append_reasoning_delta(
12    buffer: &mut String,
13    delta: &crate::llm::api_models::ChunkDelta,
14) -> bool {
15    let start_len = buffer.len();
16
17    // Priority 1: Direct reasoning_content
18    if let Some(ref r) = delta.reasoning_content
19        && !r.is_empty()
20    {
21        buffer.push_str(r);
22        return true;
23    }
24
25    // Priority 2: Structured reasoning_details
26    if let Some(ref details) = delta.reasoning_details {
27        for detail in details {
28            match detail {
29                crate::llm::api_models::ReasoningDetail::Text { text } => buffer.push_str(text),
30                crate::llm::api_models::ReasoningDetail::Summary { summary } => {
31                    buffer.push_str(summary)
32                }
33                _ => {}
34            }
35        }
36    }
37
38    buffer.len() > start_len
39}
40
41pub fn extract_reasoning_header(reasoning_buffer: &str) -> Option<&str> {
42    static RE: std::sync::LazyLock<regex::Regex> = std::sync::LazyLock::new(|| {
43        regex::Regex::new(r"(?m)(?:^#{1,6}\s+(?P<header>.+?)[\r\n])|(?:^[*]{2}(?P<bold>.+?)[*]{2})")
44            .unwrap()
45    });
46
47    RE.captures_iter(reasoning_buffer)
48        .last()
49        .and_then(|cap| {
50            cap.name("header")
51                .or_else(|| cap.name("bold"))
52                .map(|m| m.as_str().trim())
53        })
54        .filter(|s| !s.is_empty())
55}
56
57fn format_user_content(content: &str, piped: &Option<String>) -> String {
58    if let Some(p) = piped {
59        format!(
60            "<stdin_content>\n{}\n</stdin_content>\n<prompt>\n{}\n</prompt>",
61            p.trim(),
62            content.trim()
63        )
64    } else {
65        content.to_string()
66    }
67}
68
69pub async fn build_request(
70    session: &Session,
71    system_prompt: &str,
72    user_prompt: &str,
73    mode: crate::models::Mode,
74    no_history: bool,
75    passthrough: bool,
76) -> Result<ChatCompletionRequest, AicoError> {
77    let client = LlmClient::new(&session.view.model)?;
78    let config = crate::models::InteractionConfig {
79        mode,
80        no_history,
81        passthrough,
82        model_override: None,
83    };
84    build_request_with_piped(&client, session, system_prompt, user_prompt, &None, &config).await
85}
86
87pub fn append_file_context_xml(buffer: &mut String, path: &str, content: &str) {
88    use std::fmt::Write;
89    // write! handles the formatting directly into the buffer
90    let _ = writeln!(buffer, "  <file path=\"{}\">", path);
91    buffer.push_str(content);
92    if !content.ends_with('\n') {
93        buffer.push('\n');
94    }
95    buffer.push_str("  </file>\n");
96}
97
98fn merge_display_items(items: Vec<DisplayItem>) -> Vec<DisplayItem> {
99    let mut merged = Vec::with_capacity(items.len());
100    for item in items {
101        match (merged.last_mut(), item) {
102            (Some(DisplayItem::Markdown(last)), DisplayItem::Markdown(next)) => {
103                last.push_str(&next);
104            }
105            (_, item) => merged.push(item),
106        }
107    }
108    merged
109}
110
111fn format_file_block(mut files: Vec<(&str, &str)>, intro: &str, anchor: &str) -> Vec<Message> {
112    if files.is_empty() {
113        return vec![];
114    }
115    // Ensure deterministic ordering (alphabetical by path)
116    files.sort_by(|a, b| a.0.cmp(b.0));
117
118    let mut block = "<context>\n".to_string();
119    for (path, content) in files {
120        append_file_context_xml(&mut block, path, content);
121    }
122    block.push_str("</context>");
123
124    vec![
125        Message {
126            role: "user".to_string(),
127            content: format!("{}\n\n{}", intro, block),
128        },
129        Message {
130            role: "assistant".to_string(),
131            content: anchor.to_string(),
132        },
133    ]
134}
135
136pub async fn execute_interaction(
137    session: &Session,
138    system_prompt: &str,
139    prompt_text: &str,
140    piped_content: &Option<String>,
141    config: crate::models::InteractionConfig,
142) -> Result<InteractionResult, AicoError> {
143    let model_id = config
144        .model_override
145        .clone()
146        .unwrap_or_else(|| session.view.model.clone());
147    let client = LlmClient::new(&model_id)?;
148
149    let req = build_request_with_piped(
150        &client,
151        session,
152        system_prompt,
153        prompt_text,
154        piped_content,
155        &config,
156    )
157    .await?;
158
159    let start_time = Instant::now();
160
161    let response = client.stream_chat(req).await?;
162
163    let mut full_response = String::new();
164    let mut reasoning_buffer = String::new();
165    let mut usage_data: Option<TokenUsage> = None;
166
167    let should_show_live = (config.mode == crate::models::Mode::Conversation
168        || config.mode == crate::models::Mode::Diff)
169        && crate::console::is_stdout_terminal();
170
171    let mut live_display: Option<crate::ui::live_display::LiveDisplay> = if should_show_live {
172        let mut ld =
173            crate::ui::live_display::LiveDisplay::new(crate::console::get_terminal_width() as u16);
174        // Eagerly show the initial status
175        ld.render(&[]);
176        Some(ld)
177    } else {
178        None
179    };
180
181    let mut parser = StreamParser::new(&session.context_content);
182    let mut cumulative_yields = Vec::new();
183
184    use tokio::io::AsyncBufReadExt;
185    let stream = response.bytes_stream().map_err(std::io::Error::other);
186    let reader = tokio_util::io::StreamReader::new(stream);
187    let mut lines = tokio::io::BufReader::new(reader).lines();
188
189    loop {
190        match lines.next_line().await {
191            Ok(Some(line)) => {
192                if let Some(parsed) = parse_sse_line(&line) {
193                    if let Some(choice) = parsed.choices.first() {
194                        let did_update =
195                            append_reasoning_delta(&mut reasoning_buffer, &choice.delta);
196
197                        if did_update
198                            && let Some(ref mut ld) = live_display
199                            && full_response.is_empty()
200                        {
201                            let status = extract_reasoning_header(&reasoning_buffer)
202                                .unwrap_or("Thinking...");
203                            ld.update_status(status);
204                        }
205
206                        if let Some(ref content) = choice.delta.content {
207                            full_response.push_str(content);
208
209                            let yields = parser.parse_and_resolve(content, &session.root);
210
211                            if let Some(ref mut ld) = live_display {
212                                let mut ui_items: Vec<DisplayItem> = yields
213                                    .iter()
214                                    .cloned()
215                                    .filter_map(|i| i.to_display_item(false))
216                                    .collect();
217
218                                if parser.is_pending_displayable() {
219                                    let pending = parser.get_pending_content();
220                                    ui_items.push(DisplayItem::Markdown(pending));
221                                }
222
223                                if !ui_items.is_empty() {
224                                    ld.render(&ui_items);
225                                }
226                            }
227                            cumulative_yields.extend(yields);
228                        }
229                    }
230                    if let Some(u) = parsed.usage {
231                        let cached = u
232                            .prompt_tokens_details
233                            .and_then(|d| d.cached_tokens)
234                            .or(u.cached_tokens);
235                        let reasoning = u
236                            .completion_tokens_details
237                            .and_then(|d| d.reasoning_tokens)
238                            .or(u.reasoning_tokens);
239                        usage_data = Some(TokenUsage {
240                            prompt_tokens: u.prompt_tokens,
241                            completion_tokens: u.completion_tokens,
242                            total_tokens: u.total_tokens,
243                            cached_tokens: cached,
244                            reasoning_tokens: reasoning,
245                            cost: u.cost,
246                        });
247                    }
248                }
249            }
250            Ok(None) => break,
251            Err(e) => {
252                if !full_response.is_empty() {
253                    eprintln!(
254                        "\n[WARN] Stream interrupted: {}. Saving partial response.",
255                        e
256                    );
257                    break;
258                } else {
259                    return Err(AicoError::Provider(format!("Stream error: {}", e)));
260                }
261            }
262        }
263    }
264
265    let duration_ms = start_time.elapsed().as_millis() as u64;
266
267    if let Some(mut ld) = live_display {
268        // Finalize the live display using whatever was yielded incrementally.
269        ld.finish(&[]);
270    }
271
272    // --- Finalization Pass for Storage ---
273    let (unified_diff, mut final_display_items, final_warnings) =
274        parser.final_resolve(&session.root);
275
276    // Collect all warnings from incremental resolution and the final pass
277    let mut all_warnings = parser.collect_warnings(&cumulative_yields);
278    all_warnings.extend(final_warnings);
279
280    // Merge incremental yields with final resolution items
281    let mut raw_display_items: Vec<DisplayItem> = cumulative_yields
282        .into_iter()
283        .filter_map(|i| i.to_display_item(true))
284        .collect();
285    raw_display_items.append(&mut final_display_items);
286
287    let all_display_items = merge_display_items(raw_display_items);
288
289    if !all_warnings.is_empty() {
290        eprintln!("\nWarnings:");
291        for w in &all_warnings {
292            eprintln!("{}", w);
293        }
294    }
295
296    let mut message_cost = None;
297    if let Some(ref usage) = usage_data {
298        message_cost = crate::llm::tokens::calculate_cost(&model_id, usage).await;
299    }
300
301    Ok(InteractionResult {
302        content: full_response,
303        display_items: Some(all_display_items),
304        token_usage: usage_data,
305        cost: message_cost,
306        duration_ms,
307        unified_diff: if unified_diff.is_empty() {
308            None
309        } else {
310            Some(unified_diff)
311        },
312    })
313}
314
315pub async fn build_request_with_piped(
316    client: &LlmClient,
317    session: &Session,
318    system_prompt: &str,
319    user_prompt: &str,
320    piped_content: &Option<String>,
321    config: &crate::models::InteractionConfig,
322) -> Result<ChatCompletionRequest, AicoError> {
323    // 1. System Prompt
324    let mut full_system_prompt = system_prompt.to_string();
325    if config.mode == crate::models::Mode::Diff {
326        full_system_prompt.push_str(DIFF_MODE_INSTRUCTIONS);
327    }
328
329    let mut messages = vec![Message {
330        role: "system".to_string(),
331        content: full_system_prompt,
332    }];
333
334    let history_to_use = if config.no_history {
335        vec![]
336    } else {
337        session.history(false)?
338    };
339
340    if config.passthrough {
341        for item in &history_to_use {
342            messages.push(Message {
343                role: item.record.role.to_string(),
344                content: if item.record.passthrough {
345                    item.record.content.clone()
346                } else {
347                    format_user_content(&item.record.content, &item.record.piped_content)
348                },
349            });
350        }
351
352        let final_user_content = if let Some(p) = piped_content {
353            format!(
354                "<stdin_content>\n{}\n</stdin_content>\n<prompt>\n{}\n</prompt>",
355                p.trim(),
356                user_prompt.trim()
357            )
358        } else {
359            user_prompt.to_string()
360        };
361        messages.push(Message {
362            role: "user".to_string(),
363            content: final_user_content,
364        });
365    } else {
366        // --- 2. Resolve Context State ---
367        let state = session.resolve_context_state(&history_to_use)?;
368
369        // --- 3. Linear Assembly ---
370        // A. Static Context (Ground Truth)
371        messages.extend(format_file_block(
372            state.static_files,
373            STATIC_CONTEXT_INTRO,
374            STATIC_CONTEXT_ANCHOR,
375        ));
376
377        // B. History Segment 1 (Before Updates)
378        for item in &history_to_use[..state.splice_idx] {
379            messages.push(Message {
380                role: item.record.role.to_string(),
381                content: if item.record.passthrough {
382                    item.record.content.clone()
383                } else {
384                    format_user_content(&item.record.content, &item.record.piped_content)
385                },
386            });
387        }
388
389        // C. Floating Context (The Update)
390        messages.extend(format_file_block(
391            state.floating_files,
392            FLOATING_CONTEXT_INTRO,
393            FLOATING_CONTEXT_ANCHOR,
394        ));
395
396        // D. History Segment 2 (After Updates)
397        for item in &history_to_use[state.splice_idx..] {
398            messages.push(Message {
399                role: item.record.role.to_string(),
400                content: if item.record.passthrough {
401                    item.record.content.clone()
402                } else {
403                    format_user_content(&item.record.content, &item.record.piped_content)
404                },
405            });
406        }
407
408        // --- 5. Final Alignment and User Prompt ---
409        let (align_user, align_asst) = if config.mode == crate::models::Mode::Diff {
410            (ALIGNMENT_DIFF_USER, ALIGNMENT_DIFF_ASSISTANT)
411        } else {
412            (
413                ALIGNMENT_CONVERSATION_USER,
414                ALIGNMENT_CONVERSATION_ASSISTANT,
415            )
416        };
417        messages.push(Message {
418            role: "user".to_string(),
419            content: align_user.to_string(),
420        });
421        messages.push(Message {
422            role: "assistant".to_string(),
423            content: align_asst.to_string(),
424        });
425
426        let final_user_content = format_user_content(user_prompt, piped_content);
427        messages.push(Message {
428            role: "user".to_string(),
429            content: final_user_content,
430        });
431    }
432
433    // --- 6. Turn Alignment (Merge consecutive same-role messages) ---
434    // This provides robustness against dangling messages and ensures Turn-Based API compliance.
435    let mut aligned_messages: Vec<Message> = Vec::new();
436    for msg in messages {
437        let trimmed_content = msg.content.trim();
438        if trimmed_content.is_empty() {
439            continue;
440        }
441
442        if let Some(last) = aligned_messages.last_mut()
443            && last.role == msg.role
444        {
445            last.content.push_str("\n\n");
446            last.content.push_str(trimmed_content);
447            continue;
448        }
449        aligned_messages.push(Message {
450            role: msg.role,
451            content: trimmed_content.to_string(),
452        });
453    }
454
455    Ok(ChatCompletionRequest {
456        model: client.model_id.clone(),
457        messages: aligned_messages,
458        stream: true,
459        stream_options: Some(StreamOptions {
460            include_usage: true,
461        }),
462        extra_body: client.get_extra_params(),
463    })
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use crate::models::DisplayItem;
470
471    #[test]
472    fn test_merge_display_items_collapses_consecutive_markdown() {
473        let items = vec![
474            DisplayItem::Markdown("Hello ".into()),
475            DisplayItem::Markdown("World".into()),
476            DisplayItem::Diff("diff1".into()),
477            DisplayItem::Markdown("Part 1".into()),
478            DisplayItem::Markdown("Part 2".into()),
479            DisplayItem::Diff("diff2".into()),
480        ];
481
482        let merged = merge_display_items(items);
483
484        assert_eq!(merged.len(), 4);
485        assert_eq!(merged[0], DisplayItem::Markdown("Hello World".into()));
486        assert_eq!(merged[1], DisplayItem::Diff("diff1".into()));
487        assert_eq!(merged[2], DisplayItem::Markdown("Part 1Part 2".into()));
488        assert_eq!(merged[3], DisplayItem::Diff("diff2".into()));
489    }
490
491    #[test]
492    fn test_merge_display_items_handles_empty_or_single() {
493        let items: Vec<DisplayItem> = vec![];
494        assert_eq!(merge_display_items(items).len(), 0);
495
496        let items = vec![DisplayItem::Markdown("one".into())];
497        assert_eq!(merge_display_items(items).len(), 1);
498    }
499}