Skip to main content

vtcode_core/llm/providers/openresponses/
streaming.rs

1//! OpenResponses streaming event types.
2//!
3//! This module defines the semantic streaming events used by the OpenResponses specification.
4//! See <https://www.openresponses.org/specification#streaming> for details.
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9// ============================================================================
10// Streaming Event Types
11// ============================================================================
12
13/// All possible streaming event types in OpenResponses.
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub enum StreamEventType {
16    // Response lifecycle events
17    #[serde(rename = "response.created")]
18    ResponseCreated,
19    #[serde(rename = "response.in_progress")]
20    ResponseInProgress,
21    #[serde(rename = "response.completed")]
22    ResponseCompleted,
23    #[serde(rename = "response.failed")]
24    ResponseFailed,
25    #[serde(rename = "response.incomplete")]
26    ResponseIncomplete,
27
28    // Output item events
29    #[serde(rename = "response.output_item.added")]
30    OutputItemAdded,
31    #[serde(rename = "response.output_item.done")]
32    OutputItemDone,
33
34    // Text delta events
35    #[serde(rename = "response.output_text.delta")]
36    OutputTextDelta,
37    #[serde(rename = "response.output_text.done")]
38    OutputTextDone,
39
40    // Content part events
41    #[serde(rename = "response.content_part.added")]
42    ContentPartAdded,
43    #[serde(rename = "response.content_part.done")]
44    ContentPartDone,
45
46    // Function call events
47    #[serde(rename = "response.function_call_arguments.delta")]
48    FunctionCallArgumentsDelta,
49    #[serde(rename = "response.function_call_arguments.done")]
50    FunctionCallArgumentsDone,
51
52    // Reasoning events
53    #[serde(rename = "response.reasoning_summary_text.delta")]
54    ReasoningSummaryTextDelta,
55    #[serde(rename = "response.reasoning_summary_text.done")]
56    ReasoningSummaryTextDone,
57
58    // Reasoning content events
59    #[serde(rename = "response.reasoning_content.delta")]
60    ReasoningContentDelta,
61    #[serde(rename = "response.reasoning_content.done")]
62    ReasoningContentDone,
63
64    // Error event
65    #[serde(rename = "error")]
66    Error,
67}
68
69/// A streaming event from the OpenResponses API.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct StreamEvent {
72    #[serde(rename = "type")]
73    pub event_type: String,
74    #[serde(default)]
75    pub sequence_number: u32,
76    #[serde(flatten)]
77    pub data: StreamEventData,
78}
79
80/// Data payload for different streaming events.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(untagged)]
83pub enum StreamEventData {
84    /// Response lifecycle event data.
85    Response(ResponseEventData),
86    /// Output item event data.
87    OutputItem(OutputItemEventData),
88    /// Text delta event data.
89    TextDelta(TextDeltaEventData),
90    /// Function call arguments delta.
91    FunctionCallDelta(FunctionCallDeltaEventData),
92    /// Reasoning content delta.
93    ReasoningContentDelta(ReasoningContentDeltaEventData),
94    /// Error event data.
95    Error(ErrorEventData),
96    /// Generic/unknown event data.
97    Generic(Value),
98}
99
100/// Data for response lifecycle events.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct ResponseEventData {
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub response: Option<Value>,
105}
106
107/// Data for output item events.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct OutputItemEventData {
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub item: Option<Value>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub output_index: Option<u32>,
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub item_id: Option<String>,
116}
117
118/// Data for text delta events.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct TextDeltaEventData {
121    pub delta: String,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub item_id: Option<String>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub output_index: Option<u32>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub content_index: Option<u32>,
128}
129
130/// Data for function call argument delta events.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct FunctionCallDeltaEventData {
133    pub delta: String,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub item_id: Option<String>,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub output_index: Option<u32>,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub call_id: Option<String>,
140}
141
142/// Data for reasoning content delta events.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct ReasoningContentDeltaEventData {
145    pub delta: String,
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub item_id: Option<String>,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub output_index: Option<u32>,
150}
151
152/// Data for error events.
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ErrorEventData {
155    pub error: StreamError,
156}
157
158/// Error details in streaming.
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct StreamError {
161    pub code: String,
162    pub message: String,
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub param: Option<String>,
165}
166
167// ============================================================================
168// Stream Parsing Utilities
169// ============================================================================
170
171/// Parse a Server-Sent Events (SSE) line into a stream event.
172pub fn parse_sse_event(line: &str) -> Option<StreamEvent> {
173    // SSE format: "data: {...}"
174    let line = line.trim();
175    if line.is_empty() || line == "[DONE]" {
176        return None;
177    }
178
179    if let Some(data) = line.strip_prefix("data: ") {
180        if data == "[DONE]" {
181            return None;
182        }
183        serde_json::from_str(data).ok()
184    } else if line.starts_with('{') {
185        // Some implementations send raw JSON
186        serde_json::from_str(line).ok()
187    } else {
188        None
189    }
190}
191
192/// Extract the event type from an SSE event line.
193pub fn extract_event_type(line: &str) -> Option<String> {
194    let line = line.trim();
195    line.strip_prefix("event: ")
196        .map(|event_type| event_type.to_string())
197}
198
199/// Accumulator for building responses from streaming events.
200#[derive(Debug, Default)]
201pub struct StreamAccumulator {
202    pub text_content: String,
203    pub reasoning_content: String,
204    pub reasoning_summary: String,
205    pub function_calls: Vec<AccumulatedFunctionCall>,
206    pub current_function_call: Option<AccumulatingFunctionCall>,
207    pub output_items: Vec<Value>,
208    pub response_id: Option<String>,
209    pub model: Option<String>,
210    pub usage: Option<Value>,
211    pub is_complete: bool,
212    pub error: Option<StreamError>,
213}
214
215/// A function call being accumulated from streaming deltas.
216#[derive(Debug, Clone, Default)]
217pub struct AccumulatingFunctionCall {
218    pub id: String,
219    pub call_id: String,
220    pub name: String,
221    pub arguments: String,
222}
223
224/// A completed accumulated function call.
225#[derive(Debug, Clone)]
226pub struct AccumulatedFunctionCall {
227    pub id: String,
228    pub call_id: String,
229    pub name: String,
230    pub arguments: String,
231}
232
233impl StreamAccumulator {
234    pub fn new() -> Self {
235        Self::default()
236    }
237
238    /// Process a streaming event and update the accumulator state.
239    pub fn process_event(&mut self, event: &StreamEvent) {
240        match event.event_type.as_str() {
241            "response.created" | "response.in_progress" => {
242                if let StreamEventData::Response(data) = &event.data
243                    && let Some(response) = &data.response
244                {
245                    self.response_id = response
246                        .get("id")
247                        .and_then(|v| v.as_str())
248                        .map(String::from);
249                    self.model = response
250                        .get("model")
251                        .and_then(|v| v.as_str())
252                        .map(String::from);
253                }
254            }
255            "response.output_text.delta" => {
256                if let StreamEventData::TextDelta(data) = &event.data {
257                    self.text_content.push_str(&data.delta);
258                }
259            }
260            "response.reasoning_summary_text.delta" => {
261                // Summary reasoning (sanitized version)
262                if let StreamEventData::TextDelta(data) = &event.data {
263                    self.reasoning_summary.push_str(&data.delta);
264                }
265            }
266            "response.reasoning_content.delta" => {
267                // Raw reasoning traces (preferred over summary)
268                if let StreamEventData::ReasoningContentDelta(data) = &event.data {
269                    self.reasoning_content.push_str(&data.delta);
270                }
271            }
272            "response.function_call_arguments.delta" => {
273                if let StreamEventData::FunctionCallDelta(data) = &event.data
274                    && let Some(ref mut fc) = self.current_function_call
275                {
276                    fc.arguments.push_str(&data.delta);
277                }
278            }
279            "response.output_item.added" => {
280                if let StreamEventData::OutputItem(data) = &event.data
281                    && let Some(item) = &data.item
282                {
283                    // Check if this is a function call item
284                    if item.get("type").and_then(|v| v.as_str()) == Some("function_call") {
285                        let fc = AccumulatingFunctionCall {
286                            id: item
287                                .get("id")
288                                .and_then(|v| v.as_str())
289                                .unwrap_or_default()
290                                .to_string(),
291                            call_id: item
292                                .get("call_id")
293                                .and_then(|v| v.as_str())
294                                .unwrap_or_default()
295                                .to_string(),
296                            name: item
297                                .get("name")
298                                .and_then(|v| v.as_str())
299                                .unwrap_or_default()
300                                .to_string(),
301                            arguments: String::new(),
302                        };
303                        self.current_function_call = Some(fc);
304                    }
305                    self.output_items.push(item.clone());
306                }
307            }
308            "response.output_item.done" => {
309                // Finalize current function call if any
310                if let Some(fc) = self.current_function_call.take() {
311                    self.function_calls.push(AccumulatedFunctionCall {
312                        id: fc.id,
313                        call_id: fc.call_id,
314                        name: fc.name,
315                        arguments: fc.arguments,
316                    });
317                }
318            }
319            "response.completed" => {
320                self.is_complete = true;
321                if let StreamEventData::Response(data) = &event.data
322                    && let Some(response) = &data.response
323                {
324                    self.usage = response.get("usage").cloned();
325                }
326            }
327            "response.failed" => {
328                self.is_complete = true;
329            }
330            "error" => {
331                if let StreamEventData::Error(data) = &event.data {
332                    self.error = Some(data.error.clone());
333                }
334                self.is_complete = true;
335            }
336            _ => {}
337        }
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn test_parse_sse_text_delta() {
347        let line =
348            r#"data: {"type":"response.output_text.delta","sequence_number":1,"delta":"Hello"}"#;
349        let event = parse_sse_event(line).unwrap();
350        assert_eq!(event.event_type, "response.output_text.delta");
351    }
352
353    #[test]
354    fn test_parse_done_signal() {
355        assert!(parse_sse_event("[DONE]").is_none());
356        assert!(parse_sse_event("data: [DONE]").is_none());
357    }
358
359    #[test]
360    fn test_stream_accumulator_text() {
361        let mut acc = StreamAccumulator::new();
362
363        let event1 = StreamEvent {
364            event_type: "response.output_text.delta".to_string(),
365            sequence_number: 1,
366            data: StreamEventData::TextDelta(TextDeltaEventData {
367                delta: "Hello, ".to_string(),
368                item_id: None,
369                output_index: None,
370                content_index: None,
371            }),
372        };
373
374        let event2 = StreamEvent {
375            event_type: "response.output_text.delta".to_string(),
376            sequence_number: 2,
377            data: StreamEventData::TextDelta(TextDeltaEventData {
378                delta: "world!".to_string(),
379                item_id: None,
380                output_index: None,
381                content_index: None,
382            }),
383        };
384
385        acc.process_event(&event1);
386        acc.process_event(&event2);
387
388        assert_eq!(acc.text_content, "Hello, world!");
389    }
390}