1use crate::completion::{self, CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{ReasoningSummary, ResponsesUsage};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use crate::wasm_compat::WasmCompatSend;
11use async_stream::stream;
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use tracing::{Level, debug, enabled, info_span};
15use tracing_futures::Instrument as _;
16
17use super::{CompletionResponse, GenericResponsesCompletionModel, Output};
18
19type StreamingRawChoice = RawStreamingChoice<StreamingCompletionResponse>;
20
21#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32 Response(Box<ResponseChunk>),
33 Delta(ItemChunk),
34}
35
36#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39 pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44 id: &str,
45 summary: &[ReasoningSummary],
46 encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48 let mut choices = summary
49 .iter()
50 .map(|reasoning_summary| match reasoning_summary {
51 ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52 id: Some(id.to_owned()),
53 content: ReasoningContent::Summary(text.to_owned()),
54 },
55 })
56 .collect::<Vec<_>>();
57
58 if let Some(encrypted_content) = encrypted_content {
59 choices.push(RawStreamingChoice::Reasoning {
60 id: Some(id.to_owned()),
61 content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62 });
63 }
64
65 choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69 fn token_usage(&self) -> Option<crate::completion::Usage> {
70 self.usage.token_usage()
71 }
72}
73
74#[derive(Debug, Serialize, Deserialize, Clone)]
76pub struct ResponseChunk {
77 #[serde(rename = "type")]
79 pub kind: ResponseChunkKind,
80 pub response: CompletionResponse,
82 pub sequence_number: u64,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
89pub enum ResponseChunkKind {
90 #[serde(rename = "response.created")]
91 ResponseCreated,
92 #[serde(rename = "response.in_progress")]
93 ResponseInProgress,
94 #[serde(rename = "response.completed")]
95 ResponseCompleted,
96 #[serde(rename = "response.failed")]
97 ResponseFailed,
98 #[serde(rename = "response.incomplete")]
99 ResponseIncomplete,
100}
101
102fn response_chunk_error_message(
103 kind: &ResponseChunkKind,
104 response: &CompletionResponse,
105 provider_name: &str,
106) -> Option<String> {
107 match kind {
108 ResponseChunkKind::ResponseFailed => Some(response_error_message(
109 response.error.as_ref(),
110 &format!("{provider_name} response stream returned a failed response"),
111 )),
112 ResponseChunkKind::ResponseIncomplete => {
113 let reason = response
114 .incomplete_details
115 .as_ref()
116 .map(|details| details.reason.as_str())
117 .unwrap_or("unknown reason");
118
119 Some(format!(
120 "{provider_name} response stream was incomplete: {reason}"
121 ))
122 }
123 _ => None,
124 }
125}
126
127fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
128 if let Some(error) = error {
129 if error.code.is_empty() {
130 error.message.clone()
131 } else {
132 format!("{}: {}", error.code, error.message)
133 }
134 } else {
135 fallback.to_string()
136 }
137}
138
139#[derive(Clone, Copy)]
140pub(crate) enum ResponsesStreamOptions {
141 Strict,
142 StrictWithImmediateToolCalls,
143}
144
145impl ResponsesStreamOptions {
146 pub(crate) const fn strict() -> Self {
147 Self::Strict
148 }
149
150 pub(crate) const fn strict_with_immediate_tool_calls() -> Self {
151 Self::StrictWithImmediateToolCalls
152 }
153
154 const fn errors_on_terminal_response(self) -> bool {
155 true
156 }
157
158 const fn emits_completed_tool_calls_immediately(self) -> bool {
159 matches!(self, Self::StrictWithImmediateToolCalls)
160 }
161}
162
163pub(crate) fn parse_sse_completion_body(
164 body: &str,
165 provider_name: &str,
166) -> Result<CompletionResponse, CompletionError> {
167 let mut completed = None;
168 let mut provider_error = None;
169
170 for line in body.lines() {
171 let data = line
172 .strip_prefix("data:")
173 .map(str::trim)
174 .unwrap_or_default();
175 if data.is_empty() || data == "[DONE]" {
176 continue;
177 }
178
179 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
180 if let StreamingCompletionChunk::Response(chunk) = chunk {
181 let ResponseChunk { kind, response, .. } = *chunk;
182 match kind {
183 ResponseChunkKind::ResponseCompleted => {
184 completed = Some(response);
185 break;
186 }
187 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
188 provider_error =
189 response_chunk_error_message(&kind, &response, provider_name);
190 }
191 _ => {}
192 }
193 }
194 continue;
195 }
196
197 let value = match serde_json::from_str::<serde_json::Value>(data) {
198 Ok(value) => value,
199 Err(_) => continue,
200 };
201
202 match value.get("type").and_then(serde_json::Value::as_str) {
203 Some("response.completed") => {
204 if let Some(response) = value.get("response") {
205 completed = Some(serde_json::from_value(response.clone())?);
206 break;
207 }
208 }
209 Some("response.failed") | Some("response.incomplete") => {
210 provider_error = value
211 .get("response")
212 .cloned()
213 .and_then(|response| {
214 serde_json::from_value::<CompletionResponse>(response).ok()
215 })
216 .and_then(|response| {
217 let kind = if value.get("type").and_then(serde_json::Value::as_str)
218 == Some("response.failed")
219 {
220 ResponseChunkKind::ResponseFailed
221 } else {
222 ResponseChunkKind::ResponseIncomplete
223 };
224 response_chunk_error_message(&kind, &response, provider_name)
225 })
226 .or_else(|| {
227 value
228 .get("error")
229 .and_then(|error| error.get("message"))
230 .and_then(serde_json::Value::as_str)
231 .map(ToOwned::to_owned)
232 })
233 .or_else(|| Some(data.to_string()));
234 }
235 Some("error") => {
236 provider_error = value
237 .get("error")
238 .and_then(|error| error.get("message"))
239 .and_then(serde_json::Value::as_str)
240 .map(ToOwned::to_owned)
241 .or_else(|| Some(data.to_string()));
242 }
243 _ => {}
244 }
245 }
246
247 completed.ok_or_else(|| {
248 CompletionError::ProviderError(
249 provider_error.unwrap_or_else(|| {
250 format!("{provider_name} stream did not yield response.completed")
251 }),
252 )
253 })
254}
255
256struct RawChoiceAccumulator {
257 final_usage: ResponsesUsage,
258 tool_calls: Vec<StreamingRawChoice>,
259 tool_call_internal_ids: std::collections::HashMap<String, String>,
260}
261
262impl RawChoiceAccumulator {
263 fn new(initial_usage: ResponsesUsage) -> Self {
264 Self {
265 final_usage: initial_usage,
266 tool_calls: Vec::new(),
267 tool_call_internal_ids: std::collections::HashMap::new(),
268 }
269 }
270
271 fn decode_item_chunk(
272 &mut self,
273 chunk: ItemChunk,
274 options: ResponsesStreamOptions,
275 ) -> Vec<StreamingRawChoice> {
276 let mut immediate = Vec::new();
277
278 let ItemChunk {
279 item_id: outer_item_id,
280 data: item,
281 ..
282 } = chunk;
283
284 match item {
285 ItemChunkKind::OutputItemAdded(StreamingItemDoneOutput {
286 item: Output::FunctionCall(func),
287 ..
288 }) => {
289 let internal_call_id = self
290 .tool_call_internal_ids
291 .entry(func.id.clone())
292 .or_insert_with(|| nanoid::nanoid!())
293 .clone();
294 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
295 id: func.id,
296 internal_call_id,
297 content: streaming::ToolCallDeltaContent::Name(func.name),
298 });
299 }
300 ItemChunkKind::OutputItemDone(message) => {
301 self.push_output_item_done(
302 message.item,
303 &mut immediate,
304 options.emits_completed_tool_calls_immediately(),
305 );
306 }
307 ItemChunkKind::OutputTextDelta(delta) => {
308 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
309 }
310 ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
311 immediate.push(streaming::RawStreamingChoice::ReasoningDelta {
312 id: None,
313 reasoning: delta.delta,
314 });
315 }
316 ItemChunkKind::RefusalDelta(delta) => {
317 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
318 }
319 ItemChunkKind::FunctionCallArgsDelta(delta) => {
320 if let Some(item_id) = outer_item_id {
321 let internal_call_id = self
322 .tool_call_internal_ids
323 .entry(item_id.clone())
324 .or_insert_with(|| nanoid::nanoid!())
325 .clone();
326 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
327 id: item_id,
328 internal_call_id,
329 content: streaming::ToolCallDeltaContent::Delta(delta.delta),
330 });
331 }
332 }
333 _ => {}
334 }
335
336 immediate
337 }
338
339 fn record_response_chunk(
340 &mut self,
341 kind: ResponseChunkKind,
342 response: CompletionResponse,
343 provider_name: &str,
344 options: ResponsesStreamOptions,
345 ) -> Result<(), CompletionError> {
346 match kind {
347 ResponseChunkKind::ResponseCompleted => {
348 if let Some(usage) = response.usage {
349 self.final_usage = usage;
350 }
351 Ok(())
352 }
353 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete
354 if options.errors_on_terminal_response() =>
355 {
356 let error_message = response_chunk_error_message(&kind, &response, provider_name)
357 .unwrap_or_else(|| {
358 format!(
359 "{provider_name} returned terminal response {:?} without an error message",
360 kind
361 )
362 });
363 Err(CompletionError::ProviderError(error_message))
364 }
365 _ => Ok(()),
366 }
367 }
368
369 fn push_output_item_done(
370 &mut self,
371 item: Output,
372 immediate: &mut Vec<StreamingRawChoice>,
373 emit_completed_tool_calls_immediately: bool,
374 ) {
375 match item {
376 Output::FunctionCall(func) => {
377 let internal_call_id = self
378 .tool_call_internal_ids
379 .entry(func.id.clone())
380 .or_insert_with(|| nanoid::nanoid!())
381 .clone();
382 let tool_call =
383 streaming::RawStreamingToolCall::new(func.id, func.name, func.arguments)
384 .with_internal_call_id(internal_call_id)
385 .with_call_id(func.call_id);
386
387 if emit_completed_tool_calls_immediately {
388 immediate.push(streaming::RawStreamingChoice::ToolCall(tool_call));
389 } else {
390 self.tool_calls
391 .push(streaming::RawStreamingChoice::ToolCall(tool_call));
392 }
393 }
394 Output::Reasoning {
395 id,
396 summary,
397 encrypted_content,
398 ..
399 } => {
400 immediate.extend(reasoning_choices_from_done_item(
401 &id,
402 &summary,
403 encrypted_content.as_deref(),
404 ));
405 }
406 Output::Message(message) => {
407 immediate.push(streaming::RawStreamingChoice::MessageId(message.id));
408 }
409 Output::Unknown => {}
410 }
411 }
412
413 fn finish(mut self) -> Vec<StreamingRawChoice> {
414 let mut choices = Vec::new();
415 choices.append(&mut self.tool_calls);
416 choices.push(RawStreamingChoice::FinalResponse(
417 StreamingCompletionResponse {
418 usage: self.final_usage,
419 },
420 ));
421 choices
422 }
423}
424
425pub(crate) fn raw_choices_from_sse_body(
426 body: &str,
427 initial_usage: ResponsesUsage,
428 provider_name: &str,
429) -> Result<Vec<StreamingRawChoice>, CompletionError> {
430 let mut raw_choices = Vec::new();
431 let mut accumulator = RawChoiceAccumulator::new(initial_usage);
432 let options = ResponsesStreamOptions::strict();
433
434 for line in body.lines() {
435 let data = line
436 .strip_prefix("data:")
437 .map(str::trim)
438 .unwrap_or_default();
439 if data.is_empty() || data == "[DONE]" {
440 continue;
441 }
442
443 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
444 match chunk {
445 StreamingCompletionChunk::Delta(chunk) => {
446 raw_choices.extend(accumulator.decode_item_chunk(chunk, options));
447 }
448 StreamingCompletionChunk::Response(chunk) => {
449 let ResponseChunk { kind, response, .. } = *chunk;
450 accumulator.record_response_chunk(kind, response, provider_name, options)?;
451 }
452 }
453 continue;
454 }
455
456 let value = match serde_json::from_str::<serde_json::Value>(data) {
457 Ok(value) => value,
458 Err(_) => continue,
459 };
460
461 match value.get("type").and_then(serde_json::Value::as_str) {
462 Some("response.output_text.delta") | Some("response.refusal.delta") => {
463 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
464 raw_choices.push(streaming::RawStreamingChoice::Message(delta.to_owned()));
465 }
466 }
467 Some("response.reasoning_summary_text.delta") => {
468 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
469 raw_choices.push(streaming::RawStreamingChoice::ReasoningDelta {
470 id: None,
471 reasoning: delta.to_owned(),
472 });
473 }
474 }
475 Some("response.output_item.added") => {
476 if let Some(item) = value
477 .get("item")
478 .cloned()
479 .and_then(|item| serde_json::from_value::<Output>(item).ok())
480 && let Output::FunctionCall(func) = item
481 {
482 let internal_call_id = accumulator
483 .tool_call_internal_ids
484 .entry(func.id.clone())
485 .or_insert_with(|| nanoid::nanoid!())
486 .clone();
487 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
488 id: func.id,
489 internal_call_id,
490 content: streaming::ToolCallDeltaContent::Name(func.name),
491 });
492 }
493 }
494 Some("response.output_item.done") => {
495 if let Some(item) = value
496 .get("item")
497 .cloned()
498 .and_then(|item| serde_json::from_value::<Output>(item).ok())
499 {
500 accumulator.push_output_item_done(item, &mut raw_choices, false);
501 }
502 }
503 Some("response.function_call_arguments.delta") => {
504 if let (Some(item_id), Some(delta)) = (
505 value.get("item_id").and_then(serde_json::Value::as_str),
506 value.get("delta").and_then(serde_json::Value::as_str),
507 ) {
508 let internal_call_id = accumulator
509 .tool_call_internal_ids
510 .entry(item_id.to_owned())
511 .or_insert_with(|| nanoid::nanoid!())
512 .clone();
513 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
514 id: item_id.to_owned(),
515 internal_call_id,
516 content: streaming::ToolCallDeltaContent::Delta(delta.to_owned()),
517 });
518 }
519 }
520 Some("response.completed") | Some("response.failed") | Some("response.incomplete") => {
521 if let Some(response) = value.get("response").cloned() {
522 let response = serde_json::from_value::<CompletionResponse>(response)?;
523 let Some(kind) = (match value.get("type").and_then(serde_json::Value::as_str) {
524 Some("response.completed") => Some(ResponseChunkKind::ResponseCompleted),
525 Some("response.failed") => Some(ResponseChunkKind::ResponseFailed),
526 Some("response.incomplete") => Some(ResponseChunkKind::ResponseIncomplete),
527 _ => None,
528 }) else {
529 continue;
530 };
531 accumulator.record_response_chunk(kind, response, provider_name, options)?;
532 }
533 }
534 Some("error") => {
535 let message = value
536 .get("error")
537 .and_then(|error| error.get("message"))
538 .and_then(serde_json::Value::as_str)
539 .unwrap_or(data);
540 return Err(CompletionError::ProviderError(message.to_owned()));
541 }
542 _ => {}
543 }
544 }
545
546 raw_choices.extend(accumulator.finish());
547 Ok(raw_choices)
548}
549
550pub(crate) async fn completion_response_from_sse_body(
551 body: &str,
552 raw_response: CompletionResponse,
553 provider_name: &str,
554) -> Result<completion::CompletionResponse<CompletionResponse>, CompletionError> {
555 let raw_choices = raw_choices_from_sse_body(
556 body,
557 raw_response
558 .usage
559 .clone()
560 .unwrap_or_else(ResponsesUsage::new),
561 provider_name,
562 )?;
563 let stream = futures::stream::iter(
564 raw_choices
565 .into_iter()
566 .map(Ok::<_, CompletionError>)
567 .collect::<Vec<_>>(),
568 );
569 let mut stream = crate::streaming::StreamingCompletionResponse::stream(Box::pin(stream));
570
571 while let Some(item) = stream.next().await {
572 item?;
573 }
574
575 if choice_is_empty(&stream.choice) {
576 return Err(CompletionError::ResponseError(
577 "Response contained no parts".to_owned(),
578 ));
579 }
580
581 Ok(completion::CompletionResponse {
582 usage: stream
583 .response
584 .as_ref()
585 .and_then(GetTokenUsage::token_usage)
586 .unwrap_or_else(|| usage_from_raw_response(&raw_response)),
587 message_id: stream
588 .message_id
589 .clone()
590 .or_else(|| message_id_from_response(&raw_response)),
591 choice: stream.choice,
592 raw_response,
593 })
594}
595
596fn choice_is_empty(choice: &crate::OneOrMany<completion::AssistantContent>) -> bool {
597 choice.iter().all(|content| match content {
598 completion::AssistantContent::Text(text) => text.text.trim().is_empty(),
599 completion::AssistantContent::Reasoning(reasoning) => reasoning.content.is_empty(),
600 completion::AssistantContent::Image(_) => false,
601 completion::AssistantContent::ToolCall(_) => false,
602 })
603}
604
605fn message_id_from_response(response: &CompletionResponse) -> Option<String> {
606 response.output.iter().find_map(|item| match item {
607 Output::Message(message) => Some(message.id.clone()),
608 _ => None,
609 })
610}
611
612fn usage_from_raw_response(response: &CompletionResponse) -> completion::Usage {
613 response
614 .usage
615 .as_ref()
616 .and_then(GetTokenUsage::token_usage)
617 .unwrap_or_default()
618}
619
620pub(crate) fn stream_from_event_source<HttpClient, RequestBody>(
621 event_source: GenericEventSource<HttpClient, RequestBody>,
622 span: tracing::Span,
623 provider_name: &'static str,
624) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
625where
626 HttpClient: HttpClientExt + Clone + 'static,
627 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
628{
629 stream_from_event_source_with_options(
630 event_source,
631 span,
632 provider_name,
633 ResponsesStreamOptions::strict(),
634 )
635}
636
637pub(crate) fn stream_from_event_source_with_options<HttpClient, RequestBody>(
638 mut event_source: GenericEventSource<HttpClient, RequestBody>,
639 span: tracing::Span,
640 provider_name: &'static str,
641 options: ResponsesStreamOptions,
642) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
643where
644 HttpClient: HttpClientExt + Clone + 'static,
645 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
646{
647 let stream = stream! {
648 let mut accumulator = RawChoiceAccumulator::new(ResponsesUsage::new());
649 let span = tracing::Span::current();
650
651 let mut terminated_with_error = false;
652
653 while let Some(event_result) = event_source.next().await {
654 match event_result {
655 Ok(Event::Open) => {
656 tracing::trace!("SSE connection opened");
657 continue;
658 }
659 Ok(Event::Message(evt)) => {
660 if evt.data.trim().is_empty() || evt.data == "[DONE]" {
661 continue;
662 }
663
664 let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
665
666 let Ok(data) = data else {
667 let Err(err) = data else {
668 continue;
669 };
670 debug!(
671 "Couldn't deserialize SSE data as StreamingCompletionChunk: {:?}",
672 err
673 );
674 continue;
675 };
676
677 match data {
678 StreamingCompletionChunk::Delta(chunk) => {
679 for choice in accumulator.decode_item_chunk(chunk, options) {
680 yield Ok(choice);
681 }
682 }
683 StreamingCompletionChunk::Response(chunk) => {
684 let ResponseChunk { kind, response, .. } = *chunk;
685 if matches!(kind, ResponseChunkKind::ResponseCompleted) {
686 span.record("gen_ai.response.id", response.id.as_str());
687 span.record("gen_ai.response.model", response.model.as_str());
688 }
689 if let Err(error) =
690 accumulator.record_response_chunk(kind, response, provider_name, options)
691 {
692 terminated_with_error = true;
693 yield Err(error);
694 break;
695 }
696 }
697 }
698 }
699 Err(crate::http_client::Error::StreamEnded) => {
700 event_source.close();
701 }
702 Err(error) => {
703 tracing::error!(?error, "SSE error");
704 terminated_with_error = true;
705 yield Err(CompletionError::ProviderError(error.to_string()));
706 break;
707 }
708 }
709 }
710
711 event_source.close();
712
713 if terminated_with_error {
714 return;
715 }
716
717 let final_usage = accumulator.final_usage.clone();
718
719 for tool_call in accumulator.finish() {
720 yield Ok(tool_call)
721 }
722
723 span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
724 span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
725 let cached_tokens = final_usage
726 .input_tokens_details
727 .as_ref()
728 .map(|d| d.cached_tokens)
729 .unwrap_or(0);
730 span.record("gen_ai.usage.cache_read.input_tokens", cached_tokens);
731
732 }
733 .instrument(span);
734
735 streaming::StreamingCompletionResponse::stream(Box::pin(stream))
736}
737
738#[derive(Debug, Serialize, Deserialize, Clone)]
741pub struct ItemChunk {
742 pub item_id: Option<String>,
744 pub output_index: u64,
746 #[serde(flatten)]
748 pub data: ItemChunkKind,
749}
750
751#[derive(Debug, Serialize, Deserialize, Clone)]
753#[serde(tag = "type")]
754pub enum ItemChunkKind {
755 #[serde(rename = "response.output_item.added")]
756 OutputItemAdded(StreamingItemDoneOutput),
757 #[serde(rename = "response.output_item.done")]
758 OutputItemDone(StreamingItemDoneOutput),
759 #[serde(rename = "response.content_part.added")]
760 ContentPartAdded(ContentPartChunk),
761 #[serde(rename = "response.content_part.done")]
762 ContentPartDone(ContentPartChunk),
763 #[serde(rename = "response.output_text.delta")]
764 OutputTextDelta(DeltaTextChunk),
765 #[serde(rename = "response.output_text.done")]
766 OutputTextDone(OutputTextChunk),
767 #[serde(rename = "response.refusal.delta")]
768 RefusalDelta(DeltaTextChunk),
769 #[serde(rename = "response.refusal.done")]
770 RefusalDone(RefusalTextChunk),
771 #[serde(rename = "response.function_call_arguments.delta")]
772 FunctionCallArgsDelta(DeltaTextChunkWithItemId),
773 #[serde(rename = "response.function_call_arguments.done")]
774 FunctionCallArgsDone(ArgsTextChunk),
775 #[serde(rename = "response.reasoning_summary_part.added")]
776 ReasoningSummaryPartAdded(SummaryPartChunk),
777 #[serde(rename = "response.reasoning_summary_part.done")]
778 ReasoningSummaryPartDone(SummaryPartChunk),
779 #[serde(rename = "response.reasoning_summary_text.delta")]
780 ReasoningSummaryTextDelta(SummaryTextChunk),
781 #[serde(rename = "response.reasoning_summary_text.done")]
782 ReasoningSummaryTextDone(SummaryTextChunk),
783 #[serde(other)]
786 Unknown,
787}
788
789#[derive(Debug, Serialize, Deserialize, Clone)]
790pub struct StreamingItemDoneOutput {
791 pub sequence_number: u64,
792 pub item: Output,
793}
794
795#[derive(Debug, Serialize, Deserialize, Clone)]
796pub struct ContentPartChunk {
797 pub content_index: u64,
798 pub sequence_number: u64,
799 pub part: ContentPartChunkPart,
800}
801
802#[derive(Debug, Serialize, Deserialize, Clone)]
803#[serde(tag = "type", rename_all = "snake_case")]
804pub enum ContentPartChunkPart {
805 OutputText { text: String },
806 SummaryText { text: String },
807}
808
809#[derive(Debug, Serialize, Deserialize, Clone)]
810pub struct DeltaTextChunk {
811 pub content_index: u64,
812 pub sequence_number: u64,
813 pub delta: String,
814}
815
816#[derive(Debug, Serialize, Deserialize, Clone)]
817pub struct DeltaTextChunkWithItemId {
818 #[serde(default, skip_serializing_if = "Option::is_none")]
819 pub content_index: Option<u64>,
820 pub sequence_number: u64,
821 pub delta: String,
822}
823
824#[derive(Debug, Serialize, Deserialize, Clone)]
825pub struct OutputTextChunk {
826 pub content_index: u64,
827 pub sequence_number: u64,
828 pub text: String,
829}
830
831#[derive(Debug, Serialize, Deserialize, Clone)]
832pub struct RefusalTextChunk {
833 pub content_index: u64,
834 pub sequence_number: u64,
835 pub refusal: String,
836}
837
838#[derive(Debug, Serialize, Deserialize, Clone)]
839pub struct ArgsTextChunk {
840 #[serde(default, skip_serializing_if = "Option::is_none")]
841 pub content_index: Option<u64>,
842 pub sequence_number: u64,
843 pub arguments: serde_json::Value,
844}
845
846#[derive(Debug, Serialize, Deserialize, Clone)]
847pub struct SummaryPartChunk {
848 pub summary_index: u64,
849 pub sequence_number: u64,
850 pub part: SummaryPartChunkPart,
851}
852
853#[derive(Debug, Serialize, Deserialize, Clone)]
854pub struct SummaryTextChunk {
855 pub summary_index: u64,
856 pub sequence_number: u64,
857 pub delta: String,
858}
859
860#[derive(Debug, Serialize, Deserialize, Clone)]
861#[serde(tag = "type", rename_all = "snake_case")]
862pub enum SummaryPartChunkPart {
863 SummaryText { text: String },
864}
865
866impl<Ext, H> GenericResponsesCompletionModel<Ext, H>
867where
868 crate::client::Client<Ext, H>:
869 HttpClientExt + Clone + std::fmt::Debug + WasmCompatSend + 'static,
870 Ext: crate::client::Provider + Clone + 'static,
871 H: Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
872{
873 pub(crate) async fn stream(
874 &self,
875 completion_request: crate::completion::CompletionRequest,
876 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
877 {
878 let mut request = self.create_completion_request(completion_request)?;
879 request.stream = Some(true);
880
881 if enabled!(Level::TRACE) {
882 tracing::trace!(
883 target: "rig::completions",
884 "OpenAI Responses streaming completion request: {}",
885 serde_json::to_string_pretty(&request)?
886 );
887 }
888
889 let body = serde_json::to_vec(&request)?;
890
891 let req = self
892 .client
893 .post("/responses")?
894 .body(body)
895 .map_err(|e| CompletionError::HttpError(e.into()))?;
896
897 let span = if tracing::Span::current().is_disabled() {
900 info_span!(
901 target: "rig::completions",
902 "chat_streaming",
903 gen_ai.operation.name = "chat_streaming",
904 gen_ai.provider.name = tracing::field::Empty,
905 gen_ai.request.model = tracing::field::Empty,
906 gen_ai.response.id = tracing::field::Empty,
907 gen_ai.response.model = tracing::field::Empty,
908 gen_ai.usage.output_tokens = tracing::field::Empty,
909 gen_ai.usage.input_tokens = tracing::field::Empty,
910 gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
911 )
912 } else {
913 tracing::Span::current()
914 };
915 span.record("gen_ai.provider.name", "openai");
916 span.record("gen_ai.request.model", &self.model);
917 let client = self.client.clone();
918 let event_source = GenericEventSource::new(client, req);
919
920 Ok(stream_from_event_source(event_source, span, "OpenAI"))
921 }
922}
923
924#[cfg(test)]
925mod tests {
926 use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
927 use crate::completion::CompletionModel;
928 use crate::message::ReasoningContent;
929 use crate::providers::internal::openai_chat_completions_compatible::test_support::sse_bytes_from_json_events;
930 use crate::providers::openai::responses_api::{
931 AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
932 ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
933 };
934 use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
935 use crate::test_utils::MockStreamingClient;
936 use futures::StreamExt;
937 use serde_json::{self, json};
938
939 use crate::{
940 client::CompletionClient, completion::Message, providers::openai, streaming::StreamingChat,
941 test_utils::MockExampleTool,
942 };
943
944 fn sample_response(status: ResponseStatus) -> CompletionResponse {
945 CompletionResponse {
946 id: "resp_123".to_string(),
947 object: ResponseObject::Response,
948 created_at: 0,
949 status,
950 error: None,
951 incomplete_details: None,
952 instructions: None,
953 max_output_tokens: None,
954 model: "gpt-5.4".to_string(),
955 usage: None,
956 output: Vec::new(),
957 tools: Vec::new(),
958 additional_parameters: AdditionalParameters::default(),
959 }
960 }
961
962 async fn first_error_from_event(
963 event: serde_json::Value,
964 ) -> crate::completion::CompletionError {
965 let client = openai::Client::builder()
966 .http_client(MockStreamingClient {
967 sse_bytes: sse_bytes_from_json_events(&[event]),
968 })
969 .api_key("test-key")
970 .build()
971 .expect("client should build");
972 let model = client.completion_model("gpt-5.4");
973 let request = model.completion_request("hello").build();
974 let mut stream = model.stream(request).await.expect("stream should start");
975
976 stream
977 .next()
978 .await
979 .expect("stream should yield an item")
980 .expect_err("stream should surface a provider error")
981 }
982
983 async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
984 let client = openai::Client::builder()
985 .http_client(MockStreamingClient {
986 sse_bytes: sse_bytes_from_json_events(&[event]),
987 })
988 .api_key("test-key")
989 .build()
990 .expect("client should build");
991 let model = client.completion_model("gpt-5.4");
992 let request = model.completion_request("hello").build();
993 let mut stream = model.stream(request).await.expect("stream should start");
994
995 while let Some(item) = stream.next().await {
996 match item.expect("completed stream should not error") {
997 StreamedAssistantContent::Final(res) => return res.usage,
998 _ => continue,
999 }
1000 }
1001
1002 panic!("stream should yield a final response");
1003 }
1004
1005 #[test]
1006 fn reasoning_done_item_emits_summary_then_encrypted() {
1007 let summary = vec![
1008 ReasoningSummary::SummaryText {
1009 text: "step 1".to_string(),
1010 },
1011 ReasoningSummary::SummaryText {
1012 text: "step 2".to_string(),
1013 },
1014 ];
1015 let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
1016
1017 assert_eq!(choices.len(), 3);
1018 assert!(matches!(
1019 choices.first(),
1020 Some(RawStreamingChoice::Reasoning {
1021 id: Some(id),
1022 content: ReasoningContent::Summary(text),
1023 }) if id == "rs_1" && text == "step 1"
1024 ));
1025 assert!(matches!(
1026 choices.get(1),
1027 Some(RawStreamingChoice::Reasoning {
1028 id: Some(id),
1029 content: ReasoningContent::Summary(text),
1030 }) if id == "rs_1" && text == "step 2"
1031 ));
1032 assert!(matches!(
1033 choices.get(2),
1034 Some(RawStreamingChoice::Reasoning {
1035 id: Some(id),
1036 content: ReasoningContent::Encrypted(data),
1037 }) if id == "rs_1" && data == "enc_blob"
1038 ));
1039 }
1040
1041 #[test]
1042 fn reasoning_done_item_without_encrypted_emits_summary_only() {
1043 let summary = vec![ReasoningSummary::SummaryText {
1044 text: "only summary".to_string(),
1045 }];
1046 let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
1047
1048 assert_eq!(choices.len(), 1);
1049 assert!(matches!(
1050 choices.first(),
1051 Some(RawStreamingChoice::Reasoning {
1052 id: Some(id),
1053 content: ReasoningContent::Summary(text),
1054 }) if id == "rs_2" && text == "only summary"
1055 ));
1056 }
1057
1058 #[test]
1059 fn content_part_added_deserializes_snake_case_part_type() {
1060 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1061 "type": "response.content_part.added",
1062 "item_id": "msg_1",
1063 "output_index": 0,
1064 "content_index": 0,
1065 "sequence_number": 3,
1066 "part": {
1067 "type": "output_text",
1068 "text": "hello"
1069 }
1070 }))
1071 .expect("content part event should deserialize");
1072
1073 assert!(matches!(
1074 chunk,
1075 StreamingCompletionChunk::Delta(chunk)
1076 if matches!(
1077 chunk.data,
1078 ItemChunkKind::ContentPartAdded(_)
1079 )
1080 ));
1081 }
1082
1083 #[test]
1084 fn content_part_done_deserializes_snake_case_part_type() {
1085 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1086 "type": "response.content_part.done",
1087 "item_id": "msg_1",
1088 "output_index": 0,
1089 "content_index": 0,
1090 "sequence_number": 4,
1091 "part": {
1092 "type": "summary_text",
1093 "text": "done"
1094 }
1095 }))
1096 .expect("content part done event should deserialize");
1097
1098 assert!(matches!(
1099 chunk,
1100 StreamingCompletionChunk::Delta(chunk)
1101 if matches!(
1102 chunk.data,
1103 ItemChunkKind::ContentPartDone(_)
1104 )
1105 ));
1106 }
1107
1108 #[test]
1109 fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
1110 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1111 "type": "response.reasoning_summary_part.added",
1112 "item_id": "rs_1",
1113 "output_index": 0,
1114 "summary_index": 0,
1115 "sequence_number": 5,
1116 "part": {
1117 "type": "summary_text",
1118 "text": "step 1"
1119 }
1120 }))
1121 .expect("reasoning summary part event should deserialize");
1122
1123 assert!(matches!(
1124 chunk,
1125 StreamingCompletionChunk::Delta(chunk)
1126 if matches!(
1127 chunk.data,
1128 ItemChunkKind::ReasoningSummaryPartAdded(_)
1129 )
1130 ));
1131 }
1132
1133 #[test]
1134 fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
1135 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1136 "type": "response.reasoning_summary_part.done",
1137 "item_id": "rs_1",
1138 "output_index": 0,
1139 "summary_index": 0,
1140 "sequence_number": 6,
1141 "part": {
1142 "type": "summary_text",
1143 "text": "step 2"
1144 }
1145 }))
1146 .expect("reasoning summary part done event should deserialize");
1147
1148 assert!(matches!(
1149 chunk,
1150 StreamingCompletionChunk::Delta(chunk)
1151 if matches!(
1152 chunk.data,
1153 ItemChunkKind::ReasoningSummaryPartDone(_)
1154 )
1155 ));
1156 }
1157
1158 #[tokio::test]
1159 async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
1160 let mut response = sample_response(ResponseStatus::Failed);
1161 response.error = Some(ResponseError {
1162 code: String::new(),
1163 message: "maximum context length exceeded".to_string(),
1164 });
1165
1166 let event = json!({
1167 "type": "response.failed",
1168 "sequence_number": 1,
1169 "response": response,
1170 });
1171
1172 let err = first_error_from_event(event).await;
1173
1174 assert_eq!(
1175 err.to_string(),
1176 "ProviderError: maximum context length exceeded"
1177 );
1178 }
1179
1180 #[tokio::test]
1181 async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
1182 let mut response = sample_response(ResponseStatus::Failed);
1183 response.error = Some(ResponseError {
1184 code: "context_length_exceeded".to_string(),
1185 message: "maximum context length exceeded".to_string(),
1186 });
1187
1188 let event = json!({
1189 "type": "response.failed",
1190 "sequence_number": 1,
1191 "response": response,
1192 });
1193
1194 let err = first_error_from_event(event).await;
1195
1196 assert_eq!(
1197 err.to_string(),
1198 "ProviderError: context_length_exceeded: maximum context length exceeded"
1199 );
1200 }
1201
1202 #[tokio::test]
1203 async fn response_incomplete_chunk_uses_incomplete_details_reason() {
1204 let mut response = sample_response(ResponseStatus::Incomplete);
1205 response.incomplete_details = Some(IncompleteDetailsReason {
1206 reason: "max_output_tokens".to_string(),
1207 });
1208
1209 let event = json!({
1210 "type": "response.incomplete",
1211 "sequence_number": 1,
1212 "response": response,
1213 });
1214
1215 let err = first_error_from_event(event).await;
1216
1217 assert_eq!(
1218 err.to_string(),
1219 "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
1220 );
1221 }
1222
1223 #[tokio::test]
1224 async fn response_failed_chunk_terminates_stream_without_followup_items() {
1225 let tool_call_done = json!({
1226 "type": "response.output_item.done",
1227 "sequence_number": 1,
1228 "item": {
1229 "type": "function_call",
1230 "id": "fc_123",
1231 "arguments": "{}",
1232 "call_id": "call_123",
1233 "name": "example_tool",
1234 "status": "completed"
1235 }
1236 });
1237
1238 let mut response = sample_response(ResponseStatus::Failed);
1239 response.error = Some(ResponseError {
1240 code: "server_error".to_string(),
1241 message: "response stream failed".to_string(),
1242 });
1243
1244 let failed = json!({
1245 "type": "response.failed",
1246 "sequence_number": 2,
1247 "response": response,
1248 });
1249
1250 let client = openai::Client::builder()
1251 .http_client(MockStreamingClient {
1252 sse_bytes: sse_bytes_from_json_events(&[tool_call_done, failed]),
1253 })
1254 .api_key("test-key")
1255 .build()
1256 .expect("client should build");
1257 let model = client.completion_model("gpt-5.4");
1258 let request = model.completion_request("hello").build();
1259 let mut stream = model.stream(request).await.expect("stream should start");
1260
1261 let err = stream
1262 .next()
1263 .await
1264 .expect("stream should yield an item")
1265 .expect_err("stream should surface a provider error");
1266 assert_eq!(
1267 err.to_string(),
1268 "ProviderError: server_error: response stream failed"
1269 );
1270 assert!(
1271 stream.next().await.is_none(),
1272 "stream should terminate immediately after the first terminal error"
1273 );
1274 }
1275
1276 #[tokio::test]
1277 async fn response_completed_chunk_populates_final_usage() {
1278 let mut response = sample_response(ResponseStatus::Completed);
1279 response.usage = Some(ResponsesUsage {
1280 input_tokens: 10,
1281 input_tokens_details: None,
1282 output_tokens: 5,
1283 output_tokens_details: OutputTokensDetails {
1284 reasoning_tokens: 0,
1285 },
1286 total_tokens: 15,
1287 });
1288
1289 let event = json!({
1290 "type": "response.completed",
1291 "sequence_number": 1,
1292 "response": response,
1293 });
1294
1295 let usage = final_usage_from_event(event).await;
1296 assert_eq!(usage.input_tokens, 10);
1297 assert_eq!(usage.output_tokens, 5);
1298 assert_eq!(usage.total_tokens, 15);
1299 }
1300
1301 #[tokio::test]
1302 async fn done_sentinel_is_ignored_without_debug_parse_noise() {
1303 use std::io::{self, Write};
1304 use std::sync::{Arc, Mutex};
1305
1306 #[derive(Clone)]
1307 struct SharedWriter(Arc<Mutex<Vec<u8>>>);
1308
1309 impl Write for SharedWriter {
1310 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1311 self.0
1312 .lock()
1313 .expect("log buffer mutex should not be poisoned")
1314 .extend_from_slice(buf);
1315 Ok(buf.len())
1316 }
1317
1318 fn flush(&mut self) -> io::Result<()> {
1319 Ok(())
1320 }
1321 }
1322
1323 let mut response = sample_response(ResponseStatus::Completed);
1324 response.usage = Some(ResponsesUsage {
1325 input_tokens: 4,
1326 input_tokens_details: None,
1327 output_tokens: 2,
1328 output_tokens_details: OutputTokensDetails {
1329 reasoning_tokens: 0,
1330 },
1331 total_tokens: 6,
1332 });
1333
1334 let captured = Arc::new(Mutex::new(Vec::new()));
1335 let subscriber = tracing_subscriber::fmt()
1336 .with_max_level(tracing::Level::DEBUG)
1337 .with_ansi(false)
1338 .without_time()
1339 .with_writer({
1340 let captured = captured.clone();
1341 move || SharedWriter(captured.clone())
1342 })
1343 .finish();
1344 let _guard = tracing::subscriber::set_default(subscriber);
1345
1346 let client = openai::Client::builder()
1347 .http_client(MockStreamingClient {
1348 sse_bytes: bytes::Bytes::from(format!(
1349 "data: {}\n\ndata: [DONE]\n\n",
1350 serde_json::to_string(&json!({
1351 "type": "response.completed",
1352 "sequence_number": 1,
1353 "response": response,
1354 }))
1355 .expect("response event should serialize")
1356 )),
1357 })
1358 .api_key("test-key")
1359 .build()
1360 .expect("client should build");
1361 let model = client.completion_model("gpt-5.4");
1362 let request = model.completion_request("hello").build();
1363 let mut stream = model.stream(request).await.expect("stream should start");
1364
1365 let mut final_usage = None;
1366 while let Some(item) = stream.next().await {
1367 if let StreamedAssistantContent::Final(response) =
1368 item.expect("stream should complete successfully")
1369 {
1370 final_usage = Some(response.usage);
1371 }
1372 }
1373
1374 let usage = final_usage.expect("expected final response");
1375 assert_eq!(usage.input_tokens, 4);
1376 assert_eq!(usage.output_tokens, 2);
1377 assert_eq!(usage.total_tokens, 6);
1378
1379 let logs = String::from_utf8(
1380 captured
1381 .lock()
1382 .expect("log buffer mutex should not be poisoned")
1383 .clone(),
1384 )
1385 .expect("captured logs should be valid UTF-8");
1386 assert!(
1387 !logs.contains("Couldn't deserialize SSE data as StreamingCompletionChunk"),
1388 "expected [DONE] to bypass the parse-failure debug path, logs were: {logs}"
1389 );
1390 }
1391
1392 #[tokio::test]
1394 #[ignore = "requires API key"]
1395 async fn test_openai_streaming_tools_reasoning() {
1396 let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
1397 let client = openai::Client::new(&api_key).expect("Failed to build client");
1398 let agent = client
1399 .agent("gpt-5.2")
1400 .max_tokens(8192)
1401 .tool(MockExampleTool)
1402 .additional_params(serde_json::json!({
1403 "reasoning": {"effort": "high"}
1404 }))
1405 .build();
1406
1407 let chat_history: Vec<Message> = Vec::new();
1408 let mut stream = agent
1409 .stream_chat("Call my example tool", &chat_history)
1410 .multi_turn(5)
1411 .await;
1412
1413 while let Some(item) = stream.next().await {
1414 println!("Got item: {item:?}");
1415 }
1416 }
1417}