Skip to main content

vtcode_core/llm/providers/shared/
mod.rs

1use crate::llm::error_display;
2use crate::llm::provider::{
3    AssistantPhase, ContentPart, LLMError, LLMResponse, LLMStreamEvent, Message, MessageContent,
4    MessageRole, ToolCall,
5};
6pub use crate::llm::providers::ReasoningBuffer;
7use crate::llm::providers::common::{
8    extract_reasoning_text_from_serialized_details, map_finish_reason_common,
9};
10mod responses_stream;
11mod tag_sanitizer;
12use crate::llm::providers::split_reasoning_from_text;
13pub use responses_stream::{ResponsesNormalizedStreamOptions, create_responses_normalized_stream};
14use serde_json::{Map, Value};
15pub use tag_sanitizer::TagStreamSanitizer;
16
17#[derive(Debug, thiserror::Error)]
18pub enum StreamAssemblyError {
19    #[error("missing field `{0}` in stream payload")]
20    MissingField(&'static str),
21    #[error("invalid stream payload: {0}")]
22    InvalidPayload(String),
23}
24
25impl StreamAssemblyError {
26    #[cold]
27    pub fn into_llm_error(self, provider: &str) -> LLMError {
28        let message = self.to_string();
29        let formatted = error_display::format_llm_error(provider, &message);
30        LLMError::Provider {
31            message: formatted,
32            metadata: None,
33        }
34    }
35}
36
37pub trait StreamTelemetry: Send + Sync {
38    fn on_content_delta(&self, _delta: &str) {}
39    fn on_reasoning_delta(&self, _delta: &str) {}
40    fn on_reasoning_stage(&self, _stage: &str) {}
41    fn on_tool_call_delta(&self) {}
42}
43
44#[derive(Default)]
45pub struct NoopStreamTelemetry;
46
47impl StreamTelemetry for NoopStreamTelemetry {}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum StreamFragment {
51    Content(String),
52    Reasoning(String),
53}
54
55#[derive(Default, Debug)]
56pub struct StreamDelta {
57    fragments: Vec<StreamFragment>,
58}
59
60impl StreamDelta {
61    pub fn push_content(&mut self, text: &str) {
62        if text.is_empty() {
63            return;
64        }
65
66        match self.fragments.last_mut() {
67            Some(StreamFragment::Content(existing)) => existing.push_str(text),
68            _ => self
69                .fragments
70                .push(StreamFragment::Content(text.to_string())),
71        }
72    }
73
74    pub fn push_reasoning(&mut self, text: &str) {
75        if text.is_empty() {
76            return;
77        }
78
79        match self.fragments.last_mut() {
80            Some(StreamFragment::Reasoning(existing)) => existing.push_str(text),
81            _ => self
82                .fragments
83                .push(StreamFragment::Reasoning(text.to_string())),
84        }
85    }
86
87    pub fn is_empty(&self) -> bool {
88        self.fragments.is_empty()
89    }
90
91    pub fn into_fragments(self) -> Vec<StreamFragment> {
92        self.fragments
93    }
94
95    pub fn extend(&mut self, other: StreamDelta) {
96        self.fragments.extend(other.fragments);
97    }
98}
99
100#[derive(Default, Clone)]
101pub struct ToolCallBuilder {
102    id: Option<String>,
103    namespace: Option<String>,
104    name: Option<String>,
105    arguments: String,
106}
107
108impl ToolCallBuilder {
109    pub fn apply_delta(&mut self, delta: &Value) {
110        if let Some(id) = delta.get("id").and_then(|value| value.as_str()) {
111            self.id = Some(id.to_string());
112        }
113
114        if let Some(namespace) = delta.get("namespace").and_then(|value| value.as_str()) {
115            self.namespace = Some(namespace.to_string());
116        }
117
118        if let Some(function) = delta.get("function") {
119            if let Some(namespace) = function.get("namespace").and_then(|value| value.as_str()) {
120                self.namespace = Some(namespace.to_string());
121            }
122
123            if let Some(name) = function.get("name").and_then(|value| value.as_str()) {
124                self.name = Some(name.to_string());
125            }
126
127            if let Some(arguments_value) = function.get("arguments") {
128                if let Some(arguments) = arguments_value.as_str() {
129                    self.arguments.push_str(arguments);
130                } else if arguments_value.is_object() || arguments_value.is_array() {
131                    self.arguments.push_str(&arguments_value.to_string());
132                }
133            }
134        }
135    }
136
137    pub fn finalize(self, fallback_index: usize) -> Option<ToolCall> {
138        let name = self.name?;
139        let id = self
140            .id
141            .unwrap_or_else(|| format!("tool_call_{fallback_index}"));
142        let arguments = if self.arguments.is_empty() {
143            "{}".to_string()
144        } else {
145            self.arguments
146        };
147
148        Some(ToolCall::function_with_namespace(
149            id,
150            self.namespace,
151            name,
152            arguments,
153        ))
154    }
155}
156
157pub fn update_tool_calls(builders: &mut Vec<ToolCallBuilder>, deltas: &[Value]) {
158    for (position, delta) in deltas.iter().enumerate() {
159        let index = delta
160            .get("index")
161            .and_then(|value| value.as_u64())
162            .map(|value| value as usize)
163            .unwrap_or(position);
164
165        if builders.len() <= index {
166            builders.resize_with(index + 1, ToolCallBuilder::default);
167        }
168        let Some(builder) = builders.get_mut(index) else {
169            continue;
170        };
171
172        builder.apply_delta(delta);
173    }
174}
175
176pub fn finalize_tool_calls(builders: Vec<ToolCallBuilder>) -> Option<Vec<ToolCall>> {
177    let calls: Vec<ToolCall> = builders
178        .into_iter()
179        .enumerate()
180        .filter_map(|(index, builder)| builder.finalize(index))
181        .collect();
182
183    (!calls.is_empty()).then_some(calls)
184}
185
186#[derive(Debug, Clone, PartialEq, Eq)]
187pub(crate) enum FunctionOutputContentItem {
188    InputText { text: String },
189    InputImage { image_url: String },
190}
191
192impl FunctionOutputContentItem {
193    fn from_value(value: &Value) -> Option<Self> {
194        let item_type = value.get("type").and_then(Value::as_str)?;
195        match item_type {
196            "input_text" | "output_text" => Some(Self::InputText {
197                text: value.get("text").and_then(Value::as_str)?.to_string(),
198            }),
199            "input_image" => Some(Self::InputImage {
200                image_url: value.get("image_url").and_then(Value::as_str)?.to_string(),
201            }),
202            _ => None,
203        }
204    }
205
206    pub(crate) fn to_function_output_json(&self) -> Value {
207        match self {
208            Self::InputText { text } => serde_json::json!({
209                "type": "input_text",
210                "text": text
211            }),
212            Self::InputImage { image_url } => serde_json::json!({
213                "type": "input_image",
214                "image_url": image_url
215            }),
216        }
217    }
218
219    pub(crate) fn to_tool_result_json(&self) -> Value {
220        match self {
221            Self::InputText { text } => serde_json::json!({
222                "type": "output_text",
223                "text": text
224            }),
225            Self::InputImage { image_url } => serde_json::json!({
226                "type": "input_image",
227                "image_url": image_url
228            }),
229        }
230    }
231}
232
233fn parse_function_output_content_items_array(
234    items: &[Value],
235) -> Option<Vec<FunctionOutputContentItem>> {
236    items
237        .iter()
238        .map(FunctionOutputContentItem::from_value)
239        .collect::<Option<Vec<_>>>()
240}
241
242pub(crate) fn parse_function_output_content_items_value(
243    value: &Value,
244) -> Option<Vec<FunctionOutputContentItem>> {
245    match value {
246        Value::Array(items) => parse_function_output_content_items_array(items),
247        Value::Object(obj) => ["content_items", "content", "output", "body"]
248            .iter()
249            .find_map(|key| obj.get(*key))
250            .and_then(parse_function_output_content_items_value),
251        Value::String(text) => parse_function_output_content_items_text(text),
252        Value::Null | Value::Bool(_) | Value::Number(_) => None,
253    }
254}
255
256pub(crate) fn parse_function_output_content_items_text(
257    text: &str,
258) -> Option<Vec<FunctionOutputContentItem>> {
259    let trimmed = text.trim();
260    if !(trimmed.starts_with('[') || trimmed.starts_with('{')) {
261        return None;
262    }
263    let parsed: Value = serde_json::from_str(trimmed).ok()?;
264    parse_function_output_content_items_value(&parsed)
265}
266
267fn function_output_items_from_parts(parts: &[ContentPart]) -> Vec<FunctionOutputContentItem> {
268    let mut items = Vec::new();
269    for part in parts {
270        match part {
271            ContentPart::Text { text } => {
272                if text.trim().is_empty() {
273                    continue;
274                }
275                items.push(FunctionOutputContentItem::InputText { text: text.clone() });
276            }
277            ContentPart::Image {
278                data, mime_type, ..
279            } => {
280                items.push(FunctionOutputContentItem::InputImage {
281                    image_url: format!("data:{};base64,{}", mime_type, data),
282                });
283            }
284            ContentPart::File { .. } => {}
285        }
286    }
287    items
288}
289
290fn function_output_value_from_items(items: Vec<FunctionOutputContentItem>) -> Value {
291    if items.is_empty() {
292        return Value::String(String::new());
293    }
294    let has_image = items
295        .iter()
296        .any(|item| matches!(item, FunctionOutputContentItem::InputImage { .. }));
297    if has_image {
298        return Value::Array(
299            items
300                .iter()
301                .map(FunctionOutputContentItem::to_function_output_json)
302                .collect(),
303        );
304    }
305    Value::String(text_from_function_output_items(&items).unwrap_or_default())
306}
307
308pub(crate) fn function_output_value_from_message_content(content: &MessageContent) -> Value {
309    match content {
310        MessageContent::Text(text) => {
311            if let Some(items) = parse_function_output_content_items_text(text) {
312                return function_output_value_from_items(items);
313            }
314            Value::String(text.clone())
315        }
316        MessageContent::Parts(parts) => {
317            let items = function_output_items_from_parts(parts);
318            function_output_value_from_items(items)
319        }
320    }
321}
322
323pub(crate) fn tool_result_content_from_message_content(content: &MessageContent) -> Vec<Value> {
324    match content {
325        MessageContent::Text(text) => {
326            if text.trim().is_empty() {
327                return Vec::new();
328            }
329            if let Some(items) = parse_function_output_content_items_text(text) {
330                return items
331                    .iter()
332                    .map(FunctionOutputContentItem::to_tool_result_json)
333                    .collect();
334            }
335            vec![serde_json::json!({
336                "type": "output_text",
337                "text": text
338            })]
339        }
340        MessageContent::Parts(parts) => function_output_items_from_parts(parts)
341            .iter()
342            .map(FunctionOutputContentItem::to_tool_result_json)
343            .collect(),
344    }
345}
346
347fn text_from_function_output_items(items: &[FunctionOutputContentItem]) -> Option<String> {
348    let mut text = String::new();
349    for item in items {
350        match item {
351            FunctionOutputContentItem::InputText { text: segment } => text.push_str(segment),
352            FunctionOutputContentItem::InputImage { .. } => return None,
353        }
354    }
355    Some(text)
356}
357
358fn function_output_value_to_history_text(value: &Value) -> String {
359    if let Some(text) = value.as_str() {
360        return text.to_string();
361    }
362    if let Some(items) = parse_function_output_content_items_value(value)
363        && let Some(text) = text_from_function_output_items(&items)
364    {
365        return text;
366    }
367    if let Some(text) = value.get("content").and_then(Value::as_str) {
368        return text.to_string();
369    }
370    value.to_string()
371}
372
373fn append_output_item_text(value: &Value, text: &mut String) {
374    if let Some(part_text) = value.get("text").and_then(Value::as_str) {
375        text.push_str(part_text);
376    }
377    if let Some(part_output) = value.get("output").and_then(Value::as_str) {
378        text.push_str(part_output);
379    }
380    if let Some(refusal) = value.get("refusal").and_then(Value::as_str) {
381        text.push_str(refusal);
382    }
383
384    match value {
385        Value::String(s) => text.push_str(s),
386        Value::Array(parts) => {
387            for part in parts {
388                append_output_item_text(part, text);
389            }
390        }
391        Value::Object(_) => {
392            if let Some(content) = value.get("content") {
393                append_output_item_text(content, text);
394            }
395        }
396        _ => {}
397    }
398}
399
400fn output_item_text(content: &Value) -> String {
401    let mut text = String::new();
402    append_output_item_text(content, &mut text);
403    text
404}
405
406fn parse_function_call_item(item: &Value) -> Option<ToolCall> {
407    let function_obj = item.get("function").and_then(Value::as_object);
408    let namespace = item
409        .get("namespace")
410        .and_then(Value::as_str)
411        .or_else(|| function_obj.and_then(|f| f.get("namespace").and_then(Value::as_str)))
412        .map(ToOwned::to_owned);
413    let name = function_obj
414        .and_then(|f| f.get("name").and_then(Value::as_str))
415        .or_else(|| item.get("name").and_then(Value::as_str))?
416        .to_string();
417
418    let id = item
419        .get("id")
420        .and_then(Value::as_str)
421        .or_else(|| item.get("call_id").and_then(Value::as_str))
422        .filter(|value| !value.is_empty())
423        .unwrap_or("tool_call_compacted")
424        .to_string();
425
426    let arguments_value = function_obj
427        .and_then(|f| f.get("arguments"))
428        .or_else(|| item.get("arguments"));
429    let arguments = arguments_value.map_or_else(
430        || "{}".to_string(),
431        |value| {
432            value
433                .as_str()
434                .map(ToOwned::to_owned)
435                .unwrap_or_else(|| value.to_string())
436        },
437    );
438
439    Some(ToolCall::function_with_namespace(
440        id, namespace, name, arguments,
441    ))
442}
443
444fn parse_message_item(item: &Value) -> Option<Message> {
445    let role = item
446        .get("role")
447        .and_then(Value::as_str)
448        .unwrap_or("assistant");
449    let content_value = item.get("content").unwrap_or(&Value::Null);
450    let content = output_item_text(content_value).trim().to_string();
451
452    let tool_calls: Vec<ToolCall> = content_value
453        .as_array()
454        .into_iter()
455        .flatten()
456        .filter_map(|part| {
457            let part_type = part.get("type").and_then(Value::as_str).unwrap_or("");
458            if part_type == "function_call" || part_type == "tool_call" {
459                parse_function_call_item(part)
460            } else {
461                None
462            }
463        })
464        .collect();
465
466    let tool_result = content_value
467        .as_array()
468        .into_iter()
469        .flatten()
470        .find_map(|part| {
471            let part_type = part.get("type").and_then(Value::as_str).unwrap_or("");
472            if part_type != "tool_result" {
473                return None;
474            }
475
476            let tool_call_id = part
477                .get("tool_call_id")
478                .and_then(Value::as_str)
479                .or_else(|| item.get("tool_call_id").and_then(Value::as_str))
480                .or_else(|| item.get("call_id").and_then(Value::as_str))
481                .map(ToOwned::to_owned)?;
482
483            let tool_output = output_item_text(part.get("content").unwrap_or(&Value::Null))
484                .trim()
485                .to_string();
486            Some((tool_call_id, tool_output))
487        });
488
489    let assistant_phase = item
490        .get("phase")
491        .and_then(Value::as_str)
492        .and_then(AssistantPhase::from_wire_str);
493
494    match role {
495        "system" => Some(Message::system(content)),
496        "developer" => Some(Message::system(content)),
497        "user" => Some(Message::user(content)),
498        "assistant" => {
499            if tool_calls.is_empty() {
500                Some(Message::assistant(content).with_phase(assistant_phase))
501            } else {
502                Some(Message::assistant_with_tools(content, tool_calls).with_phase(assistant_phase))
503            }
504        }
505        "tool" => {
506            if let Some((tool_call_id, tool_output)) = tool_result {
507                return Some(Message::tool_response(tool_call_id, tool_output));
508            }
509
510            let tool_call_id = item
511                .get("tool_call_id")
512                .and_then(Value::as_str)
513                .or_else(|| item.get("call_id").and_then(Value::as_str))
514                .map(ToOwned::to_owned)?;
515            Some(Message::tool_response(tool_call_id, content))
516        }
517        _ => Some(Message {
518            role: MessageRole::Assistant,
519            content: MessageContent::text(content),
520            ..Message::default()
521        }),
522    }
523}
524
525#[inline]
526fn preserve_opaque_item(item: &Value) -> Message {
527    Message::assistant(String::new()).with_reasoning_details(Some(vec![item.clone()]))
528}
529
530/// Convert `/responses/compact` output items into VT Code message history.
531///
532/// Opaque/unmapped items are preserved in `reasoning_details` so they can be
533/// forwarded back to Responses-compatible providers on subsequent turns.
534pub(crate) fn parse_compacted_output_messages(output: &[Value]) -> Vec<Message> {
535    let mut messages = Vec::new();
536
537    for item in output {
538        let item_type = item.get("type").and_then(Value::as_str).unwrap_or("");
539        match item_type {
540            "message" => {
541                if let Some(message) = parse_message_item(item) {
542                    messages.push(message);
543                } else {
544                    messages.push(preserve_opaque_item(item));
545                }
546            }
547            "function_call" | "tool_call" => {
548                if let Some(tool_call) = parse_function_call_item(item) {
549                    messages.push(Message::assistant_with_tools(
550                        String::new(),
551                        vec![tool_call],
552                    ));
553                }
554            }
555            "function_call_output" => {
556                let call_id = item
557                    .get("call_id")
558                    .and_then(Value::as_str)
559                    .or_else(|| item.get("id").and_then(Value::as_str))
560                    .filter(|value| !value.is_empty());
561                if let Some(call_id) = call_id {
562                    let output_text = item
563                        .get("output")
564                        .map(function_output_value_to_history_text)
565                        .unwrap_or_default();
566                    messages.push(Message::tool_response(call_id.to_string(), output_text));
567                } else {
568                    messages.push(preserve_opaque_item(item));
569                }
570            }
571            _ => {
572                messages.push(preserve_opaque_item(item));
573            }
574        }
575    }
576
577    messages
578}
579
580/// Helper to aggregate streaming events and produce a final LLMResponse.
581pub struct StreamAggregator {
582    pub model: String,
583    pub content: String,
584    pub reasoning: String,
585    pub reasoning_details: Vec<String>,
586    pub reasoning_buffer: ReasoningBuffer,
587    pub tool_builders: Vec<ToolCallBuilder>,
588    pub usage: Option<crate::llm::provider::Usage>,
589    pub finish_reason: crate::llm::provider::FinishReason,
590    pub sanitizer: TagStreamSanitizer,
591    pub compaction: Option<String>,
592}
593
594#[derive(Clone, Copy, Debug, PartialEq, Eq)]
595pub enum OpenAiDeltaOrder {
596    ReasoningFirst,
597    ContentFirst,
598}
599
600fn emit_reasoning_delta(
601    aggregator: &mut StreamAggregator,
602    tx: &tokio::sync::mpsc::UnboundedSender<Result<LLMStreamEvent, LLMError>>,
603    delta: &Value,
604    reasoning_field: Option<&'static str>,
605) {
606    let Some(reasoning_field) = reasoning_field else {
607        return;
608    };
609    let Some(reasoning) = delta.get(reasoning_field).and_then(Value::as_str) else {
610        return;
611    };
612    let Some(delta) = aggregator.handle_reasoning(reasoning) else {
613        return;
614    };
615    let _ = tx.send(Ok(LLMStreamEvent::Reasoning { delta }));
616}
617
618fn emit_content_delta(
619    aggregator: &mut StreamAggregator,
620    tx: &tokio::sync::mpsc::UnboundedSender<Result<LLMStreamEvent, LLMError>>,
621    delta: &Value,
622) {
623    let Some(content) = delta.get("content").and_then(Value::as_str) else {
624        return;
625    };
626    for event in aggregator.handle_content(content) {
627        let _ = tx.send(Ok(event));
628    }
629}
630
631pub fn handle_openai_compatible_chunk(
632    value: &Value,
633    aggregator: &mut StreamAggregator,
634    tx: &tokio::sync::mpsc::UnboundedSender<Result<LLMStreamEvent, LLMError>>,
635    reasoning_field: Option<&'static str>,
636    delta_order: OpenAiDeltaOrder,
637) {
638    if let Some(choices) = value.get("choices").and_then(Value::as_array)
639        && let Some(choice) = choices.first()
640    {
641        if let Some(delta) = choice.get("delta") {
642            match delta_order {
643                OpenAiDeltaOrder::ReasoningFirst => {
644                    emit_reasoning_delta(aggregator, tx, delta, reasoning_field);
645                    emit_content_delta(aggregator, tx, delta);
646                }
647                OpenAiDeltaOrder::ContentFirst => {
648                    emit_content_delta(aggregator, tx, delta);
649                    emit_reasoning_delta(aggregator, tx, delta, reasoning_field);
650                }
651            }
652
653            if let Some(tool_calls) = delta.get("tool_calls").and_then(Value::as_array) {
654                aggregator.handle_tool_calls(tool_calls);
655            }
656        }
657
658        if let Some(reason) = choice.get("finish_reason").and_then(Value::as_str) {
659            aggregator.set_finish_reason(map_finish_reason_common(reason));
660        }
661    }
662
663    if let Some(_usage_value) = value.get("usage")
664        && let Some(usage) = crate::llm::providers::common::parse_usage_openai_format(value, true)
665    {
666        aggregator.set_usage(usage);
667    }
668}
669
670impl StreamAggregator {
671    pub fn new(model: String) -> Self {
672        Self {
673            model,
674            content: String::new(),
675            reasoning: String::new(),
676            reasoning_details: Vec::new(),
677            reasoning_buffer: ReasoningBuffer::default(),
678            tool_builders: Vec::new(),
679            usage: None,
680            finish_reason: crate::llm::provider::FinishReason::Stop,
681            sanitizer: TagStreamSanitizer::new(),
682            compaction: None,
683        }
684    }
685
686    /// Process a content delta, applying sanitization for reasoning tags.
687    pub fn handle_content(&mut self, delta: &str) -> Vec<LLMStreamEvent> {
688        self.content.push_str(delta);
689        self.sanitizer.process_chunk(delta)
690    }
691
692    /// Process a reasoning delta from a dedicated field.
693    pub fn handle_reasoning(&mut self, delta: &str) -> Option<String> {
694        let result = self.reasoning_buffer.push(delta);
695        if let Some(ref d) = result {
696            self.reasoning.push_str(d);
697        }
698        result
699    }
700
701    /// Store structured reasoning details received from streaming deltas.
702    pub fn set_reasoning_details(&mut self, details: &[Value]) {
703        if details.is_empty() {
704            return;
705        }
706
707        self.reasoning_details = details
708            .iter()
709            .map(|detail| {
710                detail
711                    .as_str()
712                    .map(ToOwned::to_owned)
713                    .unwrap_or_else(|| detail.to_string())
714            })
715            .collect();
716    }
717
718    /// Process tool call deltas.
719    pub fn handle_tool_calls(&mut self, deltas: &[Value]) {
720        update_tool_calls(&mut self.tool_builders, deltas);
721    }
722
723    /// Set usage metrics.
724    pub fn set_usage(&mut self, usage: crate::llm::provider::Usage) {
725        self.usage = Some(usage);
726    }
727
728    /// Set finish reason.
729    pub fn set_finish_reason(&mut self, reason: crate::llm::provider::FinishReason) {
730        self.finish_reason = reason;
731    }
732
733    /// Finalize and produce the completed LLMResponse.
734    pub fn finalize(mut self) -> LLMResponse {
735        // Collect any leftover bits from sanitizer
736        for event in self.sanitizer.finalize() {
737            match event {
738                LLMStreamEvent::Token { delta } => {
739                    self.content.push_str(&delta);
740                }
741                LLMStreamEvent::Reasoning { delta } => {
742                    self.reasoning.push_str(&delta);
743                }
744                _ => {}
745            }
746        }
747
748        let reasoning_details = if self.reasoning_details.is_empty() {
749            None
750        } else {
751            Some(self.reasoning_details)
752        };
753        let mut reasoning = if self.reasoning.is_empty() {
754            self.reasoning_buffer.finalize()
755        } else {
756            Some(self.reasoning)
757        };
758        if reasoning.is_none() {
759            reasoning = reasoning_details
760                .as_ref()
761                .and_then(|details| extract_reasoning_text_from_serialized_details(details));
762        }
763
764        LLMResponse {
765            content: if self.content.is_empty() {
766                None
767            } else {
768                Some(self.content)
769            },
770            tool_calls: finalize_tool_calls(self.tool_builders),
771            model: self.model,
772            usage: self.usage,
773            finish_reason: self.finish_reason,
774            reasoning,
775            reasoning_details,
776            tool_references: Vec::new(),
777            request_id: None,
778            organization_id: None,
779            compaction: self.compaction,
780        }
781    }
782}
783
784/// Common helper for processing OpenAI-compatible SSE streams.
785///
786/// This simplifies stream implementations across providers like DeepSeek, ZAI, Moonshot, etc.
787/// Especially optimized for high-performance models like Gemini 3 and GLM-5.
788pub async fn process_openai_stream<S, E, F>(
789    mut byte_stream: S,
790    provider_name: &'static str,
791    model: String,
792    mut on_chunk: F,
793) -> Result<LLMResponse, LLMError>
794where
795    S: futures::Stream<Item = Result<bytes::Bytes, E>> + Unpin,
796    E: std::fmt::Display,
797    F: FnMut(Value) -> Result<(), LLMError>,
798{
799    use crate::llm::providers::error_handling::format_network_error;
800    use futures::StreamExt;
801
802    let mut buffer = String::new();
803    let mut last_response_value = None;
804
805    while let Some(chunk_result) = byte_stream.next().await {
806        let chunk_bytes =
807            chunk_result.map_err(|e| format_network_error(provider_name, &e.to_string()))?;
808        let chunk_str = String::from_utf8_lossy(&chunk_bytes);
809        buffer.push_str(&chunk_str);
810
811        while let Some((boundary_idx, boundary_len)) = find_sse_boundary(&buffer) {
812            let event = buffer[..boundary_idx].to_string();
813            buffer.drain(..boundary_idx + boundary_len);
814
815            if let Some(data) = extract_data_payload(&event) {
816                if data == "[DONE]" {
817                    break;
818                }
819
820                for line in data.lines() {
821                    let trimmed = line.trim();
822                    if trimmed.is_empty() {
823                        continue;
824                    }
825
826                    if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
827                        on_chunk(value.clone())?;
828                        last_response_value = Some(value);
829                    }
830                }
831            }
832        }
833    }
834
835    // Attempt to extract final response metadata (usage, etc) from last chunk if not already done
836    let mut final_response = LLMResponse {
837        content: None,
838        tool_calls: None,
839        model,
840        usage: None,
841        finish_reason: crate::llm::provider::FinishReason::Stop,
842        reasoning: None,
843        reasoning_details: None,
844        tool_references: Vec::new(),
845        request_id: None,
846        organization_id: None,
847        compaction: None,
848    };
849
850    if let Some(value) = last_response_value
851        && value.get("usage").is_some()
852    {
853        final_response.usage =
854            crate::llm::providers::common::parse_usage_openai_format(&value, true);
855    }
856
857    Ok(final_response)
858}
859
860pub fn parse_openai_tool_calls(calls: &[Value]) -> Vec<ToolCall> {
861    calls
862        .iter()
863        .filter_map(|call| {
864            let id = call.get("id").and_then(|v| v.as_str())?;
865            let function = call.get("function")?;
866            let namespace = call
867                .get("namespace")
868                .and_then(|v| v.as_str())
869                .or_else(|| function.get("namespace").and_then(|v| v.as_str()))
870                .map(ToOwned::to_owned);
871            let name = function.get("name").and_then(|v| v.as_str())?;
872            let arguments = function.get("arguments");
873            let serialized = arguments.map_or_else(
874                || "{}".to_string(),
875                |value| {
876                    if value.is_string() {
877                        value.as_str().unwrap_or("").to_string()
878                    } else {
879                        value.to_string()
880                    }
881                },
882            );
883            Some(ToolCall::function_with_namespace(
884                id.to_string(),
885                namespace,
886                name.to_string(),
887                serialized,
888            ))
889        })
890        .collect()
891}
892
893fn push_unique_tool_reference(tool_references: &mut Vec<String>, tool_name: &str) {
894    if !tool_references.iter().any(|existing| existing == tool_name) {
895        tool_references.push(tool_name.to_string());
896    }
897}
898
899pub(crate) fn collect_tool_references_from_tool_search_output(
900    value: &Value,
901    tool_references: &mut Vec<String>,
902) {
903    match value {
904        Value::Array(items) => {
905            for item in items {
906                collect_tool_references_from_tool_search_output(item, tool_references);
907            }
908        }
909        Value::Object(object) => {
910            if let Some(tools) = object.get("tools").and_then(Value::as_array) {
911                for tool in tools {
912                    collect_tool_references_from_tool_search_output(tool, tool_references);
913                }
914            } else if let Some(tool_name) = object.get("tool_name").and_then(Value::as_str) {
915                push_unique_tool_reference(tool_references, tool_name);
916            } else if let Some(function) = object.get("function").and_then(Value::as_object)
917                && let Some(tool_name) = function.get("name").and_then(Value::as_str)
918            {
919                push_unique_tool_reference(tool_references, tool_name);
920            } else if let Some(tool_name) = object.get("name").and_then(Value::as_str) {
921                push_unique_tool_reference(tool_references, tool_name);
922            }
923
924            if let Some(tool_refs) = object.get("tool_references").and_then(Value::as_array) {
925                for tool_ref in tool_refs {
926                    collect_tool_references_from_tool_search_output(tool_ref, tool_references);
927                }
928            }
929        }
930        _ => {}
931    }
932}
933
934pub fn append_text_with_reasoning(
935    text: &str,
936    aggregated_content: &mut String,
937    reasoning: &mut ReasoningBuffer,
938    deltas: &mut StreamDelta,
939    telemetry: &impl StreamTelemetry,
940) {
941    let (segments, cleaned) = split_reasoning_from_text(text);
942
943    if segments.is_empty() && cleaned.is_none() {
944        if !text.is_empty() {
945            aggregated_content.push_str(text);
946            deltas.push_content(text);
947            telemetry.on_content_delta(text);
948        }
949        return;
950    }
951
952    for segment in segments {
953        if let Some(stage) = &segment.stage {
954            telemetry.on_reasoning_stage(stage);
955        }
956        if let Some(delta) = reasoning.push(&segment.text) {
957            telemetry.on_reasoning_delta(&delta);
958            deltas.push_reasoning(&delta);
959        }
960    }
961
962    if let Some(cleaned_text) = cleaned
963        && !cleaned_text.is_empty()
964    {
965        aggregated_content.push_str(&cleaned_text);
966        telemetry.on_content_delta(&cleaned_text);
967        deltas.push_content(&cleaned_text);
968    }
969}
970
971#[inline]
972pub fn extract_data_payload(event: &str) -> Option<String> {
973    let mut out = String::new();
974
975    for raw_line in event.lines() {
976        let line = raw_line.trim_end_matches('\r');
977        if line.is_empty() || line.starts_with(':') {
978            continue;
979        }
980
981        if let Some(value) = line.strip_prefix("data:") {
982            if !out.is_empty() {
983                out.push('\n');
984            }
985            out.push_str(value.trim_start());
986        }
987    }
988
989    (!out.is_empty()).then_some(out)
990}
991
992#[inline]
993pub fn find_sse_boundary(buffer: &str) -> Option<(usize, usize)> {
994    let newline_boundary = buffer.find("\n\n").map(|idx| (idx, 2));
995    let carriage_boundary = buffer.find("\r\n\r\n").map(|idx| (idx, 4));
996
997    match (newline_boundary, carriage_boundary) {
998        (Some((n_idx, n_len)), Some((c_idx, c_len))) => {
999            if n_idx <= c_idx {
1000                Some((n_idx, n_len))
1001            } else {
1002                Some((c_idx, c_len))
1003            }
1004        }
1005        (Some(boundary), None) => Some(boundary),
1006        (None, Some(boundary)) => Some(boundary),
1007        (None, None) => None,
1008    }
1009}
1010
1011pub fn apply_tool_call_delta_from_content(
1012    builders: &mut Vec<ToolCallBuilder>,
1013    container: &Map<String, Value>,
1014    telemetry: &impl StreamTelemetry,
1015) {
1016    apply_tool_call_delta_with_index(builders, container, telemetry, None, None);
1017}
1018
1019fn apply_tool_call_delta_with_index(
1020    builders: &mut Vec<ToolCallBuilder>,
1021    container: &Map<String, Value>,
1022    telemetry: &impl StreamTelemetry,
1023    fallback_index: Option<usize>,
1024    fallback_id: Option<Value>,
1025) {
1026    fn extract_tool_call_id(container: &Map<String, Value>) -> Option<Value> {
1027        container.get("id").cloned().or_else(|| {
1028            container
1029                .get("tool_call")
1030                .and_then(|value| value.as_object())
1031                .and_then(|inner| inner.get("id"))
1032                .cloned()
1033        })
1034    }
1035
1036    let explicit_index = container
1037        .get("tool_call")
1038        .and_then(|value| value.as_object())
1039        .and_then(|tool_call| tool_call.get("index"))
1040        .and_then(|value| value.as_u64())
1041        .or_else(|| container.get("index").and_then(|value| value.as_u64()));
1042
1043    let index = explicit_index
1044        .map(|value| value as usize)
1045        .or(fallback_index)
1046        .unwrap_or(0);
1047
1048    let current_id = extract_tool_call_id(container).or_else(|| fallback_id.clone());
1049
1050    if let Some(nested) = container.get("delta").and_then(|value| value.as_object()) {
1051        apply_tool_call_delta_with_index(
1052            builders,
1053            nested,
1054            telemetry,
1055            Some(index),
1056            current_id.clone(),
1057        );
1058    }
1059
1060    let delta_source = container
1061        .get("tool_call")
1062        .and_then(|value| value.as_object())
1063        .unwrap_or(container);
1064
1065    let mut delta_map = Map::new();
1066
1067    if let Some(id_value) = extract_tool_call_id(delta_source).or_else(|| current_id.clone()) {
1068        delta_map.insert("id".to_string(), id_value);
1069    }
1070
1071    if let Some(function_value) = delta_source
1072        .get("function")
1073        .or_else(|| container.get("function"))
1074    {
1075        delta_map.insert("function".to_string(), function_value.clone());
1076    }
1077
1078    if delta_map.is_empty() {
1079        return;
1080    }
1081
1082    if builders.len() <= index {
1083        builders.resize_with(index + 1, ToolCallBuilder::default);
1084    }
1085
1086    let mut deltas = vec![Value::Null; index + 1];
1087    deltas[index] = Value::Object(delta_map);
1088    update_tool_calls(builders, &deltas);
1089    telemetry.on_tool_call_delta();
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094    use super::*;
1095    use serde_json::json;
1096
1097    #[test]
1098    fn finalize_tool_calls_drops_empty_builders() {
1099        let builders = vec![ToolCallBuilder::default()];
1100        assert!(finalize_tool_calls(builders).is_none());
1101    }
1102
1103    #[test]
1104    fn append_text_with_reasoning_tracks_segments() {
1105        let telemetry = NoopStreamTelemetry;
1106        let mut aggregated = String::new();
1107        let mut reasoning = ReasoningBuffer::default();
1108        let mut delta = StreamDelta::default();
1109        append_text_with_reasoning(
1110            "Hello",
1111            &mut aggregated,
1112            &mut reasoning,
1113            &mut delta,
1114            &telemetry,
1115        );
1116        assert_eq!(aggregated, "Hello");
1117        assert_eq!(
1118            delta.into_fragments(),
1119            vec![StreamFragment::Content("Hello".into())]
1120        );
1121    }
1122
1123    #[test]
1124    fn apply_tool_call_delta_updates_builder() {
1125        let telemetry = NoopStreamTelemetry;
1126        let mut builders = Vec::new();
1127        let container = json!({
1128            "index": 0,
1129            "function": {"name": "foo", "arguments": "{}"}
1130        })
1131        .as_object()
1132        .cloned()
1133        .unwrap();
1134        apply_tool_call_delta_from_content(&mut builders, &container, &telemetry);
1135        let calls = finalize_tool_calls(builders).expect("call expected");
1136        let func = calls[0]
1137            .function
1138            .as_ref()
1139            .expect("function call should be present");
1140        assert_eq!(func.name, "foo");
1141    }
1142
1143    #[test]
1144    fn apply_tool_call_delta_uses_outer_index_for_nested_delta() {
1145        let telemetry = NoopStreamTelemetry;
1146        let mut builders = Vec::new();
1147        let container = json!({
1148            "delta": {
1149                "tool_call": {
1150                    "function": {
1151                        "name": "foo",
1152                        "arguments": "{\"value\":1}"
1153                    }
1154                }
1155            },
1156            "index": 1,
1157            "id": "call-1"
1158        })
1159        .as_object()
1160        .cloned()
1161        .unwrap();
1162
1163        apply_tool_call_delta_from_content(&mut builders, &container, &telemetry);
1164
1165        let calls = finalize_tool_calls(builders).expect("call expected");
1166        assert_eq!(calls.len(), 1);
1167        assert_eq!(calls[0].id, "call-1");
1168        let func = calls[0]
1169            .function
1170            .as_ref()
1171            .expect("function call should be present");
1172        assert_eq!(func.arguments, "{\"value\":1}");
1173    }
1174
1175    #[test]
1176    fn update_tool_calls_respects_explicit_index() {
1177        let mut builders = Vec::new();
1178        let deltas = vec![json!({
1179            "index": 2,
1180            "id": "call_3",
1181            "function": {
1182                "name": "get_weather",
1183                "arguments": "{\"city\":\"Beijing\"}"
1184            }
1185        })];
1186
1187        update_tool_calls(&mut builders, &deltas);
1188
1189        let calls = finalize_tool_calls(builders).expect("call expected");
1190        assert_eq!(calls.len(), 1);
1191        assert_eq!(calls[0].id, "call_3");
1192        let function = calls[0].function.as_ref().expect("function expected");
1193        assert_eq!(function.name, "get_weather");
1194        assert_eq!(function.arguments, "{\"city\":\"Beijing\"}");
1195    }
1196
1197    #[test]
1198    fn extract_data_payload_merges_lines() {
1199        let event = ": keep-alive\n".to_string() + "data: {\"a\":1}\n" + "data: {\"b\":2}\n";
1200        let payload = extract_data_payload(&event);
1201        assert_eq!(payload.as_deref(), Some("{\"a\":1}\n{\"b\":2}"));
1202    }
1203
1204    #[test]
1205    fn find_sse_boundary_prefers_newline() {
1206        let buffer = "data: foo\n\nrest";
1207        assert_eq!(find_sse_boundary(buffer), Some((9, 2)));
1208    }
1209
1210    #[test]
1211    fn parse_compacted_output_messages_keeps_messages() {
1212        let output = vec![json!({
1213            "type": "message",
1214            "role": "assistant",
1215            "phase": "final_answer",
1216            "content": [
1217                { "type": "output_text", "text": "Compacted response" }
1218            ]
1219        })];
1220
1221        let parsed = parse_compacted_output_messages(&output);
1222        assert_eq!(parsed.len(), 1);
1223        assert_eq!(parsed[0].role, MessageRole::Assistant);
1224        assert_eq!(parsed[0].phase, Some(AssistantPhase::FinalAnswer));
1225        assert_eq!(parsed[0].content.as_text(), "Compacted response");
1226    }
1227
1228    #[test]
1229    fn parse_compacted_output_messages_keeps_tool_pairs() {
1230        let output = vec![
1231            json!({
1232                "type": "function_call",
1233                "id": "call_1",
1234                "name": "shell",
1235                "arguments": "{\"command\":\"pwd\"}"
1236            }),
1237            json!({
1238                "type": "function_call_output",
1239                "call_id": "call_1",
1240                "output": "/tmp/work"
1241            }),
1242        ];
1243
1244        let parsed = parse_compacted_output_messages(&output);
1245        assert_eq!(parsed.len(), 2);
1246        assert_eq!(parsed[0].role, MessageRole::Assistant);
1247        assert!(parsed[0].tool_calls.is_some());
1248        assert_eq!(parsed[1].role, MessageRole::Tool);
1249        assert_eq!(parsed[1].tool_call_id.as_deref(), Some("call_1"));
1250    }
1251
1252    #[test]
1253    fn parse_compacted_output_messages_serializes_multimodal_function_output() {
1254        let output = vec![json!({
1255            "type": "function_call_output",
1256            "call_id": "call_1",
1257            "output": [
1258                { "type": "input_text", "text": "inline image note" },
1259                { "type": "input_image", "image_url": "data:image/png;base64,abc" }
1260            ]
1261        })];
1262
1263        let parsed = parse_compacted_output_messages(&output);
1264        assert_eq!(parsed.len(), 1);
1265        assert_eq!(parsed[0].role, MessageRole::Tool);
1266        assert_eq!(parsed[0].tool_call_id.as_deref(), Some("call_1"));
1267        let text = parsed[0].content.as_text();
1268        assert!(text.contains("\"input_image\""));
1269        assert!(text.contains("inline image note"));
1270    }
1271
1272    #[test]
1273    fn tool_result_content_parses_multimodal_tool_output_text() {
1274        let content = MessageContent::Text(
1275            r#"[{"type":"input_text","text":"note"},{"type":"input_image","image_url":"data:image/png;base64,abc"}]"#
1276                .to_string(),
1277        );
1278        let parts = tool_result_content_from_message_content(&content);
1279        assert_eq!(parts.len(), 2);
1280        assert_eq!(parts[0]["type"], "output_text");
1281        assert_eq!(parts[0]["text"], "note");
1282        assert_eq!(parts[1]["type"], "input_image");
1283        assert_eq!(parts[1]["image_url"], "data:image/png;base64,abc");
1284    }
1285
1286    #[test]
1287    fn function_output_value_parses_multimodal_tool_output_text() {
1288        let content = MessageContent::Text(
1289            r#"[{"type":"input_text","text":"note"},{"type":"input_image","image_url":"data:image/png;base64,abc"}]"#
1290                .to_string(),
1291        );
1292        let output = function_output_value_from_message_content(&content);
1293        let items = output.as_array().expect("expected array output");
1294        assert_eq!(items.len(), 2);
1295        assert_eq!(items[0]["type"], "input_text");
1296        assert_eq!(items[0]["text"], "note");
1297        assert_eq!(items[1]["type"], "input_image");
1298        assert_eq!(items[1]["image_url"], "data:image/png;base64,abc");
1299    }
1300
1301    #[test]
1302    fn parse_compacted_output_messages_preserves_compaction_items() {
1303        let output = vec![json!({
1304            "type": "compaction",
1305            "encrypted_content": "opaque_state"
1306        })];
1307
1308        let parsed = parse_compacted_output_messages(&output);
1309        assert_eq!(parsed.len(), 1);
1310        assert_eq!(parsed[0].role, MessageRole::Assistant);
1311        let preserved = parsed[0]
1312            .reasoning_details
1313            .as_ref()
1314            .and_then(|items| items.first())
1315            .and_then(|item| item.get("type"))
1316            .and_then(Value::as_str);
1317        assert_eq!(preserved, Some("compaction"));
1318    }
1319
1320    #[test]
1321    fn parse_compacted_output_messages_parses_tool_result_messages() {
1322        let output = vec![json!({
1323            "type": "message",
1324            "role": "tool",
1325            "content": [
1326                {
1327                    "type": "tool_result",
1328                    "tool_call_id": "call_42",
1329                    "content": [
1330                        { "type": "output_text", "text": "done" }
1331                    ]
1332                }
1333            ]
1334        })];
1335
1336        let parsed = parse_compacted_output_messages(&output);
1337        assert_eq!(parsed.len(), 1);
1338        assert_eq!(parsed[0].role, MessageRole::Tool);
1339        assert_eq!(parsed[0].tool_call_id.as_deref(), Some("call_42"));
1340        assert_eq!(parsed[0].content.as_text(), "done");
1341    }
1342
1343    #[test]
1344    fn stream_aggregator_derives_reasoning_from_details_when_missing() {
1345        let mut aggregator = StreamAggregator::new("test-model".to_string());
1346        aggregator.set_reasoning_details(&[json!({
1347            "type": "reasoning.text",
1348            "text": "step one"
1349        })]);
1350
1351        let response = aggregator.finalize();
1352        assert_eq!(response.reasoning.as_deref(), Some("step one"));
1353        assert!(response.reasoning_details.is_some());
1354    }
1355}