vtcode_core/llm/providers/openresponses/
streaming.rs1use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub enum StreamEventType {
16 #[serde(rename = "response.created")]
18 ResponseCreated,
19 #[serde(rename = "response.in_progress")]
20 ResponseInProgress,
21 #[serde(rename = "response.completed")]
22 ResponseCompleted,
23 #[serde(rename = "response.failed")]
24 ResponseFailed,
25 #[serde(rename = "response.incomplete")]
26 ResponseIncomplete,
27
28 #[serde(rename = "response.output_item.added")]
30 OutputItemAdded,
31 #[serde(rename = "response.output_item.done")]
32 OutputItemDone,
33
34 #[serde(rename = "response.output_text.delta")]
36 OutputTextDelta,
37 #[serde(rename = "response.output_text.done")]
38 OutputTextDone,
39
40 #[serde(rename = "response.content_part.added")]
42 ContentPartAdded,
43 #[serde(rename = "response.content_part.done")]
44 ContentPartDone,
45
46 #[serde(rename = "response.function_call_arguments.delta")]
48 FunctionCallArgumentsDelta,
49 #[serde(rename = "response.function_call_arguments.done")]
50 FunctionCallArgumentsDone,
51
52 #[serde(rename = "response.reasoning_summary_text.delta")]
54 ReasoningSummaryTextDelta,
55 #[serde(rename = "response.reasoning_summary_text.done")]
56 ReasoningSummaryTextDone,
57
58 #[serde(rename = "response.reasoning_content.delta")]
60 ReasoningContentDelta,
61 #[serde(rename = "response.reasoning_content.done")]
62 ReasoningContentDone,
63
64 #[serde(rename = "error")]
66 Error,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct StreamEvent {
72 #[serde(rename = "type")]
73 pub event_type: String,
74 #[serde(default)]
75 pub sequence_number: u32,
76 #[serde(flatten)]
77 pub data: StreamEventData,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(untagged)]
83pub enum StreamEventData {
84 Response(ResponseEventData),
86 OutputItem(OutputItemEventData),
88 TextDelta(TextDeltaEventData),
90 FunctionCallDelta(FunctionCallDeltaEventData),
92 ReasoningContentDelta(ReasoningContentDeltaEventData),
94 Error(ErrorEventData),
96 Generic(Value),
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct ResponseEventData {
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub response: Option<Value>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct OutputItemEventData {
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub item: Option<Value>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub output_index: Option<u32>,
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub item_id: Option<String>,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct TextDeltaEventData {
121 pub delta: String,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub item_id: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub output_index: Option<u32>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub content_index: Option<u32>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct FunctionCallDeltaEventData {
133 pub delta: String,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub item_id: Option<String>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub output_index: Option<u32>,
138 #[serde(skip_serializing_if = "Option::is_none")]
139 pub call_id: Option<String>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct ReasoningContentDeltaEventData {
145 pub delta: String,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 pub item_id: Option<String>,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 pub output_index: Option<u32>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ErrorEventData {
155 pub error: StreamError,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct StreamError {
161 pub code: String,
162 pub message: String,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub param: Option<String>,
165}
166
167pub fn parse_sse_event(line: &str) -> Option<StreamEvent> {
173 let line = line.trim();
175 if line.is_empty() || line == "[DONE]" {
176 return None;
177 }
178
179 if let Some(data) = line.strip_prefix("data: ") {
180 if data == "[DONE]" {
181 return None;
182 }
183 serde_json::from_str(data).ok()
184 } else if line.starts_with('{') {
185 serde_json::from_str(line).ok()
187 } else {
188 None
189 }
190}
191
192pub fn extract_event_type(line: &str) -> Option<String> {
194 let line = line.trim();
195 line.strip_prefix("event: ")
196 .map(|event_type| event_type.to_string())
197}
198
199#[derive(Debug, Default)]
201pub struct StreamAccumulator {
202 pub text_content: String,
203 pub reasoning_content: String,
204 pub reasoning_summary: String,
205 pub function_calls: Vec<AccumulatedFunctionCall>,
206 pub current_function_call: Option<AccumulatingFunctionCall>,
207 pub output_items: Vec<Value>,
208 pub response_id: Option<String>,
209 pub model: Option<String>,
210 pub usage: Option<Value>,
211 pub is_complete: bool,
212 pub error: Option<StreamError>,
213}
214
215#[derive(Debug, Clone, Default)]
217pub struct AccumulatingFunctionCall {
218 pub id: String,
219 pub call_id: String,
220 pub name: String,
221 pub arguments: String,
222}
223
224#[derive(Debug, Clone)]
226pub struct AccumulatedFunctionCall {
227 pub id: String,
228 pub call_id: String,
229 pub name: String,
230 pub arguments: String,
231}
232
233impl StreamAccumulator {
234 pub fn new() -> Self {
235 Self::default()
236 }
237
238 pub fn process_event(&mut self, event: &StreamEvent) {
240 match event.event_type.as_str() {
241 "response.created" | "response.in_progress" => {
242 if let StreamEventData::Response(data) = &event.data
243 && let Some(response) = &data.response
244 {
245 self.response_id = response
246 .get("id")
247 .and_then(|v| v.as_str())
248 .map(String::from);
249 self.model = response
250 .get("model")
251 .and_then(|v| v.as_str())
252 .map(String::from);
253 }
254 }
255 "response.output_text.delta" => {
256 if let StreamEventData::TextDelta(data) = &event.data {
257 self.text_content.push_str(&data.delta);
258 }
259 }
260 "response.reasoning_summary_text.delta" => {
261 if let StreamEventData::TextDelta(data) = &event.data {
263 self.reasoning_summary.push_str(&data.delta);
264 }
265 }
266 "response.reasoning_content.delta" => {
267 if let StreamEventData::ReasoningContentDelta(data) = &event.data {
269 self.reasoning_content.push_str(&data.delta);
270 }
271 }
272 "response.function_call_arguments.delta" => {
273 if let StreamEventData::FunctionCallDelta(data) = &event.data
274 && let Some(ref mut fc) = self.current_function_call
275 {
276 fc.arguments.push_str(&data.delta);
277 }
278 }
279 "response.output_item.added" => {
280 if let StreamEventData::OutputItem(data) = &event.data
281 && let Some(item) = &data.item
282 {
283 if item.get("type").and_then(|v| v.as_str()) == Some("function_call") {
285 let fc = AccumulatingFunctionCall {
286 id: item
287 .get("id")
288 .and_then(|v| v.as_str())
289 .unwrap_or_default()
290 .to_string(),
291 call_id: item
292 .get("call_id")
293 .and_then(|v| v.as_str())
294 .unwrap_or_default()
295 .to_string(),
296 name: item
297 .get("name")
298 .and_then(|v| v.as_str())
299 .unwrap_or_default()
300 .to_string(),
301 arguments: String::new(),
302 };
303 self.current_function_call = Some(fc);
304 }
305 self.output_items.push(item.clone());
306 }
307 }
308 "response.output_item.done" => {
309 if let Some(fc) = self.current_function_call.take() {
311 self.function_calls.push(AccumulatedFunctionCall {
312 id: fc.id,
313 call_id: fc.call_id,
314 name: fc.name,
315 arguments: fc.arguments,
316 });
317 }
318 }
319 "response.completed" => {
320 self.is_complete = true;
321 if let StreamEventData::Response(data) = &event.data
322 && let Some(response) = &data.response
323 {
324 self.usage = response.get("usage").cloned();
325 }
326 }
327 "response.failed" => {
328 self.is_complete = true;
329 }
330 "error" => {
331 if let StreamEventData::Error(data) = &event.data {
332 self.error = Some(data.error.clone());
333 }
334 self.is_complete = true;
335 }
336 _ => {}
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn test_parse_sse_text_delta() {
347 let line =
348 r#"data: {"type":"response.output_text.delta","sequence_number":1,"delta":"Hello"}"#;
349 let event = parse_sse_event(line).unwrap();
350 assert_eq!(event.event_type, "response.output_text.delta");
351 }
352
353 #[test]
354 fn test_parse_done_signal() {
355 assert!(parse_sse_event("[DONE]").is_none());
356 assert!(parse_sse_event("data: [DONE]").is_none());
357 }
358
359 #[test]
360 fn test_stream_accumulator_text() {
361 let mut acc = StreamAccumulator::new();
362
363 let event1 = StreamEvent {
364 event_type: "response.output_text.delta".to_string(),
365 sequence_number: 1,
366 data: StreamEventData::TextDelta(TextDeltaEventData {
367 delta: "Hello, ".to_string(),
368 item_id: None,
369 output_index: None,
370 content_index: None,
371 }),
372 };
373
374 let event2 = StreamEvent {
375 event_type: "response.output_text.delta".to_string(),
376 sequence_number: 2,
377 data: StreamEventData::TextDelta(TextDeltaEventData {
378 delta: "world!".to_string(),
379 item_id: None,
380 output_index: None,
381 content_index: None,
382 }),
383 };
384
385 acc.process_event(&event1);
386 acc.process_event(&event2);
387
388 assert_eq!(acc.text_content, "Hello, world!");
389 }
390}