Skip to main content

vtcode_core/open_responses/
events.rs

1//! Semantic streaming events for Open Responses.
2//!
3//! Streaming is modeled as a series of semantic events, not raw text deltas.
4//! Events describe meaningful transitions like state changes or content deltas.
5
6use serde::{Deserialize, Serialize};
7use std::sync::{Arc, Mutex};
8
9use super::{ContentPart, OutputItem, Response, ResponseId};
10
11/// Semantic streaming events per the Open Responses specification.
12///
13/// These events describe meaningful transitions during response generation,
14/// enabling predictable, provider-agnostic streaming clients.
15#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
16#[serde(tag = "type")]
17pub enum ResponseStreamEvent {
18    // ============================================================
19    // Response lifecycle events
20    // ============================================================
21    /// Initial response creation event.
22    #[serde(rename = "response.created")]
23    ResponseCreated {
24        /// The response object with initial state.
25        response: Response,
26    },
27
28    /// Response has started processing.
29    #[serde(rename = "response.in_progress")]
30    ResponseInProgress {
31        /// The response object.
32        response: Response,
33    },
34
35    /// Response completed successfully.
36    #[serde(rename = "response.completed")]
37    ResponseCompleted {
38        /// The final response object.
39        response: Response,
40    },
41
42    /// Response failed with an error.
43    #[serde(rename = "response.failed")]
44    ResponseFailed {
45        /// The response object with error details.
46        response: Response,
47    },
48
49    /// Response is incomplete (e.g., token limit reached).
50    #[serde(rename = "response.incomplete")]
51    ResponseIncomplete {
52        /// The response object with incomplete details.
53        response: Response,
54    },
55
56    // ============================================================
57    // Output item events
58    // ============================================================
59    /// New output item added to the response.
60    #[serde(rename = "response.output_item.added")]
61    OutputItemAdded {
62        /// ID of the containing response.
63        response_id: ResponseId,
64        /// Index of the item in the output array.
65        output_index: usize,
66        /// The output item being added.
67        item: OutputItem,
68    },
69
70    /// Output item is complete.
71    #[serde(rename = "response.output_item.done")]
72    OutputItemDone {
73        /// ID of the containing response.
74        response_id: ResponseId,
75        /// Index of the item in the output array.
76        output_index: usize,
77        /// The completed output item.
78        item: OutputItem,
79    },
80
81    // ============================================================
82    // Content part events
83    // ============================================================
84    /// New content part added to an output item.
85    #[serde(rename = "response.content_part.added")]
86    ContentPartAdded {
87        /// ID of the containing response.
88        response_id: ResponseId,
89        /// ID of the containing output item.
90        item_id: String,
91        /// Index of the item in the output array.
92        output_index: usize,
93        /// Index of the content part within the item.
94        content_index: usize,
95        /// The content part being added.
96        part: ContentPart,
97    },
98
99    /// Content part is complete.
100    #[serde(rename = "response.content_part.done")]
101    ContentPartDone {
102        /// ID of the containing response.
103        response_id: ResponseId,
104        /// ID of the containing output item.
105        item_id: String,
106        /// Index of the item in the output array.
107        output_index: usize,
108        /// Index of the content part within the item.
109        content_index: usize,
110        /// The completed content part.
111        part: ContentPart,
112    },
113
114    // ============================================================
115    // Text streaming events
116    // ============================================================
117    /// Text content delta for incremental streaming.
118    #[serde(rename = "response.output_text.delta")]
119    OutputTextDelta {
120        /// ID of the containing response.
121        response_id: ResponseId,
122        /// ID of the containing output item.
123        item_id: String,
124        /// Index of the item in the output array.
125        output_index: usize,
126        /// Index of the content part within the item.
127        content_index: usize,
128        /// The text delta to append.
129        delta: String,
130    },
131
132    /// Text content is complete.
133    #[serde(rename = "response.output_text.done")]
134    OutputTextDone {
135        /// ID of the containing response.
136        response_id: ResponseId,
137        /// ID of the containing output item.
138        item_id: String,
139        /// Index of the item in the output array.
140        output_index: usize,
141        /// Index of the content part within the item.
142        content_index: usize,
143        /// The complete text content.
144        text: String,
145    },
146
147    // ============================================================
148    // Function call streaming events
149    // ============================================================
150    /// Function call arguments delta.
151    #[serde(rename = "response.function_call_arguments.delta")]
152    FunctionCallArgumentsDelta {
153        /// ID of the containing response.
154        response_id: ResponseId,
155        /// ID of the function call item.
156        item_id: String,
157        /// Index of the item in the output array.
158        output_index: usize,
159        /// The arguments delta to append.
160        delta: String,
161    },
162
163    /// Function call arguments are complete.
164    #[serde(rename = "response.function_call_arguments.done")]
165    FunctionCallArgumentsDone {
166        /// ID of the containing response.
167        response_id: ResponseId,
168        /// ID of the function call item.
169        item_id: String,
170        /// Index of the item in the output array.
171        output_index: usize,
172        /// The complete arguments JSON string.
173        arguments: String,
174    },
175
176    // ============================================================
177    // Reasoning events
178    // ============================================================
179    /// Reasoning content delta.
180    #[serde(rename = "response.reasoning.delta")]
181    ReasoningDelta {
182        /// ID of the containing response.
183        response_id: ResponseId,
184        /// ID of the reasoning item.
185        item_id: String,
186        /// Index of the item in the output array.
187        output_index: usize,
188        /// The reasoning delta to append.
189        delta: String,
190    },
191
192    /// Reasoning content is complete.
193    #[serde(rename = "response.reasoning.done")]
194    ReasoningDone {
195        /// ID of the containing response.
196        response_id: ResponseId,
197        /// ID of the reasoning item.
198        item_id: String,
199        /// Index of the item in the output array.
200        output_index: usize,
201        /// The reasoning item with complete content.
202        item: OutputItem,
203    },
204
205    // ============================================================
206    // Extension events
207    // ============================================================
208    /// Custom/extension streaming event.
209    ///
210    /// Custom event types must be prefixed with the implementor slug
211    /// (e.g., `vtcode.trace_event`).
212    #[serde(rename = "response.custom_event")]
213    CustomEvent {
214        /// ID of the containing response.
215        response_id: ResponseId,
216        /// Custom event type (must be prefixed, e.g., `vtcode.telemetry`).
217        event_type: String,
218        /// Sequence number for ordering.
219        sequence_number: u64,
220        /// Custom event data.
221        data: serde_json::Value,
222    },
223}
224
225impl ResponseStreamEvent {
226    /// Returns the response ID associated with this event.
227    pub fn response_id(&self) -> &str {
228        match self {
229            Self::ResponseCreated { response, .. }
230            | Self::ResponseInProgress { response, .. }
231            | Self::ResponseCompleted { response, .. }
232            | Self::ResponseFailed { response, .. }
233            | Self::ResponseIncomplete { response, .. } => &response.id,
234
235            Self::OutputItemAdded { response_id, .. }
236            | Self::OutputItemDone { response_id, .. }
237            | Self::ContentPartAdded { response_id, .. }
238            | Self::ContentPartDone { response_id, .. }
239            | Self::OutputTextDelta { response_id, .. }
240            | Self::OutputTextDone { response_id, .. }
241            | Self::FunctionCallArgumentsDelta { response_id, .. }
242            | Self::FunctionCallArgumentsDone { response_id, .. }
243            | Self::ReasoningDelta { response_id, .. }
244            | Self::ReasoningDone { response_id, .. }
245            | Self::CustomEvent { response_id, .. } => response_id,
246        }
247    }
248
249    /// Returns the event type name.
250    pub fn event_type(&self) -> &'static str {
251        match self {
252            Self::ResponseCreated { .. } => "response.created",
253            Self::ResponseInProgress { .. } => "response.in_progress",
254            Self::ResponseCompleted { .. } => "response.completed",
255            Self::ResponseFailed { .. } => "response.failed",
256            Self::ResponseIncomplete { .. } => "response.incomplete",
257            Self::OutputItemAdded { .. } => "response.output_item.added",
258            Self::OutputItemDone { .. } => "response.output_item.done",
259            Self::ContentPartAdded { .. } => "response.content_part.added",
260            Self::ContentPartDone { .. } => "response.content_part.done",
261            Self::OutputTextDelta { .. } => "response.output_text.delta",
262            Self::OutputTextDone { .. } => "response.output_text.done",
263            Self::FunctionCallArgumentsDelta { .. } => "response.function_call_arguments.delta",
264            Self::FunctionCallArgumentsDone { .. } => "response.function_call_arguments.done",
265            Self::ReasoningDelta { .. } => "response.reasoning.delta",
266            Self::ReasoningDone { .. } => "response.reasoning.done",
267            Self::CustomEvent { .. } => "response.custom_event",
268        }
269    }
270
271    /// Returns true if this is a response lifecycle event.
272    pub fn is_response_event(&self) -> bool {
273        matches!(
274            self,
275            Self::ResponseCreated { .. }
276                | Self::ResponseInProgress { .. }
277                | Self::ResponseCompleted { .. }
278                | Self::ResponseFailed { .. }
279                | Self::ResponseIncomplete { .. }
280        )
281    }
282
283    /// Returns true if this is a terminal event.
284    pub fn is_terminal(&self) -> bool {
285        matches!(
286            self,
287            Self::ResponseCompleted { .. }
288                | Self::ResponseFailed { .. }
289                | Self::ResponseIncomplete { .. }
290        )
291    }
292}
293
294/// Callback type for streaming events.
295#[expect(dead_code)]
296pub type StreamEventCallback = Arc<Mutex<Box<dyn FnMut(&ResponseStreamEvent) + Send>>>;
297
298/// Trait for emitting Open Responses streaming events.
299pub trait StreamEventEmitter: Send {
300    /// Emit a streaming event.
301    fn emit(&mut self, event: ResponseStreamEvent);
302
303    /// Emit a response created event.
304    fn response_created(&mut self, response: Response) {
305        self.emit(ResponseStreamEvent::ResponseCreated { response });
306    }
307
308    /// Emit a response in progress event.
309    fn response_in_progress(&mut self, response: Response) {
310        self.emit(ResponseStreamEvent::ResponseInProgress { response });
311    }
312
313    /// Emit a response completed event.
314    fn response_completed(&mut self, response: Response) {
315        self.emit(ResponseStreamEvent::ResponseCompleted { response });
316    }
317
318    /// Emit a response failed event.
319    fn response_failed(&mut self, response: Response) {
320        self.emit(ResponseStreamEvent::ResponseFailed { response });
321    }
322
323    /// Emit an output item added event.
324    fn output_item_added(&mut self, response_id: &str, output_index: usize, item: OutputItem) {
325        self.emit(ResponseStreamEvent::OutputItemAdded {
326            response_id: response_id.to_string(),
327            output_index,
328            item,
329        });
330    }
331
332    /// Emit an output item done event.
333    fn output_item_done(&mut self, response_id: &str, output_index: usize, item: OutputItem) {
334        self.emit(ResponseStreamEvent::OutputItemDone {
335            response_id: response_id.to_string(),
336            output_index,
337            item,
338        });
339    }
340
341    /// Emit a text delta event.
342    fn output_text_delta(
343        &mut self,
344        response_id: &str,
345        item_id: &str,
346        output_index: usize,
347        content_index: usize,
348        delta: &str,
349    ) {
350        self.emit(ResponseStreamEvent::OutputTextDelta {
351            response_id: response_id.to_string(),
352            item_id: item_id.to_string(),
353            output_index,
354            content_index,
355            delta: delta.to_string(),
356        });
357    }
358
359    /// Emit a reasoning delta event.
360    fn reasoning_delta(
361        &mut self,
362        response_id: &str,
363        item_id: &str,
364        output_index: usize,
365        delta: &str,
366    ) {
367        self.emit(ResponseStreamEvent::ReasoningDelta {
368            response_id: response_id.to_string(),
369            item_id: item_id.to_string(),
370            output_index,
371            delta: delta.to_string(),
372        });
373    }
374}
375
376/// Vector-based event emitter for collecting events.
377#[derive(Debug, Default)]
378pub struct VecStreamEmitter {
379    events: Vec<ResponseStreamEvent>,
380}
381
382impl VecStreamEmitter {
383    /// Creates a new vector emitter.
384    pub fn new() -> Self {
385        Self::default()
386    }
387
388    /// Returns the collected events.
389    pub fn events(&self) -> &[ResponseStreamEvent] {
390        &self.events
391    }
392
393    /// Consumes and returns the collected events.
394    pub fn into_events(self) -> Vec<ResponseStreamEvent> {
395        self.events
396    }
397}
398
399impl StreamEventEmitter for VecStreamEmitter {
400    fn emit(&mut self, event: ResponseStreamEvent) {
401        self.events.push(event);
402    }
403}
404
405/// Wrapper for streaming events with sequence number for ordering.
406/// Used when serializing events for SSE transport.
407#[derive(Debug, Clone, Serialize)]
408pub struct SequencedEvent<'a> {
409    /// Monotonically increasing sequence number within the stream.
410    pub sequence_number: u64,
411    /// The underlying event.
412    #[serde(flatten)]
413    pub event: &'a ResponseStreamEvent,
414}
415
416impl<'a> SequencedEvent<'a> {
417    /// Creates a new sequenced event.
418    pub fn new(sequence_number: u64, event: &'a ResponseStreamEvent) -> Self {
419        Self {
420            sequence_number,
421            event,
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_event_type() {
432        let response = Response::new("resp_1", "gpt-5");
433        let event = ResponseStreamEvent::ResponseCreated { response };
434        assert_eq!(event.event_type(), "response.created");
435    }
436
437    #[test]
438    fn test_terminal_events() {
439        let response = Response::new("resp_1", "gpt-5");
440        let created = ResponseStreamEvent::ResponseCreated {
441            response: response.clone(),
442        };
443        let completed = ResponseStreamEvent::ResponseCompleted { response };
444        assert!(!created.is_terminal());
445        assert!(completed.is_terminal());
446    }
447
448    #[test]
449    fn test_vec_emitter() {
450        let mut emitter = VecStreamEmitter::new();
451        let response = Response::new("resp_1", "gpt-5");
452        emitter.response_created(response);
453        assert_eq!(emitter.events().len(), 1);
454    }
455}