1use crate::completion::{self, CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{ReasoningSummary, ResponsesUsage};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use crate::wasm_compat::WasmCompatSend;
11use async_stream::stream;
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use tracing::{Level, debug, enabled, info_span};
15use tracing_futures::Instrument as _;
16
17use super::{CompletionResponse, GenericResponsesCompletionModel, Output};
18
19type StreamingRawChoice = RawStreamingChoice<StreamingCompletionResponse>;
20
21#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32 Response(Box<ResponseChunk>),
33 Delta(ItemChunk),
34}
35
36#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39 pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44 id: &str,
45 summary: &[ReasoningSummary],
46 encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48 let mut choices = summary
49 .iter()
50 .map(|reasoning_summary| match reasoning_summary {
51 ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52 id: Some(id.to_owned()),
53 content: ReasoningContent::Summary(text.to_owned()),
54 },
55 })
56 .collect::<Vec<_>>();
57
58 if let Some(encrypted_content) = encrypted_content {
59 choices.push(RawStreamingChoice::Reasoning {
60 id: Some(id.to_owned()),
61 content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62 });
63 }
64
65 choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69 fn token_usage(&self) -> Option<crate::completion::Usage> {
70 self.usage.token_usage()
71 }
72}
73
74#[derive(Debug, Serialize, Deserialize, Clone)]
76pub struct ResponseChunk {
77 #[serde(rename = "type")]
79 pub kind: ResponseChunkKind,
80 pub response: CompletionResponse,
82 pub sequence_number: u64,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
89pub enum ResponseChunkKind {
90 #[serde(rename = "response.created")]
91 ResponseCreated,
92 #[serde(rename = "response.in_progress")]
93 ResponseInProgress,
94 #[serde(rename = "response.completed")]
95 ResponseCompleted,
96 #[serde(rename = "response.failed")]
97 ResponseFailed,
98 #[serde(rename = "response.incomplete")]
99 ResponseIncomplete,
100}
101
102fn response_chunk_error_message(
103 kind: &ResponseChunkKind,
104 response: &CompletionResponse,
105 provider_name: &str,
106) -> Option<String> {
107 match kind {
108 ResponseChunkKind::ResponseFailed => Some(response_error_message(
109 response.error.as_ref(),
110 &format!("{provider_name} response stream returned a failed response"),
111 )),
112 ResponseChunkKind::ResponseIncomplete => {
113 let reason = response
114 .incomplete_details
115 .as_ref()
116 .map(|details| details.reason.as_str())
117 .unwrap_or("unknown reason");
118
119 Some(format!(
120 "{provider_name} response stream was incomplete: {reason}"
121 ))
122 }
123 _ => None,
124 }
125}
126
127fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
128 if let Some(error) = error {
129 if error.code.is_empty() {
130 error.message.clone()
131 } else {
132 format!("{}: {}", error.code, error.message)
133 }
134 } else {
135 fallback.to_string()
136 }
137}
138
139#[derive(Clone, Copy)]
140pub(crate) enum ResponsesStreamOptions {
141 Strict,
142 StrictWithImmediateToolCalls,
143}
144
145impl ResponsesStreamOptions {
146 pub(crate) const fn strict() -> Self {
147 Self::Strict
148 }
149
150 pub(crate) const fn strict_with_immediate_tool_calls() -> Self {
151 Self::StrictWithImmediateToolCalls
152 }
153
154 const fn errors_on_terminal_response(self) -> bool {
155 true
156 }
157
158 const fn emits_completed_tool_calls_immediately(self) -> bool {
159 matches!(self, Self::StrictWithImmediateToolCalls)
160 }
161}
162
163pub(crate) fn parse_sse_completion_body(
164 body: &str,
165 provider_name: &str,
166) -> Result<CompletionResponse, CompletionError> {
167 let mut completed = None;
168 let mut provider_error = None;
169
170 for line in body.lines() {
171 let data = line
172 .strip_prefix("data:")
173 .map(str::trim)
174 .unwrap_or_default();
175 if data.is_empty() || data == "[DONE]" {
176 continue;
177 }
178
179 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
180 if let StreamingCompletionChunk::Response(chunk) = chunk {
181 let ResponseChunk { kind, response, .. } = *chunk;
182 match kind {
183 ResponseChunkKind::ResponseCompleted => {
184 completed = Some(response);
185 break;
186 }
187 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
188 provider_error =
189 response_chunk_error_message(&kind, &response, provider_name);
190 }
191 _ => {}
192 }
193 }
194 continue;
195 }
196
197 let value = match serde_json::from_str::<serde_json::Value>(data) {
198 Ok(value) => value,
199 Err(_) => continue,
200 };
201
202 match value.get("type").and_then(serde_json::Value::as_str) {
203 Some("response.completed") => {
204 if let Some(response) = value.get("response") {
205 completed = Some(serde_json::from_value(response.clone())?);
206 break;
207 }
208 }
209 Some("response.failed") | Some("response.incomplete") => {
210 provider_error = value
211 .get("response")
212 .cloned()
213 .and_then(|response| {
214 serde_json::from_value::<CompletionResponse>(response).ok()
215 })
216 .and_then(|response| {
217 let kind = if value.get("type").and_then(serde_json::Value::as_str)
218 == Some("response.failed")
219 {
220 ResponseChunkKind::ResponseFailed
221 } else {
222 ResponseChunkKind::ResponseIncomplete
223 };
224 response_chunk_error_message(&kind, &response, provider_name)
225 })
226 .or_else(|| {
227 value
228 .get("error")
229 .and_then(|error| error.get("message"))
230 .and_then(serde_json::Value::as_str)
231 .map(ToOwned::to_owned)
232 })
233 .or_else(|| Some(data.to_string()));
234 }
235 Some("error") => {
236 provider_error = value
237 .get("error")
238 .and_then(|error| error.get("message"))
239 .and_then(serde_json::Value::as_str)
240 .map(ToOwned::to_owned)
241 .or_else(|| Some(data.to_string()));
242 }
243 _ => {}
244 }
245 }
246
247 completed.ok_or_else(|| {
248 CompletionError::ProviderError(
249 provider_error.unwrap_or_else(|| {
250 format!("{provider_name} stream did not yield response.completed")
251 }),
252 )
253 })
254}
255
256struct RawChoiceAccumulator {
257 final_usage: ResponsesUsage,
258 tool_calls: Vec<StreamingRawChoice>,
259 tool_call_internal_ids: std::collections::HashMap<String, String>,
260}
261
262impl RawChoiceAccumulator {
263 fn new(initial_usage: ResponsesUsage) -> Self {
264 Self {
265 final_usage: initial_usage,
266 tool_calls: Vec::new(),
267 tool_call_internal_ids: std::collections::HashMap::new(),
268 }
269 }
270
271 fn decode_item_chunk(
272 &mut self,
273 item: ItemChunkKind,
274 options: ResponsesStreamOptions,
275 ) -> Vec<StreamingRawChoice> {
276 let mut immediate = Vec::new();
277
278 match item {
279 ItemChunkKind::OutputItemAdded(StreamingItemDoneOutput {
280 item: Output::FunctionCall(func),
281 ..
282 }) => {
283 let internal_call_id = self
284 .tool_call_internal_ids
285 .entry(func.id.clone())
286 .or_insert_with(|| nanoid::nanoid!())
287 .clone();
288 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
289 id: func.id,
290 internal_call_id,
291 content: streaming::ToolCallDeltaContent::Name(func.name),
292 });
293 }
294 ItemChunkKind::OutputItemDone(message) => {
295 self.push_output_item_done(
296 message.item,
297 &mut immediate,
298 options.emits_completed_tool_calls_immediately(),
299 );
300 }
301 ItemChunkKind::OutputTextDelta(delta) => {
302 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
303 }
304 ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
305 immediate.push(streaming::RawStreamingChoice::ReasoningDelta {
306 id: None,
307 reasoning: delta.delta,
308 });
309 }
310 ItemChunkKind::RefusalDelta(delta) => {
311 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
312 }
313 ItemChunkKind::FunctionCallArgsDelta(delta) => {
314 let internal_call_id = self
315 .tool_call_internal_ids
316 .entry(delta.item_id.clone())
317 .or_insert_with(|| nanoid::nanoid!())
318 .clone();
319 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
320 id: delta.item_id,
321 internal_call_id,
322 content: streaming::ToolCallDeltaContent::Delta(delta.delta),
323 });
324 }
325 _ => {}
326 }
327
328 immediate
329 }
330
331 fn record_response_chunk(
332 &mut self,
333 kind: ResponseChunkKind,
334 response: CompletionResponse,
335 provider_name: &str,
336 options: ResponsesStreamOptions,
337 ) -> Result<(), CompletionError> {
338 match kind {
339 ResponseChunkKind::ResponseCompleted => {
340 if let Some(usage) = response.usage {
341 self.final_usage = usage;
342 }
343 Ok(())
344 }
345 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete
346 if options.errors_on_terminal_response() =>
347 {
348 let error_message = response_chunk_error_message(&kind, &response, provider_name)
349 .unwrap_or_else(|| {
350 format!(
351 "{provider_name} returned terminal response {:?} without an error message",
352 kind
353 )
354 });
355 Err(CompletionError::ProviderError(error_message))
356 }
357 _ => Ok(()),
358 }
359 }
360
361 fn push_output_item_done(
362 &mut self,
363 item: Output,
364 immediate: &mut Vec<StreamingRawChoice>,
365 emit_completed_tool_calls_immediately: bool,
366 ) {
367 match item {
368 Output::FunctionCall(func) => {
369 let internal_call_id = self
370 .tool_call_internal_ids
371 .entry(func.id.clone())
372 .or_insert_with(|| nanoid::nanoid!())
373 .clone();
374 let tool_call =
375 streaming::RawStreamingToolCall::new(func.id, func.name, func.arguments)
376 .with_internal_call_id(internal_call_id)
377 .with_call_id(func.call_id);
378
379 if emit_completed_tool_calls_immediately {
380 immediate.push(streaming::RawStreamingChoice::ToolCall(tool_call));
381 } else {
382 self.tool_calls
383 .push(streaming::RawStreamingChoice::ToolCall(tool_call));
384 }
385 }
386 Output::Reasoning {
387 id,
388 summary,
389 encrypted_content,
390 ..
391 } => {
392 immediate.extend(reasoning_choices_from_done_item(
393 &id,
394 &summary,
395 encrypted_content.as_deref(),
396 ));
397 }
398 Output::Message(message) => {
399 immediate.push(streaming::RawStreamingChoice::MessageId(message.id));
400 }
401 Output::Unknown => {}
402 }
403 }
404
405 fn finish(mut self) -> Vec<StreamingRawChoice> {
406 let mut choices = Vec::new();
407 choices.append(&mut self.tool_calls);
408 choices.push(RawStreamingChoice::FinalResponse(
409 StreamingCompletionResponse {
410 usage: self.final_usage,
411 },
412 ));
413 choices
414 }
415}
416
417pub(crate) fn raw_choices_from_sse_body(
418 body: &str,
419 initial_usage: ResponsesUsage,
420 provider_name: &str,
421) -> Result<Vec<StreamingRawChoice>, CompletionError> {
422 let mut raw_choices = Vec::new();
423 let mut accumulator = RawChoiceAccumulator::new(initial_usage);
424 let options = ResponsesStreamOptions::strict();
425
426 for line in body.lines() {
427 let data = line
428 .strip_prefix("data:")
429 .map(str::trim)
430 .unwrap_or_default();
431 if data.is_empty() || data == "[DONE]" {
432 continue;
433 }
434
435 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
436 match chunk {
437 StreamingCompletionChunk::Delta(chunk) => {
438 raw_choices.extend(accumulator.decode_item_chunk(chunk.data, options));
439 }
440 StreamingCompletionChunk::Response(chunk) => {
441 let ResponseChunk { kind, response, .. } = *chunk;
442 accumulator.record_response_chunk(kind, response, provider_name, options)?;
443 }
444 }
445 continue;
446 }
447
448 let value = match serde_json::from_str::<serde_json::Value>(data) {
449 Ok(value) => value,
450 Err(_) => continue,
451 };
452
453 match value.get("type").and_then(serde_json::Value::as_str) {
454 Some("response.output_text.delta") | Some("response.refusal.delta") => {
455 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
456 raw_choices.push(streaming::RawStreamingChoice::Message(delta.to_owned()));
457 }
458 }
459 Some("response.reasoning_summary_text.delta") => {
460 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
461 raw_choices.push(streaming::RawStreamingChoice::ReasoningDelta {
462 id: None,
463 reasoning: delta.to_owned(),
464 });
465 }
466 }
467 Some("response.output_item.added") => {
468 if let Some(item) = value
469 .get("item")
470 .cloned()
471 .and_then(|item| serde_json::from_value::<Output>(item).ok())
472 && let Output::FunctionCall(func) = item
473 {
474 let internal_call_id = accumulator
475 .tool_call_internal_ids
476 .entry(func.id.clone())
477 .or_insert_with(|| nanoid::nanoid!())
478 .clone();
479 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
480 id: func.id,
481 internal_call_id,
482 content: streaming::ToolCallDeltaContent::Name(func.name),
483 });
484 }
485 }
486 Some("response.output_item.done") => {
487 if let Some(item) = value
488 .get("item")
489 .cloned()
490 .and_then(|item| serde_json::from_value::<Output>(item).ok())
491 {
492 accumulator.push_output_item_done(item, &mut raw_choices, false);
493 }
494 }
495 Some("response.function_call_arguments.delta") => {
496 if let (Some(item_id), Some(delta)) = (
497 value.get("item_id").and_then(serde_json::Value::as_str),
498 value.get("delta").and_then(serde_json::Value::as_str),
499 ) {
500 let internal_call_id = accumulator
501 .tool_call_internal_ids
502 .entry(item_id.to_owned())
503 .or_insert_with(|| nanoid::nanoid!())
504 .clone();
505 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
506 id: item_id.to_owned(),
507 internal_call_id,
508 content: streaming::ToolCallDeltaContent::Delta(delta.to_owned()),
509 });
510 }
511 }
512 Some("response.completed") | Some("response.failed") | Some("response.incomplete") => {
513 if let Some(response) = value.get("response").cloned() {
514 let response = serde_json::from_value::<CompletionResponse>(response)?;
515 let Some(kind) = (match value.get("type").and_then(serde_json::Value::as_str) {
516 Some("response.completed") => Some(ResponseChunkKind::ResponseCompleted),
517 Some("response.failed") => Some(ResponseChunkKind::ResponseFailed),
518 Some("response.incomplete") => Some(ResponseChunkKind::ResponseIncomplete),
519 _ => None,
520 }) else {
521 continue;
522 };
523 accumulator.record_response_chunk(kind, response, provider_name, options)?;
524 }
525 }
526 Some("error") => {
527 let message = value
528 .get("error")
529 .and_then(|error| error.get("message"))
530 .and_then(serde_json::Value::as_str)
531 .unwrap_or(data);
532 return Err(CompletionError::ProviderError(message.to_owned()));
533 }
534 _ => {}
535 }
536 }
537
538 raw_choices.extend(accumulator.finish());
539 Ok(raw_choices)
540}
541
542pub(crate) async fn completion_response_from_sse_body(
543 body: &str,
544 raw_response: CompletionResponse,
545 provider_name: &str,
546) -> Result<completion::CompletionResponse<CompletionResponse>, CompletionError> {
547 let raw_choices = raw_choices_from_sse_body(
548 body,
549 raw_response
550 .usage
551 .clone()
552 .unwrap_or_else(ResponsesUsage::new),
553 provider_name,
554 )?;
555 let stream = futures::stream::iter(
556 raw_choices
557 .into_iter()
558 .map(Ok::<_, CompletionError>)
559 .collect::<Vec<_>>(),
560 );
561 let mut stream = crate::streaming::StreamingCompletionResponse::stream(Box::pin(stream));
562
563 while let Some(item) = stream.next().await {
564 item?;
565 }
566
567 if choice_is_empty(&stream.choice) {
568 return Err(CompletionError::ResponseError(
569 "Response contained no parts".to_owned(),
570 ));
571 }
572
573 Ok(completion::CompletionResponse {
574 usage: stream
575 .response
576 .as_ref()
577 .and_then(GetTokenUsage::token_usage)
578 .unwrap_or_else(|| usage_from_raw_response(&raw_response)),
579 message_id: stream
580 .message_id
581 .clone()
582 .or_else(|| message_id_from_response(&raw_response)),
583 choice: stream.choice,
584 raw_response,
585 })
586}
587
588fn choice_is_empty(choice: &crate::OneOrMany<completion::AssistantContent>) -> bool {
589 choice.iter().all(|content| match content {
590 completion::AssistantContent::Text(text) => text.text.trim().is_empty(),
591 completion::AssistantContent::Reasoning(reasoning) => reasoning.content.is_empty(),
592 completion::AssistantContent::Image(_) => false,
593 completion::AssistantContent::ToolCall(_) => false,
594 })
595}
596
597fn message_id_from_response(response: &CompletionResponse) -> Option<String> {
598 response.output.iter().find_map(|item| match item {
599 Output::Message(message) => Some(message.id.clone()),
600 _ => None,
601 })
602}
603
604fn usage_from_raw_response(response: &CompletionResponse) -> completion::Usage {
605 response
606 .usage
607 .as_ref()
608 .and_then(GetTokenUsage::token_usage)
609 .unwrap_or_default()
610}
611
612pub(crate) fn stream_from_event_source<HttpClient, RequestBody>(
613 event_source: GenericEventSource<HttpClient, RequestBody>,
614 span: tracing::Span,
615 provider_name: &'static str,
616) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
617where
618 HttpClient: HttpClientExt + Clone + 'static,
619 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
620{
621 stream_from_event_source_with_options(
622 event_source,
623 span,
624 provider_name,
625 ResponsesStreamOptions::strict(),
626 )
627}
628
629pub(crate) fn stream_from_event_source_with_options<HttpClient, RequestBody>(
630 mut event_source: GenericEventSource<HttpClient, RequestBody>,
631 span: tracing::Span,
632 provider_name: &'static str,
633 options: ResponsesStreamOptions,
634) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
635where
636 HttpClient: HttpClientExt + Clone + 'static,
637 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
638{
639 let stream = stream! {
640 let mut accumulator = RawChoiceAccumulator::new(ResponsesUsage::new());
641 let span = tracing::Span::current();
642
643 let mut terminated_with_error = false;
644
645 while let Some(event_result) = event_source.next().await {
646 match event_result {
647 Ok(Event::Open) => {
648 tracing::trace!("SSE connection opened");
649 continue;
650 }
651 Ok(Event::Message(evt)) => {
652 if evt.data.trim().is_empty() || evt.data == "[DONE]" {
653 continue;
654 }
655
656 let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
657
658 let Ok(data) = data else {
659 let Err(err) = data else {
660 continue;
661 };
662 debug!(
663 "Couldn't deserialize SSE data as StreamingCompletionChunk: {:?}",
664 err
665 );
666 continue;
667 };
668
669 match data {
670 StreamingCompletionChunk::Delta(chunk) => {
671 for choice in accumulator.decode_item_chunk(chunk.data, options) {
672 yield Ok(choice);
673 }
674 }
675 StreamingCompletionChunk::Response(chunk) => {
676 let ResponseChunk { kind, response, .. } = *chunk;
677 if matches!(kind, ResponseChunkKind::ResponseCompleted) {
678 span.record("gen_ai.response.id", response.id.as_str());
679 span.record("gen_ai.response.model", response.model.as_str());
680 }
681 if let Err(error) =
682 accumulator.record_response_chunk(kind, response, provider_name, options)
683 {
684 terminated_with_error = true;
685 yield Err(error);
686 break;
687 }
688 }
689 }
690 }
691 Err(crate::http_client::Error::StreamEnded) => {
692 event_source.close();
693 }
694 Err(error) => {
695 tracing::error!(?error, "SSE error");
696 terminated_with_error = true;
697 yield Err(CompletionError::ProviderError(error.to_string()));
698 break;
699 }
700 }
701 }
702
703 event_source.close();
704
705 if terminated_with_error {
706 return;
707 }
708
709 let final_usage = accumulator.final_usage.clone();
710
711 for tool_call in accumulator.finish() {
712 yield Ok(tool_call)
713 }
714
715 span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
716 span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
717 let cached_tokens = final_usage
718 .input_tokens_details
719 .as_ref()
720 .map(|d| d.cached_tokens)
721 .unwrap_or(0);
722 span.record("gen_ai.usage.cache_read.input_tokens", cached_tokens);
723
724 }
725 .instrument(span);
726
727 streaming::StreamingCompletionResponse::stream(Box::pin(stream))
728}
729
730#[derive(Debug, Serialize, Deserialize, Clone)]
733pub struct ItemChunk {
734 pub item_id: Option<String>,
736 pub output_index: u64,
738 #[serde(flatten)]
740 pub data: ItemChunkKind,
741}
742
743#[derive(Debug, Serialize, Deserialize, Clone)]
745#[serde(tag = "type")]
746pub enum ItemChunkKind {
747 #[serde(rename = "response.output_item.added")]
748 OutputItemAdded(StreamingItemDoneOutput),
749 #[serde(rename = "response.output_item.done")]
750 OutputItemDone(StreamingItemDoneOutput),
751 #[serde(rename = "response.content_part.added")]
752 ContentPartAdded(ContentPartChunk),
753 #[serde(rename = "response.content_part.done")]
754 ContentPartDone(ContentPartChunk),
755 #[serde(rename = "response.output_text.delta")]
756 OutputTextDelta(DeltaTextChunk),
757 #[serde(rename = "response.output_text.done")]
758 OutputTextDone(OutputTextChunk),
759 #[serde(rename = "response.refusal.delta")]
760 RefusalDelta(DeltaTextChunk),
761 #[serde(rename = "response.refusal.done")]
762 RefusalDone(RefusalTextChunk),
763 #[serde(rename = "response.function_call_arguments.delta")]
764 FunctionCallArgsDelta(DeltaTextChunkWithItemId),
765 #[serde(rename = "response.function_call_arguments.done")]
766 FunctionCallArgsDone(ArgsTextChunk),
767 #[serde(rename = "response.reasoning_summary_part.added")]
768 ReasoningSummaryPartAdded(SummaryPartChunk),
769 #[serde(rename = "response.reasoning_summary_part.done")]
770 ReasoningSummaryPartDone(SummaryPartChunk),
771 #[serde(rename = "response.reasoning_summary_text.delta")]
772 ReasoningSummaryTextDelta(SummaryTextChunk),
773 #[serde(rename = "response.reasoning_summary_text.done")]
774 ReasoningSummaryTextDone(SummaryTextChunk),
775 #[serde(other)]
778 Unknown,
779}
780
781#[derive(Debug, Serialize, Deserialize, Clone)]
782pub struct StreamingItemDoneOutput {
783 pub sequence_number: u64,
784 pub item: Output,
785}
786
787#[derive(Debug, Serialize, Deserialize, Clone)]
788pub struct ContentPartChunk {
789 pub content_index: u64,
790 pub sequence_number: u64,
791 pub part: ContentPartChunkPart,
792}
793
794#[derive(Debug, Serialize, Deserialize, Clone)]
795#[serde(tag = "type", rename_all = "snake_case")]
796pub enum ContentPartChunkPart {
797 OutputText { text: String },
798 SummaryText { text: String },
799}
800
801#[derive(Debug, Serialize, Deserialize, Clone)]
802pub struct DeltaTextChunk {
803 pub content_index: u64,
804 pub sequence_number: u64,
805 pub delta: String,
806}
807
808#[derive(Debug, Serialize, Deserialize, Clone)]
809pub struct DeltaTextChunkWithItemId {
810 pub item_id: String,
811 pub content_index: u64,
812 pub sequence_number: u64,
813 pub delta: String,
814}
815
816#[derive(Debug, Serialize, Deserialize, Clone)]
817pub struct OutputTextChunk {
818 pub content_index: u64,
819 pub sequence_number: u64,
820 pub text: String,
821}
822
823#[derive(Debug, Serialize, Deserialize, Clone)]
824pub struct RefusalTextChunk {
825 pub content_index: u64,
826 pub sequence_number: u64,
827 pub refusal: String,
828}
829
830#[derive(Debug, Serialize, Deserialize, Clone)]
831pub struct ArgsTextChunk {
832 pub content_index: u64,
833 pub sequence_number: u64,
834 pub arguments: serde_json::Value,
835}
836
837#[derive(Debug, Serialize, Deserialize, Clone)]
838pub struct SummaryPartChunk {
839 pub summary_index: u64,
840 pub sequence_number: u64,
841 pub part: SummaryPartChunkPart,
842}
843
844#[derive(Debug, Serialize, Deserialize, Clone)]
845pub struct SummaryTextChunk {
846 pub summary_index: u64,
847 pub sequence_number: u64,
848 pub delta: String,
849}
850
851#[derive(Debug, Serialize, Deserialize, Clone)]
852#[serde(tag = "type", rename_all = "snake_case")]
853pub enum SummaryPartChunkPart {
854 SummaryText { text: String },
855}
856
857impl<Ext, H> GenericResponsesCompletionModel<Ext, H>
858where
859 crate::client::Client<Ext, H>:
860 HttpClientExt + Clone + std::fmt::Debug + WasmCompatSend + 'static,
861 Ext: crate::client::Provider + Clone + 'static,
862 H: Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
863{
864 pub(crate) async fn stream(
865 &self,
866 completion_request: crate::completion::CompletionRequest,
867 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
868 {
869 let mut request = self.create_completion_request(completion_request)?;
870 request.stream = Some(true);
871
872 if enabled!(Level::TRACE) {
873 tracing::trace!(
874 target: "rig::completions",
875 "OpenAI Responses streaming completion request: {}",
876 serde_json::to_string_pretty(&request)?
877 );
878 }
879
880 let body = serde_json::to_vec(&request)?;
881
882 let req = self
883 .client
884 .post("/responses")?
885 .body(body)
886 .map_err(|e| CompletionError::HttpError(e.into()))?;
887
888 let span = if tracing::Span::current().is_disabled() {
891 info_span!(
892 target: "rig::completions",
893 "chat_streaming",
894 gen_ai.operation.name = "chat_streaming",
895 gen_ai.provider.name = tracing::field::Empty,
896 gen_ai.request.model = tracing::field::Empty,
897 gen_ai.response.id = tracing::field::Empty,
898 gen_ai.response.model = tracing::field::Empty,
899 gen_ai.usage.output_tokens = tracing::field::Empty,
900 gen_ai.usage.input_tokens = tracing::field::Empty,
901 gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
902 )
903 } else {
904 tracing::Span::current()
905 };
906 span.record("gen_ai.provider.name", "openai");
907 span.record("gen_ai.request.model", &self.model);
908 let client = self.client.clone();
909 let event_source = GenericEventSource::new(client, req);
910
911 Ok(stream_from_event_source(event_source, span, "OpenAI"))
912 }
913}
914
915#[cfg(test)]
916mod tests {
917 use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
918 use crate::completion::CompletionModel;
919 use crate::message::ReasoningContent;
920 use crate::providers::internal::openai_chat_completions_compatible::test_support::sse_bytes_from_json_events;
921 use crate::providers::openai::responses_api::{
922 AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
923 ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
924 };
925 use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
926 use crate::test_utils::MockStreamingClient;
927 use futures::StreamExt;
928 use serde_json::{self, json};
929
930 use crate::{
931 client::CompletionClient, completion::Message, providers::openai, streaming::StreamingChat,
932 test_utils::MockExampleTool,
933 };
934
935 fn sample_response(status: ResponseStatus) -> CompletionResponse {
936 CompletionResponse {
937 id: "resp_123".to_string(),
938 object: ResponseObject::Response,
939 created_at: 0,
940 status,
941 error: None,
942 incomplete_details: None,
943 instructions: None,
944 max_output_tokens: None,
945 model: "gpt-5.4".to_string(),
946 usage: None,
947 output: Vec::new(),
948 tools: Vec::new(),
949 additional_parameters: AdditionalParameters::default(),
950 }
951 }
952
953 async fn first_error_from_event(
954 event: serde_json::Value,
955 ) -> crate::completion::CompletionError {
956 let client = openai::Client::builder()
957 .http_client(MockStreamingClient {
958 sse_bytes: sse_bytes_from_json_events(&[event]),
959 })
960 .api_key("test-key")
961 .build()
962 .expect("client should build");
963 let model = client.completion_model("gpt-5.4");
964 let request = model.completion_request("hello").build();
965 let mut stream = model.stream(request).await.expect("stream should start");
966
967 stream
968 .next()
969 .await
970 .expect("stream should yield an item")
971 .expect_err("stream should surface a provider error")
972 }
973
974 async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
975 let client = openai::Client::builder()
976 .http_client(MockStreamingClient {
977 sse_bytes: sse_bytes_from_json_events(&[event]),
978 })
979 .api_key("test-key")
980 .build()
981 .expect("client should build");
982 let model = client.completion_model("gpt-5.4");
983 let request = model.completion_request("hello").build();
984 let mut stream = model.stream(request).await.expect("stream should start");
985
986 while let Some(item) = stream.next().await {
987 match item.expect("completed stream should not error") {
988 StreamedAssistantContent::Final(res) => return res.usage,
989 _ => continue,
990 }
991 }
992
993 panic!("stream should yield a final response");
994 }
995
996 #[test]
997 fn reasoning_done_item_emits_summary_then_encrypted() {
998 let summary = vec![
999 ReasoningSummary::SummaryText {
1000 text: "step 1".to_string(),
1001 },
1002 ReasoningSummary::SummaryText {
1003 text: "step 2".to_string(),
1004 },
1005 ];
1006 let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
1007
1008 assert_eq!(choices.len(), 3);
1009 assert!(matches!(
1010 choices.first(),
1011 Some(RawStreamingChoice::Reasoning {
1012 id: Some(id),
1013 content: ReasoningContent::Summary(text),
1014 }) if id == "rs_1" && text == "step 1"
1015 ));
1016 assert!(matches!(
1017 choices.get(1),
1018 Some(RawStreamingChoice::Reasoning {
1019 id: Some(id),
1020 content: ReasoningContent::Summary(text),
1021 }) if id == "rs_1" && text == "step 2"
1022 ));
1023 assert!(matches!(
1024 choices.get(2),
1025 Some(RawStreamingChoice::Reasoning {
1026 id: Some(id),
1027 content: ReasoningContent::Encrypted(data),
1028 }) if id == "rs_1" && data == "enc_blob"
1029 ));
1030 }
1031
1032 #[test]
1033 fn reasoning_done_item_without_encrypted_emits_summary_only() {
1034 let summary = vec![ReasoningSummary::SummaryText {
1035 text: "only summary".to_string(),
1036 }];
1037 let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
1038
1039 assert_eq!(choices.len(), 1);
1040 assert!(matches!(
1041 choices.first(),
1042 Some(RawStreamingChoice::Reasoning {
1043 id: Some(id),
1044 content: ReasoningContent::Summary(text),
1045 }) if id == "rs_2" && text == "only summary"
1046 ));
1047 }
1048
1049 #[test]
1050 fn content_part_added_deserializes_snake_case_part_type() {
1051 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1052 "type": "response.content_part.added",
1053 "item_id": "msg_1",
1054 "output_index": 0,
1055 "content_index": 0,
1056 "sequence_number": 3,
1057 "part": {
1058 "type": "output_text",
1059 "text": "hello"
1060 }
1061 }))
1062 .expect("content part event should deserialize");
1063
1064 assert!(matches!(
1065 chunk,
1066 StreamingCompletionChunk::Delta(chunk)
1067 if matches!(
1068 chunk.data,
1069 ItemChunkKind::ContentPartAdded(_)
1070 )
1071 ));
1072 }
1073
1074 #[test]
1075 fn content_part_done_deserializes_snake_case_part_type() {
1076 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1077 "type": "response.content_part.done",
1078 "item_id": "msg_1",
1079 "output_index": 0,
1080 "content_index": 0,
1081 "sequence_number": 4,
1082 "part": {
1083 "type": "summary_text",
1084 "text": "done"
1085 }
1086 }))
1087 .expect("content part done event should deserialize");
1088
1089 assert!(matches!(
1090 chunk,
1091 StreamingCompletionChunk::Delta(chunk)
1092 if matches!(
1093 chunk.data,
1094 ItemChunkKind::ContentPartDone(_)
1095 )
1096 ));
1097 }
1098
1099 #[test]
1100 fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
1101 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1102 "type": "response.reasoning_summary_part.added",
1103 "item_id": "rs_1",
1104 "output_index": 0,
1105 "summary_index": 0,
1106 "sequence_number": 5,
1107 "part": {
1108 "type": "summary_text",
1109 "text": "step 1"
1110 }
1111 }))
1112 .expect("reasoning summary part event should deserialize");
1113
1114 assert!(matches!(
1115 chunk,
1116 StreamingCompletionChunk::Delta(chunk)
1117 if matches!(
1118 chunk.data,
1119 ItemChunkKind::ReasoningSummaryPartAdded(_)
1120 )
1121 ));
1122 }
1123
1124 #[test]
1125 fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
1126 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1127 "type": "response.reasoning_summary_part.done",
1128 "item_id": "rs_1",
1129 "output_index": 0,
1130 "summary_index": 0,
1131 "sequence_number": 6,
1132 "part": {
1133 "type": "summary_text",
1134 "text": "step 2"
1135 }
1136 }))
1137 .expect("reasoning summary part done event should deserialize");
1138
1139 assert!(matches!(
1140 chunk,
1141 StreamingCompletionChunk::Delta(chunk)
1142 if matches!(
1143 chunk.data,
1144 ItemChunkKind::ReasoningSummaryPartDone(_)
1145 )
1146 ));
1147 }
1148
1149 #[tokio::test]
1150 async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
1151 let mut response = sample_response(ResponseStatus::Failed);
1152 response.error = Some(ResponseError {
1153 code: String::new(),
1154 message: "maximum context length exceeded".to_string(),
1155 });
1156
1157 let event = json!({
1158 "type": "response.failed",
1159 "sequence_number": 1,
1160 "response": response,
1161 });
1162
1163 let err = first_error_from_event(event).await;
1164
1165 assert_eq!(
1166 err.to_string(),
1167 "ProviderError: maximum context length exceeded"
1168 );
1169 }
1170
1171 #[tokio::test]
1172 async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
1173 let mut response = sample_response(ResponseStatus::Failed);
1174 response.error = Some(ResponseError {
1175 code: "context_length_exceeded".to_string(),
1176 message: "maximum context length exceeded".to_string(),
1177 });
1178
1179 let event = json!({
1180 "type": "response.failed",
1181 "sequence_number": 1,
1182 "response": response,
1183 });
1184
1185 let err = first_error_from_event(event).await;
1186
1187 assert_eq!(
1188 err.to_string(),
1189 "ProviderError: context_length_exceeded: maximum context length exceeded"
1190 );
1191 }
1192
1193 #[tokio::test]
1194 async fn response_incomplete_chunk_uses_incomplete_details_reason() {
1195 let mut response = sample_response(ResponseStatus::Incomplete);
1196 response.incomplete_details = Some(IncompleteDetailsReason {
1197 reason: "max_output_tokens".to_string(),
1198 });
1199
1200 let event = json!({
1201 "type": "response.incomplete",
1202 "sequence_number": 1,
1203 "response": response,
1204 });
1205
1206 let err = first_error_from_event(event).await;
1207
1208 assert_eq!(
1209 err.to_string(),
1210 "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
1211 );
1212 }
1213
1214 #[tokio::test]
1215 async fn response_failed_chunk_terminates_stream_without_followup_items() {
1216 let tool_call_done = json!({
1217 "type": "response.output_item.done",
1218 "sequence_number": 1,
1219 "item": {
1220 "type": "function_call",
1221 "id": "fc_123",
1222 "arguments": "{}",
1223 "call_id": "call_123",
1224 "name": "example_tool",
1225 "status": "completed"
1226 }
1227 });
1228
1229 let mut response = sample_response(ResponseStatus::Failed);
1230 response.error = Some(ResponseError {
1231 code: "server_error".to_string(),
1232 message: "response stream failed".to_string(),
1233 });
1234
1235 let failed = json!({
1236 "type": "response.failed",
1237 "sequence_number": 2,
1238 "response": response,
1239 });
1240
1241 let client = openai::Client::builder()
1242 .http_client(MockStreamingClient {
1243 sse_bytes: sse_bytes_from_json_events(&[tool_call_done, failed]),
1244 })
1245 .api_key("test-key")
1246 .build()
1247 .expect("client should build");
1248 let model = client.completion_model("gpt-5.4");
1249 let request = model.completion_request("hello").build();
1250 let mut stream = model.stream(request).await.expect("stream should start");
1251
1252 let err = stream
1253 .next()
1254 .await
1255 .expect("stream should yield an item")
1256 .expect_err("stream should surface a provider error");
1257 assert_eq!(
1258 err.to_string(),
1259 "ProviderError: server_error: response stream failed"
1260 );
1261 assert!(
1262 stream.next().await.is_none(),
1263 "stream should terminate immediately after the first terminal error"
1264 );
1265 }
1266
1267 #[tokio::test]
1268 async fn response_completed_chunk_populates_final_usage() {
1269 let mut response = sample_response(ResponseStatus::Completed);
1270 response.usage = Some(ResponsesUsage {
1271 input_tokens: 10,
1272 input_tokens_details: None,
1273 output_tokens: 5,
1274 output_tokens_details: OutputTokensDetails {
1275 reasoning_tokens: 0,
1276 },
1277 total_tokens: 15,
1278 });
1279
1280 let event = json!({
1281 "type": "response.completed",
1282 "sequence_number": 1,
1283 "response": response,
1284 });
1285
1286 let usage = final_usage_from_event(event).await;
1287 assert_eq!(usage.input_tokens, 10);
1288 assert_eq!(usage.output_tokens, 5);
1289 assert_eq!(usage.total_tokens, 15);
1290 }
1291
1292 #[tokio::test]
1293 async fn done_sentinel_is_ignored_without_debug_parse_noise() {
1294 use std::io::{self, Write};
1295 use std::sync::{Arc, Mutex};
1296
1297 #[derive(Clone)]
1298 struct SharedWriter(Arc<Mutex<Vec<u8>>>);
1299
1300 impl Write for SharedWriter {
1301 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1302 self.0
1303 .lock()
1304 .expect("log buffer mutex should not be poisoned")
1305 .extend_from_slice(buf);
1306 Ok(buf.len())
1307 }
1308
1309 fn flush(&mut self) -> io::Result<()> {
1310 Ok(())
1311 }
1312 }
1313
1314 let mut response = sample_response(ResponseStatus::Completed);
1315 response.usage = Some(ResponsesUsage {
1316 input_tokens: 4,
1317 input_tokens_details: None,
1318 output_tokens: 2,
1319 output_tokens_details: OutputTokensDetails {
1320 reasoning_tokens: 0,
1321 },
1322 total_tokens: 6,
1323 });
1324
1325 let captured = Arc::new(Mutex::new(Vec::new()));
1326 let subscriber = tracing_subscriber::fmt()
1327 .with_max_level(tracing::Level::DEBUG)
1328 .with_ansi(false)
1329 .without_time()
1330 .with_writer({
1331 let captured = captured.clone();
1332 move || SharedWriter(captured.clone())
1333 })
1334 .finish();
1335 let _guard = tracing::subscriber::set_default(subscriber);
1336
1337 let client = openai::Client::builder()
1338 .http_client(MockStreamingClient {
1339 sse_bytes: bytes::Bytes::from(format!(
1340 "data: {}\n\ndata: [DONE]\n\n",
1341 serde_json::to_string(&json!({
1342 "type": "response.completed",
1343 "sequence_number": 1,
1344 "response": response,
1345 }))
1346 .expect("response event should serialize")
1347 )),
1348 })
1349 .api_key("test-key")
1350 .build()
1351 .expect("client should build");
1352 let model = client.completion_model("gpt-5.4");
1353 let request = model.completion_request("hello").build();
1354 let mut stream = model.stream(request).await.expect("stream should start");
1355
1356 let mut final_usage = None;
1357 while let Some(item) = stream.next().await {
1358 if let StreamedAssistantContent::Final(response) =
1359 item.expect("stream should complete successfully")
1360 {
1361 final_usage = Some(response.usage);
1362 }
1363 }
1364
1365 let usage = final_usage.expect("expected final response");
1366 assert_eq!(usage.input_tokens, 4);
1367 assert_eq!(usage.output_tokens, 2);
1368 assert_eq!(usage.total_tokens, 6);
1369
1370 let logs = String::from_utf8(
1371 captured
1372 .lock()
1373 .expect("log buffer mutex should not be poisoned")
1374 .clone(),
1375 )
1376 .expect("captured logs should be valid UTF-8");
1377 assert!(
1378 !logs.contains("Couldn't deserialize SSE data as StreamingCompletionChunk"),
1379 "expected [DONE] to bypass the parse-failure debug path, logs were: {logs}"
1380 );
1381 }
1382
1383 #[tokio::test]
1385 #[ignore = "requires API key"]
1386 async fn test_openai_streaming_tools_reasoning() {
1387 let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
1388 let client = openai::Client::new(&api_key).expect("Failed to build client");
1389 let agent = client
1390 .agent("gpt-5.2")
1391 .max_tokens(8192)
1392 .tool(MockExampleTool)
1393 .additional_params(serde_json::json!({
1394 "reasoning": {"effort": "high"}
1395 }))
1396 .build();
1397
1398 let chat_history: Vec<Message> = Vec::new();
1399 let mut stream = agent
1400 .stream_chat("Call my example tool", &chat_history)
1401 .multi_turn(5)
1402 .await;
1403
1404 while let Some(item) = stream.next().await {
1405 println!("Got item: {item:?}");
1406 }
1407 }
1408}