Skip to main content

rig_core/providers/openai/responses_api/
streaming.rs

1//! The streaming module for the OpenAI Responses API.
2//! Please see the `openai_streaming` or `openai_streaming_with_tools` example for more practical usage.
3use crate::completion::{self, CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{ReasoningSummary, ResponsesUsage};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use crate::wasm_compat::WasmCompatSend;
11use async_stream::stream;
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use tracing::{Level, debug, enabled, info_span};
15use tracing_futures::Instrument as _;
16
17use super::{CompletionResponse, GenericResponsesCompletionModel, Output};
18
19type StreamingRawChoice = RawStreamingChoice<StreamingCompletionResponse>;
20
21// ================================================================
22// OpenAI Responses Streaming API
23// ================================================================
24
25/// A streaming completion chunk.
26/// Streaming chunks can come in one of two forms:
27/// - A response chunk (where the completed response will have the total token usage)
28/// - An item chunk commonly referred to as a delta. In the completions API this would be referred to as the message delta.
29#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32    Response(Box<ResponseChunk>),
33    Delta(ItemChunk),
34}
35
36/// The final streaming response from the OpenAI Responses API.
37#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39    /// Token usage
40    pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44    id: &str,
45    summary: &[ReasoningSummary],
46    encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48    let mut choices = summary
49        .iter()
50        .map(|reasoning_summary| match reasoning_summary {
51            ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52                id: Some(id.to_owned()),
53                content: ReasoningContent::Summary(text.to_owned()),
54            },
55        })
56        .collect::<Vec<_>>();
57
58    if let Some(encrypted_content) = encrypted_content {
59        choices.push(RawStreamingChoice::Reasoning {
60            id: Some(id.to_owned()),
61            content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62        });
63    }
64
65    choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69    fn token_usage(&self) -> Option<crate::completion::Usage> {
70        self.usage.token_usage()
71    }
72}
73
74/// A response chunk from OpenAI's response API.
75#[derive(Debug, Serialize, Deserialize, Clone)]
76pub struct ResponseChunk {
77    /// The response chunk type
78    #[serde(rename = "type")]
79    pub kind: ResponseChunkKind,
80    /// The response itself
81    pub response: CompletionResponse,
82    /// The item sequence
83    pub sequence_number: u64,
84}
85
86/// Response chunk type.
87/// Renames are used to ensure that this type gets (de)serialized properly.
88#[derive(Debug, Serialize, Deserialize, Clone)]
89pub enum ResponseChunkKind {
90    #[serde(rename = "response.created")]
91    ResponseCreated,
92    #[serde(rename = "response.in_progress")]
93    ResponseInProgress,
94    #[serde(rename = "response.completed")]
95    ResponseCompleted,
96    #[serde(rename = "response.failed")]
97    ResponseFailed,
98    #[serde(rename = "response.incomplete")]
99    ResponseIncomplete,
100}
101
102fn response_chunk_error_message(
103    kind: &ResponseChunkKind,
104    response: &CompletionResponse,
105    provider_name: &str,
106) -> Option<String> {
107    match kind {
108        ResponseChunkKind::ResponseFailed => Some(response_error_message(
109            response.error.as_ref(),
110            &format!("{provider_name} response stream returned a failed response"),
111        )),
112        ResponseChunkKind::ResponseIncomplete => {
113            let reason = response
114                .incomplete_details
115                .as_ref()
116                .map(|details| details.reason.as_str())
117                .unwrap_or("unknown reason");
118
119            Some(format!(
120                "{provider_name} response stream was incomplete: {reason}"
121            ))
122        }
123        _ => None,
124    }
125}
126
127fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
128    if let Some(error) = error {
129        if error.code.is_empty() {
130            error.message.clone()
131        } else {
132            format!("{}: {}", error.code, error.message)
133        }
134    } else {
135        fallback.to_string()
136    }
137}
138
139#[derive(Clone, Copy)]
140pub(crate) enum ResponsesStreamOptions {
141    Strict,
142    StrictWithImmediateToolCalls,
143}
144
145impl ResponsesStreamOptions {
146    pub(crate) const fn strict() -> Self {
147        Self::Strict
148    }
149
150    pub(crate) const fn strict_with_immediate_tool_calls() -> Self {
151        Self::StrictWithImmediateToolCalls
152    }
153
154    const fn errors_on_terminal_response(self) -> bool {
155        true
156    }
157
158    const fn emits_completed_tool_calls_immediately(self) -> bool {
159        matches!(self, Self::StrictWithImmediateToolCalls)
160    }
161}
162
163pub(crate) fn parse_sse_completion_body(
164    body: &str,
165    provider_name: &str,
166) -> Result<CompletionResponse, CompletionError> {
167    let mut completed = None;
168    let mut provider_error = None;
169
170    for line in body.lines() {
171        let data = line
172            .strip_prefix("data:")
173            .map(str::trim)
174            .unwrap_or_default();
175        if data.is_empty() || data == "[DONE]" {
176            continue;
177        }
178
179        if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
180            if let StreamingCompletionChunk::Response(chunk) = chunk {
181                let ResponseChunk { kind, response, .. } = *chunk;
182                match kind {
183                    ResponseChunkKind::ResponseCompleted => {
184                        completed = Some(response);
185                        break;
186                    }
187                    ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
188                        provider_error =
189                            response_chunk_error_message(&kind, &response, provider_name);
190                    }
191                    _ => {}
192                }
193            }
194            continue;
195        }
196
197        let value = match serde_json::from_str::<serde_json::Value>(data) {
198            Ok(value) => value,
199            Err(_) => continue,
200        };
201
202        match value.get("type").and_then(serde_json::Value::as_str) {
203            Some("response.completed") => {
204                if let Some(response) = value.get("response") {
205                    completed = Some(serde_json::from_value(response.clone())?);
206                    break;
207                }
208            }
209            Some("response.failed") | Some("response.incomplete") => {
210                provider_error = value
211                    .get("response")
212                    .cloned()
213                    .and_then(|response| {
214                        serde_json::from_value::<CompletionResponse>(response).ok()
215                    })
216                    .and_then(|response| {
217                        let kind = if value.get("type").and_then(serde_json::Value::as_str)
218                            == Some("response.failed")
219                        {
220                            ResponseChunkKind::ResponseFailed
221                        } else {
222                            ResponseChunkKind::ResponseIncomplete
223                        };
224                        response_chunk_error_message(&kind, &response, provider_name)
225                    })
226                    .or_else(|| {
227                        value
228                            .get("error")
229                            .and_then(|error| error.get("message"))
230                            .and_then(serde_json::Value::as_str)
231                            .map(ToOwned::to_owned)
232                    })
233                    .or_else(|| Some(data.to_string()));
234            }
235            Some("error") => {
236                provider_error = value
237                    .get("error")
238                    .and_then(|error| error.get("message"))
239                    .and_then(serde_json::Value::as_str)
240                    .map(ToOwned::to_owned)
241                    .or_else(|| Some(data.to_string()));
242            }
243            _ => {}
244        }
245    }
246
247    completed.ok_or_else(|| {
248        CompletionError::ProviderError(
249            provider_error.unwrap_or_else(|| {
250                format!("{provider_name} stream did not yield response.completed")
251            }),
252        )
253    })
254}
255
256struct RawChoiceAccumulator {
257    final_usage: ResponsesUsage,
258    tool_calls: Vec<StreamingRawChoice>,
259    tool_call_internal_ids: std::collections::HashMap<String, String>,
260}
261
262impl RawChoiceAccumulator {
263    fn new(initial_usage: ResponsesUsage) -> Self {
264        Self {
265            final_usage: initial_usage,
266            tool_calls: Vec::new(),
267            tool_call_internal_ids: std::collections::HashMap::new(),
268        }
269    }
270
271    fn decode_item_chunk(
272        &mut self,
273        chunk: ItemChunk,
274        options: ResponsesStreamOptions,
275    ) -> Vec<StreamingRawChoice> {
276        let mut immediate = Vec::new();
277
278        let ItemChunk {
279            item_id: outer_item_id,
280            data: item,
281            ..
282        } = chunk;
283
284        match item {
285            ItemChunkKind::OutputItemAdded(StreamingItemDoneOutput {
286                item: Output::FunctionCall(func),
287                ..
288            }) => {
289                let internal_call_id = self
290                    .tool_call_internal_ids
291                    .entry(func.id.clone())
292                    .or_insert_with(|| nanoid::nanoid!())
293                    .clone();
294                immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
295                    id: func.id,
296                    internal_call_id,
297                    content: streaming::ToolCallDeltaContent::Name(func.name),
298                });
299            }
300            ItemChunkKind::OutputItemDone(message) => {
301                self.push_output_item_done(
302                    message.item,
303                    &mut immediate,
304                    options.emits_completed_tool_calls_immediately(),
305                );
306            }
307            ItemChunkKind::OutputTextDelta(delta) => {
308                immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
309            }
310            ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
311                immediate.push(streaming::RawStreamingChoice::ReasoningDelta {
312                    id: None,
313                    reasoning: delta.delta,
314                });
315            }
316            ItemChunkKind::RefusalDelta(delta) => {
317                immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
318            }
319            ItemChunkKind::FunctionCallArgsDelta(delta) => {
320                if let Some(item_id) = outer_item_id {
321                    let internal_call_id = self
322                        .tool_call_internal_ids
323                        .entry(item_id.clone())
324                        .or_insert_with(|| nanoid::nanoid!())
325                        .clone();
326                    immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
327                        id: item_id,
328                        internal_call_id,
329                        content: streaming::ToolCallDeltaContent::Delta(delta.delta),
330                    });
331                }
332            }
333            _ => {}
334        }
335
336        immediate
337    }
338
339    fn record_response_chunk(
340        &mut self,
341        kind: ResponseChunkKind,
342        response: CompletionResponse,
343        provider_name: &str,
344        options: ResponsesStreamOptions,
345    ) -> Result<(), CompletionError> {
346        match kind {
347            ResponseChunkKind::ResponseCompleted => {
348                if let Some(usage) = response.usage {
349                    self.final_usage = usage;
350                }
351                Ok(())
352            }
353            ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete
354                if options.errors_on_terminal_response() =>
355            {
356                let error_message = response_chunk_error_message(&kind, &response, provider_name)
357                    .unwrap_or_else(|| {
358                        format!(
359                            "{provider_name} returned terminal response {:?} without an error message",
360                            kind
361                        )
362                    });
363                Err(CompletionError::ProviderError(error_message))
364            }
365            _ => Ok(()),
366        }
367    }
368
369    fn push_output_item_done(
370        &mut self,
371        item: Output,
372        immediate: &mut Vec<StreamingRawChoice>,
373        emit_completed_tool_calls_immediately: bool,
374    ) {
375        match item {
376            Output::FunctionCall(func) => {
377                let internal_call_id = self
378                    .tool_call_internal_ids
379                    .entry(func.id.clone())
380                    .or_insert_with(|| nanoid::nanoid!())
381                    .clone();
382                let tool_call =
383                    streaming::RawStreamingToolCall::new(func.id, func.name, func.arguments)
384                        .with_internal_call_id(internal_call_id)
385                        .with_call_id(func.call_id);
386
387                if emit_completed_tool_calls_immediately {
388                    immediate.push(streaming::RawStreamingChoice::ToolCall(tool_call));
389                } else {
390                    self.tool_calls
391                        .push(streaming::RawStreamingChoice::ToolCall(tool_call));
392                }
393            }
394            Output::Reasoning {
395                id,
396                summary,
397                encrypted_content,
398                ..
399            } => {
400                immediate.extend(reasoning_choices_from_done_item(
401                    &id,
402                    &summary,
403                    encrypted_content.as_deref(),
404                ));
405            }
406            Output::Message(message) => {
407                immediate.push(streaming::RawStreamingChoice::MessageId(message.id));
408            }
409            Output::Unknown => {}
410        }
411    }
412
413    fn finish(mut self) -> Vec<StreamingRawChoice> {
414        let mut choices = Vec::new();
415        choices.append(&mut self.tool_calls);
416        choices.push(RawStreamingChoice::FinalResponse(
417            StreamingCompletionResponse {
418                usage: self.final_usage,
419            },
420        ));
421        choices
422    }
423}
424
425pub(crate) fn raw_choices_from_sse_body(
426    body: &str,
427    initial_usage: ResponsesUsage,
428    provider_name: &str,
429) -> Result<Vec<StreamingRawChoice>, CompletionError> {
430    let mut raw_choices = Vec::new();
431    let mut accumulator = RawChoiceAccumulator::new(initial_usage);
432    let options = ResponsesStreamOptions::strict();
433
434    for line in body.lines() {
435        let data = line
436            .strip_prefix("data:")
437            .map(str::trim)
438            .unwrap_or_default();
439        if data.is_empty() || data == "[DONE]" {
440            continue;
441        }
442
443        if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
444            match chunk {
445                StreamingCompletionChunk::Delta(chunk) => {
446                    raw_choices.extend(accumulator.decode_item_chunk(chunk, options));
447                }
448                StreamingCompletionChunk::Response(chunk) => {
449                    let ResponseChunk { kind, response, .. } = *chunk;
450                    accumulator.record_response_chunk(kind, response, provider_name, options)?;
451                }
452            }
453            continue;
454        }
455
456        let value = match serde_json::from_str::<serde_json::Value>(data) {
457            Ok(value) => value,
458            Err(_) => continue,
459        };
460
461        match value.get("type").and_then(serde_json::Value::as_str) {
462            Some("response.output_text.delta") | Some("response.refusal.delta") => {
463                if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
464                    raw_choices.push(streaming::RawStreamingChoice::Message(delta.to_owned()));
465                }
466            }
467            Some("response.reasoning_summary_text.delta") => {
468                if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
469                    raw_choices.push(streaming::RawStreamingChoice::ReasoningDelta {
470                        id: None,
471                        reasoning: delta.to_owned(),
472                    });
473                }
474            }
475            Some("response.output_item.added") => {
476                if let Some(item) = value
477                    .get("item")
478                    .cloned()
479                    .and_then(|item| serde_json::from_value::<Output>(item).ok())
480                    && let Output::FunctionCall(func) = item
481                {
482                    let internal_call_id = accumulator
483                        .tool_call_internal_ids
484                        .entry(func.id.clone())
485                        .or_insert_with(|| nanoid::nanoid!())
486                        .clone();
487                    raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
488                        id: func.id,
489                        internal_call_id,
490                        content: streaming::ToolCallDeltaContent::Name(func.name),
491                    });
492                }
493            }
494            Some("response.output_item.done") => {
495                if let Some(item) = value
496                    .get("item")
497                    .cloned()
498                    .and_then(|item| serde_json::from_value::<Output>(item).ok())
499                {
500                    accumulator.push_output_item_done(item, &mut raw_choices, false);
501                }
502            }
503            Some("response.function_call_arguments.delta") => {
504                if let (Some(item_id), Some(delta)) = (
505                    value.get("item_id").and_then(serde_json::Value::as_str),
506                    value.get("delta").and_then(serde_json::Value::as_str),
507                ) {
508                    let internal_call_id = accumulator
509                        .tool_call_internal_ids
510                        .entry(item_id.to_owned())
511                        .or_insert_with(|| nanoid::nanoid!())
512                        .clone();
513                    raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
514                        id: item_id.to_owned(),
515                        internal_call_id,
516                        content: streaming::ToolCallDeltaContent::Delta(delta.to_owned()),
517                    });
518                }
519            }
520            Some("response.completed") | Some("response.failed") | Some("response.incomplete") => {
521                if let Some(response) = value.get("response").cloned() {
522                    let response = serde_json::from_value::<CompletionResponse>(response)?;
523                    let Some(kind) = (match value.get("type").and_then(serde_json::Value::as_str) {
524                        Some("response.completed") => Some(ResponseChunkKind::ResponseCompleted),
525                        Some("response.failed") => Some(ResponseChunkKind::ResponseFailed),
526                        Some("response.incomplete") => Some(ResponseChunkKind::ResponseIncomplete),
527                        _ => None,
528                    }) else {
529                        continue;
530                    };
531                    accumulator.record_response_chunk(kind, response, provider_name, options)?;
532                }
533            }
534            Some("error") => {
535                let message = value
536                    .get("error")
537                    .and_then(|error| error.get("message"))
538                    .and_then(serde_json::Value::as_str)
539                    .unwrap_or(data);
540                return Err(CompletionError::ProviderError(message.to_owned()));
541            }
542            _ => {}
543        }
544    }
545
546    raw_choices.extend(accumulator.finish());
547    Ok(raw_choices)
548}
549
550pub(crate) async fn completion_response_from_sse_body(
551    body: &str,
552    raw_response: CompletionResponse,
553    provider_name: &str,
554) -> Result<completion::CompletionResponse<CompletionResponse>, CompletionError> {
555    let raw_choices = raw_choices_from_sse_body(
556        body,
557        raw_response
558            .usage
559            .clone()
560            .unwrap_or_else(ResponsesUsage::new),
561        provider_name,
562    )?;
563    let stream = futures::stream::iter(
564        raw_choices
565            .into_iter()
566            .map(Ok::<_, CompletionError>)
567            .collect::<Vec<_>>(),
568    );
569    let mut stream = crate::streaming::StreamingCompletionResponse::stream(Box::pin(stream));
570
571    while let Some(item) = stream.next().await {
572        item?;
573    }
574
575    if choice_is_empty(&stream.choice) {
576        return Err(CompletionError::ResponseError(
577            "Response contained no parts".to_owned(),
578        ));
579    }
580
581    Ok(completion::CompletionResponse {
582        usage: stream
583            .response
584            .as_ref()
585            .and_then(GetTokenUsage::token_usage)
586            .unwrap_or_else(|| usage_from_raw_response(&raw_response)),
587        message_id: stream
588            .message_id
589            .clone()
590            .or_else(|| message_id_from_response(&raw_response)),
591        choice: stream.choice,
592        raw_response,
593    })
594}
595
596fn choice_is_empty(choice: &crate::OneOrMany<completion::AssistantContent>) -> bool {
597    choice.iter().all(|content| match content {
598        completion::AssistantContent::Text(text) => text.text.trim().is_empty(),
599        completion::AssistantContent::Reasoning(reasoning) => reasoning.content.is_empty(),
600        completion::AssistantContent::Image(_) => false,
601        completion::AssistantContent::ToolCall(_) => false,
602    })
603}
604
605fn message_id_from_response(response: &CompletionResponse) -> Option<String> {
606    response.output.iter().find_map(|item| match item {
607        Output::Message(message) => Some(message.id.clone()),
608        _ => None,
609    })
610}
611
612fn usage_from_raw_response(response: &CompletionResponse) -> completion::Usage {
613    response
614        .usage
615        .as_ref()
616        .and_then(GetTokenUsage::token_usage)
617        .unwrap_or_default()
618}
619
620pub(crate) fn stream_from_event_source<HttpClient, RequestBody>(
621    event_source: GenericEventSource<HttpClient, RequestBody>,
622    span: tracing::Span,
623    provider_name: &'static str,
624) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
625where
626    HttpClient: HttpClientExt + Clone + 'static,
627    RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
628{
629    stream_from_event_source_with_options(
630        event_source,
631        span,
632        provider_name,
633        ResponsesStreamOptions::strict(),
634    )
635}
636
637pub(crate) fn stream_from_event_source_with_options<HttpClient, RequestBody>(
638    mut event_source: GenericEventSource<HttpClient, RequestBody>,
639    span: tracing::Span,
640    provider_name: &'static str,
641    options: ResponsesStreamOptions,
642) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
643where
644    HttpClient: HttpClientExt + Clone + 'static,
645    RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
646{
647    let stream = stream! {
648        let mut accumulator = RawChoiceAccumulator::new(ResponsesUsage::new());
649        let span = tracing::Span::current();
650
651        let mut terminated_with_error = false;
652
653        while let Some(event_result) = event_source.next().await {
654            match event_result {
655                Ok(Event::Open) => {
656                    tracing::trace!("SSE connection opened");
657                    continue;
658                }
659                Ok(Event::Message(evt)) => {
660                    if evt.data.trim().is_empty() || evt.data == "[DONE]" {
661                        continue;
662                    }
663
664                    let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
665
666                    let Ok(data) = data else {
667                        let Err(err) = data else {
668                            continue;
669                        };
670                        debug!(
671                            "Couldn't deserialize SSE data as StreamingCompletionChunk: {:?}",
672                            err
673                        );
674                        continue;
675                    };
676
677                    match data {
678                        StreamingCompletionChunk::Delta(chunk) => {
679                            for choice in accumulator.decode_item_chunk(chunk, options) {
680                                yield Ok(choice);
681                            }
682                        }
683                        StreamingCompletionChunk::Response(chunk) => {
684                            let ResponseChunk { kind, response, .. } = *chunk;
685                            if matches!(kind, ResponseChunkKind::ResponseCompleted) {
686                                span.record("gen_ai.response.id", response.id.as_str());
687                                span.record("gen_ai.response.model", response.model.as_str());
688                            }
689                            if let Err(error) =
690                                accumulator.record_response_chunk(kind, response, provider_name, options)
691                            {
692                                terminated_with_error = true;
693                                yield Err(error);
694                                break;
695                            }
696                        }
697                    }
698                }
699                Err(crate::http_client::Error::StreamEnded) => {
700                    event_source.close();
701                }
702                Err(error) => {
703                    tracing::error!(?error, "SSE error");
704                    terminated_with_error = true;
705                    yield Err(CompletionError::ProviderError(error.to_string()));
706                    break;
707                }
708            }
709        }
710
711        event_source.close();
712
713        if terminated_with_error {
714            return;
715        }
716
717        let final_usage = accumulator.final_usage.clone();
718
719        for tool_call in accumulator.finish() {
720            yield Ok(tool_call)
721        }
722
723        span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
724        span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
725        let cached_tokens = final_usage
726            .input_tokens_details
727            .as_ref()
728            .map(|d| d.cached_tokens)
729            .unwrap_or(0);
730        span.record("gen_ai.usage.cache_read.input_tokens", cached_tokens);
731
732    }
733    .instrument(span);
734
735    streaming::StreamingCompletionResponse::stream(Box::pin(stream))
736}
737
738/// An item message chunk from OpenAI's Responses API.
739/// See
740#[derive(Debug, Serialize, Deserialize, Clone)]
741pub struct ItemChunk {
742    /// Item ID. Optional.
743    pub item_id: Option<String>,
744    /// The output index of the item from a given streamed response.
745    pub output_index: u64,
746    /// The item type chunk, as well as the inner data.
747    #[serde(flatten)]
748    pub data: ItemChunkKind,
749}
750
751/// The item chunk type from OpenAI's Responses API.
752#[derive(Debug, Serialize, Deserialize, Clone)]
753#[serde(tag = "type")]
754pub enum ItemChunkKind {
755    #[serde(rename = "response.output_item.added")]
756    OutputItemAdded(StreamingItemDoneOutput),
757    #[serde(rename = "response.output_item.done")]
758    OutputItemDone(StreamingItemDoneOutput),
759    #[serde(rename = "response.content_part.added")]
760    ContentPartAdded(ContentPartChunk),
761    #[serde(rename = "response.content_part.done")]
762    ContentPartDone(ContentPartChunk),
763    #[serde(rename = "response.output_text.delta")]
764    OutputTextDelta(DeltaTextChunk),
765    #[serde(rename = "response.output_text.done")]
766    OutputTextDone(OutputTextChunk),
767    #[serde(rename = "response.refusal.delta")]
768    RefusalDelta(DeltaTextChunk),
769    #[serde(rename = "response.refusal.done")]
770    RefusalDone(RefusalTextChunk),
771    #[serde(rename = "response.function_call_arguments.delta")]
772    FunctionCallArgsDelta(DeltaTextChunkWithItemId),
773    #[serde(rename = "response.function_call_arguments.done")]
774    FunctionCallArgsDone(ArgsTextChunk),
775    #[serde(rename = "response.reasoning_summary_part.added")]
776    ReasoningSummaryPartAdded(SummaryPartChunk),
777    #[serde(rename = "response.reasoning_summary_part.done")]
778    ReasoningSummaryPartDone(SummaryPartChunk),
779    #[serde(rename = "response.reasoning_summary_text.delta")]
780    ReasoningSummaryTextDelta(SummaryTextChunk),
781    #[serde(rename = "response.reasoning_summary_text.done")]
782    ReasoningSummaryTextDone(SummaryTextChunk),
783    /// Catch-all for unknown item chunk types (e.g., `web_search_call` events).
784    /// This prevents unknown streaming events from breaking deserialization.
785    #[serde(other)]
786    Unknown,
787}
788
789#[derive(Debug, Serialize, Deserialize, Clone)]
790pub struct StreamingItemDoneOutput {
791    pub sequence_number: u64,
792    pub item: Output,
793}
794
795#[derive(Debug, Serialize, Deserialize, Clone)]
796pub struct ContentPartChunk {
797    pub content_index: u64,
798    pub sequence_number: u64,
799    pub part: ContentPartChunkPart,
800}
801
802#[derive(Debug, Serialize, Deserialize, Clone)]
803#[serde(tag = "type", rename_all = "snake_case")]
804pub enum ContentPartChunkPart {
805    OutputText { text: String },
806    SummaryText { text: String },
807}
808
809#[derive(Debug, Serialize, Deserialize, Clone)]
810pub struct DeltaTextChunk {
811    pub content_index: u64,
812    pub sequence_number: u64,
813    pub delta: String,
814}
815
816#[derive(Debug, Serialize, Deserialize, Clone)]
817pub struct DeltaTextChunkWithItemId {
818    #[serde(default, skip_serializing_if = "Option::is_none")]
819    pub content_index: Option<u64>,
820    pub sequence_number: u64,
821    pub delta: String,
822}
823
824#[derive(Debug, Serialize, Deserialize, Clone)]
825pub struct OutputTextChunk {
826    pub content_index: u64,
827    pub sequence_number: u64,
828    pub text: String,
829}
830
831#[derive(Debug, Serialize, Deserialize, Clone)]
832pub struct RefusalTextChunk {
833    pub content_index: u64,
834    pub sequence_number: u64,
835    pub refusal: String,
836}
837
838#[derive(Debug, Serialize, Deserialize, Clone)]
839pub struct ArgsTextChunk {
840    #[serde(default, skip_serializing_if = "Option::is_none")]
841    pub content_index: Option<u64>,
842    pub sequence_number: u64,
843    pub arguments: serde_json::Value,
844}
845
846#[derive(Debug, Serialize, Deserialize, Clone)]
847pub struct SummaryPartChunk {
848    pub summary_index: u64,
849    pub sequence_number: u64,
850    pub part: SummaryPartChunkPart,
851}
852
853#[derive(Debug, Serialize, Deserialize, Clone)]
854pub struct SummaryTextChunk {
855    pub summary_index: u64,
856    pub sequence_number: u64,
857    pub delta: String,
858}
859
860#[derive(Debug, Serialize, Deserialize, Clone)]
861#[serde(tag = "type", rename_all = "snake_case")]
862pub enum SummaryPartChunkPart {
863    SummaryText { text: String },
864}
865
866impl<Ext, H> GenericResponsesCompletionModel<Ext, H>
867where
868    crate::client::Client<Ext, H>:
869        HttpClientExt + Clone + std::fmt::Debug + WasmCompatSend + 'static,
870    Ext: crate::client::Provider + Clone + 'static,
871    H: Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
872{
873    pub(crate) async fn stream(
874        &self,
875        completion_request: crate::completion::CompletionRequest,
876    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
877    {
878        let mut request = self.create_completion_request(completion_request)?;
879        request.stream = Some(true);
880
881        if enabled!(Level::TRACE) {
882            tracing::trace!(
883                target: "rig::completions",
884                "OpenAI Responses streaming completion request: {}",
885                serde_json::to_string_pretty(&request)?
886            );
887        }
888
889        let body = serde_json::to_vec(&request)?;
890
891        let req = self
892            .client
893            .post("/responses")?
894            .body(body)
895            .map_err(|e| CompletionError::HttpError(e.into()))?;
896
897        // let request_builder = self.client.post_reqwest("/responses").json(&request);
898
899        let span = if tracing::Span::current().is_disabled() {
900            info_span!(
901                target: "rig::completions",
902                "chat_streaming",
903                gen_ai.operation.name = "chat_streaming",
904                gen_ai.provider.name = tracing::field::Empty,
905                gen_ai.request.model = tracing::field::Empty,
906                gen_ai.response.id = tracing::field::Empty,
907                gen_ai.response.model = tracing::field::Empty,
908                gen_ai.usage.output_tokens = tracing::field::Empty,
909                gen_ai.usage.input_tokens = tracing::field::Empty,
910                gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
911            )
912        } else {
913            tracing::Span::current()
914        };
915        span.record("gen_ai.provider.name", "openai");
916        span.record("gen_ai.request.model", &self.model);
917        let client = self.client.clone();
918        let event_source = GenericEventSource::new(client, req);
919
920        Ok(stream_from_event_source(event_source, span, "OpenAI"))
921    }
922}
923
924#[cfg(test)]
925mod tests {
926    use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
927    use crate::completion::CompletionModel;
928    use crate::message::ReasoningContent;
929    use crate::providers::internal::openai_chat_completions_compatible::test_support::sse_bytes_from_json_events;
930    use crate::providers::openai::responses_api::{
931        AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
932        ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
933    };
934    use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
935    use crate::test_utils::MockStreamingClient;
936    use futures::StreamExt;
937    use serde_json::{self, json};
938
939    use crate::{
940        client::CompletionClient, completion::Message, providers::openai, streaming::StreamingChat,
941        test_utils::MockExampleTool,
942    };
943
944    fn sample_response(status: ResponseStatus) -> CompletionResponse {
945        CompletionResponse {
946            id: "resp_123".to_string(),
947            object: ResponseObject::Response,
948            created_at: 0,
949            status,
950            error: None,
951            incomplete_details: None,
952            instructions: None,
953            max_output_tokens: None,
954            model: "gpt-5.4".to_string(),
955            usage: None,
956            output: Vec::new(),
957            tools: Vec::new(),
958            additional_parameters: AdditionalParameters::default(),
959        }
960    }
961
962    async fn first_error_from_event(
963        event: serde_json::Value,
964    ) -> crate::completion::CompletionError {
965        let client = openai::Client::builder()
966            .http_client(MockStreamingClient {
967                sse_bytes: sse_bytes_from_json_events(&[event]),
968            })
969            .api_key("test-key")
970            .build()
971            .expect("client should build");
972        let model = client.completion_model("gpt-5.4");
973        let request = model.completion_request("hello").build();
974        let mut stream = model.stream(request).await.expect("stream should start");
975
976        stream
977            .next()
978            .await
979            .expect("stream should yield an item")
980            .expect_err("stream should surface a provider error")
981    }
982
983    async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
984        let client = openai::Client::builder()
985            .http_client(MockStreamingClient {
986                sse_bytes: sse_bytes_from_json_events(&[event]),
987            })
988            .api_key("test-key")
989            .build()
990            .expect("client should build");
991        let model = client.completion_model("gpt-5.4");
992        let request = model.completion_request("hello").build();
993        let mut stream = model.stream(request).await.expect("stream should start");
994
995        while let Some(item) = stream.next().await {
996            match item.expect("completed stream should not error") {
997                StreamedAssistantContent::Final(res) => return res.usage,
998                _ => continue,
999            }
1000        }
1001
1002        panic!("stream should yield a final response");
1003    }
1004
1005    #[test]
1006    fn reasoning_done_item_emits_summary_then_encrypted() {
1007        let summary = vec![
1008            ReasoningSummary::SummaryText {
1009                text: "step 1".to_string(),
1010            },
1011            ReasoningSummary::SummaryText {
1012                text: "step 2".to_string(),
1013            },
1014        ];
1015        let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
1016
1017        assert_eq!(choices.len(), 3);
1018        assert!(matches!(
1019            choices.first(),
1020            Some(RawStreamingChoice::Reasoning {
1021                id: Some(id),
1022                content: ReasoningContent::Summary(text),
1023            }) if id == "rs_1" && text == "step 1"
1024        ));
1025        assert!(matches!(
1026            choices.get(1),
1027            Some(RawStreamingChoice::Reasoning {
1028                id: Some(id),
1029                content: ReasoningContent::Summary(text),
1030            }) if id == "rs_1" && text == "step 2"
1031        ));
1032        assert!(matches!(
1033            choices.get(2),
1034            Some(RawStreamingChoice::Reasoning {
1035                id: Some(id),
1036                content: ReasoningContent::Encrypted(data),
1037            }) if id == "rs_1" && data == "enc_blob"
1038        ));
1039    }
1040
1041    #[test]
1042    fn reasoning_done_item_without_encrypted_emits_summary_only() {
1043        let summary = vec![ReasoningSummary::SummaryText {
1044            text: "only summary".to_string(),
1045        }];
1046        let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
1047
1048        assert_eq!(choices.len(), 1);
1049        assert!(matches!(
1050            choices.first(),
1051            Some(RawStreamingChoice::Reasoning {
1052                id: Some(id),
1053                content: ReasoningContent::Summary(text),
1054            }) if id == "rs_2" && text == "only summary"
1055        ));
1056    }
1057
1058    #[test]
1059    fn content_part_added_deserializes_snake_case_part_type() {
1060        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1061            "type": "response.content_part.added",
1062            "item_id": "msg_1",
1063            "output_index": 0,
1064            "content_index": 0,
1065            "sequence_number": 3,
1066            "part": {
1067                "type": "output_text",
1068                "text": "hello"
1069            }
1070        }))
1071        .expect("content part event should deserialize");
1072
1073        assert!(matches!(
1074            chunk,
1075            StreamingCompletionChunk::Delta(chunk)
1076                if matches!(
1077                    chunk.data,
1078                    ItemChunkKind::ContentPartAdded(_)
1079                )
1080        ));
1081    }
1082
1083    #[test]
1084    fn content_part_done_deserializes_snake_case_part_type() {
1085        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1086            "type": "response.content_part.done",
1087            "item_id": "msg_1",
1088            "output_index": 0,
1089            "content_index": 0,
1090            "sequence_number": 4,
1091            "part": {
1092                "type": "summary_text",
1093                "text": "done"
1094            }
1095        }))
1096        .expect("content part done event should deserialize");
1097
1098        assert!(matches!(
1099            chunk,
1100            StreamingCompletionChunk::Delta(chunk)
1101                if matches!(
1102                    chunk.data,
1103                    ItemChunkKind::ContentPartDone(_)
1104                )
1105        ));
1106    }
1107
1108    #[test]
1109    fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
1110        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1111            "type": "response.reasoning_summary_part.added",
1112            "item_id": "rs_1",
1113            "output_index": 0,
1114            "summary_index": 0,
1115            "sequence_number": 5,
1116            "part": {
1117                "type": "summary_text",
1118                "text": "step 1"
1119            }
1120        }))
1121        .expect("reasoning summary part event should deserialize");
1122
1123        assert!(matches!(
1124            chunk,
1125            StreamingCompletionChunk::Delta(chunk)
1126                if matches!(
1127                    chunk.data,
1128                    ItemChunkKind::ReasoningSummaryPartAdded(_)
1129                )
1130        ));
1131    }
1132
1133    #[test]
1134    fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
1135        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1136            "type": "response.reasoning_summary_part.done",
1137            "item_id": "rs_1",
1138            "output_index": 0,
1139            "summary_index": 0,
1140            "sequence_number": 6,
1141            "part": {
1142                "type": "summary_text",
1143                "text": "step 2"
1144            }
1145        }))
1146        .expect("reasoning summary part done event should deserialize");
1147
1148        assert!(matches!(
1149            chunk,
1150            StreamingCompletionChunk::Delta(chunk)
1151                if matches!(
1152                    chunk.data,
1153                    ItemChunkKind::ReasoningSummaryPartDone(_)
1154                )
1155        ));
1156    }
1157
1158    #[tokio::test]
1159    async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
1160        let mut response = sample_response(ResponseStatus::Failed);
1161        response.error = Some(ResponseError {
1162            code: String::new(),
1163            message: "maximum context length exceeded".to_string(),
1164        });
1165
1166        let event = json!({
1167            "type": "response.failed",
1168            "sequence_number": 1,
1169            "response": response,
1170        });
1171
1172        let err = first_error_from_event(event).await;
1173
1174        assert_eq!(
1175            err.to_string(),
1176            "ProviderError: maximum context length exceeded"
1177        );
1178    }
1179
1180    #[tokio::test]
1181    async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
1182        let mut response = sample_response(ResponseStatus::Failed);
1183        response.error = Some(ResponseError {
1184            code: "context_length_exceeded".to_string(),
1185            message: "maximum context length exceeded".to_string(),
1186        });
1187
1188        let event = json!({
1189            "type": "response.failed",
1190            "sequence_number": 1,
1191            "response": response,
1192        });
1193
1194        let err = first_error_from_event(event).await;
1195
1196        assert_eq!(
1197            err.to_string(),
1198            "ProviderError: context_length_exceeded: maximum context length exceeded"
1199        );
1200    }
1201
1202    #[tokio::test]
1203    async fn response_incomplete_chunk_uses_incomplete_details_reason() {
1204        let mut response = sample_response(ResponseStatus::Incomplete);
1205        response.incomplete_details = Some(IncompleteDetailsReason {
1206            reason: "max_output_tokens".to_string(),
1207        });
1208
1209        let event = json!({
1210            "type": "response.incomplete",
1211            "sequence_number": 1,
1212            "response": response,
1213        });
1214
1215        let err = first_error_from_event(event).await;
1216
1217        assert_eq!(
1218            err.to_string(),
1219            "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
1220        );
1221    }
1222
1223    #[tokio::test]
1224    async fn response_failed_chunk_terminates_stream_without_followup_items() {
1225        let tool_call_done = json!({
1226            "type": "response.output_item.done",
1227            "sequence_number": 1,
1228            "item": {
1229                "type": "function_call",
1230                "id": "fc_123",
1231                "arguments": "{}",
1232                "call_id": "call_123",
1233                "name": "example_tool",
1234                "status": "completed"
1235            }
1236        });
1237
1238        let mut response = sample_response(ResponseStatus::Failed);
1239        response.error = Some(ResponseError {
1240            code: "server_error".to_string(),
1241            message: "response stream failed".to_string(),
1242        });
1243
1244        let failed = json!({
1245            "type": "response.failed",
1246            "sequence_number": 2,
1247            "response": response,
1248        });
1249
1250        let client = openai::Client::builder()
1251            .http_client(MockStreamingClient {
1252                sse_bytes: sse_bytes_from_json_events(&[tool_call_done, failed]),
1253            })
1254            .api_key("test-key")
1255            .build()
1256            .expect("client should build");
1257        let model = client.completion_model("gpt-5.4");
1258        let request = model.completion_request("hello").build();
1259        let mut stream = model.stream(request).await.expect("stream should start");
1260
1261        let err = stream
1262            .next()
1263            .await
1264            .expect("stream should yield an item")
1265            .expect_err("stream should surface a provider error");
1266        assert_eq!(
1267            err.to_string(),
1268            "ProviderError: server_error: response stream failed"
1269        );
1270        assert!(
1271            stream.next().await.is_none(),
1272            "stream should terminate immediately after the first terminal error"
1273        );
1274    }
1275
1276    #[tokio::test]
1277    async fn response_completed_chunk_populates_final_usage() {
1278        let mut response = sample_response(ResponseStatus::Completed);
1279        response.usage = Some(ResponsesUsage {
1280            input_tokens: 10,
1281            input_tokens_details: None,
1282            output_tokens: 5,
1283            output_tokens_details: OutputTokensDetails {
1284                reasoning_tokens: 0,
1285            },
1286            total_tokens: 15,
1287        });
1288
1289        let event = json!({
1290            "type": "response.completed",
1291            "sequence_number": 1,
1292            "response": response,
1293        });
1294
1295        let usage = final_usage_from_event(event).await;
1296        assert_eq!(usage.input_tokens, 10);
1297        assert_eq!(usage.output_tokens, 5);
1298        assert_eq!(usage.total_tokens, 15);
1299    }
1300
1301    #[tokio::test]
1302    async fn done_sentinel_is_ignored_without_debug_parse_noise() {
1303        use std::io::{self, Write};
1304        use std::sync::{Arc, Mutex};
1305
1306        #[derive(Clone)]
1307        struct SharedWriter(Arc<Mutex<Vec<u8>>>);
1308
1309        impl Write for SharedWriter {
1310            fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1311                self.0
1312                    .lock()
1313                    .expect("log buffer mutex should not be poisoned")
1314                    .extend_from_slice(buf);
1315                Ok(buf.len())
1316            }
1317
1318            fn flush(&mut self) -> io::Result<()> {
1319                Ok(())
1320            }
1321        }
1322
1323        let mut response = sample_response(ResponseStatus::Completed);
1324        response.usage = Some(ResponsesUsage {
1325            input_tokens: 4,
1326            input_tokens_details: None,
1327            output_tokens: 2,
1328            output_tokens_details: OutputTokensDetails {
1329                reasoning_tokens: 0,
1330            },
1331            total_tokens: 6,
1332        });
1333
1334        let captured = Arc::new(Mutex::new(Vec::new()));
1335        let subscriber = tracing_subscriber::fmt()
1336            .with_max_level(tracing::Level::DEBUG)
1337            .with_ansi(false)
1338            .without_time()
1339            .with_writer({
1340                let captured = captured.clone();
1341                move || SharedWriter(captured.clone())
1342            })
1343            .finish();
1344        let _guard = tracing::subscriber::set_default(subscriber);
1345
1346        let client = openai::Client::builder()
1347            .http_client(MockStreamingClient {
1348                sse_bytes: bytes::Bytes::from(format!(
1349                    "data: {}\n\ndata: [DONE]\n\n",
1350                    serde_json::to_string(&json!({
1351                        "type": "response.completed",
1352                        "sequence_number": 1,
1353                        "response": response,
1354                    }))
1355                    .expect("response event should serialize")
1356                )),
1357            })
1358            .api_key("test-key")
1359            .build()
1360            .expect("client should build");
1361        let model = client.completion_model("gpt-5.4");
1362        let request = model.completion_request("hello").build();
1363        let mut stream = model.stream(request).await.expect("stream should start");
1364
1365        let mut final_usage = None;
1366        while let Some(item) = stream.next().await {
1367            if let StreamedAssistantContent::Final(response) =
1368                item.expect("stream should complete successfully")
1369            {
1370                final_usage = Some(response.usage);
1371            }
1372        }
1373
1374        let usage = final_usage.expect("expected final response");
1375        assert_eq!(usage.input_tokens, 4);
1376        assert_eq!(usage.output_tokens, 2);
1377        assert_eq!(usage.total_tokens, 6);
1378
1379        let logs = String::from_utf8(
1380            captured
1381                .lock()
1382                .expect("log buffer mutex should not be poisoned")
1383                .clone(),
1384        )
1385        .expect("captured logs should be valid UTF-8");
1386        assert!(
1387            !logs.contains("Couldn't deserialize SSE data as StreamingCompletionChunk"),
1388            "expected [DONE] to bypass the parse-failure debug path, logs were: {logs}"
1389        );
1390    }
1391
1392    // requires `derive` rig-core feature due to using tool macro
1393    #[tokio::test]
1394    #[ignore = "requires API key"]
1395    async fn test_openai_streaming_tools_reasoning() {
1396        let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
1397        let client = openai::Client::new(&api_key).expect("Failed to build client");
1398        let agent = client
1399            .agent("gpt-5.2")
1400            .max_tokens(8192)
1401            .tool(MockExampleTool)
1402            .additional_params(serde_json::json!({
1403                "reasoning": {"effort": "high"}
1404            }))
1405            .build();
1406
1407        let chat_history: Vec<Message> = Vec::new();
1408        let mut stream = agent
1409            .stream_chat("Call my example tool", &chat_history)
1410            .multi_turn(5)
1411            .await;
1412
1413        while let Some(item) = stream.next().await {
1414            println!("Got item: {item:?}");
1415        }
1416    }
1417}