rig/providers/openai/responses_api/
streaming.rs

1//! The streaming module for the OpenAI Responses API.
2//! Please see the `openai_streaming` or `openai_streaming_with_tools` example for more practical usage.
3use crate::completion::CompletionError;
4use crate::providers::openai::responses_api::{
5    ReasoningSummary, ResponsesCompletionModel, ResponsesUsage,
6};
7use crate::streaming;
8use crate::streaming::RawStreamingChoice;
9use async_stream::stream;
10use futures::StreamExt;
11use reqwest::RequestBuilder;
12use serde::{Deserialize, Serialize};
13use tracing::debug;
14
15use super::{CompletionResponse, Output};
16
17// ================================================================
18// OpenAI Responses Streaming API
19// ================================================================
20
21/// A streaming completion chunk.
22/// Streaming chunks can come in one of two forms:
23/// - A response chunk (where the completed response will have the total token usage)
24/// - An item chunk commonly referred to as a delta. In the completions API this would be referred to as the message delta.
25#[derive(Debug, Serialize, Deserialize, Clone)]
26#[serde(untagged)]
27pub enum StreamingCompletionChunk {
28    Response(Box<ResponseChunk>),
29    Delta(ItemChunk),
30}
31
32/// The final streaming response from the OpenAI Responses API.
33#[derive(Debug, Serialize, Deserialize, Clone)]
34pub struct StreamingCompletionResponse {
35    /// Token usage
36    pub usage: ResponsesUsage,
37}
38
39/// A response chunk from OpenAI's response API.
40#[derive(Debug, Serialize, Deserialize, Clone)]
41pub struct ResponseChunk {
42    /// The response chunk type
43    #[serde(rename = "type")]
44    pub kind: ResponseChunkKind,
45    /// The response itself
46    pub response: CompletionResponse,
47    /// The item sequence
48    pub sequence_number: u64,
49}
50
51/// Response chunk type.
52/// Renames are used to ensure that this type gets (de)serialized properly.
53#[derive(Debug, Serialize, Deserialize, Clone)]
54pub enum ResponseChunkKind {
55    #[serde(rename = "response.created")]
56    ResponseCreated,
57    #[serde(rename = "response.in_progress")]
58    ResponseInProgress,
59    #[serde(rename = "response.completed")]
60    ResponseCompleted,
61    #[serde(rename = "response.failed")]
62    ResponseFailed,
63    #[serde(rename = "response.incomplete")]
64    ResponseIncomplete,
65}
66
67/// An item message chunk from OpenAI's Responses API.
68/// See
69#[derive(Debug, Serialize, Deserialize, Clone)]
70pub struct ItemChunk {
71    /// Item ID. Optional.
72    pub item_id: Option<String>,
73    /// The output index of the item from a given streamed response.
74    pub output_index: u64,
75    /// The item type chunk, as well as the inner data.
76    #[serde(flatten)]
77    pub data: ItemChunkKind,
78}
79
80/// The item chunk type from OpenAI's Responses API.
81#[derive(Debug, Serialize, Deserialize, Clone)]
82#[serde(tag = "type")]
83pub enum ItemChunkKind {
84    #[serde(rename = "response.output_item.added")]
85    OutputItemAdded(StreamingItemDoneOutput),
86    #[serde(rename = "response.output_item.done")]
87    OutputItemDone(StreamingItemDoneOutput),
88    #[serde(rename = "response.content_part.added")]
89    ContentPartAdded(ContentPartChunk),
90    #[serde(rename = "response.content_part.done")]
91    ContentPartDone(ContentPartChunk),
92    #[serde(rename = "response.output_text.delta")]
93    OutputTextDelta(DeltaTextChunk),
94    #[serde(rename = "response.output_text.done")]
95    OutputTextDone(OutputTextChunk),
96    #[serde(rename = "response.refusal.delta")]
97    RefusalDelta(DeltaTextChunk),
98    #[serde(rename = "response.refusal.done")]
99    RefusalDone(RefusalTextChunk),
100    #[serde(rename = "response.function_call_arguments.delta")]
101    FunctionCallArgsDelta(DeltaTextChunk),
102    #[serde(rename = "response.function_call_arguments.done")]
103    FunctionCallArgsDone(ArgsTextChunk),
104    #[serde(rename = "response.reasoning_summary_part.added")]
105    ReasoningSummaryPartAdded(SummaryPartChunk),
106    #[serde(rename = "response.reasoning_summary_part.done")]
107    ReasoningSummaryPartDone(SummaryPartChunk),
108    #[serde(rename = "response.reasoning_summary_text.added")]
109    ReasoningSummaryTextAdded(SummaryTextChunk),
110    #[serde(rename = "response.reasoning_summary_text.done")]
111    ReasoningSummaryTextDone(SummaryTextChunk),
112}
113
114#[derive(Debug, Serialize, Deserialize, Clone)]
115pub struct StreamingItemDoneOutput {
116    pub sequence_number: u64,
117    pub item: Output,
118}
119
120#[derive(Debug, Serialize, Deserialize, Clone)]
121pub struct ContentPartChunk {
122    pub content_index: u64,
123    pub sequence_number: u64,
124    pub part: ContentPartChunkPart,
125}
126
127#[derive(Debug, Serialize, Deserialize, Clone)]
128#[serde(tag = "type")]
129pub enum ContentPartChunkPart {
130    OutputText { text: String },
131    SummaryText { text: String },
132}
133
134#[derive(Debug, Serialize, Deserialize, Clone)]
135pub struct DeltaTextChunk {
136    pub content_index: u64,
137    pub sequence_number: u64,
138    pub delta: String,
139}
140
141#[derive(Debug, Serialize, Deserialize, Clone)]
142pub struct OutputTextChunk {
143    pub content_index: u64,
144    pub sequence_number: u64,
145    pub text: String,
146}
147
148#[derive(Debug, Serialize, Deserialize, Clone)]
149pub struct RefusalTextChunk {
150    pub content_index: u64,
151    pub sequence_number: u64,
152    pub refusal: String,
153}
154
155#[derive(Debug, Serialize, Deserialize, Clone)]
156pub struct ArgsTextChunk {
157    pub content_index: u64,
158    pub sequence_number: u64,
159    pub arguments: serde_json::Value,
160}
161
162#[derive(Debug, Serialize, Deserialize, Clone)]
163pub struct SummaryPartChunk {
164    pub summary_index: u64,
165    pub sequence_number: u64,
166    pub part: SummaryPartChunkPart,
167}
168
169#[derive(Debug, Serialize, Deserialize, Clone)]
170pub struct SummaryTextChunk {
171    pub summary_index: u64,
172    pub sequence_number: u64,
173    pub delta: String,
174}
175
176#[derive(Debug, Serialize, Deserialize, Clone)]
177#[serde(tag = "type")]
178pub enum SummaryPartChunkPart {
179    SummaryText { text: String },
180}
181
182impl ResponsesCompletionModel {
183    pub(crate) async fn stream(
184        &self,
185        completion_request: crate::completion::CompletionRequest,
186    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
187    {
188        let mut request = self.create_completion_request(completion_request)?;
189        request.stream = Some(true);
190
191        tracing::debug!("Input: {}", serde_json::to_string_pretty(&request)?);
192
193        let builder = self.client.post("/responses").json(&request);
194        send_compatible_streaming_request(builder).await
195    }
196}
197
198pub async fn send_compatible_streaming_request(
199    request_builder: RequestBuilder,
200) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError> {
201    let response = request_builder.send().await?;
202
203    if !response.status().is_success() {
204        return Err(CompletionError::ProviderError(format!(
205            "{}: {}",
206            response.status(),
207            response.text().await?
208        )));
209    }
210
211    // Handle OpenAI Compatible SSE chunks
212    let inner = Box::pin(stream! {
213        let mut stream = response.bytes_stream();
214
215        let mut final_usage = ResponsesUsage::new();
216
217        let mut partial_data = None;
218
219        let mut tool_calls: Vec<RawStreamingChoice<StreamingCompletionResponse>> = Vec::new();
220
221        while let Some(chunk_result) = stream.next().await {
222            let chunk = match chunk_result {
223                Ok(c) => c,
224                Err(e) => {
225                    yield Err(CompletionError::from(e));
226                    break;
227                }
228            };
229
230            let text = match String::from_utf8(chunk.to_vec()) {
231                Ok(t) => t,
232                Err(e) => {
233                    yield Err(CompletionError::ResponseError(e.to_string()));
234                    break;
235                }
236            };
237
238            for line in text.lines() {
239                let mut line = line.to_string();
240
241                // If there was a remaining part, concat with current line
242                if partial_data.is_some() {
243                    line = format!("{}{}", partial_data.unwrap(), line);
244                    partial_data = None;
245                }
246                // Otherwise full data line
247                else {
248                    let Some(data) = line.strip_prefix("data: ") else {
249                        continue;
250                    };
251
252                    // Partial data, split somewhere in the middle
253                    if !line.ends_with("}") {
254                        partial_data = Some(data.to_string());
255                    } else {
256                        line = data.to_string();
257                    }
258                }
259
260                let data = serde_json::from_str::<StreamingCompletionChunk>(&line);
261
262                let Ok(data) = data else {
263                    let err = data.unwrap_err();
264                    debug!("Couldn't serialize data as StreamingCompletionResponse: {:?}", err);
265                    continue;
266                };
267
268                debug!("Data get: {data:?}");
269
270
271                if let StreamingCompletionChunk::Delta(chunk) = &data {
272                    match &chunk.data {
273                        ItemChunkKind::OutputItemDone(message) => {
274                            match message {
275                                StreamingItemDoneOutput {  item: Output::FunctionCall(func), .. } => {
276                                    tracing::debug!("Function call received: {func:?}");
277                                    tool_calls.push(streaming::RawStreamingChoice::ToolCall { id: func.id.clone(), call_id: Some(func.call_id.clone()), name: func.name.clone(), arguments: func.arguments.clone() });
278                                }
279
280                                StreamingItemDoneOutput {  item: Output::Reasoning { summary }, .. } => {
281                                    let reasoning = summary
282                                        .iter()
283                                        .map(|x| {
284                                            let ReasoningSummary::SummaryText { text } = x;
285                                            text.to_owned()
286                                        })
287                                        .collect::<Vec<String>>()
288                                        .join("\n");
289                                    yield Ok(streaming::RawStreamingChoice::Reasoning { reasoning })
290                                }
291                                _ => continue
292                            }
293                        }
294                        ItemChunkKind::OutputTextDelta(delta) => {
295                            yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
296                        }
297                        ItemChunkKind::RefusalDelta(delta) => {
298                            yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
299                        }
300
301                        _ => { continue }
302                    }
303                }
304
305                    if let StreamingCompletionChunk::Response(chunk) = data {
306                        if let Some(usage) = chunk.response.usage {
307                        final_usage = usage;
308                        }
309                    }
310            }
311        }
312
313        for tool_call in tool_calls {
314            yield Ok(tool_call)
315        }
316
317        yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
318            usage: final_usage.clone()
319        }))
320    });
321
322    Ok(streaming::StreamingCompletionResponse::stream(inner))
323}