rig/providers/openai/responses_api/
streaming.rs1use crate::completion::CompletionError;
4use crate::message::Text;
5use crate::providers::openai::responses_api::{
6 AssistantContent, ResponsesCompletionModel, ResponsesUsage,
7};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use async_stream::stream;
11use futures::StreamExt;
12use reqwest::RequestBuilder;
13use serde::{Deserialize, Serialize};
14use tracing::debug;
15
16use super::{CompletionResponse, Output};
17
18#[derive(Debug, Serialize, Deserialize, Clone)]
27#[serde(untagged)]
28pub enum StreamingCompletionChunk {
29 Response(Box<ResponseChunk>),
30 Delta(ItemChunk),
31}
32
33#[derive(Debug, Serialize, Deserialize, Clone)]
35pub struct StreamingCompletionResponse {
36 pub usage: ResponsesUsage,
38}
39
40#[derive(Debug, Serialize, Deserialize, Clone)]
42pub struct ResponseChunk {
43 #[serde(rename = "type")]
45 pub kind: ResponseChunkKind,
46 pub response: CompletionResponse,
48 pub sequence_number: u64,
50}
51
52#[derive(Debug, Serialize, Deserialize, Clone)]
55pub enum ResponseChunkKind {
56 #[serde(rename = "response.created")]
57 ResponseCreated,
58 #[serde(rename = "response.in_progress")]
59 ResponseInProgress,
60 #[serde(rename = "response.completed")]
61 ResponseCompleted,
62 #[serde(rename = "response.failed")]
63 ResponseFailed,
64 #[serde(rename = "response.incomplete")]
65 ResponseIncomplete,
66}
67
68#[derive(Debug, Serialize, Deserialize, Clone)]
71pub struct ItemChunk {
72 pub item_id: Option<String>,
74 pub output_index: u64,
76 #[serde(flatten)]
78 pub data: ItemChunkKind,
79}
80
81#[derive(Debug, Serialize, Deserialize, Clone)]
83#[serde(tag = "type")]
84pub enum ItemChunkKind {
85 #[serde(rename = "response.output_item.added")]
86 OutputItemAdded(StreamingItemDoneOutput),
87 #[serde(rename = "response.output_item.done")]
88 OutputItemDone(StreamingItemDoneOutput),
89 #[serde(rename = "response.content_part.added")]
90 ContentPartAdded(ContentPartChunk),
91 #[serde(rename = "response.content_part.done")]
92 ContentPartDone(ContentPartChunk),
93 #[serde(rename = "response.output_text.delta")]
94 OutputTextDelta(OutputTextChunk),
95 #[serde(rename = "response.output_text.done")]
96 OutputTextDone(OutputTextChunk),
97 #[serde(rename = "response.refusal.delta")]
98 RefusalDelta(OutputTextChunk),
99 #[serde(rename = "response.refusal.done")]
100 RefusalDone(OutputTextChunk),
101 #[serde(rename = "response.function_call_arguments.delta")]
102 FunctionCallArgsDelta(OutputTextChunk),
103 #[serde(rename = "response.function_call_arguments.done")]
104 FunctionCallArgsDone(OutputTextChunk),
105 #[serde(rename = "response.reasoning_summary_part.added")]
106 ReasoningSummaryPartAdded(SummaryPartChunk),
107 #[serde(rename = "response.reasoning_summary_part.done")]
108 ReasoningSummaryPartDone(SummaryPartChunk),
109 #[serde(rename = "response.reasoning_summary_text.added")]
110 ReasoningSummaryTextAdded(SummaryTextChunk),
111 #[serde(rename = "response.reasoning_summary_text.done")]
112 ReasoningSummaryTextDone(SummaryTextChunk),
113}
114
115#[derive(Debug, Serialize, Deserialize, Clone)]
116pub struct StreamingItemDoneOutput {
117 pub sequence_number: u64,
118 pub item: Output,
119}
120
121#[derive(Debug, Serialize, Deserialize, Clone)]
122pub struct ContentPartChunk {
123 pub content_index: u64,
124 pub sequence_number: u64,
125 pub part: ContentPartChunkPart,
126}
127
128#[derive(Debug, Serialize, Deserialize, Clone)]
129#[serde(tag = "type")]
130pub enum ContentPartChunkPart {
131 OutputText { text: String },
132 SummaryText { text: String },
133}
134
135#[derive(Debug, Serialize, Deserialize, Clone)]
136pub struct OutputTextChunk {
137 pub content_index: u64,
138 pub sequence_number: u64,
139 #[serde(flatten)]
140 pub data: OutputTextChunkData,
141}
142
143#[derive(Debug, Serialize, Deserialize, Clone)]
144#[serde(untagged)]
145pub enum OutputTextChunkData {
146 Delta { delta: String },
147 Text { text: String },
148 Refusal { refusal: String },
149 Arguments { arguments: serde_json::Value },
150}
151
152#[derive(Debug, Serialize, Deserialize, Clone)]
153pub struct SummaryPartChunk {
154 pub summary_index: u64,
155 pub sequence_number: u64,
156 pub part: SummaryPartChunkPart,
157}
158
159#[derive(Debug, Serialize, Deserialize, Clone)]
160pub struct SummaryTextChunk {
161 pub summary_index: u64,
162 pub sequence_number: u64,
163 pub part: OutputTextChunkData,
164}
165
166#[derive(Debug, Serialize, Deserialize, Clone)]
167#[serde(tag = "type")]
168pub enum SummaryPartChunkPart {
169 SummaryText { text: String },
170}
171
172impl ResponsesCompletionModel {
173 pub(crate) async fn stream(
174 &self,
175 completion_request: crate::completion::CompletionRequest,
176 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
177 {
178 let mut request = self.create_completion_request(completion_request)?;
179 request.stream = Some(true);
180
181 let builder = self.client.post("/responses").json(&request);
182 send_compatible_streaming_request(builder).await
183 }
184}
185
186pub async fn send_compatible_streaming_request(
187 request_builder: RequestBuilder,
188) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError> {
189 let response = request_builder.send().await?;
190
191 if !response.status().is_success() {
192 return Err(CompletionError::ProviderError(format!(
193 "{}: {}",
194 response.status(),
195 response.text().await?
196 )));
197 }
198
199 let inner = Box::pin(stream! {
201 let mut stream = response.bytes_stream();
202
203 let mut final_usage = ResponsesUsage::new();
204
205 let mut partial_data = None;
206
207 let mut tool_calls: Vec<RawStreamingChoice<StreamingCompletionResponse>> = Vec::new();
208
209 while let Some(chunk_result) = stream.next().await {
210 let chunk = match chunk_result {
211 Ok(c) => c,
212 Err(e) => {
213 yield Err(CompletionError::from(e));
214 break;
215 }
216 };
217
218 let text = match String::from_utf8(chunk.to_vec()) {
219 Ok(t) => t,
220 Err(e) => {
221 yield Err(CompletionError::ResponseError(e.to_string()));
222 break;
223 }
224 };
225
226 for line in text.lines() {
227 let mut line = line.to_string();
228
229 if partial_data.is_some() {
231 line = format!("{}{}", partial_data.unwrap(), line);
232 partial_data = None;
233 }
234 else {
236 let Some(data) = line.strip_prefix("data: ") else {
237 continue;
238 };
239
240 if !line.ends_with("}") {
242 partial_data = Some(data.to_string());
243 } else {
244 line = data.to_string();
245 }
246 }
247
248 let data = serde_json::from_str::<StreamingCompletionChunk>(&line);
249
250 let Ok(data) = data else {
251 let err = data.unwrap_err();
252 debug!("Couldn't serialize data as StreamingCompletionResponse: {:?}", err);
253 continue;
254 };
255
256 debug!("Data get: {data:?}");
257
258
259 if let StreamingCompletionChunk::Delta(chunk) = &data {
260 match &chunk.data {
261 ItemChunkKind::OutputItemDone(message) => {
262 match message {
263 StreamingItemDoneOutput { item: Output::Message(message), .. } => {
264 let message = match message.content.first().unwrap() {
265 AssistantContent::OutputText(Text { text}) => text,
266 AssistantContent::Refusal { refusal } => refusal
267 };
268
269 yield Ok(streaming::RawStreamingChoice::Message(message.clone()))
270 }
271 StreamingItemDoneOutput { item: Output::FunctionCall(func), .. } => {
272 tool_calls.push(streaming::RawStreamingChoice::ToolCall { id: func.id.clone(), call_id: Some(func.call_id.clone()), name: func.name.clone(), arguments: func.arguments.clone() });
273 }
274 }
275 }
276 ItemChunkKind::OutputTextDelta(delta) => {
277 let OutputTextChunkData::Delta { ref delta } = delta.data else {
278 panic!("Placeholder error!");
279 };
280
281 yield Ok(streaming::RawStreamingChoice::Message(delta.clone()))
282 }
283
284 _ => { continue }
285 }
286 }
287
288 if let StreamingCompletionChunk::Response(chunk) = data {
289 if let Some(usage) = chunk.response.usage {
290 final_usage = usage;
291 }
292 }
293 }
294 }
295
296 for tool_call in tool_calls {
297 yield Ok(tool_call)
298 }
299
300 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
301 usage: final_usage.clone()
302 }))
303 });
304
305 Ok(streaming::StreamingCompletionResponse::stream(inner))
306}