Skip to main content

vtcode_core/llm/providers/openai/
stream_decoder.rs

1//! Streaming decoders for OpenAI Chat Completions and Responses APIs.
2
3use crate::llm::error_display;
4use crate::llm::provider;
5use crate::llm::providers::shared::StreamTelemetry;
6use crate::llm::providers::shared::{StreamAssemblyError, extract_data_payload, find_sse_boundary};
7use crate::models_manager::model_family::find_family_for_model;
8use async_stream::try_stream;
9use futures::StreamExt;
10use serde_json::Value;
11use std::time::Instant;
12
13use super::responses_api::parse_responses_payload;
14use super::streaming::OpenAIStreamTelemetry;
15
16fn strip_reasoning_for_model(
17    model: &str,
18    mut response: provider::LLMResponse,
19) -> provider::LLMResponse {
20    if !find_family_for_model(model).supports_reasoning_summaries {
21        response.reasoning = None;
22        response.reasoning_details = None;
23    }
24
25    response
26}
27
28pub(crate) fn create_chat_stream(
29    response: reqwest::Response,
30    model: String,
31) -> provider::LLMStream {
32    let stream = try_stream! {
33        let mut body_stream = response.bytes_stream();
34        let mut buffer = String::new();
35        let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
36        let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
37        let telemetry = OpenAIStreamTelemetry;
38
39        while let Some(chunk_result) = body_stream.next().await {
40            let chunk = chunk_result.map_err(|err| {
41                let formatted_error = error_display::format_llm_error(
42                    "OpenAI",
43                    &format!("Streaming error: {}", err),
44                );
45                provider::LLMError::Network { message: formatted_error, metadata: None }
46            })?;
47
48            buffer.push_str(&String::from_utf8_lossy(&chunk));
49
50            while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
51                let event = buffer[..split_idx].to_string();
52                buffer.drain(..split_idx + delimiter_len);
53
54                if let Some(data_payload) = extract_data_payload(&event) {
55                    let trimmed_payload = data_payload.trim();
56                    if trimmed_payload.is_empty() || trimmed_payload == "[DONE]" {
57                        continue;
58                    }
59
60                    let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
61                        StreamAssemblyError::InvalidPayload(err.to_string())
62                            .into_llm_error("OpenAI")
63                    })?;
64
65                    if let Some(usage_val) = payload.get("usage")
66                        && let Ok(u) = serde_json::from_value::<provider::Usage>(usage_val.clone()) {
67                            aggregator.set_usage(u);
68                        }
69
70                    if let Some(choices) = payload.get("choices").and_then(|v| v.as_array())
71                        && let Some(choice) = choices.first() {
72                            if let Some(delta) = choice.get("delta") {
73                                if let Some(content) = delta.get("content").and_then(|v| v.as_str()) {
74                                    telemetry.on_content_delta(content);
75                                    for event in aggregator.handle_content(content) {
76                                        yield event;
77                                    }
78                                }
79
80                                if retain_reasoning_summaries
81                                    && let Some(reasoning) = delta.get("reasoning_content").and_then(|v| v.as_str())
82                                    && let Some(delta) = aggregator.handle_reasoning(reasoning) {
83                                        telemetry.on_reasoning_delta(&delta);
84                                        yield provider::LLMStreamEvent::Reasoning { delta };
85                                    }
86
87                                if let Some(tool_deltas) = delta.get("tool_calls").and_then(|v| v.as_array()) {
88                                    aggregator.handle_tool_calls(tool_deltas);
89                                    telemetry.on_tool_call_delta();
90                                }
91                            }
92
93                            if let Some(reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
94                                aggregator.set_finish_reason(match reason {
95                                    "stop" => provider::FinishReason::Stop,
96                                    "length" => provider::FinishReason::Length,
97                                    "tool_calls" => provider::FinishReason::ToolCalls,
98                                    "content_filter" => provider::FinishReason::ContentFilter,
99                                    _ => provider::FinishReason::Stop,
100                                });
101                            }
102                        }
103                }
104            }
105        }
106
107        let response = aggregator.finalize();
108        let response = strip_reasoning_for_model(&model, response);
109        yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
110    };
111
112    Box::pin(stream)
113}
114
115pub(crate) fn create_responses_stream(
116    response: reqwest::Response,
117    model: String,
118    include_metrics: bool,
119    _debug_model: Option<String>,
120    _request_timer: Option<Instant>,
121) -> provider::LLMStream {
122    let stream = try_stream! {
123        let mut body_stream = response.bytes_stream();
124        let mut buffer = String::new();
125        let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
126        let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
127        let mut final_response: Option<Value> = None;
128        let mut done = false;
129        #[cfg(debug_assertions)]
130        let mut streamed_events_counter: usize = 0;
131        let telemetry = OpenAIStreamTelemetry;
132
133        while let Some(chunk_result) = body_stream.next().await {
134            let chunk = chunk_result.map_err(|err| {
135                let formatted_error = error_display::format_llm_error(
136                    "OpenAI",
137                    &format!("Streaming error: {}", err),
138                );
139                provider::LLMError::Network { message: formatted_error, metadata: None }
140            })?;
141
142            buffer.push_str(&String::from_utf8_lossy(&chunk));
143
144            while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
145                let event = buffer[..split_idx].to_string();
146                buffer.drain(..split_idx + delimiter_len);
147                #[cfg(debug_assertions)]
148                {
149                    streamed_events_counter = streamed_events_counter.saturating_add(1);
150                }
151
152                if let Some(data_payload) = extract_data_payload(&event) {
153                    let trimmed_payload = data_payload.trim();
154                    if trimmed_payload.is_empty() {
155                        continue;
156                    }
157
158                    if trimmed_payload == "[DONE]" {
159                        done = true;
160                        break;
161                    }
162
163                    let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
164                        StreamAssemblyError::InvalidPayload(err.to_string())
165                            .into_llm_error("OpenAI")
166                    })?;
167
168                    if let Some(event_type) = payload.get("type").and_then(|value| value.as_str()) {
169                        match event_type {
170                            "response.output_text.delta" => {
171                                let delta = payload
172                                    .get("delta")
173                                    .and_then(|value| value.as_str())
174                                    .ok_or_else(|| {
175                                        StreamAssemblyError::MissingField("delta")
176                                            .into_llm_error("OpenAI")
177                                    })?;
178                                telemetry.on_content_delta(delta);
179
180                                for event in aggregator.handle_content(delta) {
181                                    yield event;
182                                }
183                            }
184                            "response.refusal.delta" => {
185                                let delta = payload
186                                    .get("delta")
187                                    .and_then(|value| value.as_str())
188                                    .ok_or_else(|| {
189                                        StreamAssemblyError::MissingField("delta")
190                                            .into_llm_error("OpenAI")
191                                    })?;
192                                telemetry.on_content_delta(delta);
193                                aggregator.content.push_str(delta);
194                            }
195                            "response.reasoning_text.delta" | "response.reasoning_summary_text.delta" => {
196                                let delta = payload
197                                    .get("delta")
198                                    .and_then(|value| value.as_str())
199                                    .ok_or_else(|| {
200                                        StreamAssemblyError::MissingField("delta")
201                                            .into_llm_error("OpenAI")
202                                    })?;
203                                if retain_reasoning_summaries
204                                    && let Some(delta) = aggregator.handle_reasoning(delta) {
205                                    telemetry.on_reasoning_delta(&delta);
206                                    yield provider::LLMStreamEvent::Reasoning { delta };
207                                }
208                            }
209                            "response.function_call_arguments.delta" => {}
210                            "response.completed" => {
211                                if let Some(response_value) = payload.get("response") {
212                                    final_response = Some(response_value.clone());
213                                }
214                                done = true;
215                            }
216                            "response.failed" | "response.incomplete" => {
217                                let error_message = if let Some(err) = payload.get("response")
218                                    .and_then(|r| r.get("error"))
219                                {
220                                    err.get("message")
221                                        .and_then(|v| v.as_str())
222                                        .unwrap_or("Unknown error")
223                                } else {
224                                    "Unknown error from Responses API"
225                                };
226                                let formatted_error = error_display::format_llm_error("OpenAI", error_message);
227                                Err(provider::LLMError::Provider {
228                                    message: formatted_error,
229                                    metadata: None,
230                                })?;
231                            }
232                            _ => {}
233                        }
234                    }
235                }
236
237                if done {
238                    break;
239                }
240            }
241
242            if done {
243                break;
244            }
245        }
246
247        let response_value = match final_response {
248            Some(value) => value,
249            None => {
250                let formatted_error = error_display::format_llm_error(
251                    "OpenAI",
252                    "Stream ended without a completion event",
253                );
254                Err(provider::LLMError::Provider { message: formatted_error, metadata: None })?
255            }
256        };
257
258        let mut response = parse_responses_payload(response_value, model.clone(), include_metrics)?;
259
260        let final_aggregator_response = aggregator.finalize();
261
262        if response.content.is_none() {
263            response.content = final_aggregator_response.content;
264        } else if let (Some(c), Some(agg_c)) = (&mut response.content, final_aggregator_response.content)
265            && !c.contains(&agg_c) {
266                c.push_str(&agg_c);
267            }
268
269        if response.reasoning.is_none() {
270            response.reasoning = final_aggregator_response.reasoning;
271        }
272
273        let response = strip_reasoning_for_model(&model, response);
274        yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
275    };
276
277    Box::pin(stream)
278}