Skip to main content

rig_core/providers/openai/responses_api/
streaming.rs

1//! The streaming module for the OpenAI Responses API.
2//! Please see the `openai_streaming` or `openai_streaming_with_tools` example for more practical usage.
3use crate::completion::{self, CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{ReasoningSummary, ResponsesUsage};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use crate::wasm_compat::WasmCompatSend;
11use async_stream::stream;
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use tracing::{Level, debug, enabled, info_span};
15use tracing_futures::Instrument as _;
16
17use super::{CompletionResponse, GenericResponsesCompletionModel, Output};
18
19type StreamingRawChoice = RawStreamingChoice<StreamingCompletionResponse>;
20
21// ================================================================
22// OpenAI Responses Streaming API
23// ================================================================
24
25/// A streaming completion chunk.
26/// Streaming chunks can come in one of two forms:
27/// - A response chunk (where the completed response will have the total token usage)
28/// - An item chunk commonly referred to as a delta. In the completions API this would be referred to as the message delta.
29#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32    Response(Box<ResponseChunk>),
33    Delta(ItemChunk),
34}
35
36/// The final streaming response from the OpenAI Responses API.
37#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39    /// Token usage
40    pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44    id: &str,
45    summary: &[ReasoningSummary],
46    encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48    let mut choices = summary
49        .iter()
50        .map(|reasoning_summary| match reasoning_summary {
51            ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52                id: Some(id.to_owned()),
53                content: ReasoningContent::Summary(text.to_owned()),
54            },
55        })
56        .collect::<Vec<_>>();
57
58    if let Some(encrypted_content) = encrypted_content {
59        choices.push(RawStreamingChoice::Reasoning {
60            id: Some(id.to_owned()),
61            content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62        });
63    }
64
65    choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69    fn token_usage(&self) -> Option<crate::completion::Usage> {
70        self.usage.token_usage()
71    }
72}
73
74/// A response chunk from OpenAI's response API.
75#[derive(Debug, Serialize, Deserialize, Clone)]
76pub struct ResponseChunk {
77    /// The response chunk type
78    #[serde(rename = "type")]
79    pub kind: ResponseChunkKind,
80    /// The response itself
81    pub response: CompletionResponse,
82    /// The item sequence
83    pub sequence_number: u64,
84}
85
86/// Response chunk type.
87/// Renames are used to ensure that this type gets (de)serialized properly.
88#[derive(Debug, Serialize, Deserialize, Clone)]
89pub enum ResponseChunkKind {
90    #[serde(rename = "response.created")]
91    ResponseCreated,
92    #[serde(rename = "response.in_progress")]
93    ResponseInProgress,
94    #[serde(rename = "response.completed")]
95    ResponseCompleted,
96    #[serde(rename = "response.failed")]
97    ResponseFailed,
98    #[serde(rename = "response.incomplete")]
99    ResponseIncomplete,
100}
101
102fn response_chunk_error_message(
103    kind: &ResponseChunkKind,
104    response: &CompletionResponse,
105    provider_name: &str,
106) -> Option<String> {
107    match kind {
108        ResponseChunkKind::ResponseFailed => Some(response_error_message(
109            response.error.as_ref(),
110            &format!("{provider_name} response stream returned a failed response"),
111        )),
112        ResponseChunkKind::ResponseIncomplete => {
113            let reason = response
114                .incomplete_details
115                .as_ref()
116                .map(|details| details.reason.as_str())
117                .unwrap_or("unknown reason");
118
119            Some(format!(
120                "{provider_name} response stream was incomplete: {reason}"
121            ))
122        }
123        _ => None,
124    }
125}
126
127fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
128    if let Some(error) = error {
129        if error.code.is_empty() {
130            error.message.clone()
131        } else {
132            format!("{}: {}", error.code, error.message)
133        }
134    } else {
135        fallback.to_string()
136    }
137}
138
139#[derive(Clone, Copy)]
140pub(crate) enum ResponsesStreamOptions {
141    Strict,
142    StrictWithImmediateToolCalls,
143}
144
145impl ResponsesStreamOptions {
146    pub(crate) const fn strict() -> Self {
147        Self::Strict
148    }
149
150    pub(crate) const fn strict_with_immediate_tool_calls() -> Self {
151        Self::StrictWithImmediateToolCalls
152    }
153
154    const fn errors_on_terminal_response(self) -> bool {
155        true
156    }
157
158    const fn emits_completed_tool_calls_immediately(self) -> bool {
159        matches!(self, Self::StrictWithImmediateToolCalls)
160    }
161}
162
163pub(crate) fn parse_sse_completion_body(
164    body: &str,
165    provider_name: &str,
166) -> Result<CompletionResponse, CompletionError> {
167    let mut completed = None;
168    let mut provider_error = None;
169
170    for line in body.lines() {
171        let data = line
172            .strip_prefix("data:")
173            .map(str::trim)
174            .unwrap_or_default();
175        if data.is_empty() || data == "[DONE]" {
176            continue;
177        }
178
179        if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
180            if let StreamingCompletionChunk::Response(chunk) = chunk {
181                let ResponseChunk { kind, response, .. } = *chunk;
182                match kind {
183                    ResponseChunkKind::ResponseCompleted => {
184                        completed = Some(response);
185                        break;
186                    }
187                    ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
188                        provider_error =
189                            response_chunk_error_message(&kind, &response, provider_name);
190                    }
191                    _ => {}
192                }
193            }
194            continue;
195        }
196
197        let value = match serde_json::from_str::<serde_json::Value>(data) {
198            Ok(value) => value,
199            Err(_) => continue,
200        };
201
202        match value.get("type").and_then(serde_json::Value::as_str) {
203            Some("response.completed") => {
204                if let Some(response) = value.get("response") {
205                    completed = Some(serde_json::from_value(response.clone())?);
206                    break;
207                }
208            }
209            Some("response.failed") | Some("response.incomplete") => {
210                provider_error = value
211                    .get("response")
212                    .cloned()
213                    .and_then(|response| {
214                        serde_json::from_value::<CompletionResponse>(response).ok()
215                    })
216                    .and_then(|response| {
217                        let kind = if value.get("type").and_then(serde_json::Value::as_str)
218                            == Some("response.failed")
219                        {
220                            ResponseChunkKind::ResponseFailed
221                        } else {
222                            ResponseChunkKind::ResponseIncomplete
223                        };
224                        response_chunk_error_message(&kind, &response, provider_name)
225                    })
226                    .or_else(|| {
227                        value
228                            .get("error")
229                            .and_then(|error| error.get("message"))
230                            .and_then(serde_json::Value::as_str)
231                            .map(ToOwned::to_owned)
232                    })
233                    .or_else(|| Some(data.to_string()));
234            }
235            Some("error") => {
236                provider_error = value
237                    .get("error")
238                    .and_then(|error| error.get("message"))
239                    .and_then(serde_json::Value::as_str)
240                    .map(ToOwned::to_owned)
241                    .or_else(|| Some(data.to_string()));
242            }
243            _ => {}
244        }
245    }
246
247    completed.ok_or_else(|| {
248        CompletionError::ProviderError(
249            provider_error.unwrap_or_else(|| {
250                format!("{provider_name} stream did not yield response.completed")
251            }),
252        )
253    })
254}
255
256struct RawChoiceAccumulator {
257    final_usage: ResponsesUsage,
258    tool_calls: Vec<StreamingRawChoice>,
259    tool_call_internal_ids: std::collections::HashMap<String, String>,
260}
261
262impl RawChoiceAccumulator {
263    fn new(initial_usage: ResponsesUsage) -> Self {
264        Self {
265            final_usage: initial_usage,
266            tool_calls: Vec::new(),
267            tool_call_internal_ids: std::collections::HashMap::new(),
268        }
269    }
270
271    fn decode_item_chunk(
272        &mut self,
273        chunk: ItemChunk,
274        options: ResponsesStreamOptions,
275    ) -> Vec<StreamingRawChoice> {
276        let mut immediate = Vec::new();
277
278        let ItemChunk {
279            item_id: outer_item_id,
280            data: item,
281            ..
282        } = chunk;
283
284        match item {
285            ItemChunkKind::OutputItemAdded(StreamingItemDoneOutput {
286                item: Output::FunctionCall(func),
287                ..
288            }) => {
289                let internal_call_id = self
290                    .tool_call_internal_ids
291                    .entry(func.id.clone())
292                    .or_insert_with(|| nanoid::nanoid!())
293                    .clone();
294                immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
295                    id: func.id,
296                    internal_call_id,
297                    content: streaming::ToolCallDeltaContent::Name(func.name),
298                });
299            }
300            ItemChunkKind::OutputItemDone(message) => {
301                self.push_output_item_done(
302                    message.item,
303                    &mut immediate,
304                    options.emits_completed_tool_calls_immediately(),
305                );
306            }
307            ItemChunkKind::OutputTextDelta(delta) => {
308                immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
309            }
310            ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
311                immediate.push(streaming::RawStreamingChoice::ReasoningDelta {
312                    id: None,
313                    reasoning: delta.delta,
314                });
315            }
316            ItemChunkKind::RefusalDelta(delta) => {
317                immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
318            }
319            ItemChunkKind::FunctionCallArgsDelta(delta) => {
320                if let Some(item_id) = outer_item_id {
321                    let internal_call_id = self
322                        .tool_call_internal_ids
323                        .entry(item_id.clone())
324                        .or_insert_with(|| nanoid::nanoid!())
325                        .clone();
326                    immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
327                        id: item_id,
328                        internal_call_id,
329                        content: streaming::ToolCallDeltaContent::Delta(delta.delta),
330                    });
331                }
332            }
333            _ => {}
334        }
335
336        immediate
337    }
338
339    fn record_response_chunk(
340        &mut self,
341        kind: ResponseChunkKind,
342        response: CompletionResponse,
343        provider_name: &str,
344        options: ResponsesStreamOptions,
345    ) -> Result<(), CompletionError> {
346        match kind {
347            ResponseChunkKind::ResponseCompleted => {
348                if let Some(usage) = response.usage {
349                    self.final_usage = usage;
350                }
351                Ok(())
352            }
353            ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete
354                if options.errors_on_terminal_response() =>
355            {
356                let error_message = response_chunk_error_message(&kind, &response, provider_name)
357                    .unwrap_or_else(|| {
358                        format!(
359                            "{provider_name} returned terminal response {:?} without an error message",
360                            kind
361                        )
362                    });
363                Err(CompletionError::ProviderError(error_message))
364            }
365            _ => Ok(()),
366        }
367    }
368
369    fn push_output_item_done(
370        &mut self,
371        item: Output,
372        immediate: &mut Vec<StreamingRawChoice>,
373        emit_completed_tool_calls_immediately: bool,
374    ) {
375        match item {
376            Output::FunctionCall(func) => {
377                let internal_call_id = self
378                    .tool_call_internal_ids
379                    .entry(func.id.clone())
380                    .or_insert_with(|| nanoid::nanoid!())
381                    .clone();
382                let tool_call =
383                    streaming::RawStreamingToolCall::new(func.id, func.name, func.arguments)
384                        .with_internal_call_id(internal_call_id)
385                        .with_call_id(func.call_id);
386
387                if emit_completed_tool_calls_immediately {
388                    immediate.push(streaming::RawStreamingChoice::ToolCall(tool_call));
389                } else {
390                    self.tool_calls
391                        .push(streaming::RawStreamingChoice::ToolCall(tool_call));
392                }
393            }
394            Output::Reasoning {
395                id,
396                summary,
397                encrypted_content,
398                ..
399            } => {
400                immediate.extend(reasoning_choices_from_done_item(
401                    &id,
402                    &summary,
403                    encrypted_content.as_deref(),
404                ));
405            }
406            Output::Message(message) => {
407                immediate.push(streaming::RawStreamingChoice::MessageId(message.id));
408            }
409            Output::Unknown => {}
410        }
411    }
412
413    fn finish(mut self) -> Vec<StreamingRawChoice> {
414        let mut choices = Vec::new();
415        choices.append(&mut self.tool_calls);
416        choices.push(RawStreamingChoice::FinalResponse(
417            StreamingCompletionResponse {
418                usage: self.final_usage,
419            },
420        ));
421        choices
422    }
423}
424
425pub(crate) fn raw_choices_from_sse_body(
426    body: &str,
427    initial_usage: ResponsesUsage,
428    provider_name: &str,
429) -> Result<Vec<StreamingRawChoice>, CompletionError> {
430    let mut raw_choices = Vec::new();
431    let mut accumulator = RawChoiceAccumulator::new(initial_usage);
432    let options = ResponsesStreamOptions::strict();
433
434    for line in body.lines() {
435        let data = line
436            .strip_prefix("data:")
437            .map(str::trim)
438            .unwrap_or_default();
439        if data.is_empty() || data == "[DONE]" {
440            continue;
441        }
442
443        if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
444            match chunk {
445                StreamingCompletionChunk::Delta(chunk) => {
446                    raw_choices.extend(accumulator.decode_item_chunk(chunk, options));
447                }
448                StreamingCompletionChunk::Response(chunk) => {
449                    let ResponseChunk { kind, response, .. } = *chunk;
450                    accumulator.record_response_chunk(kind, response, provider_name, options)?;
451                }
452            }
453            continue;
454        }
455
456        let value = match serde_json::from_str::<serde_json::Value>(data) {
457            Ok(value) => value,
458            Err(_) => continue,
459        };
460
461        match value.get("type").and_then(serde_json::Value::as_str) {
462            Some("response.output_text.delta") | Some("response.refusal.delta") => {
463                if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
464                    raw_choices.push(streaming::RawStreamingChoice::Message(delta.to_owned()));
465                }
466            }
467            Some("response.reasoning_summary_text.delta") => {
468                if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
469                    raw_choices.push(streaming::RawStreamingChoice::ReasoningDelta {
470                        id: None,
471                        reasoning: delta.to_owned(),
472                    });
473                }
474            }
475            Some("response.output_item.added") => {
476                if let Some(item) = value
477                    .get("item")
478                    .cloned()
479                    .and_then(|item| serde_json::from_value::<Output>(item).ok())
480                    && let Output::FunctionCall(func) = item
481                {
482                    let internal_call_id = accumulator
483                        .tool_call_internal_ids
484                        .entry(func.id.clone())
485                        .or_insert_with(|| nanoid::nanoid!())
486                        .clone();
487                    raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
488                        id: func.id,
489                        internal_call_id,
490                        content: streaming::ToolCallDeltaContent::Name(func.name),
491                    });
492                }
493            }
494            Some("response.output_item.done") => {
495                if let Some(item) = value
496                    .get("item")
497                    .cloned()
498                    .and_then(|item| serde_json::from_value::<Output>(item).ok())
499                {
500                    accumulator.push_output_item_done(item, &mut raw_choices, false);
501                }
502            }
503            Some("response.function_call_arguments.delta") => {
504                if let (Some(item_id), Some(delta)) = (
505                    value.get("item_id").and_then(serde_json::Value::as_str),
506                    value.get("delta").and_then(serde_json::Value::as_str),
507                ) {
508                    let internal_call_id = accumulator
509                        .tool_call_internal_ids
510                        .entry(item_id.to_owned())
511                        .or_insert_with(|| nanoid::nanoid!())
512                        .clone();
513                    raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
514                        id: item_id.to_owned(),
515                        internal_call_id,
516                        content: streaming::ToolCallDeltaContent::Delta(delta.to_owned()),
517                    });
518                }
519            }
520            Some("response.completed") | Some("response.failed") | Some("response.incomplete") => {
521                if let Some(response) = value.get("response").cloned() {
522                    let response = serde_json::from_value::<CompletionResponse>(response)?;
523                    let Some(kind) = (match value.get("type").and_then(serde_json::Value::as_str) {
524                        Some("response.completed") => Some(ResponseChunkKind::ResponseCompleted),
525                        Some("response.failed") => Some(ResponseChunkKind::ResponseFailed),
526                        Some("response.incomplete") => Some(ResponseChunkKind::ResponseIncomplete),
527                        _ => None,
528                    }) else {
529                        continue;
530                    };
531                    accumulator.record_response_chunk(kind, response, provider_name, options)?;
532                }
533            }
534            Some("error") => {
535                let message = value
536                    .get("error")
537                    .and_then(|error| error.get("message"))
538                    .and_then(serde_json::Value::as_str)
539                    .unwrap_or(data);
540                return Err(CompletionError::ProviderError(message.to_owned()));
541            }
542            _ => {}
543        }
544    }
545
546    raw_choices.extend(accumulator.finish());
547    Ok(raw_choices)
548}
549
550pub(crate) async fn completion_response_from_sse_body(
551    body: &str,
552    raw_response: CompletionResponse,
553    provider_name: &str,
554) -> Result<completion::CompletionResponse<CompletionResponse>, CompletionError> {
555    let raw_choices = raw_choices_from_sse_body(
556        body,
557        raw_response
558            .usage
559            .clone()
560            .unwrap_or_else(ResponsesUsage::new),
561        provider_name,
562    )?;
563    let stream = futures::stream::iter(
564        raw_choices
565            .into_iter()
566            .map(Ok::<_, CompletionError>)
567            .collect::<Vec<_>>(),
568    );
569    let mut stream = crate::streaming::StreamingCompletionResponse::stream(Box::pin(stream));
570
571    while let Some(item) = stream.next().await {
572        item?;
573    }
574
575    if choice_is_empty(&stream.choice) {
576        return Err(CompletionError::ResponseError(
577            "Response contained no parts".to_owned(),
578        ));
579    }
580
581    Ok(completion::CompletionResponse {
582        usage: stream
583            .response
584            .as_ref()
585            .and_then(GetTokenUsage::token_usage)
586            .unwrap_or_else(|| usage_from_raw_response(&raw_response)),
587        message_id: stream
588            .message_id
589            .clone()
590            .or_else(|| message_id_from_response(&raw_response)),
591        choice: stream.choice,
592        raw_response,
593    })
594}
595
596fn choice_is_empty(choice: &crate::OneOrMany<completion::AssistantContent>) -> bool {
597    choice.iter().all(|content| match content {
598        completion::AssistantContent::Text(text) => text.text.trim().is_empty(),
599        completion::AssistantContent::Reasoning(reasoning) => reasoning.content.is_empty(),
600        completion::AssistantContent::Image(_) => false,
601        completion::AssistantContent::ToolCall(_) => false,
602    })
603}
604
605fn message_id_from_response(response: &CompletionResponse) -> Option<String> {
606    response.output.iter().find_map(|item| match item {
607        Output::Message(message) => Some(message.id.clone()),
608        _ => None,
609    })
610}
611
612fn usage_from_raw_response(response: &CompletionResponse) -> completion::Usage {
613    response
614        .usage
615        .as_ref()
616        .and_then(GetTokenUsage::token_usage)
617        .unwrap_or_default()
618}
619
620pub(crate) fn stream_from_event_source<HttpClient, RequestBody>(
621    event_source: GenericEventSource<HttpClient, RequestBody>,
622    span: tracing::Span,
623    provider_name: &'static str,
624) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
625where
626    HttpClient: HttpClientExt + Clone + 'static,
627    RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
628{
629    stream_from_event_source_with_options(
630        event_source,
631        span,
632        provider_name,
633        ResponsesStreamOptions::strict(),
634    )
635}
636
637pub(crate) fn stream_from_event_source_with_options<HttpClient, RequestBody>(
638    mut event_source: GenericEventSource<HttpClient, RequestBody>,
639    span: tracing::Span,
640    provider_name: &'static str,
641    options: ResponsesStreamOptions,
642) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
643where
644    HttpClient: HttpClientExt + Clone + 'static,
645    RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
646{
647    let stream = stream! {
648        let mut accumulator = RawChoiceAccumulator::new(ResponsesUsage::new());
649        let span = tracing::Span::current();
650
651        let mut terminated_with_error = false;
652
653        while let Some(event_result) = event_source.next().await {
654            match event_result {
655                Ok(Event::Open) => {
656                    tracing::trace!("SSE connection opened");
657                    continue;
658                }
659                Ok(Event::Message(evt)) => {
660                    if evt.data.trim().is_empty() || evt.data == "[DONE]" {
661                        continue;
662                    }
663
664                    let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
665
666                    let Ok(data) = data else {
667                        let Err(err) = data else {
668                            continue;
669                        };
670                        debug!(
671                            "Couldn't deserialize SSE data as StreamingCompletionChunk: {:?}",
672                            err
673                        );
674                        continue;
675                    };
676
677                    match data {
678                        StreamingCompletionChunk::Delta(chunk) => {
679                            for choice in accumulator.decode_item_chunk(chunk, options) {
680                                yield Ok(choice);
681                            }
682                        }
683                        StreamingCompletionChunk::Response(chunk) => {
684                            let ResponseChunk { kind, response, .. } = *chunk;
685                            if matches!(kind, ResponseChunkKind::ResponseCompleted) {
686                                span.record("gen_ai.response.id", response.id.as_str());
687                                span.record("gen_ai.response.model", response.model.as_str());
688                            }
689                            if let Err(error) =
690                                accumulator.record_response_chunk(kind, response, provider_name, options)
691                            {
692                                terminated_with_error = true;
693                                yield Err(error);
694                                break;
695                            }
696                        }
697                    }
698                }
699                Err(crate::http_client::Error::StreamEnded) => {
700                    event_source.close();
701                }
702                Err(error) => {
703                    tracing::error!(?error, "SSE error");
704                    terminated_with_error = true;
705                    yield Err(CompletionError::ProviderError(error.to_string()));
706                    break;
707                }
708            }
709        }
710
711        event_source.close();
712
713        if terminated_with_error {
714            return;
715        }
716
717        let final_usage = accumulator.final_usage.clone();
718
719        for tool_call in accumulator.finish() {
720            yield Ok(tool_call)
721        }
722
723        span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
724        span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
725        let cached_tokens = final_usage
726            .input_tokens_details
727            .as_ref()
728            .map(|d| d.cached_tokens)
729            .unwrap_or(0);
730        span.record("gen_ai.usage.cache_read.input_tokens", cached_tokens);
731
732    }
733    .instrument(span);
734
735    streaming::StreamingCompletionResponse::stream(Box::pin(stream))
736}
737
738/// An item message chunk from OpenAI's Responses API.
739/// See
740#[derive(Debug, Serialize, Deserialize, Clone)]
741pub struct ItemChunk {
742    /// Item ID. Optional.
743    pub item_id: Option<String>,
744    /// The output index of the item from a given streamed response.
745    pub output_index: u64,
746    /// The item type chunk, as well as the inner data.
747    #[serde(flatten)]
748    pub data: ItemChunkKind,
749}
750
751/// The item chunk type from OpenAI's Responses API.
752#[derive(Debug, Serialize, Deserialize, Clone)]
753#[serde(tag = "type")]
754pub enum ItemChunkKind {
755    #[serde(rename = "response.output_item.added")]
756    OutputItemAdded(StreamingItemDoneOutput),
757    #[serde(rename = "response.output_item.done")]
758    OutputItemDone(StreamingItemDoneOutput),
759    #[serde(rename = "response.content_part.added")]
760    ContentPartAdded(ContentPartChunk),
761    #[serde(rename = "response.content_part.done")]
762    ContentPartDone(ContentPartChunk),
763    #[serde(rename = "response.output_text.delta")]
764    OutputTextDelta(DeltaTextChunk),
765    #[serde(rename = "response.output_text.done")]
766    OutputTextDone(OutputTextChunk),
767    #[serde(rename = "response.refusal.delta")]
768    RefusalDelta(DeltaTextChunk),
769    #[serde(rename = "response.refusal.done")]
770    RefusalDone(RefusalTextChunk),
771    #[serde(rename = "response.function_call_arguments.delta")]
772    FunctionCallArgsDelta(DeltaTextChunkWithItemId),
773    #[serde(rename = "response.function_call_arguments.done")]
774    FunctionCallArgsDone(ArgsTextChunk),
775    #[serde(rename = "response.reasoning_summary_part.added")]
776    ReasoningSummaryPartAdded(SummaryPartChunk),
777    #[serde(rename = "response.reasoning_summary_part.done")]
778    ReasoningSummaryPartDone(SummaryPartChunk),
779    #[serde(rename = "response.reasoning_summary_text.delta")]
780    ReasoningSummaryTextDelta(SummaryTextChunk),
781    #[serde(rename = "response.reasoning_summary_text.done")]
782    ReasoningSummaryTextDone(SummaryTextChunk),
783    /// Catch-all for unknown item chunk types (e.g., `web_search_call` events).
784    /// This prevents unknown streaming events from breaking deserialization.
785    #[serde(other)]
786    Unknown,
787}
788
789#[derive(Debug, Serialize, Deserialize, Clone)]
790pub struct StreamingItemDoneOutput {
791    pub sequence_number: u64,
792    pub item: Output,
793}
794
795#[derive(Debug, Serialize, Deserialize, Clone)]
796pub struct ContentPartChunk {
797    pub content_index: u64,
798    pub sequence_number: u64,
799    pub part: ContentPartChunkPart,
800}
801
802#[derive(Debug, Serialize, Deserialize, Clone)]
803#[serde(tag = "type", rename_all = "snake_case")]
804pub enum ContentPartChunkPart {
805    OutputText { text: String },
806    SummaryText { text: String },
807}
808
809#[derive(Debug, Serialize, Deserialize, Clone)]
810pub struct DeltaTextChunk {
811    pub content_index: u64,
812    pub sequence_number: u64,
813    pub delta: String,
814}
815
816#[derive(Debug, Serialize, Deserialize, Clone)]
817pub struct DeltaTextChunkWithItemId {
818    #[serde(default, skip_serializing_if = "Option::is_none")]
819    pub content_index: Option<u64>,
820    pub sequence_number: u64,
821    pub delta: String,
822}
823
824#[derive(Debug, Serialize, Deserialize, Clone)]
825pub struct OutputTextChunk {
826    pub content_index: u64,
827    pub sequence_number: u64,
828    pub text: String,
829}
830
831#[derive(Debug, Serialize, Deserialize, Clone)]
832pub struct RefusalTextChunk {
833    pub content_index: u64,
834    pub sequence_number: u64,
835    pub refusal: String,
836}
837
838#[derive(Debug, Serialize, Deserialize, Clone)]
839pub struct ArgsTextChunk {
840    #[serde(default, skip_serializing_if = "Option::is_none")]
841    pub content_index: Option<u64>,
842    pub sequence_number: u64,
843    pub arguments: serde_json::Value,
844}
845
846#[derive(Debug, Serialize, Deserialize, Clone)]
847pub struct SummaryPartChunk {
848    pub summary_index: u64,
849    pub sequence_number: u64,
850    pub part: SummaryPartChunkPart,
851}
852
853#[derive(Debug, Serialize, Deserialize, Clone)]
854pub struct SummaryTextChunk {
855    pub summary_index: u64,
856    pub sequence_number: u64,
857    pub delta: String,
858}
859
860#[derive(Debug, Serialize, Deserialize, Clone)]
861#[serde(tag = "type", rename_all = "snake_case")]
862pub enum SummaryPartChunkPart {
863    SummaryText { text: String },
864}
865
866impl<Ext, H> GenericResponsesCompletionModel<Ext, H>
867where
868    crate::client::Client<Ext, H>:
869        HttpClientExt + Clone + std::fmt::Debug + WasmCompatSend + 'static,
870    Ext: crate::client::Provider + Clone + 'static,
871    H: Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
872{
873    pub(crate) async fn stream(
874        &self,
875        completion_request: crate::completion::CompletionRequest,
876    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
877    {
878        let mut request = self.create_completion_request(completion_request)?;
879        request.stream = Some(true);
880
881        if enabled!(Level::TRACE) {
882            tracing::trace!(
883                target: "rig::completions",
884                "OpenAI Responses streaming completion request: {}",
885                serde_json::to_string_pretty(&request)?
886            );
887        }
888
889        let body = serde_json::to_vec(&request)?;
890
891        let req = self
892            .client
893            .post("/responses")?
894            .body(body)
895            .map_err(|e| CompletionError::HttpError(e.into()))?;
896
897        // let request_builder = self.client.post_reqwest("/responses").json(&request);
898
899        let span = if tracing::Span::current().is_disabled() {
900            info_span!(
901                target: "rig::completions",
902                "chat_streaming",
903                gen_ai.operation.name = "chat_streaming",
904                gen_ai.provider.name = tracing::field::Empty,
905                gen_ai.request.model = tracing::field::Empty,
906                gen_ai.response.id = tracing::field::Empty,
907                gen_ai.response.model = tracing::field::Empty,
908                gen_ai.usage.output_tokens = tracing::field::Empty,
909                gen_ai.usage.input_tokens = tracing::field::Empty,
910                gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
911            )
912        } else {
913            tracing::Span::current()
914        };
915        span.record("gen_ai.provider.name", "openai");
916        span.record("gen_ai.request.model", &self.model);
917        let client = self.client.clone();
918        let event_source = GenericEventSource::new(client, req);
919
920        Ok(stream_from_event_source(event_source, span, "OpenAI"))
921    }
922}
923
924#[cfg(test)]
925mod tests {
926    use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
927    use crate::completion::CompletionModel;
928    use crate::message::ReasoningContent;
929    use crate::providers::internal::openai_chat_completions_compatible::test_support::sse_bytes_from_json_events;
930    use crate::providers::openai::responses_api::{
931        AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
932        ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
933    };
934    use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
935    use crate::test_utils::MockStreamingClient;
936    use futures::StreamExt;
937    use serde_json::{self, json};
938
939    use crate::{
940        client::CompletionClient, completion::Message, providers::openai, streaming::StreamingChat,
941        test_utils::MockExampleTool,
942    };
943
944    fn sample_response(status: ResponseStatus) -> CompletionResponse {
945        CompletionResponse {
946            id: "resp_123".to_string(),
947            object: ResponseObject::Response,
948            created_at: 0,
949            status,
950            error: None,
951            incomplete_details: None,
952            instructions: None,
953            max_output_tokens: None,
954            model: "gpt-5.4".to_string(),
955            provider_reasoning: None,
956            usage: None,
957            output: Vec::new(),
958            tools: Vec::new(),
959            additional_parameters: AdditionalParameters::default(),
960        }
961    }
962
963    async fn first_error_from_event(
964        event: serde_json::Value,
965    ) -> crate::completion::CompletionError {
966        let client = openai::Client::builder()
967            .http_client(MockStreamingClient {
968                sse_bytes: sse_bytes_from_json_events(&[event]),
969            })
970            .api_key("test-key")
971            .build()
972            .expect("client should build");
973        let model = client.completion_model("gpt-5.4");
974        let request = model.completion_request("hello").build();
975        let mut stream = model.stream(request).await.expect("stream should start");
976
977        stream
978            .next()
979            .await
980            .expect("stream should yield an item")
981            .expect_err("stream should surface a provider error")
982    }
983
984    async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
985        let client = openai::Client::builder()
986            .http_client(MockStreamingClient {
987                sse_bytes: sse_bytes_from_json_events(&[event]),
988            })
989            .api_key("test-key")
990            .build()
991            .expect("client should build");
992        let model = client.completion_model("gpt-5.4");
993        let request = model.completion_request("hello").build();
994        let mut stream = model.stream(request).await.expect("stream should start");
995
996        while let Some(item) = stream.next().await {
997            match item.expect("completed stream should not error") {
998                StreamedAssistantContent::Final(res) => return res.usage,
999                _ => continue,
1000            }
1001        }
1002
1003        panic!("stream should yield a final response");
1004    }
1005
1006    #[test]
1007    fn reasoning_done_item_emits_summary_then_encrypted() {
1008        let summary = vec![
1009            ReasoningSummary::SummaryText {
1010                text: "step 1".to_string(),
1011            },
1012            ReasoningSummary::SummaryText {
1013                text: "step 2".to_string(),
1014            },
1015        ];
1016        let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
1017
1018        assert_eq!(choices.len(), 3);
1019        assert!(matches!(
1020            choices.first(),
1021            Some(RawStreamingChoice::Reasoning {
1022                id: Some(id),
1023                content: ReasoningContent::Summary(text),
1024            }) if id == "rs_1" && text == "step 1"
1025        ));
1026        assert!(matches!(
1027            choices.get(1),
1028            Some(RawStreamingChoice::Reasoning {
1029                id: Some(id),
1030                content: ReasoningContent::Summary(text),
1031            }) if id == "rs_1" && text == "step 2"
1032        ));
1033        assert!(matches!(
1034            choices.get(2),
1035            Some(RawStreamingChoice::Reasoning {
1036                id: Some(id),
1037                content: ReasoningContent::Encrypted(data),
1038            }) if id == "rs_1" && data == "enc_blob"
1039        ));
1040    }
1041
1042    #[test]
1043    fn reasoning_done_item_without_encrypted_emits_summary_only() {
1044        let summary = vec![ReasoningSummary::SummaryText {
1045            text: "only summary".to_string(),
1046        }];
1047        let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
1048
1049        assert_eq!(choices.len(), 1);
1050        assert!(matches!(
1051            choices.first(),
1052            Some(RawStreamingChoice::Reasoning {
1053                id: Some(id),
1054                content: ReasoningContent::Summary(text),
1055            }) if id == "rs_2" && text == "only summary"
1056        ));
1057    }
1058
1059    #[test]
1060    fn content_part_added_deserializes_snake_case_part_type() {
1061        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1062            "type": "response.content_part.added",
1063            "item_id": "msg_1",
1064            "output_index": 0,
1065            "content_index": 0,
1066            "sequence_number": 3,
1067            "part": {
1068                "type": "output_text",
1069                "text": "hello"
1070            }
1071        }))
1072        .expect("content part event should deserialize");
1073
1074        assert!(matches!(
1075            chunk,
1076            StreamingCompletionChunk::Delta(chunk)
1077                if matches!(
1078                    chunk.data,
1079                    ItemChunkKind::ContentPartAdded(_)
1080                )
1081        ));
1082    }
1083
1084    #[test]
1085    fn content_part_done_deserializes_snake_case_part_type() {
1086        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1087            "type": "response.content_part.done",
1088            "item_id": "msg_1",
1089            "output_index": 0,
1090            "content_index": 0,
1091            "sequence_number": 4,
1092            "part": {
1093                "type": "summary_text",
1094                "text": "done"
1095            }
1096        }))
1097        .expect("content part done event should deserialize");
1098
1099        assert!(matches!(
1100            chunk,
1101            StreamingCompletionChunk::Delta(chunk)
1102                if matches!(
1103                    chunk.data,
1104                    ItemChunkKind::ContentPartDone(_)
1105                )
1106        ));
1107    }
1108
1109    #[test]
1110    fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
1111        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1112            "type": "response.reasoning_summary_part.added",
1113            "item_id": "rs_1",
1114            "output_index": 0,
1115            "summary_index": 0,
1116            "sequence_number": 5,
1117            "part": {
1118                "type": "summary_text",
1119                "text": "step 1"
1120            }
1121        }))
1122        .expect("reasoning summary part event should deserialize");
1123
1124        assert!(matches!(
1125            chunk,
1126            StreamingCompletionChunk::Delta(chunk)
1127                if matches!(
1128                    chunk.data,
1129                    ItemChunkKind::ReasoningSummaryPartAdded(_)
1130                )
1131        ));
1132    }
1133
1134    #[test]
1135    fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
1136        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1137            "type": "response.reasoning_summary_part.done",
1138            "item_id": "rs_1",
1139            "output_index": 0,
1140            "summary_index": 0,
1141            "sequence_number": 6,
1142            "part": {
1143                "type": "summary_text",
1144                "text": "step 2"
1145            }
1146        }))
1147        .expect("reasoning summary part done event should deserialize");
1148
1149        assert!(matches!(
1150            chunk,
1151            StreamingCompletionChunk::Delta(chunk)
1152                if matches!(
1153                    chunk.data,
1154                    ItemChunkKind::ReasoningSummaryPartDone(_)
1155                )
1156        ));
1157    }
1158
1159    #[tokio::test]
1160    async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
1161        let mut response = sample_response(ResponseStatus::Failed);
1162        response.error = Some(ResponseError {
1163            code: String::new(),
1164            message: "maximum context length exceeded".to_string(),
1165        });
1166
1167        let event = json!({
1168            "type": "response.failed",
1169            "sequence_number": 1,
1170            "response": response,
1171        });
1172
1173        let err = first_error_from_event(event).await;
1174
1175        assert_eq!(
1176            err.to_string(),
1177            "ProviderError: maximum context length exceeded"
1178        );
1179    }
1180
1181    #[tokio::test]
1182    async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
1183        let mut response = sample_response(ResponseStatus::Failed);
1184        response.error = Some(ResponseError {
1185            code: "context_length_exceeded".to_string(),
1186            message: "maximum context length exceeded".to_string(),
1187        });
1188
1189        let event = json!({
1190            "type": "response.failed",
1191            "sequence_number": 1,
1192            "response": response,
1193        });
1194
1195        let err = first_error_from_event(event).await;
1196
1197        assert_eq!(
1198            err.to_string(),
1199            "ProviderError: context_length_exceeded: maximum context length exceeded"
1200        );
1201    }
1202
1203    #[tokio::test]
1204    async fn response_incomplete_chunk_uses_incomplete_details_reason() {
1205        let mut response = sample_response(ResponseStatus::Incomplete);
1206        response.incomplete_details = Some(IncompleteDetailsReason {
1207            reason: "max_output_tokens".to_string(),
1208        });
1209
1210        let event = json!({
1211            "type": "response.incomplete",
1212            "sequence_number": 1,
1213            "response": response,
1214        });
1215
1216        let err = first_error_from_event(event).await;
1217
1218        assert_eq!(
1219            err.to_string(),
1220            "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
1221        );
1222    }
1223
1224    #[tokio::test]
1225    async fn response_failed_chunk_terminates_stream_without_followup_items() {
1226        let tool_call_done = json!({
1227            "type": "response.output_item.done",
1228            "sequence_number": 1,
1229            "item": {
1230                "type": "function_call",
1231                "id": "fc_123",
1232                "arguments": "{}",
1233                "call_id": "call_123",
1234                "name": "example_tool",
1235                "status": "completed"
1236            }
1237        });
1238
1239        let mut response = sample_response(ResponseStatus::Failed);
1240        response.error = Some(ResponseError {
1241            code: "server_error".to_string(),
1242            message: "response stream failed".to_string(),
1243        });
1244
1245        let failed = json!({
1246            "type": "response.failed",
1247            "sequence_number": 2,
1248            "response": response,
1249        });
1250
1251        let client = openai::Client::builder()
1252            .http_client(MockStreamingClient {
1253                sse_bytes: sse_bytes_from_json_events(&[tool_call_done, failed]),
1254            })
1255            .api_key("test-key")
1256            .build()
1257            .expect("client should build");
1258        let model = client.completion_model("gpt-5.4");
1259        let request = model.completion_request("hello").build();
1260        let mut stream = model.stream(request).await.expect("stream should start");
1261
1262        let err = stream
1263            .next()
1264            .await
1265            .expect("stream should yield an item")
1266            .expect_err("stream should surface a provider error");
1267        assert_eq!(
1268            err.to_string(),
1269            "ProviderError: server_error: response stream failed"
1270        );
1271        assert!(
1272            stream.next().await.is_none(),
1273            "stream should terminate immediately after the first terminal error"
1274        );
1275    }
1276
1277    #[tokio::test]
1278    async fn response_completed_chunk_populates_final_usage() {
1279        let mut response = sample_response(ResponseStatus::Completed);
1280        response.usage = Some(ResponsesUsage {
1281            input_tokens: 10,
1282            input_tokens_details: None,
1283            output_tokens: 5,
1284            output_tokens_details: Some(OutputTokensDetails {
1285                reasoning_tokens: 0,
1286            }),
1287            total_tokens: 15,
1288        });
1289
1290        let event = json!({
1291            "type": "response.completed",
1292            "sequence_number": 1,
1293            "response": response,
1294        });
1295
1296        let usage = final_usage_from_event(event).await;
1297        assert_eq!(usage.input_tokens, 10);
1298        assert_eq!(usage.output_tokens, 5);
1299        assert_eq!(usage.total_tokens, 15);
1300    }
1301
1302    #[tokio::test]
1303    async fn done_sentinel_is_ignored_without_debug_parse_noise() {
1304        use std::io::{self, Write};
1305        use std::sync::{Arc, Mutex};
1306
1307        #[derive(Clone)]
1308        struct SharedWriter(Arc<Mutex<Vec<u8>>>);
1309
1310        impl Write for SharedWriter {
1311            fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1312                self.0
1313                    .lock()
1314                    .expect("log buffer mutex should not be poisoned")
1315                    .extend_from_slice(buf);
1316                Ok(buf.len())
1317            }
1318
1319            fn flush(&mut self) -> io::Result<()> {
1320                Ok(())
1321            }
1322        }
1323
1324        let mut response = sample_response(ResponseStatus::Completed);
1325        response.usage = Some(ResponsesUsage {
1326            input_tokens: 4,
1327            input_tokens_details: None,
1328            output_tokens: 2,
1329            output_tokens_details: Some(OutputTokensDetails {
1330                reasoning_tokens: 0,
1331            }),
1332            total_tokens: 6,
1333        });
1334
1335        let captured = Arc::new(Mutex::new(Vec::new()));
1336        let subscriber = tracing_subscriber::fmt()
1337            .with_max_level(tracing::Level::DEBUG)
1338            .with_ansi(false)
1339            .without_time()
1340            .with_writer({
1341                let captured = captured.clone();
1342                move || SharedWriter(captured.clone())
1343            })
1344            .finish();
1345        let _guard = tracing::subscriber::set_default(subscriber);
1346
1347        let client = openai::Client::builder()
1348            .http_client(MockStreamingClient {
1349                sse_bytes: bytes::Bytes::from(format!(
1350                    "data: {}\n\ndata: [DONE]\n\n",
1351                    serde_json::to_string(&json!({
1352                        "type": "response.completed",
1353                        "sequence_number": 1,
1354                        "response": response,
1355                    }))
1356                    .expect("response event should serialize")
1357                )),
1358            })
1359            .api_key("test-key")
1360            .build()
1361            .expect("client should build");
1362        let model = client.completion_model("gpt-5.4");
1363        let request = model.completion_request("hello").build();
1364        let mut stream = model.stream(request).await.expect("stream should start");
1365
1366        let mut final_usage = None;
1367        while let Some(item) = stream.next().await {
1368            if let StreamedAssistantContent::Final(response) =
1369                item.expect("stream should complete successfully")
1370            {
1371                final_usage = Some(response.usage);
1372            }
1373        }
1374
1375        let usage = final_usage.expect("expected final response");
1376        assert_eq!(usage.input_tokens, 4);
1377        assert_eq!(usage.output_tokens, 2);
1378        assert_eq!(usage.total_tokens, 6);
1379
1380        let logs = String::from_utf8(
1381            captured
1382                .lock()
1383                .expect("log buffer mutex should not be poisoned")
1384                .clone(),
1385        )
1386        .expect("captured logs should be valid UTF-8");
1387        assert!(
1388            !logs.contains("Couldn't deserialize SSE data as StreamingCompletionChunk"),
1389            "expected [DONE] to bypass the parse-failure debug path, logs were: {logs}"
1390        );
1391    }
1392
1393    // requires `derive` rig-core feature due to using tool macro
1394    #[tokio::test]
1395    #[ignore = "requires API key"]
1396    async fn test_openai_streaming_tools_reasoning() {
1397        let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
1398        let client = openai::Client::new(&api_key).expect("Failed to build client");
1399        let agent = client
1400            .agent("gpt-5.2")
1401            .max_tokens(8192)
1402            .tool(MockExampleTool)
1403            .additional_params(serde_json::json!({
1404                "reasoning": {"effort": "high"}
1405            }))
1406            .build();
1407
1408        let chat_history: Vec<Message> = Vec::new();
1409        let mut stream = agent
1410            .stream_chat("Call my example tool", &chat_history)
1411            .multi_turn(5)
1412            .await;
1413
1414        while let Some(item) = stream.next().await {
1415            println!("Got item: {item:?}");
1416        }
1417    }
1418}