vtcode_core/llm/providers/openai/
stream_decoder.rs1use crate::llm::error_display;
4use crate::llm::provider;
5use crate::llm::providers::shared::StreamTelemetry;
6use crate::llm::providers::shared::{StreamAssemblyError, extract_data_payload, find_sse_boundary};
7use crate::models_manager::model_family::find_family_for_model;
8use async_stream::try_stream;
9use futures::StreamExt;
10use serde_json::Value;
11use std::time::Instant;
12
13use super::responses_api::parse_responses_payload;
14use super::streaming::OpenAIStreamTelemetry;
15
16fn strip_reasoning_for_model(
17 model: &str,
18 mut response: provider::LLMResponse,
19) -> provider::LLMResponse {
20 if !find_family_for_model(model).supports_reasoning_summaries {
21 response.reasoning = None;
22 response.reasoning_details = None;
23 }
24
25 response
26}
27
28fn streamed_response_is_usable(response: &provider::LLMResponse) -> bool {
29 response
30 .content
31 .as_deref()
32 .is_some_and(|content| !content.is_empty())
33 || response
34 .tool_calls
35 .as_ref()
36 .is_some_and(|tool_calls| !tool_calls.is_empty())
37 || response
38 .reasoning
39 .as_deref()
40 .is_some_and(|reasoning| !reasoning.is_empty())
41 || response
42 .reasoning_details
43 .as_ref()
44 .is_some_and(|details| !details.is_empty())
45}
46
47fn final_response_output_is_empty(final_response: &Value) -> bool {
48 final_response
49 .get("output")
50 .and_then(Value::as_array)
51 .is_some_and(Vec::is_empty)
52}
53
54fn merge_final_response_metadata(
55 response: &mut provider::LLMResponse,
56 final_response: &Value,
57 include_cached_prompt_metrics: bool,
58) {
59 if let Some(usage_value) = final_response.get("usage") {
60 let cached_prompt_tokens = if include_cached_prompt_metrics {
61 usage_value
62 .get("prompt_tokens_details")
63 .and_then(|details| details.get("cached_tokens"))
64 .or_else(|| usage_value.get("prompt_cache_hit_tokens"))
65 .and_then(Value::as_u64)
66 .and_then(|value| u32::try_from(value).ok())
67 } else {
68 None
69 };
70
71 response.usage = Some(provider::Usage {
72 prompt_tokens: usage_value
73 .get("input_tokens")
74 .or_else(|| usage_value.get("prompt_tokens"))
75 .and_then(Value::as_u64)
76 .and_then(|value| u32::try_from(value).ok())
77 .unwrap_or(0),
78 completion_tokens: usage_value
79 .get("output_tokens")
80 .or_else(|| usage_value.get("completion_tokens"))
81 .and_then(Value::as_u64)
82 .and_then(|value| u32::try_from(value).ok())
83 .unwrap_or(0),
84 total_tokens: usage_value
85 .get("total_tokens")
86 .and_then(Value::as_u64)
87 .and_then(|value| u32::try_from(value).ok())
88 .unwrap_or(0),
89 cached_prompt_tokens,
90 cache_creation_tokens: None,
91 cache_read_tokens: None,
92 iterations: None,
93 });
94 }
95
96 if let Some(request_id) = final_response
97 .get("id")
98 .and_then(Value::as_str)
99 .or_else(|| final_response.get("request_id").and_then(Value::as_str))
100 {
101 response.request_id = Some(request_id.to_string());
102 }
103}
104
105pub(crate) fn create_chat_stream(
106 response: reqwest::Response,
107 model: String,
108) -> provider::LLMStream {
109 let stream = try_stream! {
110 let mut body_stream = response.bytes_stream();
111 let mut buffer = String::new();
112 let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
113 let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
114 let telemetry = OpenAIStreamTelemetry;
115
116 while let Some(chunk_result) = body_stream.next().await {
117 let chunk = chunk_result.map_err(|err| {
118 let formatted_error = error_display::format_llm_error(
119 "OpenAI",
120 &format!("Streaming error: {}", err),
121 );
122 provider::LLMError::Network { message: formatted_error, metadata: None }
123 })?;
124
125 buffer.push_str(&String::from_utf8_lossy(&chunk));
126
127 while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
128 let event = buffer[..split_idx].to_string();
129 buffer.drain(..split_idx + delimiter_len);
130
131 if let Some(data_payload) = extract_data_payload(&event) {
132 let trimmed_payload = data_payload.trim();
133 if trimmed_payload.is_empty() || trimmed_payload == "[DONE]" {
134 continue;
135 }
136
137 let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
138 StreamAssemblyError::InvalidPayload(err.to_string())
139 .into_llm_error("OpenAI")
140 })?;
141
142 if let Some(usage_val) = payload.get("usage")
143 && let Ok(u) = serde_json::from_value::<provider::Usage>(usage_val.clone()) {
144 aggregator.set_usage(u);
145 }
146
147 if let Some(choices) = payload.get("choices").and_then(|v| v.as_array())
148 && let Some(choice) = choices.first() {
149 if let Some(delta) = choice.get("delta") {
150 if let Some(content) = delta.get("content").and_then(|v| v.as_str()) {
151 telemetry.on_content_delta(content);
152 for event in aggregator.handle_content(content) {
153 yield event;
154 }
155 }
156
157 if retain_reasoning_summaries
158 && let Some(reasoning) = delta.get("reasoning_content").and_then(|v| v.as_str())
159 && let Some(delta) = aggregator.handle_reasoning(reasoning) {
160 telemetry.on_reasoning_delta(&delta);
161 yield provider::LLMStreamEvent::Reasoning { delta };
162 }
163
164 if let Some(tool_deltas) = delta.get("tool_calls").and_then(|v| v.as_array()) {
165 aggregator.handle_tool_calls(tool_deltas);
166 telemetry.on_tool_call_delta();
167 }
168 }
169
170 if let Some(reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
171 aggregator.set_finish_reason(match reason {
172 "stop" => provider::FinishReason::Stop,
173 "length" => provider::FinishReason::Length,
174 "tool_calls" => provider::FinishReason::ToolCalls,
175 "content_filter" => provider::FinishReason::ContentFilter,
176 _ => provider::FinishReason::Stop,
177 });
178 }
179 }
180 }
181 }
182 }
183
184 let response = aggregator.finalize();
185 let response = strip_reasoning_for_model(&model, response);
186 yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
187 };
188
189 Box::pin(stream)
190}
191
192pub(crate) fn create_responses_stream(
193 response: reqwest::Response,
194 model: String,
195 include_metrics: bool,
196 _debug_model: Option<String>,
197 _request_timer: Option<Instant>,
198) -> provider::LLMStream {
199 let stream = try_stream! {
200 let mut body_stream = response.bytes_stream();
201 let mut buffer = String::new();
202 let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
203 let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
204 let mut final_response: Option<Value> = None;
205 let mut done = false;
206 #[cfg(debug_assertions)]
207 let mut streamed_events_counter: usize = 0;
208 let telemetry = OpenAIStreamTelemetry;
209
210 while let Some(chunk_result) = body_stream.next().await {
211 let chunk = chunk_result.map_err(|err| {
212 let formatted_error = error_display::format_llm_error(
213 "OpenAI",
214 &format!("Streaming error: {}", err),
215 );
216 provider::LLMError::Network { message: formatted_error, metadata: None }
217 })?;
218
219 buffer.push_str(&String::from_utf8_lossy(&chunk));
220
221 while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
222 let event = buffer[..split_idx].to_string();
223 buffer.drain(..split_idx + delimiter_len);
224 #[cfg(debug_assertions)]
225 {
226 streamed_events_counter = streamed_events_counter.saturating_add(1);
227 }
228
229 if let Some(data_payload) = extract_data_payload(&event) {
230 let trimmed_payload = data_payload.trim();
231 if trimmed_payload.is_empty() {
232 continue;
233 }
234
235 if trimmed_payload == "[DONE]" {
236 done = true;
237 break;
238 }
239
240 let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
241 StreamAssemblyError::InvalidPayload(err.to_string())
242 .into_llm_error("OpenAI")
243 })?;
244
245 if let Some(event_type) = payload.get("type").and_then(|value| value.as_str()) {
246 match event_type {
247 "response.output_text.delta" => {
248 let delta = payload
249 .get("delta")
250 .and_then(|value| value.as_str())
251 .ok_or_else(|| {
252 StreamAssemblyError::MissingField("delta")
253 .into_llm_error("OpenAI")
254 })?;
255 telemetry.on_content_delta(delta);
256
257 for event in aggregator.handle_content(delta) {
258 yield event;
259 }
260 }
261 "response.refusal.delta" => {
262 let delta = payload
263 .get("delta")
264 .and_then(|value| value.as_str())
265 .ok_or_else(|| {
266 StreamAssemblyError::MissingField("delta")
267 .into_llm_error("OpenAI")
268 })?;
269 telemetry.on_content_delta(delta);
270 aggregator.content.push_str(delta);
271 }
272 "response.reasoning_text.delta" | "response.reasoning_summary_text.delta" => {
273 let delta = payload
274 .get("delta")
275 .and_then(|value| value.as_str())
276 .ok_or_else(|| {
277 StreamAssemblyError::MissingField("delta")
278 .into_llm_error("OpenAI")
279 })?;
280 if retain_reasoning_summaries
281 && let Some(delta) = aggregator.handle_reasoning(delta) {
282 telemetry.on_reasoning_delta(&delta);
283 yield provider::LLMStreamEvent::Reasoning { delta };
284 }
285 }
286 "response.function_call_arguments.delta" => {}
287 "response.completed" => {
288 if let Some(response_value) = payload.get("response") {
289 final_response = Some(response_value.clone());
290 }
291 done = true;
292 }
293 "response.failed" | "response.incomplete" => {
294 let error_message = if let Some(err) = payload.get("response")
295 .and_then(|r| r.get("error"))
296 {
297 err.get("message")
298 .and_then(|v| v.as_str())
299 .unwrap_or("Unknown error")
300 } else {
301 "Unknown error from Responses API"
302 };
303 let formatted_error = error_display::format_llm_error("OpenAI", error_message);
304 Err(provider::LLMError::Provider {
305 message: formatted_error,
306 metadata: None,
307 })?;
308 }
309 _ => {}
310 }
311 }
312 }
313
314 if done {
315 break;
316 }
317 }
318
319 if done {
320 break;
321 }
322 }
323
324 let response_value = match final_response {
325 Some(value) => value,
326 None => {
327 let formatted_error = error_display::format_llm_error(
328 "OpenAI",
329 "Stream ended without a completion event",
330 );
331 Err(provider::LLMError::Provider { message: formatted_error, metadata: None })?
332 }
333 };
334
335 let final_aggregator_response = aggregator.finalize();
336 let mut response = match parse_responses_payload(response_value.clone(), model.clone(), include_metrics) {
337 Ok(response) => response,
338 Err(_)
339 if final_response_output_is_empty(&response_value)
340 && streamed_response_is_usable(&final_aggregator_response) =>
341 {
342 let mut response = final_aggregator_response.clone();
343 merge_final_response_metadata(&mut response, &response_value, include_metrics);
344 response
345 }
346 Err(err) => Err(err)?,
347 };
348
349 if response.content.is_none() {
350 response.content = final_aggregator_response.content;
351 } else if let (Some(c), Some(agg_c)) = (&mut response.content, final_aggregator_response.content)
352 && !c.contains(&agg_c) {
353 c.push_str(&agg_c);
354 }
355
356 if response.reasoning.is_none() {
357 response.reasoning = final_aggregator_response.reasoning;
358 }
359
360 let response = strip_reasoning_for_model(&model, response);
361 yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
362 };
363
364 Box::pin(stream)
365}