rig/providers/openai/responses_api/
streaming.rs1use crate::completion::CompletionError;
4use crate::providers::openai::responses_api::{
5 ReasoningSummary, ResponsesCompletionModel, ResponsesUsage,
6};
7use crate::streaming;
8use crate::streaming::RawStreamingChoice;
9use async_stream::stream;
10use futures::StreamExt;
11use reqwest::RequestBuilder;
12use serde::{Deserialize, Serialize};
13use tracing::debug;
14
15use super::{CompletionResponse, Output};
16
17#[derive(Debug, Serialize, Deserialize, Clone)]
26#[serde(untagged)]
27pub enum StreamingCompletionChunk {
28 Response(Box<ResponseChunk>),
29 Delta(ItemChunk),
30}
31
32#[derive(Debug, Serialize, Deserialize, Clone)]
34pub struct StreamingCompletionResponse {
35 pub usage: ResponsesUsage,
37}
38
39#[derive(Debug, Serialize, Deserialize, Clone)]
41pub struct ResponseChunk {
42 #[serde(rename = "type")]
44 pub kind: ResponseChunkKind,
45 pub response: CompletionResponse,
47 pub sequence_number: u64,
49}
50
51#[derive(Debug, Serialize, Deserialize, Clone)]
54pub enum ResponseChunkKind {
55 #[serde(rename = "response.created")]
56 ResponseCreated,
57 #[serde(rename = "response.in_progress")]
58 ResponseInProgress,
59 #[serde(rename = "response.completed")]
60 ResponseCompleted,
61 #[serde(rename = "response.failed")]
62 ResponseFailed,
63 #[serde(rename = "response.incomplete")]
64 ResponseIncomplete,
65}
66
67#[derive(Debug, Serialize, Deserialize, Clone)]
70pub struct ItemChunk {
71 pub item_id: Option<String>,
73 pub output_index: u64,
75 #[serde(flatten)]
77 pub data: ItemChunkKind,
78}
79
80#[derive(Debug, Serialize, Deserialize, Clone)]
82#[serde(tag = "type")]
83pub enum ItemChunkKind {
84 #[serde(rename = "response.output_item.added")]
85 OutputItemAdded(StreamingItemDoneOutput),
86 #[serde(rename = "response.output_item.done")]
87 OutputItemDone(StreamingItemDoneOutput),
88 #[serde(rename = "response.content_part.added")]
89 ContentPartAdded(ContentPartChunk),
90 #[serde(rename = "response.content_part.done")]
91 ContentPartDone(ContentPartChunk),
92 #[serde(rename = "response.output_text.delta")]
93 OutputTextDelta(DeltaTextChunk),
94 #[serde(rename = "response.output_text.done")]
95 OutputTextDone(OutputTextChunk),
96 #[serde(rename = "response.refusal.delta")]
97 RefusalDelta(DeltaTextChunk),
98 #[serde(rename = "response.refusal.done")]
99 RefusalDone(RefusalTextChunk),
100 #[serde(rename = "response.function_call_arguments.delta")]
101 FunctionCallArgsDelta(DeltaTextChunk),
102 #[serde(rename = "response.function_call_arguments.done")]
103 FunctionCallArgsDone(ArgsTextChunk),
104 #[serde(rename = "response.reasoning_summary_part.added")]
105 ReasoningSummaryPartAdded(SummaryPartChunk),
106 #[serde(rename = "response.reasoning_summary_part.done")]
107 ReasoningSummaryPartDone(SummaryPartChunk),
108 #[serde(rename = "response.reasoning_summary_text.added")]
109 ReasoningSummaryTextAdded(SummaryTextChunk),
110 #[serde(rename = "response.reasoning_summary_text.done")]
111 ReasoningSummaryTextDone(SummaryTextChunk),
112}
113
114#[derive(Debug, Serialize, Deserialize, Clone)]
115pub struct StreamingItemDoneOutput {
116 pub sequence_number: u64,
117 pub item: Output,
118}
119
120#[derive(Debug, Serialize, Deserialize, Clone)]
121pub struct ContentPartChunk {
122 pub content_index: u64,
123 pub sequence_number: u64,
124 pub part: ContentPartChunkPart,
125}
126
127#[derive(Debug, Serialize, Deserialize, Clone)]
128#[serde(tag = "type")]
129pub enum ContentPartChunkPart {
130 OutputText { text: String },
131 SummaryText { text: String },
132}
133
134#[derive(Debug, Serialize, Deserialize, Clone)]
135pub struct DeltaTextChunk {
136 pub content_index: u64,
137 pub sequence_number: u64,
138 pub delta: String,
139}
140
141#[derive(Debug, Serialize, Deserialize, Clone)]
142pub struct OutputTextChunk {
143 pub content_index: u64,
144 pub sequence_number: u64,
145 pub text: String,
146}
147
148#[derive(Debug, Serialize, Deserialize, Clone)]
149pub struct RefusalTextChunk {
150 pub content_index: u64,
151 pub sequence_number: u64,
152 pub refusal: String,
153}
154
155#[derive(Debug, Serialize, Deserialize, Clone)]
156pub struct ArgsTextChunk {
157 pub content_index: u64,
158 pub sequence_number: u64,
159 pub arguments: serde_json::Value,
160}
161
162#[derive(Debug, Serialize, Deserialize, Clone)]
163pub struct SummaryPartChunk {
164 pub summary_index: u64,
165 pub sequence_number: u64,
166 pub part: SummaryPartChunkPart,
167}
168
169#[derive(Debug, Serialize, Deserialize, Clone)]
170pub struct SummaryTextChunk {
171 pub summary_index: u64,
172 pub sequence_number: u64,
173 pub delta: String,
174}
175
176#[derive(Debug, Serialize, Deserialize, Clone)]
177#[serde(tag = "type")]
178pub enum SummaryPartChunkPart {
179 SummaryText { text: String },
180}
181
182impl ResponsesCompletionModel {
183 pub(crate) async fn stream(
184 &self,
185 completion_request: crate::completion::CompletionRequest,
186 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
187 {
188 let mut request = self.create_completion_request(completion_request)?;
189 request.stream = Some(true);
190
191 tracing::debug!("Input: {}", serde_json::to_string_pretty(&request)?);
192
193 let builder = self.client.post("/responses").json(&request);
194 send_compatible_streaming_request(builder).await
195 }
196}
197
198pub async fn send_compatible_streaming_request(
199 request_builder: RequestBuilder,
200) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError> {
201 let response = request_builder.send().await?;
202
203 if !response.status().is_success() {
204 return Err(CompletionError::ProviderError(format!(
205 "{}: {}",
206 response.status(),
207 response.text().await?
208 )));
209 }
210
211 let inner = Box::pin(stream! {
213 let mut stream = response.bytes_stream();
214
215 let mut final_usage = ResponsesUsage::new();
216
217 let mut partial_data = None;
218
219 let mut tool_calls: Vec<RawStreamingChoice<StreamingCompletionResponse>> = Vec::new();
220
221 while let Some(chunk_result) = stream.next().await {
222 let chunk = match chunk_result {
223 Ok(c) => c,
224 Err(e) => {
225 yield Err(CompletionError::from(e));
226 break;
227 }
228 };
229
230 let text = match String::from_utf8(chunk.to_vec()) {
231 Ok(t) => t,
232 Err(e) => {
233 yield Err(CompletionError::ResponseError(e.to_string()));
234 break;
235 }
236 };
237
238 for line in text.lines() {
239 let mut line = line.to_string();
240
241 if partial_data.is_some() {
243 line = format!("{}{}", partial_data.unwrap(), line);
244 partial_data = None;
245 }
246 else {
248 let Some(data) = line.strip_prefix("data: ") else {
249 continue;
250 };
251
252 if !line.ends_with("}") {
254 partial_data = Some(data.to_string());
255 } else {
256 line = data.to_string();
257 }
258 }
259
260 let data = serde_json::from_str::<StreamingCompletionChunk>(&line);
261
262 let Ok(data) = data else {
263 let err = data.unwrap_err();
264 debug!("Couldn't serialize data as StreamingCompletionResponse: {:?}", err);
265 continue;
266 };
267
268 debug!("Data get: {data:?}");
269
270
271 if let StreamingCompletionChunk::Delta(chunk) = &data {
272 match &chunk.data {
273 ItemChunkKind::OutputItemDone(message) => {
274 match message {
275 StreamingItemDoneOutput { item: Output::FunctionCall(func), .. } => {
276 tracing::debug!("Function call received: {func:?}");
277 tool_calls.push(streaming::RawStreamingChoice::ToolCall { id: func.id.clone(), call_id: Some(func.call_id.clone()), name: func.name.clone(), arguments: func.arguments.clone() });
278 }
279
280 StreamingItemDoneOutput { item: Output::Reasoning { summary }, .. } => {
281 let reasoning = summary
282 .iter()
283 .map(|x| {
284 let ReasoningSummary::SummaryText { text } = x;
285 text.to_owned()
286 })
287 .collect::<Vec<String>>()
288 .join("\n");
289 yield Ok(streaming::RawStreamingChoice::Reasoning { reasoning })
290 }
291 _ => continue
292 }
293 }
294 ItemChunkKind::OutputTextDelta(delta) => {
295 yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
296 }
297 ItemChunkKind::RefusalDelta(delta) => {
298 yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
299 }
300
301 _ => { continue }
302 }
303 }
304
305 if let StreamingCompletionChunk::Response(chunk) = data {
306 if let Some(usage) = chunk.response.usage {
307 final_usage = usage;
308 }
309 }
310 }
311 }
312
313 for tool_call in tool_calls {
314 yield Ok(tool_call)
315 }
316
317 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
318 usage: final_usage.clone()
319 }))
320 });
321
322 Ok(streaming::StreamingCompletionResponse::stream(inner))
323}