1use crate::completion::{self, CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{ReasoningSummary, ResponsesUsage};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use crate::wasm_compat::WasmCompatSend;
11use async_stream::stream;
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use tracing::{Level, debug, enabled, info_span};
15use tracing_futures::Instrument as _;
16
17use super::{CompletionResponse, GenericResponsesCompletionModel, Output};
18
19type StreamingRawChoice = RawStreamingChoice<StreamingCompletionResponse>;
20
21#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32 Response(Box<ResponseChunk>),
33 Delta(ItemChunk),
34}
35
36#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39 pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44 id: &str,
45 summary: &[ReasoningSummary],
46 encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48 let mut choices = summary
49 .iter()
50 .map(|reasoning_summary| match reasoning_summary {
51 ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52 id: Some(id.to_owned()),
53 content: ReasoningContent::Summary(text.to_owned()),
54 },
55 })
56 .collect::<Vec<_>>();
57
58 if let Some(encrypted_content) = encrypted_content {
59 choices.push(RawStreamingChoice::Reasoning {
60 id: Some(id.to_owned()),
61 content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62 });
63 }
64
65 choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69 fn token_usage(&self) -> Option<crate::completion::Usage> {
70 self.usage.token_usage()
71 }
72}
73
74#[derive(Debug, Serialize, Deserialize, Clone)]
76pub struct ResponseChunk {
77 #[serde(rename = "type")]
79 pub kind: ResponseChunkKind,
80 pub response: CompletionResponse,
82 pub sequence_number: u64,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
89pub enum ResponseChunkKind {
90 #[serde(rename = "response.created")]
91 ResponseCreated,
92 #[serde(rename = "response.in_progress")]
93 ResponseInProgress,
94 #[serde(rename = "response.completed")]
95 ResponseCompleted,
96 #[serde(rename = "response.failed")]
97 ResponseFailed,
98 #[serde(rename = "response.incomplete")]
99 ResponseIncomplete,
100}
101
102fn response_chunk_error_message(
103 kind: &ResponseChunkKind,
104 response: &CompletionResponse,
105 provider_name: &str,
106) -> Option<String> {
107 match kind {
108 ResponseChunkKind::ResponseFailed => Some(response_error_message(
109 response.error.as_ref(),
110 &format!("{provider_name} response stream returned a failed response"),
111 )),
112 ResponseChunkKind::ResponseIncomplete => {
113 let reason = response
114 .incomplete_details
115 .as_ref()
116 .map(|details| details.reason.as_str())
117 .unwrap_or("unknown reason");
118
119 Some(format!(
120 "{provider_name} response stream was incomplete: {reason}"
121 ))
122 }
123 _ => None,
124 }
125}
126
127fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
128 if let Some(error) = error {
129 if error.code.is_empty() {
130 error.message.clone()
131 } else {
132 format!("{}: {}", error.code, error.message)
133 }
134 } else {
135 fallback.to_string()
136 }
137}
138
139#[derive(Clone, Copy)]
140pub(crate) enum ResponsesStreamOptions {
141 Strict,
142 StrictWithImmediateToolCalls,
143}
144
145impl ResponsesStreamOptions {
146 pub(crate) const fn strict() -> Self {
147 Self::Strict
148 }
149
150 pub(crate) const fn strict_with_immediate_tool_calls() -> Self {
151 Self::StrictWithImmediateToolCalls
152 }
153
154 const fn errors_on_terminal_response(self) -> bool {
155 true
156 }
157
158 const fn emits_completed_tool_calls_immediately(self) -> bool {
159 matches!(self, Self::StrictWithImmediateToolCalls)
160 }
161}
162
163pub(crate) fn parse_sse_completion_body(
164 body: &str,
165 provider_name: &str,
166) -> Result<CompletionResponse, CompletionError> {
167 let mut completed = None;
168 let mut provider_error = None;
169
170 for line in body.lines() {
171 let data = line
172 .strip_prefix("data:")
173 .map(str::trim)
174 .unwrap_or_default();
175 if data.is_empty() || data == "[DONE]" {
176 continue;
177 }
178
179 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
180 if let StreamingCompletionChunk::Response(chunk) = chunk {
181 let ResponseChunk { kind, response, .. } = *chunk;
182 match kind {
183 ResponseChunkKind::ResponseCompleted => {
184 completed = Some(response);
185 break;
186 }
187 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
188 provider_error =
189 response_chunk_error_message(&kind, &response, provider_name);
190 }
191 _ => {}
192 }
193 }
194 continue;
195 }
196
197 let value = match serde_json::from_str::<serde_json::Value>(data) {
198 Ok(value) => value,
199 Err(_) => continue,
200 };
201
202 match value.get("type").and_then(serde_json::Value::as_str) {
203 Some("response.completed") => {
204 if let Some(response) = value.get("response") {
205 completed = Some(serde_json::from_value(response.clone())?);
206 break;
207 }
208 }
209 Some("response.failed") | Some("response.incomplete") => {
210 provider_error = value
211 .get("response")
212 .cloned()
213 .and_then(|response| {
214 serde_json::from_value::<CompletionResponse>(response).ok()
215 })
216 .and_then(|response| {
217 let kind = if value.get("type").and_then(serde_json::Value::as_str)
218 == Some("response.failed")
219 {
220 ResponseChunkKind::ResponseFailed
221 } else {
222 ResponseChunkKind::ResponseIncomplete
223 };
224 response_chunk_error_message(&kind, &response, provider_name)
225 })
226 .or_else(|| {
227 value
228 .get("error")
229 .and_then(|error| error.get("message"))
230 .and_then(serde_json::Value::as_str)
231 .map(ToOwned::to_owned)
232 })
233 .or_else(|| Some(data.to_string()));
234 }
235 Some("error") => {
236 provider_error = value
237 .get("error")
238 .and_then(|error| error.get("message"))
239 .and_then(serde_json::Value::as_str)
240 .map(ToOwned::to_owned)
241 .or_else(|| Some(data.to_string()));
242 }
243 _ => {}
244 }
245 }
246
247 completed.ok_or_else(|| {
248 CompletionError::ProviderError(
249 provider_error.unwrap_or_else(|| {
250 format!("{provider_name} stream did not yield response.completed")
251 }),
252 )
253 })
254}
255
256struct RawChoiceAccumulator {
257 final_usage: ResponsesUsage,
258 tool_calls: Vec<StreamingRawChoice>,
259 tool_call_internal_ids: std::collections::HashMap<String, String>,
260}
261
262impl RawChoiceAccumulator {
263 fn new(initial_usage: ResponsesUsage) -> Self {
264 Self {
265 final_usage: initial_usage,
266 tool_calls: Vec::new(),
267 tool_call_internal_ids: std::collections::HashMap::new(),
268 }
269 }
270
271 fn decode_item_chunk(
272 &mut self,
273 chunk: ItemChunk,
274 options: ResponsesStreamOptions,
275 ) -> Vec<StreamingRawChoice> {
276 let mut immediate = Vec::new();
277
278 let ItemChunk {
279 item_id: outer_item_id,
280 data: item,
281 ..
282 } = chunk;
283
284 match item {
285 ItemChunkKind::OutputItemAdded(StreamingItemDoneOutput {
286 item: Output::FunctionCall(func),
287 ..
288 }) => {
289 let internal_call_id = self
290 .tool_call_internal_ids
291 .entry(func.id.clone())
292 .or_insert_with(|| nanoid::nanoid!())
293 .clone();
294 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
295 id: func.id,
296 internal_call_id,
297 content: streaming::ToolCallDeltaContent::Name(func.name),
298 });
299 }
300 ItemChunkKind::OutputItemDone(message) => {
301 self.push_output_item_done(
302 message.item,
303 &mut immediate,
304 options.emits_completed_tool_calls_immediately(),
305 );
306 }
307 ItemChunkKind::OutputTextDelta(delta) => {
308 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
309 }
310 ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
311 immediate.push(streaming::RawStreamingChoice::ReasoningDelta {
312 id: None,
313 reasoning: delta.delta,
314 });
315 }
316 ItemChunkKind::RefusalDelta(delta) => {
317 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
318 }
319 ItemChunkKind::FunctionCallArgsDelta(delta) => {
320 if let Some(item_id) = outer_item_id {
321 let internal_call_id = self
322 .tool_call_internal_ids
323 .entry(item_id.clone())
324 .or_insert_with(|| nanoid::nanoid!())
325 .clone();
326 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
327 id: item_id,
328 internal_call_id,
329 content: streaming::ToolCallDeltaContent::Delta(delta.delta),
330 });
331 }
332 }
333 _ => {}
334 }
335
336 immediate
337 }
338
339 fn record_response_chunk(
340 &mut self,
341 kind: ResponseChunkKind,
342 response: CompletionResponse,
343 provider_name: &str,
344 options: ResponsesStreamOptions,
345 ) -> Result<(), CompletionError> {
346 match kind {
347 ResponseChunkKind::ResponseCompleted => {
348 if let Some(usage) = response.usage {
349 self.final_usage = usage;
350 }
351 Ok(())
352 }
353 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete
354 if options.errors_on_terminal_response() =>
355 {
356 let error_message = response_chunk_error_message(&kind, &response, provider_name)
357 .unwrap_or_else(|| {
358 format!(
359 "{provider_name} returned terminal response {:?} without an error message",
360 kind
361 )
362 });
363 Err(CompletionError::ProviderError(error_message))
364 }
365 _ => Ok(()),
366 }
367 }
368
369 fn push_output_item_done(
370 &mut self,
371 item: Output,
372 immediate: &mut Vec<StreamingRawChoice>,
373 emit_completed_tool_calls_immediately: bool,
374 ) {
375 match item {
376 Output::FunctionCall(func) => {
377 let internal_call_id = self
378 .tool_call_internal_ids
379 .entry(func.id.clone())
380 .or_insert_with(|| nanoid::nanoid!())
381 .clone();
382 let tool_call =
383 streaming::RawStreamingToolCall::new(func.id, func.name, func.arguments)
384 .with_internal_call_id(internal_call_id)
385 .with_call_id(func.call_id);
386
387 if emit_completed_tool_calls_immediately {
388 immediate.push(streaming::RawStreamingChoice::ToolCall(tool_call));
389 } else {
390 self.tool_calls
391 .push(streaming::RawStreamingChoice::ToolCall(tool_call));
392 }
393 }
394 Output::Reasoning {
395 id,
396 summary,
397 encrypted_content,
398 ..
399 } => {
400 immediate.extend(reasoning_choices_from_done_item(
401 &id,
402 &summary,
403 encrypted_content.as_deref(),
404 ));
405 }
406 Output::Message(message) => {
407 immediate.push(streaming::RawStreamingChoice::MessageId(message.id));
408 }
409 Output::Unknown => {}
410 }
411 }
412
413 fn finish(mut self) -> Vec<StreamingRawChoice> {
414 let mut choices = Vec::new();
415 choices.append(&mut self.tool_calls);
416 choices.push(RawStreamingChoice::FinalResponse(
417 StreamingCompletionResponse {
418 usage: self.final_usage,
419 },
420 ));
421 choices
422 }
423}
424
425pub(crate) fn raw_choices_from_sse_body(
426 body: &str,
427 initial_usage: ResponsesUsage,
428 provider_name: &str,
429) -> Result<Vec<StreamingRawChoice>, CompletionError> {
430 let mut raw_choices = Vec::new();
431 let mut accumulator = RawChoiceAccumulator::new(initial_usage);
432 let options = ResponsesStreamOptions::strict();
433
434 for line in body.lines() {
435 let data = line
436 .strip_prefix("data:")
437 .map(str::trim)
438 .unwrap_or_default();
439 if data.is_empty() || data == "[DONE]" {
440 continue;
441 }
442
443 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
444 match chunk {
445 StreamingCompletionChunk::Delta(chunk) => {
446 raw_choices.extend(accumulator.decode_item_chunk(chunk, options));
447 }
448 StreamingCompletionChunk::Response(chunk) => {
449 let ResponseChunk { kind, response, .. } = *chunk;
450 accumulator.record_response_chunk(kind, response, provider_name, options)?;
451 }
452 }
453 continue;
454 }
455
456 let value = match serde_json::from_str::<serde_json::Value>(data) {
457 Ok(value) => value,
458 Err(_) => continue,
459 };
460
461 match value.get("type").and_then(serde_json::Value::as_str) {
462 Some("response.output_text.delta") | Some("response.refusal.delta") => {
463 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
464 raw_choices.push(streaming::RawStreamingChoice::Message(delta.to_owned()));
465 }
466 }
467 Some("response.reasoning_summary_text.delta") => {
468 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
469 raw_choices.push(streaming::RawStreamingChoice::ReasoningDelta {
470 id: None,
471 reasoning: delta.to_owned(),
472 });
473 }
474 }
475 Some("response.output_item.added") => {
476 if let Some(item) = value
477 .get("item")
478 .cloned()
479 .and_then(|item| serde_json::from_value::<Output>(item).ok())
480 && let Output::FunctionCall(func) = item
481 {
482 let internal_call_id = accumulator
483 .tool_call_internal_ids
484 .entry(func.id.clone())
485 .or_insert_with(|| nanoid::nanoid!())
486 .clone();
487 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
488 id: func.id,
489 internal_call_id,
490 content: streaming::ToolCallDeltaContent::Name(func.name),
491 });
492 }
493 }
494 Some("response.output_item.done") => {
495 if let Some(item) = value
496 .get("item")
497 .cloned()
498 .and_then(|item| serde_json::from_value::<Output>(item).ok())
499 {
500 accumulator.push_output_item_done(item, &mut raw_choices, false);
501 }
502 }
503 Some("response.function_call_arguments.delta") => {
504 if let (Some(item_id), Some(delta)) = (
505 value.get("item_id").and_then(serde_json::Value::as_str),
506 value.get("delta").and_then(serde_json::Value::as_str),
507 ) {
508 let internal_call_id = accumulator
509 .tool_call_internal_ids
510 .entry(item_id.to_owned())
511 .or_insert_with(|| nanoid::nanoid!())
512 .clone();
513 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
514 id: item_id.to_owned(),
515 internal_call_id,
516 content: streaming::ToolCallDeltaContent::Delta(delta.to_owned()),
517 });
518 }
519 }
520 Some("response.completed") | Some("response.failed") | Some("response.incomplete") => {
521 if let Some(response) = value.get("response").cloned() {
522 let response = serde_json::from_value::<CompletionResponse>(response)?;
523 let Some(kind) = (match value.get("type").and_then(serde_json::Value::as_str) {
524 Some("response.completed") => Some(ResponseChunkKind::ResponseCompleted),
525 Some("response.failed") => Some(ResponseChunkKind::ResponseFailed),
526 Some("response.incomplete") => Some(ResponseChunkKind::ResponseIncomplete),
527 _ => None,
528 }) else {
529 continue;
530 };
531 accumulator.record_response_chunk(kind, response, provider_name, options)?;
532 }
533 }
534 Some("error") => {
535 let message = value
536 .get("error")
537 .and_then(|error| error.get("message"))
538 .and_then(serde_json::Value::as_str)
539 .unwrap_or(data);
540 return Err(CompletionError::ProviderError(message.to_owned()));
541 }
542 _ => {}
543 }
544 }
545
546 raw_choices.extend(accumulator.finish());
547 Ok(raw_choices)
548}
549
550pub(crate) async fn completion_response_from_sse_body(
551 body: &str,
552 raw_response: CompletionResponse,
553 provider_name: &str,
554) -> Result<completion::CompletionResponse<CompletionResponse>, CompletionError> {
555 let raw_choices = raw_choices_from_sse_body(
556 body,
557 raw_response
558 .usage
559 .clone()
560 .unwrap_or_else(ResponsesUsage::new),
561 provider_name,
562 )?;
563 let stream = futures::stream::iter(
564 raw_choices
565 .into_iter()
566 .map(Ok::<_, CompletionError>)
567 .collect::<Vec<_>>(),
568 );
569 let mut stream = crate::streaming::StreamingCompletionResponse::stream(Box::pin(stream));
570
571 while let Some(item) = stream.next().await {
572 item?;
573 }
574
575 if choice_is_empty(&stream.choice) {
576 return Err(CompletionError::ResponseError(
577 "Response contained no parts".to_owned(),
578 ));
579 }
580
581 Ok(completion::CompletionResponse {
582 usage: stream
583 .response
584 .as_ref()
585 .and_then(GetTokenUsage::token_usage)
586 .unwrap_or_else(|| usage_from_raw_response(&raw_response)),
587 message_id: stream
588 .message_id
589 .clone()
590 .or_else(|| message_id_from_response(&raw_response)),
591 choice: stream.choice,
592 raw_response,
593 })
594}
595
596fn choice_is_empty(choice: &crate::OneOrMany<completion::AssistantContent>) -> bool {
597 choice.iter().all(|content| match content {
598 completion::AssistantContent::Text(text) => text.text.trim().is_empty(),
599 completion::AssistantContent::Reasoning(reasoning) => reasoning.content.is_empty(),
600 completion::AssistantContent::Image(_) => false,
601 completion::AssistantContent::ToolCall(_) => false,
602 })
603}
604
605fn message_id_from_response(response: &CompletionResponse) -> Option<String> {
606 response.output.iter().find_map(|item| match item {
607 Output::Message(message) => Some(message.id.clone()),
608 _ => None,
609 })
610}
611
612fn usage_from_raw_response(response: &CompletionResponse) -> completion::Usage {
613 response
614 .usage
615 .as_ref()
616 .and_then(GetTokenUsage::token_usage)
617 .unwrap_or_default()
618}
619
620pub(crate) fn stream_from_event_source<HttpClient, RequestBody>(
621 event_source: GenericEventSource<HttpClient, RequestBody>,
622 span: tracing::Span,
623 provider_name: &'static str,
624) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
625where
626 HttpClient: HttpClientExt + Clone + 'static,
627 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
628{
629 stream_from_event_source_with_options(
630 event_source,
631 span,
632 provider_name,
633 ResponsesStreamOptions::strict(),
634 )
635}
636
637pub(crate) fn stream_from_event_source_with_options<HttpClient, RequestBody>(
638 mut event_source: GenericEventSource<HttpClient, RequestBody>,
639 span: tracing::Span,
640 provider_name: &'static str,
641 options: ResponsesStreamOptions,
642) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
643where
644 HttpClient: HttpClientExt + Clone + 'static,
645 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
646{
647 let stream = stream! {
648 let mut accumulator = RawChoiceAccumulator::new(ResponsesUsage::new());
649 let span = tracing::Span::current();
650
651 let mut terminated_with_error = false;
652
653 while let Some(event_result) = event_source.next().await {
654 match event_result {
655 Ok(Event::Open) => {
656 tracing::trace!("SSE connection opened");
657 continue;
658 }
659 Ok(Event::Message(evt)) => {
660 if evt.data.trim().is_empty() || evt.data == "[DONE]" {
661 continue;
662 }
663
664 let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
665
666 let Ok(data) = data else {
667 let Err(err) = data else {
668 continue;
669 };
670 debug!(
671 "Couldn't deserialize SSE data as StreamingCompletionChunk: {:?}",
672 err
673 );
674 continue;
675 };
676
677 match data {
678 StreamingCompletionChunk::Delta(chunk) => {
679 for choice in accumulator.decode_item_chunk(chunk, options) {
680 yield Ok(choice);
681 }
682 }
683 StreamingCompletionChunk::Response(chunk) => {
684 let ResponseChunk { kind, response, .. } = *chunk;
685 if matches!(kind, ResponseChunkKind::ResponseCompleted) {
686 span.record("gen_ai.response.id", response.id.as_str());
687 span.record("gen_ai.response.model", response.model.as_str());
688 }
689 if let Err(error) =
690 accumulator.record_response_chunk(kind, response, provider_name, options)
691 {
692 terminated_with_error = true;
693 yield Err(error);
694 break;
695 }
696 }
697 }
698 }
699 Err(crate::http_client::Error::StreamEnded) => {
700 event_source.close();
701 }
702 Err(error) => {
703 tracing::error!(?error, "SSE error");
704 terminated_with_error = true;
705 yield Err(CompletionError::ProviderError(error.to_string()));
706 break;
707 }
708 }
709 }
710
711 event_source.close();
712
713 if terminated_with_error {
714 return;
715 }
716
717 let final_usage = accumulator.final_usage.clone();
718
719 for tool_call in accumulator.finish() {
720 yield Ok(tool_call)
721 }
722
723 span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
724 span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
725 let cached_tokens = final_usage
726 .input_tokens_details
727 .as_ref()
728 .map(|d| d.cached_tokens)
729 .unwrap_or(0);
730 span.record("gen_ai.usage.cache_read.input_tokens", cached_tokens);
731
732 }
733 .instrument(span);
734
735 streaming::StreamingCompletionResponse::stream(Box::pin(stream))
736}
737
738#[derive(Debug, Serialize, Deserialize, Clone)]
741pub struct ItemChunk {
742 pub item_id: Option<String>,
744 pub output_index: u64,
746 #[serde(flatten)]
748 pub data: ItemChunkKind,
749}
750
751#[derive(Debug, Serialize, Deserialize, Clone)]
753#[serde(tag = "type")]
754pub enum ItemChunkKind {
755 #[serde(rename = "response.output_item.added")]
756 OutputItemAdded(StreamingItemDoneOutput),
757 #[serde(rename = "response.output_item.done")]
758 OutputItemDone(StreamingItemDoneOutput),
759 #[serde(rename = "response.content_part.added")]
760 ContentPartAdded(ContentPartChunk),
761 #[serde(rename = "response.content_part.done")]
762 ContentPartDone(ContentPartChunk),
763 #[serde(rename = "response.output_text.delta")]
764 OutputTextDelta(DeltaTextChunk),
765 #[serde(rename = "response.output_text.done")]
766 OutputTextDone(OutputTextChunk),
767 #[serde(rename = "response.refusal.delta")]
768 RefusalDelta(DeltaTextChunk),
769 #[serde(rename = "response.refusal.done")]
770 RefusalDone(RefusalTextChunk),
771 #[serde(rename = "response.function_call_arguments.delta")]
772 FunctionCallArgsDelta(DeltaTextChunkWithItemId),
773 #[serde(rename = "response.function_call_arguments.done")]
774 FunctionCallArgsDone(ArgsTextChunk),
775 #[serde(rename = "response.reasoning_summary_part.added")]
776 ReasoningSummaryPartAdded(SummaryPartChunk),
777 #[serde(rename = "response.reasoning_summary_part.done")]
778 ReasoningSummaryPartDone(SummaryPartChunk),
779 #[serde(rename = "response.reasoning_summary_text.delta")]
780 ReasoningSummaryTextDelta(SummaryTextChunk),
781 #[serde(rename = "response.reasoning_summary_text.done")]
782 ReasoningSummaryTextDone(SummaryTextChunk),
783 #[serde(other)]
786 Unknown,
787}
788
789#[derive(Debug, Serialize, Deserialize, Clone)]
790pub struct StreamingItemDoneOutput {
791 pub sequence_number: u64,
792 pub item: Output,
793}
794
795#[derive(Debug, Serialize, Deserialize, Clone)]
796pub struct ContentPartChunk {
797 pub content_index: u64,
798 pub sequence_number: u64,
799 pub part: ContentPartChunkPart,
800}
801
802#[derive(Debug, Serialize, Deserialize, Clone)]
803#[serde(tag = "type", rename_all = "snake_case")]
804pub enum ContentPartChunkPart {
805 OutputText { text: String },
806 SummaryText { text: String },
807}
808
809#[derive(Debug, Serialize, Deserialize, Clone)]
810pub struct DeltaTextChunk {
811 pub content_index: u64,
812 pub sequence_number: u64,
813 pub delta: String,
814}
815
816#[derive(Debug, Serialize, Deserialize, Clone)]
817pub struct DeltaTextChunkWithItemId {
818 #[serde(default, skip_serializing_if = "Option::is_none")]
819 pub content_index: Option<u64>,
820 pub sequence_number: u64,
821 pub delta: String,
822}
823
824#[derive(Debug, Serialize, Deserialize, Clone)]
825pub struct OutputTextChunk {
826 pub content_index: u64,
827 pub sequence_number: u64,
828 pub text: String,
829}
830
831#[derive(Debug, Serialize, Deserialize, Clone)]
832pub struct RefusalTextChunk {
833 pub content_index: u64,
834 pub sequence_number: u64,
835 pub refusal: String,
836}
837
838#[derive(Debug, Serialize, Deserialize, Clone)]
839pub struct ArgsTextChunk {
840 #[serde(default, skip_serializing_if = "Option::is_none")]
841 pub content_index: Option<u64>,
842 pub sequence_number: u64,
843 pub arguments: serde_json::Value,
844}
845
846#[derive(Debug, Serialize, Deserialize, Clone)]
847pub struct SummaryPartChunk {
848 pub summary_index: u64,
849 pub sequence_number: u64,
850 pub part: SummaryPartChunkPart,
851}
852
853#[derive(Debug, Serialize, Deserialize, Clone)]
854pub struct SummaryTextChunk {
855 pub summary_index: u64,
856 pub sequence_number: u64,
857 pub delta: String,
858}
859
860#[derive(Debug, Serialize, Deserialize, Clone)]
861#[serde(tag = "type", rename_all = "snake_case")]
862pub enum SummaryPartChunkPart {
863 SummaryText { text: String },
864}
865
866impl<Ext, H> GenericResponsesCompletionModel<Ext, H>
867where
868 crate::client::Client<Ext, H>:
869 HttpClientExt + Clone + std::fmt::Debug + WasmCompatSend + 'static,
870 Ext: crate::client::Provider + Clone + 'static,
871 H: Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
872{
873 pub(crate) async fn stream(
874 &self,
875 completion_request: crate::completion::CompletionRequest,
876 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
877 {
878 let mut request = self.create_completion_request(completion_request)?;
879 request.stream = Some(true);
880
881 if enabled!(Level::TRACE) {
882 tracing::trace!(
883 target: "rig::completions",
884 "OpenAI Responses streaming completion request: {}",
885 serde_json::to_string_pretty(&request)?
886 );
887 }
888
889 let body = serde_json::to_vec(&request)?;
890
891 let req = self
892 .client
893 .post("/responses")?
894 .body(body)
895 .map_err(|e| CompletionError::HttpError(e.into()))?;
896
897 let span = if tracing::Span::current().is_disabled() {
900 info_span!(
901 target: "rig::completions",
902 "chat_streaming",
903 gen_ai.operation.name = "chat_streaming",
904 gen_ai.provider.name = tracing::field::Empty,
905 gen_ai.request.model = tracing::field::Empty,
906 gen_ai.response.id = tracing::field::Empty,
907 gen_ai.response.model = tracing::field::Empty,
908 gen_ai.usage.output_tokens = tracing::field::Empty,
909 gen_ai.usage.input_tokens = tracing::field::Empty,
910 gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
911 )
912 } else {
913 tracing::Span::current()
914 };
915 span.record("gen_ai.provider.name", "openai");
916 span.record("gen_ai.request.model", &self.model);
917 let client = self.client.clone();
918 let event_source = GenericEventSource::new(client, req);
919
920 Ok(stream_from_event_source(event_source, span, "OpenAI"))
921 }
922}
923
924#[cfg(test)]
925mod tests {
926 use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
927 use crate::completion::CompletionModel;
928 use crate::message::ReasoningContent;
929 use crate::providers::internal::openai_chat_completions_compatible::test_support::sse_bytes_from_json_events;
930 use crate::providers::openai::responses_api::{
931 AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
932 ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
933 };
934 use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
935 use crate::test_utils::MockStreamingClient;
936 use futures::StreamExt;
937 use serde_json::{self, json};
938
939 use crate::{
940 client::CompletionClient, completion::Message, providers::openai, streaming::StreamingChat,
941 test_utils::MockExampleTool,
942 };
943
944 fn sample_response(status: ResponseStatus) -> CompletionResponse {
945 CompletionResponse {
946 id: "resp_123".to_string(),
947 object: ResponseObject::Response,
948 created_at: 0,
949 status,
950 error: None,
951 incomplete_details: None,
952 instructions: None,
953 max_output_tokens: None,
954 model: "gpt-5.4".to_string(),
955 provider_reasoning: None,
956 usage: None,
957 output: Vec::new(),
958 tools: Vec::new(),
959 additional_parameters: AdditionalParameters::default(),
960 }
961 }
962
963 async fn first_error_from_event(
964 event: serde_json::Value,
965 ) -> crate::completion::CompletionError {
966 let client = openai::Client::builder()
967 .http_client(MockStreamingClient {
968 sse_bytes: sse_bytes_from_json_events(&[event]),
969 })
970 .api_key("test-key")
971 .build()
972 .expect("client should build");
973 let model = client.completion_model("gpt-5.4");
974 let request = model.completion_request("hello").build();
975 let mut stream = model.stream(request).await.expect("stream should start");
976
977 stream
978 .next()
979 .await
980 .expect("stream should yield an item")
981 .expect_err("stream should surface a provider error")
982 }
983
984 async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
985 let client = openai::Client::builder()
986 .http_client(MockStreamingClient {
987 sse_bytes: sse_bytes_from_json_events(&[event]),
988 })
989 .api_key("test-key")
990 .build()
991 .expect("client should build");
992 let model = client.completion_model("gpt-5.4");
993 let request = model.completion_request("hello").build();
994 let mut stream = model.stream(request).await.expect("stream should start");
995
996 while let Some(item) = stream.next().await {
997 match item.expect("completed stream should not error") {
998 StreamedAssistantContent::Final(res) => return res.usage,
999 _ => continue,
1000 }
1001 }
1002
1003 panic!("stream should yield a final response");
1004 }
1005
1006 #[test]
1007 fn reasoning_done_item_emits_summary_then_encrypted() {
1008 let summary = vec![
1009 ReasoningSummary::SummaryText {
1010 text: "step 1".to_string(),
1011 },
1012 ReasoningSummary::SummaryText {
1013 text: "step 2".to_string(),
1014 },
1015 ];
1016 let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
1017
1018 assert_eq!(choices.len(), 3);
1019 assert!(matches!(
1020 choices.first(),
1021 Some(RawStreamingChoice::Reasoning {
1022 id: Some(id),
1023 content: ReasoningContent::Summary(text),
1024 }) if id == "rs_1" && text == "step 1"
1025 ));
1026 assert!(matches!(
1027 choices.get(1),
1028 Some(RawStreamingChoice::Reasoning {
1029 id: Some(id),
1030 content: ReasoningContent::Summary(text),
1031 }) if id == "rs_1" && text == "step 2"
1032 ));
1033 assert!(matches!(
1034 choices.get(2),
1035 Some(RawStreamingChoice::Reasoning {
1036 id: Some(id),
1037 content: ReasoningContent::Encrypted(data),
1038 }) if id == "rs_1" && data == "enc_blob"
1039 ));
1040 }
1041
1042 #[test]
1043 fn reasoning_done_item_without_encrypted_emits_summary_only() {
1044 let summary = vec![ReasoningSummary::SummaryText {
1045 text: "only summary".to_string(),
1046 }];
1047 let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
1048
1049 assert_eq!(choices.len(), 1);
1050 assert!(matches!(
1051 choices.first(),
1052 Some(RawStreamingChoice::Reasoning {
1053 id: Some(id),
1054 content: ReasoningContent::Summary(text),
1055 }) if id == "rs_2" && text == "only summary"
1056 ));
1057 }
1058
1059 #[test]
1060 fn content_part_added_deserializes_snake_case_part_type() {
1061 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1062 "type": "response.content_part.added",
1063 "item_id": "msg_1",
1064 "output_index": 0,
1065 "content_index": 0,
1066 "sequence_number": 3,
1067 "part": {
1068 "type": "output_text",
1069 "text": "hello"
1070 }
1071 }))
1072 .expect("content part event should deserialize");
1073
1074 assert!(matches!(
1075 chunk,
1076 StreamingCompletionChunk::Delta(chunk)
1077 if matches!(
1078 chunk.data,
1079 ItemChunkKind::ContentPartAdded(_)
1080 )
1081 ));
1082 }
1083
1084 #[test]
1085 fn content_part_done_deserializes_snake_case_part_type() {
1086 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1087 "type": "response.content_part.done",
1088 "item_id": "msg_1",
1089 "output_index": 0,
1090 "content_index": 0,
1091 "sequence_number": 4,
1092 "part": {
1093 "type": "summary_text",
1094 "text": "done"
1095 }
1096 }))
1097 .expect("content part done event should deserialize");
1098
1099 assert!(matches!(
1100 chunk,
1101 StreamingCompletionChunk::Delta(chunk)
1102 if matches!(
1103 chunk.data,
1104 ItemChunkKind::ContentPartDone(_)
1105 )
1106 ));
1107 }
1108
1109 #[test]
1110 fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
1111 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1112 "type": "response.reasoning_summary_part.added",
1113 "item_id": "rs_1",
1114 "output_index": 0,
1115 "summary_index": 0,
1116 "sequence_number": 5,
1117 "part": {
1118 "type": "summary_text",
1119 "text": "step 1"
1120 }
1121 }))
1122 .expect("reasoning summary part event should deserialize");
1123
1124 assert!(matches!(
1125 chunk,
1126 StreamingCompletionChunk::Delta(chunk)
1127 if matches!(
1128 chunk.data,
1129 ItemChunkKind::ReasoningSummaryPartAdded(_)
1130 )
1131 ));
1132 }
1133
1134 #[test]
1135 fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
1136 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1137 "type": "response.reasoning_summary_part.done",
1138 "item_id": "rs_1",
1139 "output_index": 0,
1140 "summary_index": 0,
1141 "sequence_number": 6,
1142 "part": {
1143 "type": "summary_text",
1144 "text": "step 2"
1145 }
1146 }))
1147 .expect("reasoning summary part done event should deserialize");
1148
1149 assert!(matches!(
1150 chunk,
1151 StreamingCompletionChunk::Delta(chunk)
1152 if matches!(
1153 chunk.data,
1154 ItemChunkKind::ReasoningSummaryPartDone(_)
1155 )
1156 ));
1157 }
1158
1159 #[tokio::test]
1160 async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
1161 let mut response = sample_response(ResponseStatus::Failed);
1162 response.error = Some(ResponseError {
1163 code: String::new(),
1164 message: "maximum context length exceeded".to_string(),
1165 });
1166
1167 let event = json!({
1168 "type": "response.failed",
1169 "sequence_number": 1,
1170 "response": response,
1171 });
1172
1173 let err = first_error_from_event(event).await;
1174
1175 assert_eq!(
1176 err.to_string(),
1177 "ProviderError: maximum context length exceeded"
1178 );
1179 }
1180
1181 #[tokio::test]
1182 async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
1183 let mut response = sample_response(ResponseStatus::Failed);
1184 response.error = Some(ResponseError {
1185 code: "context_length_exceeded".to_string(),
1186 message: "maximum context length exceeded".to_string(),
1187 });
1188
1189 let event = json!({
1190 "type": "response.failed",
1191 "sequence_number": 1,
1192 "response": response,
1193 });
1194
1195 let err = first_error_from_event(event).await;
1196
1197 assert_eq!(
1198 err.to_string(),
1199 "ProviderError: context_length_exceeded: maximum context length exceeded"
1200 );
1201 }
1202
1203 #[tokio::test]
1204 async fn response_incomplete_chunk_uses_incomplete_details_reason() {
1205 let mut response = sample_response(ResponseStatus::Incomplete);
1206 response.incomplete_details = Some(IncompleteDetailsReason {
1207 reason: "max_output_tokens".to_string(),
1208 });
1209
1210 let event = json!({
1211 "type": "response.incomplete",
1212 "sequence_number": 1,
1213 "response": response,
1214 });
1215
1216 let err = first_error_from_event(event).await;
1217
1218 assert_eq!(
1219 err.to_string(),
1220 "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
1221 );
1222 }
1223
1224 #[tokio::test]
1225 async fn response_failed_chunk_terminates_stream_without_followup_items() {
1226 let tool_call_done = json!({
1227 "type": "response.output_item.done",
1228 "sequence_number": 1,
1229 "item": {
1230 "type": "function_call",
1231 "id": "fc_123",
1232 "arguments": "{}",
1233 "call_id": "call_123",
1234 "name": "example_tool",
1235 "status": "completed"
1236 }
1237 });
1238
1239 let mut response = sample_response(ResponseStatus::Failed);
1240 response.error = Some(ResponseError {
1241 code: "server_error".to_string(),
1242 message: "response stream failed".to_string(),
1243 });
1244
1245 let failed = json!({
1246 "type": "response.failed",
1247 "sequence_number": 2,
1248 "response": response,
1249 });
1250
1251 let client = openai::Client::builder()
1252 .http_client(MockStreamingClient {
1253 sse_bytes: sse_bytes_from_json_events(&[tool_call_done, failed]),
1254 })
1255 .api_key("test-key")
1256 .build()
1257 .expect("client should build");
1258 let model = client.completion_model("gpt-5.4");
1259 let request = model.completion_request("hello").build();
1260 let mut stream = model.stream(request).await.expect("stream should start");
1261
1262 let err = stream
1263 .next()
1264 .await
1265 .expect("stream should yield an item")
1266 .expect_err("stream should surface a provider error");
1267 assert_eq!(
1268 err.to_string(),
1269 "ProviderError: server_error: response stream failed"
1270 );
1271 assert!(
1272 stream.next().await.is_none(),
1273 "stream should terminate immediately after the first terminal error"
1274 );
1275 }
1276
1277 #[tokio::test]
1278 async fn response_completed_chunk_populates_final_usage() {
1279 let mut response = sample_response(ResponseStatus::Completed);
1280 response.usage = Some(ResponsesUsage {
1281 input_tokens: 10,
1282 input_tokens_details: None,
1283 output_tokens: 5,
1284 output_tokens_details: Some(OutputTokensDetails {
1285 reasoning_tokens: 0,
1286 }),
1287 total_tokens: 15,
1288 });
1289
1290 let event = json!({
1291 "type": "response.completed",
1292 "sequence_number": 1,
1293 "response": response,
1294 });
1295
1296 let usage = final_usage_from_event(event).await;
1297 assert_eq!(usage.input_tokens, 10);
1298 assert_eq!(usage.output_tokens, 5);
1299 assert_eq!(usage.total_tokens, 15);
1300 }
1301
1302 #[tokio::test]
1303 async fn done_sentinel_is_ignored_without_debug_parse_noise() {
1304 use std::io::{self, Write};
1305 use std::sync::{Arc, Mutex};
1306
1307 #[derive(Clone)]
1308 struct SharedWriter(Arc<Mutex<Vec<u8>>>);
1309
1310 impl Write for SharedWriter {
1311 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1312 self.0
1313 .lock()
1314 .expect("log buffer mutex should not be poisoned")
1315 .extend_from_slice(buf);
1316 Ok(buf.len())
1317 }
1318
1319 fn flush(&mut self) -> io::Result<()> {
1320 Ok(())
1321 }
1322 }
1323
1324 let mut response = sample_response(ResponseStatus::Completed);
1325 response.usage = Some(ResponsesUsage {
1326 input_tokens: 4,
1327 input_tokens_details: None,
1328 output_tokens: 2,
1329 output_tokens_details: Some(OutputTokensDetails {
1330 reasoning_tokens: 0,
1331 }),
1332 total_tokens: 6,
1333 });
1334
1335 let captured = Arc::new(Mutex::new(Vec::new()));
1336 let subscriber = tracing_subscriber::fmt()
1337 .with_max_level(tracing::Level::DEBUG)
1338 .with_ansi(false)
1339 .without_time()
1340 .with_writer({
1341 let captured = captured.clone();
1342 move || SharedWriter(captured.clone())
1343 })
1344 .finish();
1345 let _guard = tracing::subscriber::set_default(subscriber);
1346
1347 let client = openai::Client::builder()
1348 .http_client(MockStreamingClient {
1349 sse_bytes: bytes::Bytes::from(format!(
1350 "data: {}\n\ndata: [DONE]\n\n",
1351 serde_json::to_string(&json!({
1352 "type": "response.completed",
1353 "sequence_number": 1,
1354 "response": response,
1355 }))
1356 .expect("response event should serialize")
1357 )),
1358 })
1359 .api_key("test-key")
1360 .build()
1361 .expect("client should build");
1362 let model = client.completion_model("gpt-5.4");
1363 let request = model.completion_request("hello").build();
1364 let mut stream = model.stream(request).await.expect("stream should start");
1365
1366 let mut final_usage = None;
1367 while let Some(item) = stream.next().await {
1368 if let StreamedAssistantContent::Final(response) =
1369 item.expect("stream should complete successfully")
1370 {
1371 final_usage = Some(response.usage);
1372 }
1373 }
1374
1375 let usage = final_usage.expect("expected final response");
1376 assert_eq!(usage.input_tokens, 4);
1377 assert_eq!(usage.output_tokens, 2);
1378 assert_eq!(usage.total_tokens, 6);
1379
1380 let logs = String::from_utf8(
1381 captured
1382 .lock()
1383 .expect("log buffer mutex should not be poisoned")
1384 .clone(),
1385 )
1386 .expect("captured logs should be valid UTF-8");
1387 assert!(
1388 !logs.contains("Couldn't deserialize SSE data as StreamingCompletionChunk"),
1389 "expected [DONE] to bypass the parse-failure debug path, logs were: {logs}"
1390 );
1391 }
1392
1393 #[tokio::test]
1395 #[ignore = "requires API key"]
1396 async fn test_openai_streaming_tools_reasoning() {
1397 let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
1398 let client = openai::Client::new(&api_key).expect("Failed to build client");
1399 let agent = client
1400 .agent("gpt-5.2")
1401 .max_tokens(8192)
1402 .tool(MockExampleTool)
1403 .additional_params(serde_json::json!({
1404 "reasoning": {"effort": "high"}
1405 }))
1406 .build();
1407
1408 let chat_history: Vec<Message> = Vec::new();
1409 let mut stream = agent
1410 .stream_chat("Call my example tool", &chat_history)
1411 .multi_turn(5)
1412 .await;
1413
1414 while let Some(item) = stream.next().await {
1415 println!("Got item: {item:?}");
1416 }
1417 }
1418}