Skip to main content

vtcode_core/llm/providers/openai/
stream_decoder.rs

1//! Streaming decoders for OpenAI Chat Completions and Responses APIs.
2
3use crate::llm::error_display;
4use crate::llm::provider;
5use crate::llm::providers::shared::StreamTelemetry;
6use crate::llm::providers::shared::{StreamAssemblyError, extract_data_payload, find_sse_boundary};
7use crate::models_manager::model_family::find_family_for_model;
8use async_stream::try_stream;
9use futures::StreamExt;
10use serde_json::Value;
11use std::time::Instant;
12
13use super::responses_api::parse_responses_payload;
14use super::streaming::OpenAIStreamTelemetry;
15
16fn strip_reasoning_for_model(
17    model: &str,
18    mut response: provider::LLMResponse,
19) -> provider::LLMResponse {
20    if !find_family_for_model(model).supports_reasoning_summaries {
21        response.reasoning = None;
22        response.reasoning_details = None;
23    }
24
25    response
26}
27
28fn streamed_response_is_usable(response: &provider::LLMResponse) -> bool {
29    response
30        .content
31        .as_deref()
32        .is_some_and(|content| !content.is_empty())
33        || response
34            .tool_calls
35            .as_ref()
36            .is_some_and(|tool_calls| !tool_calls.is_empty())
37        || response
38            .reasoning
39            .as_deref()
40            .is_some_and(|reasoning| !reasoning.is_empty())
41        || response
42            .reasoning_details
43            .as_ref()
44            .is_some_and(|details| !details.is_empty())
45}
46
47fn final_response_output_is_empty(final_response: &Value) -> bool {
48    final_response
49        .get("output")
50        .and_then(Value::as_array)
51        .is_some_and(Vec::is_empty)
52}
53
54fn merge_final_response_metadata(
55    response: &mut provider::LLMResponse,
56    final_response: &Value,
57    include_cached_prompt_metrics: bool,
58) {
59    if let Some(usage_value) = final_response.get("usage") {
60        let cached_prompt_tokens = if include_cached_prompt_metrics {
61            usage_value
62                .get("prompt_tokens_details")
63                .and_then(|details| details.get("cached_tokens"))
64                .or_else(|| usage_value.get("prompt_cache_hit_tokens"))
65                .and_then(Value::as_u64)
66                .and_then(|value| u32::try_from(value).ok())
67        } else {
68            None
69        };
70
71        response.usage = Some(provider::Usage {
72            prompt_tokens: usage_value
73                .get("input_tokens")
74                .or_else(|| usage_value.get("prompt_tokens"))
75                .and_then(Value::as_u64)
76                .and_then(|value| u32::try_from(value).ok())
77                .unwrap_or(0),
78            completion_tokens: usage_value
79                .get("output_tokens")
80                .or_else(|| usage_value.get("completion_tokens"))
81                .and_then(Value::as_u64)
82                .and_then(|value| u32::try_from(value).ok())
83                .unwrap_or(0),
84            total_tokens: usage_value
85                .get("total_tokens")
86                .and_then(Value::as_u64)
87                .and_then(|value| u32::try_from(value).ok())
88                .unwrap_or(0),
89            cached_prompt_tokens,
90            cache_creation_tokens: None,
91            cache_read_tokens: None,
92            iterations: None,
93        });
94    }
95
96    if let Some(request_id) = final_response
97        .get("id")
98        .and_then(Value::as_str)
99        .or_else(|| final_response.get("request_id").and_then(Value::as_str))
100    {
101        response.request_id = Some(request_id.to_string());
102    }
103}
104
105pub(crate) fn create_chat_stream(
106    response: reqwest::Response,
107    model: String,
108) -> provider::LLMStream {
109    let stream = try_stream! {
110        let mut body_stream = response.bytes_stream();
111        let mut buffer = String::new();
112        let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
113        let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
114        let telemetry = OpenAIStreamTelemetry;
115
116        while let Some(chunk_result) = body_stream.next().await {
117            let chunk = chunk_result.map_err(|err| {
118                let formatted_error = error_display::format_llm_error(
119                    "OpenAI",
120                    &format!("Streaming error: {}", err),
121                );
122                provider::LLMError::Network { message: formatted_error, metadata: None }
123            })?;
124
125            buffer.push_str(&String::from_utf8_lossy(&chunk));
126
127            while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
128                let event = buffer[..split_idx].to_string();
129                buffer.drain(..split_idx + delimiter_len);
130
131                if let Some(data_payload) = extract_data_payload(&event) {
132                    let trimmed_payload = data_payload.trim();
133                    if trimmed_payload.is_empty() || trimmed_payload == "[DONE]" {
134                        continue;
135                    }
136
137                    let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
138                        StreamAssemblyError::InvalidPayload(err.to_string())
139                            .into_llm_error("OpenAI")
140                    })?;
141
142                    if let Some(usage_val) = payload.get("usage")
143                        && let Ok(u) = serde_json::from_value::<provider::Usage>(usage_val.clone()) {
144                            aggregator.set_usage(u);
145                        }
146
147                    if let Some(choices) = payload.get("choices").and_then(|v| v.as_array())
148                        && let Some(choice) = choices.first() {
149                            if let Some(delta) = choice.get("delta") {
150                                if let Some(content) = delta.get("content").and_then(|v| v.as_str()) {
151                                    telemetry.on_content_delta(content);
152                                    for event in aggregator.handle_content(content) {
153                                        yield event;
154                                    }
155                                }
156
157                                if retain_reasoning_summaries
158                                    && let Some(reasoning) = delta.get("reasoning_content").and_then(|v| v.as_str())
159                                    && let Some(delta) = aggregator.handle_reasoning(reasoning) {
160                                        telemetry.on_reasoning_delta(&delta);
161                                        yield provider::LLMStreamEvent::Reasoning { delta };
162                                    }
163
164                                if let Some(tool_deltas) = delta.get("tool_calls").and_then(|v| v.as_array()) {
165                                    aggregator.handle_tool_calls(tool_deltas);
166                                    telemetry.on_tool_call_delta();
167                                }
168                            }
169
170                            if let Some(reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
171                                aggregator.set_finish_reason(match reason {
172                                    "stop" => provider::FinishReason::Stop,
173                                    "length" => provider::FinishReason::Length,
174                                    "tool_calls" => provider::FinishReason::ToolCalls,
175                                    "content_filter" => provider::FinishReason::ContentFilter,
176                                    _ => provider::FinishReason::Stop,
177                                });
178                            }
179                        }
180                }
181            }
182        }
183
184        let response = aggregator.finalize();
185        let response = strip_reasoning_for_model(&model, response);
186        yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
187    };
188
189    Box::pin(stream)
190}
191
192pub(crate) fn create_responses_stream(
193    response: reqwest::Response,
194    model: String,
195    include_metrics: bool,
196    _debug_model: Option<String>,
197    _request_timer: Option<Instant>,
198) -> provider::LLMStream {
199    let stream = try_stream! {
200        let mut body_stream = response.bytes_stream();
201        let mut buffer = String::new();
202        let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
203        let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
204        let mut final_response: Option<Value> = None;
205        let mut done = false;
206        #[cfg(debug_assertions)]
207        let mut streamed_events_counter: usize = 0;
208        let telemetry = OpenAIStreamTelemetry;
209
210        while let Some(chunk_result) = body_stream.next().await {
211            let chunk = chunk_result.map_err(|err| {
212                let formatted_error = error_display::format_llm_error(
213                    "OpenAI",
214                    &format!("Streaming error: {}", err),
215                );
216                provider::LLMError::Network { message: formatted_error, metadata: None }
217            })?;
218
219            buffer.push_str(&String::from_utf8_lossy(&chunk));
220
221            while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
222                let event = buffer[..split_idx].to_string();
223                buffer.drain(..split_idx + delimiter_len);
224                #[cfg(debug_assertions)]
225                {
226                    streamed_events_counter = streamed_events_counter.saturating_add(1);
227                }
228
229                if let Some(data_payload) = extract_data_payload(&event) {
230                    let trimmed_payload = data_payload.trim();
231                    if trimmed_payload.is_empty() {
232                        continue;
233                    }
234
235                    if trimmed_payload == "[DONE]" {
236                        done = true;
237                        break;
238                    }
239
240                    let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
241                        StreamAssemblyError::InvalidPayload(err.to_string())
242                            .into_llm_error("OpenAI")
243                    })?;
244
245                    if let Some(event_type) = payload.get("type").and_then(|value| value.as_str()) {
246                        match event_type {
247                            "response.output_text.delta" => {
248                                let delta = payload
249                                    .get("delta")
250                                    .and_then(|value| value.as_str())
251                                    .ok_or_else(|| {
252                                        StreamAssemblyError::MissingField("delta")
253                                            .into_llm_error("OpenAI")
254                                    })?;
255                                telemetry.on_content_delta(delta);
256
257                                for event in aggregator.handle_content(delta) {
258                                    yield event;
259                                }
260                            }
261                            "response.refusal.delta" => {
262                                let delta = payload
263                                    .get("delta")
264                                    .and_then(|value| value.as_str())
265                                    .ok_or_else(|| {
266                                        StreamAssemblyError::MissingField("delta")
267                                            .into_llm_error("OpenAI")
268                                    })?;
269                                telemetry.on_content_delta(delta);
270                                aggregator.content.push_str(delta);
271                            }
272                            "response.reasoning_text.delta" | "response.reasoning_summary_text.delta" => {
273                                let delta = payload
274                                    .get("delta")
275                                    .and_then(|value| value.as_str())
276                                    .ok_or_else(|| {
277                                        StreamAssemblyError::MissingField("delta")
278                                            .into_llm_error("OpenAI")
279                                    })?;
280                                if retain_reasoning_summaries
281                                    && let Some(delta) = aggregator.handle_reasoning(delta) {
282                                    telemetry.on_reasoning_delta(&delta);
283                                    yield provider::LLMStreamEvent::Reasoning { delta };
284                                }
285                            }
286                            "response.function_call_arguments.delta" => {}
287                            "response.completed" => {
288                                if let Some(response_value) = payload.get("response") {
289                                    final_response = Some(response_value.clone());
290                                }
291                                done = true;
292                            }
293                            "response.failed" | "response.incomplete" => {
294                                let error_message = if let Some(err) = payload.get("response")
295                                    .and_then(|r| r.get("error"))
296                                {
297                                    err.get("message")
298                                        .and_then(|v| v.as_str())
299                                        .unwrap_or("Unknown error")
300                                } else {
301                                    "Unknown error from Responses API"
302                                };
303                                let formatted_error = error_display::format_llm_error("OpenAI", error_message);
304                                Err(provider::LLMError::Provider {
305                                    message: formatted_error,
306                                    metadata: None,
307                                })?;
308                            }
309                            _ => {}
310                        }
311                    }
312                }
313
314                if done {
315                    break;
316                }
317            }
318
319            if done {
320                break;
321            }
322        }
323
324        let response_value = match final_response {
325            Some(value) => value,
326            None => {
327                let formatted_error = error_display::format_llm_error(
328                    "OpenAI",
329                    "Stream ended without a completion event",
330                );
331                Err(provider::LLMError::Provider { message: formatted_error, metadata: None })?
332            }
333        };
334
335        let final_aggregator_response = aggregator.finalize();
336        let mut response = match parse_responses_payload(response_value.clone(), model.clone(), include_metrics) {
337            Ok(response) => response,
338            Err(_)
339                if final_response_output_is_empty(&response_value)
340                    && streamed_response_is_usable(&final_aggregator_response) =>
341            {
342                let mut response = final_aggregator_response.clone();
343                merge_final_response_metadata(&mut response, &response_value, include_metrics);
344                response
345            }
346            Err(err) => Err(err)?,
347        };
348
349        if response.content.is_none() {
350            response.content = final_aggregator_response.content;
351        } else if let (Some(c), Some(agg_c)) = (&mut response.content, final_aggregator_response.content)
352            && !c.contains(&agg_c) {
353                c.push_str(&agg_c);
354            }
355
356        if response.reasoning.is_none() {
357            response.reasoning = final_aggregator_response.reasoning;
358        }
359
360        let response = strip_reasoning_for_model(&model, response);
361        yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
362    };
363
364    Box::pin(stream)
365}