rig/providers/openai/responses_api/
streaming.rs

1//! The streaming module for the OpenAI Responses API.
2//! Please see the `openai_streaming` or `openai_streaming_with_tools` example for more practical usage.
3use crate::completion::CompletionError;
4use crate::message::Text;
5use crate::providers::openai::responses_api::{
6    AssistantContent, ResponsesCompletionModel, ResponsesUsage,
7};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use async_stream::stream;
11use futures::StreamExt;
12use reqwest::RequestBuilder;
13use serde::{Deserialize, Serialize};
14use tracing::debug;
15
16use super::{CompletionResponse, Output};
17
18// ================================================================
19// OpenAI Responses Streaming API
20// ================================================================
21
22/// A streaming completion chunk.
23/// Streaming chunks can come in one of two forms:
24/// - A response chunk (where the completed response will have the total token usage)
25/// - An item chunk commonly referred to as a delta. In the completions API this would be referred to as the message delta.
26#[derive(Debug, Serialize, Deserialize, Clone)]
27#[serde(untagged)]
28pub enum StreamingCompletionChunk {
29    Response(Box<ResponseChunk>),
30    Delta(ItemChunk),
31}
32
33/// The final streaming response from the OpenAI Responses API.
34#[derive(Debug, Serialize, Deserialize, Clone)]
35pub struct StreamingCompletionResponse {
36    /// Token usage
37    pub usage: ResponsesUsage,
38}
39
40/// A response chunk from OpenAI's response API.
41#[derive(Debug, Serialize, Deserialize, Clone)]
42pub struct ResponseChunk {
43    /// The response chunk type
44    #[serde(rename = "type")]
45    pub kind: ResponseChunkKind,
46    /// The response itself
47    pub response: CompletionResponse,
48    /// The item sequence
49    pub sequence_number: u64,
50}
51
52/// Response chunk type.
53/// Renames are used to ensure that this type gets (de)serialized properly.
54#[derive(Debug, Serialize, Deserialize, Clone)]
55pub enum ResponseChunkKind {
56    #[serde(rename = "response.created")]
57    ResponseCreated,
58    #[serde(rename = "response.in_progress")]
59    ResponseInProgress,
60    #[serde(rename = "response.completed")]
61    ResponseCompleted,
62    #[serde(rename = "response.failed")]
63    ResponseFailed,
64    #[serde(rename = "response.incomplete")]
65    ResponseIncomplete,
66}
67
68/// An item message chunk from OpenAI's Responses API.
69/// See
70#[derive(Debug, Serialize, Deserialize, Clone)]
71pub struct ItemChunk {
72    /// Item ID. Optional.
73    pub item_id: Option<String>,
74    /// The output index of the item from a given streamed response.
75    pub output_index: u64,
76    /// The item type chunk, as well as the inner data.
77    #[serde(flatten)]
78    pub data: ItemChunkKind,
79}
80
81/// The item chunk type from OpenAI's Responses API.
82#[derive(Debug, Serialize, Deserialize, Clone)]
83#[serde(tag = "type")]
84pub enum ItemChunkKind {
85    #[serde(rename = "response.output_item.added")]
86    OutputItemAdded(StreamingItemDoneOutput),
87    #[serde(rename = "response.output_item.done")]
88    OutputItemDone(StreamingItemDoneOutput),
89    #[serde(rename = "response.content_part.added")]
90    ContentPartAdded(ContentPartChunk),
91    #[serde(rename = "response.content_part.done")]
92    ContentPartDone(ContentPartChunk),
93    #[serde(rename = "response.output_text.delta")]
94    OutputTextDelta(OutputTextChunk),
95    #[serde(rename = "response.output_text.done")]
96    OutputTextDone(OutputTextChunk),
97    #[serde(rename = "response.refusal.delta")]
98    RefusalDelta(OutputTextChunk),
99    #[serde(rename = "response.refusal.done")]
100    RefusalDone(OutputTextChunk),
101    #[serde(rename = "response.function_call_arguments.delta")]
102    FunctionCallArgsDelta(OutputTextChunk),
103    #[serde(rename = "response.function_call_arguments.done")]
104    FunctionCallArgsDone(OutputTextChunk),
105    #[serde(rename = "response.reasoning_summary_part.added")]
106    ReasoningSummaryPartAdded(SummaryPartChunk),
107    #[serde(rename = "response.reasoning_summary_part.done")]
108    ReasoningSummaryPartDone(SummaryPartChunk),
109    #[serde(rename = "response.reasoning_summary_text.added")]
110    ReasoningSummaryTextAdded(SummaryTextChunk),
111    #[serde(rename = "response.reasoning_summary_text.done")]
112    ReasoningSummaryTextDone(SummaryTextChunk),
113}
114
115#[derive(Debug, Serialize, Deserialize, Clone)]
116pub struct StreamingItemDoneOutput {
117    pub sequence_number: u64,
118    pub item: Output,
119}
120
121#[derive(Debug, Serialize, Deserialize, Clone)]
122pub struct ContentPartChunk {
123    pub content_index: u64,
124    pub sequence_number: u64,
125    pub part: ContentPartChunkPart,
126}
127
128#[derive(Debug, Serialize, Deserialize, Clone)]
129#[serde(tag = "type")]
130pub enum ContentPartChunkPart {
131    OutputText { text: String },
132    SummaryText { text: String },
133}
134
135#[derive(Debug, Serialize, Deserialize, Clone)]
136pub struct OutputTextChunk {
137    pub content_index: u64,
138    pub sequence_number: u64,
139    #[serde(flatten)]
140    pub data: OutputTextChunkData,
141}
142
143#[derive(Debug, Serialize, Deserialize, Clone)]
144#[serde(untagged)]
145pub enum OutputTextChunkData {
146    Delta { delta: String },
147    Text { text: String },
148    Refusal { refusal: String },
149    Arguments { arguments: serde_json::Value },
150}
151
152#[derive(Debug, Serialize, Deserialize, Clone)]
153pub struct SummaryPartChunk {
154    pub summary_index: u64,
155    pub sequence_number: u64,
156    pub part: SummaryPartChunkPart,
157}
158
159#[derive(Debug, Serialize, Deserialize, Clone)]
160pub struct SummaryTextChunk {
161    pub summary_index: u64,
162    pub sequence_number: u64,
163    pub part: OutputTextChunkData,
164}
165
166#[derive(Debug, Serialize, Deserialize, Clone)]
167#[serde(tag = "type")]
168pub enum SummaryPartChunkPart {
169    SummaryText { text: String },
170}
171
172impl ResponsesCompletionModel {
173    pub(crate) async fn stream(
174        &self,
175        completion_request: crate::completion::CompletionRequest,
176    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
177    {
178        let mut request = self.create_completion_request(completion_request)?;
179        request.stream = Some(true);
180
181        let builder = self.client.post("/responses").json(&request);
182        send_compatible_streaming_request(builder).await
183    }
184}
185
186pub async fn send_compatible_streaming_request(
187    request_builder: RequestBuilder,
188) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError> {
189    let response = request_builder.send().await?;
190
191    if !response.status().is_success() {
192        return Err(CompletionError::ProviderError(format!(
193            "{}: {}",
194            response.status(),
195            response.text().await?
196        )));
197    }
198
199    // Handle OpenAI Compatible SSE chunks
200    let inner = Box::pin(stream! {
201        let mut stream = response.bytes_stream();
202
203        let mut final_usage = ResponsesUsage::new();
204
205        let mut partial_data = None;
206
207        let mut tool_calls: Vec<RawStreamingChoice<StreamingCompletionResponse>> = Vec::new();
208
209        while let Some(chunk_result) = stream.next().await {
210            let chunk = match chunk_result {
211                Ok(c) => c,
212                Err(e) => {
213                    yield Err(CompletionError::from(e));
214                    break;
215                }
216            };
217
218            let text = match String::from_utf8(chunk.to_vec()) {
219                Ok(t) => t,
220                Err(e) => {
221                    yield Err(CompletionError::ResponseError(e.to_string()));
222                    break;
223                }
224            };
225
226            for line in text.lines() {
227                let mut line = line.to_string();
228
229                // If there was a remaining part, concat with current line
230                if partial_data.is_some() {
231                    line = format!("{}{}", partial_data.unwrap(), line);
232                    partial_data = None;
233                }
234                // Otherwise full data line
235                else {
236                    let Some(data) = line.strip_prefix("data: ") else {
237                        continue;
238                    };
239
240                    // Partial data, split somewhere in the middle
241                    if !line.ends_with("}") {
242                        partial_data = Some(data.to_string());
243                    } else {
244                        line = data.to_string();
245                    }
246                }
247
248                let data = serde_json::from_str::<StreamingCompletionChunk>(&line);
249
250                let Ok(data) = data else {
251                    let err = data.unwrap_err();
252                    debug!("Couldn't serialize data as StreamingCompletionResponse: {:?}", err);
253                    continue;
254                };
255
256                debug!("Data get: {data:?}");
257
258
259                if let StreamingCompletionChunk::Delta(chunk) = &data {
260                    match &chunk.data {
261                        ItemChunkKind::OutputItemDone(message) => {
262                            match message {
263                                StreamingItemDoneOutput {  item: Output::Message(message), .. } => {
264                                        let message = match message.content.first().unwrap() {
265                                            AssistantContent::OutputText(Text { text}) => text,
266                                            AssistantContent::Refusal { refusal } => refusal
267                                        };
268
269                                        yield Ok(streaming::RawStreamingChoice::Message(message.clone()))
270                                }
271                                StreamingItemDoneOutput {  item: Output::FunctionCall(func), .. } => {
272                                    tool_calls.push(streaming::RawStreamingChoice::ToolCall { id: func.id.clone(), call_id: Some(func.call_id.clone()), name: func.name.clone(), arguments: func.arguments.clone() });
273                                }
274                            }
275                        }
276                        ItemChunkKind::OutputTextDelta(delta) => {
277                            let OutputTextChunkData::Delta { ref delta } = delta.data else {
278                                panic!("Placeholder error!");
279                            };
280
281                            yield Ok(streaming::RawStreamingChoice::Message(delta.clone()))
282                        }
283
284                        _ => { continue }
285                    }
286                }
287
288                    if let StreamingCompletionChunk::Response(chunk) = data {
289                        if let Some(usage) = chunk.response.usage {
290                        final_usage = usage;
291                        }
292                    }
293            }
294        }
295
296        for tool_call in tool_calls {
297            yield Ok(tool_call)
298        }
299
300        yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
301            usage: final_usage.clone()
302        }))
303    });
304
305    Ok(streaming::StreamingCompletionResponse::stream(inner))
306}