vtcode_core/llm/providers/openai/
stream_decoder.rs1use crate::llm::error_display;
4use crate::llm::provider;
5use crate::llm::providers::shared::StreamTelemetry;
6use crate::llm::providers::shared::{StreamAssemblyError, extract_data_payload, find_sse_boundary};
7use crate::models_manager::model_family::find_family_for_model;
8use async_stream::try_stream;
9use futures::StreamExt;
10use serde_json::Value;
11use std::time::Instant;
12
13use super::responses_api::parse_responses_payload;
14use super::streaming::OpenAIStreamTelemetry;
15
16fn strip_reasoning_for_model(
17 model: &str,
18 mut response: provider::LLMResponse,
19) -> provider::LLMResponse {
20 if !find_family_for_model(model).supports_reasoning_summaries {
21 response.reasoning = None;
22 response.reasoning_details = None;
23 }
24
25 response
26}
27
28pub(crate) fn create_chat_stream(
29 response: reqwest::Response,
30 model: String,
31) -> provider::LLMStream {
32 let stream = try_stream! {
33 let mut body_stream = response.bytes_stream();
34 let mut buffer = String::new();
35 let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
36 let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
37 let telemetry = OpenAIStreamTelemetry;
38
39 while let Some(chunk_result) = body_stream.next().await {
40 let chunk = chunk_result.map_err(|err| {
41 let formatted_error = error_display::format_llm_error(
42 "OpenAI",
43 &format!("Streaming error: {}", err),
44 );
45 provider::LLMError::Network { message: formatted_error, metadata: None }
46 })?;
47
48 buffer.push_str(&String::from_utf8_lossy(&chunk));
49
50 while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
51 let event = buffer[..split_idx].to_string();
52 buffer.drain(..split_idx + delimiter_len);
53
54 if let Some(data_payload) = extract_data_payload(&event) {
55 let trimmed_payload = data_payload.trim();
56 if trimmed_payload.is_empty() || trimmed_payload == "[DONE]" {
57 continue;
58 }
59
60 let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
61 StreamAssemblyError::InvalidPayload(err.to_string())
62 .into_llm_error("OpenAI")
63 })?;
64
65 if let Some(usage_val) = payload.get("usage")
66 && let Ok(u) = serde_json::from_value::<provider::Usage>(usage_val.clone()) {
67 aggregator.set_usage(u);
68 }
69
70 if let Some(choices) = payload.get("choices").and_then(|v| v.as_array())
71 && let Some(choice) = choices.first() {
72 if let Some(delta) = choice.get("delta") {
73 if let Some(content) = delta.get("content").and_then(|v| v.as_str()) {
74 telemetry.on_content_delta(content);
75 for event in aggregator.handle_content(content) {
76 yield event;
77 }
78 }
79
80 if retain_reasoning_summaries
81 && let Some(reasoning) = delta.get("reasoning_content").and_then(|v| v.as_str())
82 && let Some(delta) = aggregator.handle_reasoning(reasoning) {
83 telemetry.on_reasoning_delta(&delta);
84 yield provider::LLMStreamEvent::Reasoning { delta };
85 }
86
87 if let Some(tool_deltas) = delta.get("tool_calls").and_then(|v| v.as_array()) {
88 aggregator.handle_tool_calls(tool_deltas);
89 telemetry.on_tool_call_delta();
90 }
91 }
92
93 if let Some(reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
94 aggregator.set_finish_reason(match reason {
95 "stop" => provider::FinishReason::Stop,
96 "length" => provider::FinishReason::Length,
97 "tool_calls" => provider::FinishReason::ToolCalls,
98 "content_filter" => provider::FinishReason::ContentFilter,
99 _ => provider::FinishReason::Stop,
100 });
101 }
102 }
103 }
104 }
105 }
106
107 let response = aggregator.finalize();
108 let response = strip_reasoning_for_model(&model, response);
109 yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
110 };
111
112 Box::pin(stream)
113}
114
115pub(crate) fn create_responses_stream(
116 response: reqwest::Response,
117 model: String,
118 include_metrics: bool,
119 _debug_model: Option<String>,
120 _request_timer: Option<Instant>,
121) -> provider::LLMStream {
122 let stream = try_stream! {
123 let mut body_stream = response.bytes_stream();
124 let mut buffer = String::new();
125 let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model.clone());
126 let retain_reasoning_summaries = find_family_for_model(&model).supports_reasoning_summaries;
127 let mut final_response: Option<Value> = None;
128 let mut done = false;
129 #[cfg(debug_assertions)]
130 let mut streamed_events_counter: usize = 0;
131 let telemetry = OpenAIStreamTelemetry;
132
133 while let Some(chunk_result) = body_stream.next().await {
134 let chunk = chunk_result.map_err(|err| {
135 let formatted_error = error_display::format_llm_error(
136 "OpenAI",
137 &format!("Streaming error: {}", err),
138 );
139 provider::LLMError::Network { message: formatted_error, metadata: None }
140 })?;
141
142 buffer.push_str(&String::from_utf8_lossy(&chunk));
143
144 while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
145 let event = buffer[..split_idx].to_string();
146 buffer.drain(..split_idx + delimiter_len);
147 #[cfg(debug_assertions)]
148 {
149 streamed_events_counter = streamed_events_counter.saturating_add(1);
150 }
151
152 if let Some(data_payload) = extract_data_payload(&event) {
153 let trimmed_payload = data_payload.trim();
154 if trimmed_payload.is_empty() {
155 continue;
156 }
157
158 if trimmed_payload == "[DONE]" {
159 done = true;
160 break;
161 }
162
163 let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
164 StreamAssemblyError::InvalidPayload(err.to_string())
165 .into_llm_error("OpenAI")
166 })?;
167
168 if let Some(event_type) = payload.get("type").and_then(|value| value.as_str()) {
169 match event_type {
170 "response.output_text.delta" => {
171 let delta = payload
172 .get("delta")
173 .and_then(|value| value.as_str())
174 .ok_or_else(|| {
175 StreamAssemblyError::MissingField("delta")
176 .into_llm_error("OpenAI")
177 })?;
178 telemetry.on_content_delta(delta);
179
180 for event in aggregator.handle_content(delta) {
181 yield event;
182 }
183 }
184 "response.refusal.delta" => {
185 let delta = payload
186 .get("delta")
187 .and_then(|value| value.as_str())
188 .ok_or_else(|| {
189 StreamAssemblyError::MissingField("delta")
190 .into_llm_error("OpenAI")
191 })?;
192 telemetry.on_content_delta(delta);
193 aggregator.content.push_str(delta);
194 }
195 "response.reasoning_text.delta" | "response.reasoning_summary_text.delta" => {
196 let delta = payload
197 .get("delta")
198 .and_then(|value| value.as_str())
199 .ok_or_else(|| {
200 StreamAssemblyError::MissingField("delta")
201 .into_llm_error("OpenAI")
202 })?;
203 if retain_reasoning_summaries
204 && let Some(delta) = aggregator.handle_reasoning(delta) {
205 telemetry.on_reasoning_delta(&delta);
206 yield provider::LLMStreamEvent::Reasoning { delta };
207 }
208 }
209 "response.function_call_arguments.delta" => {}
210 "response.completed" => {
211 if let Some(response_value) = payload.get("response") {
212 final_response = Some(response_value.clone());
213 }
214 done = true;
215 }
216 "response.failed" | "response.incomplete" => {
217 let error_message = if let Some(err) = payload.get("response")
218 .and_then(|r| r.get("error"))
219 {
220 err.get("message")
221 .and_then(|v| v.as_str())
222 .unwrap_or("Unknown error")
223 } else {
224 "Unknown error from Responses API"
225 };
226 let formatted_error = error_display::format_llm_error("OpenAI", error_message);
227 Err(provider::LLMError::Provider {
228 message: formatted_error,
229 metadata: None,
230 })?;
231 }
232 _ => {}
233 }
234 }
235 }
236
237 if done {
238 break;
239 }
240 }
241
242 if done {
243 break;
244 }
245 }
246
247 let response_value = match final_response {
248 Some(value) => value,
249 None => {
250 let formatted_error = error_display::format_llm_error(
251 "OpenAI",
252 "Stream ended without a completion event",
253 );
254 Err(provider::LLMError::Provider { message: formatted_error, metadata: None })?
255 }
256 };
257
258 let mut response = parse_responses_payload(response_value, model.clone(), include_metrics)?;
259
260 let final_aggregator_response = aggregator.finalize();
261
262 if response.content.is_none() {
263 response.content = final_aggregator_response.content;
264 } else if let (Some(c), Some(agg_c)) = (&mut response.content, final_aggregator_response.content)
265 && !c.contains(&agg_c) {
266 c.push_str(&agg_c);
267 }
268
269 if response.reasoning.is_none() {
270 response.reasoning = final_aggregator_response.reasoning;
271 }
272
273 let response = strip_reasoning_for_model(&model, response);
274 yield provider::LLMStreamEvent::Completed { response: Box::new(response) };
275 };
276
277 Box::pin(stream)
278}