Skip to main content

rig/providers/openai/responses_api/
streaming.rs

1//! The streaming module for the OpenAI Responses API.
2//! Please see the `openai_streaming` or `openai_streaming_with_tools` example for more practical usage.
3use crate::completion::{self, CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{ReasoningSummary, ResponsesUsage};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use crate::wasm_compat::WasmCompatSend;
11use async_stream::stream;
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use tracing::{Level, debug, enabled, info_span};
15use tracing_futures::Instrument as _;
16
17use super::{CompletionResponse, GenericResponsesCompletionModel, Output};
18
19type StreamingRawChoice = RawStreamingChoice<StreamingCompletionResponse>;
20
21// ================================================================
22// OpenAI Responses Streaming API
23// ================================================================
24
25/// A streaming completion chunk.
26/// Streaming chunks can come in one of two forms:
27/// - A response chunk (where the completed response will have the total token usage)
28/// - An item chunk commonly referred to as a delta. In the completions API this would be referred to as the message delta.
29#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32    Response(Box<ResponseChunk>),
33    Delta(ItemChunk),
34}
35
36/// The final streaming response from the OpenAI Responses API.
37#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39    /// Token usage
40    pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44    id: &str,
45    summary: &[ReasoningSummary],
46    encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48    let mut choices = summary
49        .iter()
50        .map(|reasoning_summary| match reasoning_summary {
51            ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52                id: Some(id.to_owned()),
53                content: ReasoningContent::Summary(text.to_owned()),
54            },
55        })
56        .collect::<Vec<_>>();
57
58    if let Some(encrypted_content) = encrypted_content {
59        choices.push(RawStreamingChoice::Reasoning {
60            id: Some(id.to_owned()),
61            content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62        });
63    }
64
65    choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69    fn token_usage(&self) -> Option<crate::completion::Usage> {
70        self.usage.token_usage()
71    }
72}
73
74/// A response chunk from OpenAI's response API.
75#[derive(Debug, Serialize, Deserialize, Clone)]
76pub struct ResponseChunk {
77    /// The response chunk type
78    #[serde(rename = "type")]
79    pub kind: ResponseChunkKind,
80    /// The response itself
81    pub response: CompletionResponse,
82    /// The item sequence
83    pub sequence_number: u64,
84}
85
86/// Response chunk type.
87/// Renames are used to ensure that this type gets (de)serialized properly.
88#[derive(Debug, Serialize, Deserialize, Clone)]
89pub enum ResponseChunkKind {
90    #[serde(rename = "response.created")]
91    ResponseCreated,
92    #[serde(rename = "response.in_progress")]
93    ResponseInProgress,
94    #[serde(rename = "response.completed")]
95    ResponseCompleted,
96    #[serde(rename = "response.failed")]
97    ResponseFailed,
98    #[serde(rename = "response.incomplete")]
99    ResponseIncomplete,
100}
101
102fn response_chunk_error_message(
103    kind: &ResponseChunkKind,
104    response: &CompletionResponse,
105    provider_name: &str,
106) -> Option<String> {
107    match kind {
108        ResponseChunkKind::ResponseFailed => Some(response_error_message(
109            response.error.as_ref(),
110            &format!("{provider_name} response stream returned a failed response"),
111        )),
112        ResponseChunkKind::ResponseIncomplete => {
113            let reason = response
114                .incomplete_details
115                .as_ref()
116                .map(|details| details.reason.as_str())
117                .unwrap_or("unknown reason");
118
119            Some(format!(
120                "{provider_name} response stream was incomplete: {reason}"
121            ))
122        }
123        _ => None,
124    }
125}
126
127fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
128    if let Some(error) = error {
129        if error.code.is_empty() {
130            error.message.clone()
131        } else {
132            format!("{}: {}", error.code, error.message)
133        }
134    } else {
135        fallback.to_string()
136    }
137}
138
139#[derive(Clone, Copy)]
140pub(crate) enum ResponsesStreamOptions {
141    Strict,
142    StrictWithImmediateToolCalls,
143}
144
145impl ResponsesStreamOptions {
146    pub(crate) const fn strict() -> Self {
147        Self::Strict
148    }
149
150    pub(crate) const fn strict_with_immediate_tool_calls() -> Self {
151        Self::StrictWithImmediateToolCalls
152    }
153
154    const fn errors_on_terminal_response(self) -> bool {
155        true
156    }
157
158    const fn emits_completed_tool_calls_immediately(self) -> bool {
159        matches!(self, Self::StrictWithImmediateToolCalls)
160    }
161}
162
163pub(crate) fn parse_sse_completion_body(
164    body: &str,
165    provider_name: &str,
166) -> Result<CompletionResponse, CompletionError> {
167    let mut completed = None;
168    let mut provider_error = None;
169
170    for line in body.lines() {
171        let data = line
172            .strip_prefix("data:")
173            .map(str::trim)
174            .unwrap_or_default();
175        if data.is_empty() || data == "[DONE]" {
176            continue;
177        }
178
179        if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
180            if let StreamingCompletionChunk::Response(chunk) = chunk {
181                let ResponseChunk { kind, response, .. } = *chunk;
182                match kind {
183                    ResponseChunkKind::ResponseCompleted => {
184                        completed = Some(response);
185                        break;
186                    }
187                    ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
188                        provider_error =
189                            response_chunk_error_message(&kind, &response, provider_name);
190                    }
191                    _ => {}
192                }
193            }
194            continue;
195        }
196
197        let value = match serde_json::from_str::<serde_json::Value>(data) {
198            Ok(value) => value,
199            Err(_) => continue,
200        };
201
202        match value.get("type").and_then(serde_json::Value::as_str) {
203            Some("response.completed") => {
204                if let Some(response) = value.get("response") {
205                    completed = Some(serde_json::from_value(response.clone())?);
206                    break;
207                }
208            }
209            Some("response.failed") | Some("response.incomplete") => {
210                provider_error = value
211                    .get("response")
212                    .cloned()
213                    .and_then(|response| {
214                        serde_json::from_value::<CompletionResponse>(response).ok()
215                    })
216                    .and_then(|response| {
217                        let kind = if value.get("type").and_then(serde_json::Value::as_str)
218                            == Some("response.failed")
219                        {
220                            ResponseChunkKind::ResponseFailed
221                        } else {
222                            ResponseChunkKind::ResponseIncomplete
223                        };
224                        response_chunk_error_message(&kind, &response, provider_name)
225                    })
226                    .or_else(|| {
227                        value
228                            .get("error")
229                            .and_then(|error| error.get("message"))
230                            .and_then(serde_json::Value::as_str)
231                            .map(ToOwned::to_owned)
232                    })
233                    .or_else(|| Some(data.to_string()));
234            }
235            Some("error") => {
236                provider_error = value
237                    .get("error")
238                    .and_then(|error| error.get("message"))
239                    .and_then(serde_json::Value::as_str)
240                    .map(ToOwned::to_owned)
241                    .or_else(|| Some(data.to_string()));
242            }
243            _ => {}
244        }
245    }
246
247    completed.ok_or_else(|| {
248        CompletionError::ProviderError(
249            provider_error.unwrap_or_else(|| {
250                format!("{provider_name} stream did not yield response.completed")
251            }),
252        )
253    })
254}
255
256struct RawChoiceAccumulator {
257    final_usage: ResponsesUsage,
258    tool_calls: Vec<StreamingRawChoice>,
259    tool_call_internal_ids: std::collections::HashMap<String, String>,
260}
261
262impl RawChoiceAccumulator {
263    fn new(initial_usage: ResponsesUsage) -> Self {
264        Self {
265            final_usage: initial_usage,
266            tool_calls: Vec::new(),
267            tool_call_internal_ids: std::collections::HashMap::new(),
268        }
269    }
270
271    fn decode_item_chunk(
272        &mut self,
273        item: ItemChunkKind,
274        options: ResponsesStreamOptions,
275    ) -> Vec<StreamingRawChoice> {
276        let mut immediate = Vec::new();
277
278        match item {
279            ItemChunkKind::OutputItemAdded(StreamingItemDoneOutput {
280                item: Output::FunctionCall(func),
281                ..
282            }) => {
283                let internal_call_id = self
284                    .tool_call_internal_ids
285                    .entry(func.id.clone())
286                    .or_insert_with(|| nanoid::nanoid!())
287                    .clone();
288                immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
289                    id: func.id,
290                    internal_call_id,
291                    content: streaming::ToolCallDeltaContent::Name(func.name),
292                });
293            }
294            ItemChunkKind::OutputItemDone(message) => {
295                self.push_output_item_done(
296                    message.item,
297                    &mut immediate,
298                    options.emits_completed_tool_calls_immediately(),
299                );
300            }
301            ItemChunkKind::OutputTextDelta(delta) => {
302                immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
303            }
304            ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
305                immediate.push(streaming::RawStreamingChoice::ReasoningDelta {
306                    id: None,
307                    reasoning: delta.delta,
308                });
309            }
310            ItemChunkKind::RefusalDelta(delta) => {
311                immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
312            }
313            ItemChunkKind::FunctionCallArgsDelta(delta) => {
314                let internal_call_id = self
315                    .tool_call_internal_ids
316                    .entry(delta.item_id.clone())
317                    .or_insert_with(|| nanoid::nanoid!())
318                    .clone();
319                immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
320                    id: delta.item_id,
321                    internal_call_id,
322                    content: streaming::ToolCallDeltaContent::Delta(delta.delta),
323                });
324            }
325            _ => {}
326        }
327
328        immediate
329    }
330
331    fn record_response_chunk(
332        &mut self,
333        kind: ResponseChunkKind,
334        response: CompletionResponse,
335        provider_name: &str,
336        options: ResponsesStreamOptions,
337    ) -> Result<(), CompletionError> {
338        match kind {
339            ResponseChunkKind::ResponseCompleted => {
340                if let Some(usage) = response.usage {
341                    self.final_usage = usage;
342                }
343                Ok(())
344            }
345            ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete
346                if options.errors_on_terminal_response() =>
347            {
348                let error_message = response_chunk_error_message(&kind, &response, provider_name)
349                    .unwrap_or_else(|| {
350                        format!(
351                            "{provider_name} returned terminal response {:?} without an error message",
352                            kind
353                        )
354                    });
355                Err(CompletionError::ProviderError(error_message))
356            }
357            _ => Ok(()),
358        }
359    }
360
361    fn push_output_item_done(
362        &mut self,
363        item: Output,
364        immediate: &mut Vec<StreamingRawChoice>,
365        emit_completed_tool_calls_immediately: bool,
366    ) {
367        match item {
368            Output::FunctionCall(func) => {
369                let internal_call_id = self
370                    .tool_call_internal_ids
371                    .entry(func.id.clone())
372                    .or_insert_with(|| nanoid::nanoid!())
373                    .clone();
374                let tool_call =
375                    streaming::RawStreamingToolCall::new(func.id, func.name, func.arguments)
376                        .with_internal_call_id(internal_call_id)
377                        .with_call_id(func.call_id);
378
379                if emit_completed_tool_calls_immediately {
380                    immediate.push(streaming::RawStreamingChoice::ToolCall(tool_call));
381                } else {
382                    self.tool_calls
383                        .push(streaming::RawStreamingChoice::ToolCall(tool_call));
384                }
385            }
386            Output::Reasoning {
387                id,
388                summary,
389                encrypted_content,
390                ..
391            } => {
392                immediate.extend(reasoning_choices_from_done_item(
393                    &id,
394                    &summary,
395                    encrypted_content.as_deref(),
396                ));
397            }
398            Output::Message(message) => {
399                immediate.push(streaming::RawStreamingChoice::MessageId(message.id));
400            }
401            Output::Unknown => {}
402        }
403    }
404
405    fn finish(mut self) -> Vec<StreamingRawChoice> {
406        let mut choices = Vec::new();
407        choices.append(&mut self.tool_calls);
408        choices.push(RawStreamingChoice::FinalResponse(
409            StreamingCompletionResponse {
410                usage: self.final_usage,
411            },
412        ));
413        choices
414    }
415}
416
417pub(crate) fn raw_choices_from_sse_body(
418    body: &str,
419    initial_usage: ResponsesUsage,
420    provider_name: &str,
421) -> Result<Vec<StreamingRawChoice>, CompletionError> {
422    let mut raw_choices = Vec::new();
423    let mut accumulator = RawChoiceAccumulator::new(initial_usage);
424    let options = ResponsesStreamOptions::strict();
425
426    for line in body.lines() {
427        let data = line
428            .strip_prefix("data:")
429            .map(str::trim)
430            .unwrap_or_default();
431        if data.is_empty() || data == "[DONE]" {
432            continue;
433        }
434
435        if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
436            match chunk {
437                StreamingCompletionChunk::Delta(chunk) => {
438                    raw_choices.extend(accumulator.decode_item_chunk(chunk.data, options));
439                }
440                StreamingCompletionChunk::Response(chunk) => {
441                    let ResponseChunk { kind, response, .. } = *chunk;
442                    accumulator.record_response_chunk(kind, response, provider_name, options)?;
443                }
444            }
445            continue;
446        }
447
448        let value = match serde_json::from_str::<serde_json::Value>(data) {
449            Ok(value) => value,
450            Err(_) => continue,
451        };
452
453        match value.get("type").and_then(serde_json::Value::as_str) {
454            Some("response.output_text.delta") | Some("response.refusal.delta") => {
455                if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
456                    raw_choices.push(streaming::RawStreamingChoice::Message(delta.to_owned()));
457                }
458            }
459            Some("response.reasoning_summary_text.delta") => {
460                if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
461                    raw_choices.push(streaming::RawStreamingChoice::ReasoningDelta {
462                        id: None,
463                        reasoning: delta.to_owned(),
464                    });
465                }
466            }
467            Some("response.output_item.added") => {
468                if let Some(item) = value
469                    .get("item")
470                    .cloned()
471                    .and_then(|item| serde_json::from_value::<Output>(item).ok())
472                    && let Output::FunctionCall(func) = item
473                {
474                    let internal_call_id = accumulator
475                        .tool_call_internal_ids
476                        .entry(func.id.clone())
477                        .or_insert_with(|| nanoid::nanoid!())
478                        .clone();
479                    raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
480                        id: func.id,
481                        internal_call_id,
482                        content: streaming::ToolCallDeltaContent::Name(func.name),
483                    });
484                }
485            }
486            Some("response.output_item.done") => {
487                if let Some(item) = value
488                    .get("item")
489                    .cloned()
490                    .and_then(|item| serde_json::from_value::<Output>(item).ok())
491                {
492                    accumulator.push_output_item_done(item, &mut raw_choices, false);
493                }
494            }
495            Some("response.function_call_arguments.delta") => {
496                if let (Some(item_id), Some(delta)) = (
497                    value.get("item_id").and_then(serde_json::Value::as_str),
498                    value.get("delta").and_then(serde_json::Value::as_str),
499                ) {
500                    let internal_call_id = accumulator
501                        .tool_call_internal_ids
502                        .entry(item_id.to_owned())
503                        .or_insert_with(|| nanoid::nanoid!())
504                        .clone();
505                    raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
506                        id: item_id.to_owned(),
507                        internal_call_id,
508                        content: streaming::ToolCallDeltaContent::Delta(delta.to_owned()),
509                    });
510                }
511            }
512            Some("response.completed") | Some("response.failed") | Some("response.incomplete") => {
513                if let Some(response) = value.get("response").cloned() {
514                    let response = serde_json::from_value::<CompletionResponse>(response)?;
515                    let Some(kind) = (match value.get("type").and_then(serde_json::Value::as_str) {
516                        Some("response.completed") => Some(ResponseChunkKind::ResponseCompleted),
517                        Some("response.failed") => Some(ResponseChunkKind::ResponseFailed),
518                        Some("response.incomplete") => Some(ResponseChunkKind::ResponseIncomplete),
519                        _ => None,
520                    }) else {
521                        continue;
522                    };
523                    accumulator.record_response_chunk(kind, response, provider_name, options)?;
524                }
525            }
526            Some("error") => {
527                let message = value
528                    .get("error")
529                    .and_then(|error| error.get("message"))
530                    .and_then(serde_json::Value::as_str)
531                    .unwrap_or(data);
532                return Err(CompletionError::ProviderError(message.to_owned()));
533            }
534            _ => {}
535        }
536    }
537
538    raw_choices.extend(accumulator.finish());
539    Ok(raw_choices)
540}
541
542pub(crate) async fn completion_response_from_sse_body(
543    body: &str,
544    raw_response: CompletionResponse,
545    provider_name: &str,
546) -> Result<completion::CompletionResponse<CompletionResponse>, CompletionError> {
547    let raw_choices = raw_choices_from_sse_body(
548        body,
549        raw_response
550            .usage
551            .clone()
552            .unwrap_or_else(ResponsesUsage::new),
553        provider_name,
554    )?;
555    let stream = futures::stream::iter(
556        raw_choices
557            .into_iter()
558            .map(Ok::<_, CompletionError>)
559            .collect::<Vec<_>>(),
560    );
561    let mut stream = crate::streaming::StreamingCompletionResponse::stream(Box::pin(stream));
562
563    while let Some(item) = stream.next().await {
564        item?;
565    }
566
567    if choice_is_empty(&stream.choice) {
568        return Err(CompletionError::ResponseError(
569            "Response contained no parts".to_owned(),
570        ));
571    }
572
573    Ok(completion::CompletionResponse {
574        usage: stream
575            .response
576            .as_ref()
577            .and_then(GetTokenUsage::token_usage)
578            .unwrap_or_else(|| usage_from_raw_response(&raw_response)),
579        message_id: stream
580            .message_id
581            .clone()
582            .or_else(|| message_id_from_response(&raw_response)),
583        choice: stream.choice,
584        raw_response,
585    })
586}
587
588fn choice_is_empty(choice: &crate::OneOrMany<completion::AssistantContent>) -> bool {
589    choice.iter().all(|content| match content {
590        completion::AssistantContent::Text(text) => text.text.trim().is_empty(),
591        completion::AssistantContent::Reasoning(reasoning) => reasoning.content.is_empty(),
592        completion::AssistantContent::Image(_) => false,
593        completion::AssistantContent::ToolCall(_) => false,
594    })
595}
596
597fn message_id_from_response(response: &CompletionResponse) -> Option<String> {
598    response.output.iter().find_map(|item| match item {
599        Output::Message(message) => Some(message.id.clone()),
600        _ => None,
601    })
602}
603
604fn usage_from_raw_response(response: &CompletionResponse) -> completion::Usage {
605    response
606        .usage
607        .as_ref()
608        .and_then(GetTokenUsage::token_usage)
609        .unwrap_or_default()
610}
611
612pub(crate) fn stream_from_event_source<HttpClient, RequestBody>(
613    event_source: GenericEventSource<HttpClient, RequestBody>,
614    span: tracing::Span,
615    provider_name: &'static str,
616) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
617where
618    HttpClient: HttpClientExt + Clone + 'static,
619    RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
620{
621    stream_from_event_source_with_options(
622        event_source,
623        span,
624        provider_name,
625        ResponsesStreamOptions::strict(),
626    )
627}
628
629pub(crate) fn stream_from_event_source_with_options<HttpClient, RequestBody>(
630    mut event_source: GenericEventSource<HttpClient, RequestBody>,
631    span: tracing::Span,
632    provider_name: &'static str,
633    options: ResponsesStreamOptions,
634) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
635where
636    HttpClient: HttpClientExt + Clone + 'static,
637    RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
638{
639    let stream = stream! {
640        let mut accumulator = RawChoiceAccumulator::new(ResponsesUsage::new());
641        let span = tracing::Span::current();
642
643        let mut terminated_with_error = false;
644
645        while let Some(event_result) = event_source.next().await {
646            match event_result {
647                Ok(Event::Open) => {
648                    tracing::trace!("SSE connection opened");
649                    continue;
650                }
651                Ok(Event::Message(evt)) => {
652                    if evt.data.trim().is_empty() || evt.data == "[DONE]" {
653                        continue;
654                    }
655
656                    let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
657
658                    let Ok(data) = data else {
659                        let Err(err) = data else {
660                            continue;
661                        };
662                        debug!(
663                            "Couldn't deserialize SSE data as StreamingCompletionChunk: {:?}",
664                            err
665                        );
666                        continue;
667                    };
668
669                    match data {
670                        StreamingCompletionChunk::Delta(chunk) => {
671                            for choice in accumulator.decode_item_chunk(chunk.data, options) {
672                                yield Ok(choice);
673                            }
674                        }
675                        StreamingCompletionChunk::Response(chunk) => {
676                            let ResponseChunk { kind, response, .. } = *chunk;
677                            if matches!(kind, ResponseChunkKind::ResponseCompleted) {
678                                span.record("gen_ai.response.id", response.id.as_str());
679                                span.record("gen_ai.response.model", response.model.as_str());
680                            }
681                            if let Err(error) =
682                                accumulator.record_response_chunk(kind, response, provider_name, options)
683                            {
684                                terminated_with_error = true;
685                                yield Err(error);
686                                break;
687                            }
688                        }
689                    }
690                }
691                Err(crate::http_client::Error::StreamEnded) => {
692                    event_source.close();
693                }
694                Err(error) => {
695                    tracing::error!(?error, "SSE error");
696                    terminated_with_error = true;
697                    yield Err(CompletionError::ProviderError(error.to_string()));
698                    break;
699                }
700            }
701        }
702
703        event_source.close();
704
705        if terminated_with_error {
706            return;
707        }
708
709        let final_usage = accumulator.final_usage.clone();
710
711        for tool_call in accumulator.finish() {
712            yield Ok(tool_call)
713        }
714
715        span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
716        span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
717        let cached_tokens = final_usage
718            .input_tokens_details
719            .as_ref()
720            .map(|d| d.cached_tokens)
721            .unwrap_or(0);
722        span.record("gen_ai.usage.cache_read.input_tokens", cached_tokens);
723
724    }
725    .instrument(span);
726
727    streaming::StreamingCompletionResponse::stream(Box::pin(stream))
728}
729
730/// An item message chunk from OpenAI's Responses API.
731/// See
732#[derive(Debug, Serialize, Deserialize, Clone)]
733pub struct ItemChunk {
734    /// Item ID. Optional.
735    pub item_id: Option<String>,
736    /// The output index of the item from a given streamed response.
737    pub output_index: u64,
738    /// The item type chunk, as well as the inner data.
739    #[serde(flatten)]
740    pub data: ItemChunkKind,
741}
742
743/// The item chunk type from OpenAI's Responses API.
744#[derive(Debug, Serialize, Deserialize, Clone)]
745#[serde(tag = "type")]
746pub enum ItemChunkKind {
747    #[serde(rename = "response.output_item.added")]
748    OutputItemAdded(StreamingItemDoneOutput),
749    #[serde(rename = "response.output_item.done")]
750    OutputItemDone(StreamingItemDoneOutput),
751    #[serde(rename = "response.content_part.added")]
752    ContentPartAdded(ContentPartChunk),
753    #[serde(rename = "response.content_part.done")]
754    ContentPartDone(ContentPartChunk),
755    #[serde(rename = "response.output_text.delta")]
756    OutputTextDelta(DeltaTextChunk),
757    #[serde(rename = "response.output_text.done")]
758    OutputTextDone(OutputTextChunk),
759    #[serde(rename = "response.refusal.delta")]
760    RefusalDelta(DeltaTextChunk),
761    #[serde(rename = "response.refusal.done")]
762    RefusalDone(RefusalTextChunk),
763    #[serde(rename = "response.function_call_arguments.delta")]
764    FunctionCallArgsDelta(DeltaTextChunkWithItemId),
765    #[serde(rename = "response.function_call_arguments.done")]
766    FunctionCallArgsDone(ArgsTextChunk),
767    #[serde(rename = "response.reasoning_summary_part.added")]
768    ReasoningSummaryPartAdded(SummaryPartChunk),
769    #[serde(rename = "response.reasoning_summary_part.done")]
770    ReasoningSummaryPartDone(SummaryPartChunk),
771    #[serde(rename = "response.reasoning_summary_text.delta")]
772    ReasoningSummaryTextDelta(SummaryTextChunk),
773    #[serde(rename = "response.reasoning_summary_text.done")]
774    ReasoningSummaryTextDone(SummaryTextChunk),
775    /// Catch-all for unknown item chunk types (e.g., `web_search_call` events).
776    /// This prevents unknown streaming events from breaking deserialization.
777    #[serde(other)]
778    Unknown,
779}
780
781#[derive(Debug, Serialize, Deserialize, Clone)]
782pub struct StreamingItemDoneOutput {
783    pub sequence_number: u64,
784    pub item: Output,
785}
786
787#[derive(Debug, Serialize, Deserialize, Clone)]
788pub struct ContentPartChunk {
789    pub content_index: u64,
790    pub sequence_number: u64,
791    pub part: ContentPartChunkPart,
792}
793
794#[derive(Debug, Serialize, Deserialize, Clone)]
795#[serde(tag = "type", rename_all = "snake_case")]
796pub enum ContentPartChunkPart {
797    OutputText { text: String },
798    SummaryText { text: String },
799}
800
801#[derive(Debug, Serialize, Deserialize, Clone)]
802pub struct DeltaTextChunk {
803    pub content_index: u64,
804    pub sequence_number: u64,
805    pub delta: String,
806}
807
808#[derive(Debug, Serialize, Deserialize, Clone)]
809pub struct DeltaTextChunkWithItemId {
810    pub item_id: String,
811    pub content_index: u64,
812    pub sequence_number: u64,
813    pub delta: String,
814}
815
816#[derive(Debug, Serialize, Deserialize, Clone)]
817pub struct OutputTextChunk {
818    pub content_index: u64,
819    pub sequence_number: u64,
820    pub text: String,
821}
822
823#[derive(Debug, Serialize, Deserialize, Clone)]
824pub struct RefusalTextChunk {
825    pub content_index: u64,
826    pub sequence_number: u64,
827    pub refusal: String,
828}
829
830#[derive(Debug, Serialize, Deserialize, Clone)]
831pub struct ArgsTextChunk {
832    pub content_index: u64,
833    pub sequence_number: u64,
834    pub arguments: serde_json::Value,
835}
836
837#[derive(Debug, Serialize, Deserialize, Clone)]
838pub struct SummaryPartChunk {
839    pub summary_index: u64,
840    pub sequence_number: u64,
841    pub part: SummaryPartChunkPart,
842}
843
844#[derive(Debug, Serialize, Deserialize, Clone)]
845pub struct SummaryTextChunk {
846    pub summary_index: u64,
847    pub sequence_number: u64,
848    pub delta: String,
849}
850
851#[derive(Debug, Serialize, Deserialize, Clone)]
852#[serde(tag = "type", rename_all = "snake_case")]
853pub enum SummaryPartChunkPart {
854    SummaryText { text: String },
855}
856
857impl<Ext, H> GenericResponsesCompletionModel<Ext, H>
858where
859    crate::client::Client<Ext, H>:
860        HttpClientExt + Clone + std::fmt::Debug + WasmCompatSend + 'static,
861    Ext: crate::client::Provider + Clone + 'static,
862    H: Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
863{
864    pub(crate) async fn stream(
865        &self,
866        completion_request: crate::completion::CompletionRequest,
867    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
868    {
869        let mut request = self.create_completion_request(completion_request)?;
870        request.stream = Some(true);
871
872        if enabled!(Level::TRACE) {
873            tracing::trace!(
874                target: "rig::completions",
875                "OpenAI Responses streaming completion request: {}",
876                serde_json::to_string_pretty(&request)?
877            );
878        }
879
880        let body = serde_json::to_vec(&request)?;
881
882        let req = self
883            .client
884            .post("/responses")?
885            .body(body)
886            .map_err(|e| CompletionError::HttpError(e.into()))?;
887
888        // let request_builder = self.client.post_reqwest("/responses").json(&request);
889
890        let span = if tracing::Span::current().is_disabled() {
891            info_span!(
892                target: "rig::completions",
893                "chat_streaming",
894                gen_ai.operation.name = "chat_streaming",
895                gen_ai.provider.name = tracing::field::Empty,
896                gen_ai.request.model = tracing::field::Empty,
897                gen_ai.response.id = tracing::field::Empty,
898                gen_ai.response.model = tracing::field::Empty,
899                gen_ai.usage.output_tokens = tracing::field::Empty,
900                gen_ai.usage.input_tokens = tracing::field::Empty,
901                gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
902            )
903        } else {
904            tracing::Span::current()
905        };
906        span.record("gen_ai.provider.name", "openai");
907        span.record("gen_ai.request.model", &self.model);
908        let client = self.client.clone();
909        let event_source = GenericEventSource::new(client, req);
910
911        Ok(stream_from_event_source(event_source, span, "OpenAI"))
912    }
913}
914
915#[cfg(test)]
916mod tests {
917    use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
918    use crate::completion::CompletionModel;
919    use crate::http_client::mock::MockStreamingClient;
920    use crate::message::ReasoningContent;
921    use crate::providers::internal::openai_chat_completions_compatible::test_support::sse_bytes_from_json_events;
922    use crate::providers::openai::responses_api::{
923        AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
924        ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
925    };
926    use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
927    use bytes::Bytes;
928    use futures::StreamExt;
929    use serde_json::{self, json};
930
931    use crate::{
932        client::CompletionClient,
933        completion::{Message, ToolDefinition},
934        providers::openai,
935        streaming::StreamingChat,
936        tool::{Tool, ToolError},
937    };
938
939    struct ExampleTool;
940
941    impl Default for MockStreamingClient {
942        fn default() -> Self {
943            Self {
944                sse_bytes: Bytes::new(),
945            }
946        }
947    }
948
949    impl std::fmt::Debug for MockStreamingClient {
950        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951            f.debug_struct("MockStreamingClient")
952                .finish_non_exhaustive()
953        }
954    }
955
956    fn sample_response(status: ResponseStatus) -> CompletionResponse {
957        CompletionResponse {
958            id: "resp_123".to_string(),
959            object: ResponseObject::Response,
960            created_at: 0,
961            status,
962            error: None,
963            incomplete_details: None,
964            instructions: None,
965            max_output_tokens: None,
966            model: "gpt-5.4".to_string(),
967            usage: None,
968            output: Vec::new(),
969            tools: Vec::new(),
970            additional_parameters: AdditionalParameters::default(),
971        }
972    }
973
974    async fn first_error_from_event(
975        event: serde_json::Value,
976    ) -> crate::completion::CompletionError {
977        let client = openai::Client::builder()
978            .http_client(MockStreamingClient {
979                sse_bytes: sse_bytes_from_json_events(&[event]),
980            })
981            .api_key("test-key")
982            .build()
983            .expect("client should build");
984        let model = client.completion_model("gpt-5.4");
985        let request = model.completion_request("hello").build();
986        let mut stream = model.stream(request).await.expect("stream should start");
987
988        stream
989            .next()
990            .await
991            .expect("stream should yield an item")
992            .expect_err("stream should surface a provider error")
993    }
994
995    async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
996        let client = openai::Client::builder()
997            .http_client(MockStreamingClient {
998                sse_bytes: sse_bytes_from_json_events(&[event]),
999            })
1000            .api_key("test-key")
1001            .build()
1002            .expect("client should build");
1003        let model = client.completion_model("gpt-5.4");
1004        let request = model.completion_request("hello").build();
1005        let mut stream = model.stream(request).await.expect("stream should start");
1006
1007        while let Some(item) = stream.next().await {
1008            match item.expect("completed stream should not error") {
1009                StreamedAssistantContent::Final(res) => return res.usage,
1010                _ => continue,
1011            }
1012        }
1013
1014        panic!("stream should yield a final response");
1015    }
1016
1017    impl Tool for ExampleTool {
1018        type Args = ();
1019        type Error = ToolError;
1020        type Output = String;
1021        const NAME: &'static str = "example_tool";
1022
1023        async fn definition(&self, _prompt: String) -> ToolDefinition {
1024            ToolDefinition {
1025                name: self.name(),
1026                description: "A tool that returns some example text.".to_string(),
1027                parameters: serde_json::json!({
1028                        "type": "object",
1029                        "properties": {},
1030                        "required": []
1031                }),
1032            }
1033        }
1034
1035        async fn call(&self, _input: Self::Args) -> Result<Self::Output, Self::Error> {
1036            let result = "Example answer".to_string();
1037            Ok(result)
1038        }
1039    }
1040
1041    #[test]
1042    fn reasoning_done_item_emits_summary_then_encrypted() {
1043        let summary = vec![
1044            ReasoningSummary::SummaryText {
1045                text: "step 1".to_string(),
1046            },
1047            ReasoningSummary::SummaryText {
1048                text: "step 2".to_string(),
1049            },
1050        ];
1051        let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
1052
1053        assert_eq!(choices.len(), 3);
1054        assert!(matches!(
1055            choices.first(),
1056            Some(RawStreamingChoice::Reasoning {
1057                id: Some(id),
1058                content: ReasoningContent::Summary(text),
1059            }) if id == "rs_1" && text == "step 1"
1060        ));
1061        assert!(matches!(
1062            choices.get(1),
1063            Some(RawStreamingChoice::Reasoning {
1064                id: Some(id),
1065                content: ReasoningContent::Summary(text),
1066            }) if id == "rs_1" && text == "step 2"
1067        ));
1068        assert!(matches!(
1069            choices.get(2),
1070            Some(RawStreamingChoice::Reasoning {
1071                id: Some(id),
1072                content: ReasoningContent::Encrypted(data),
1073            }) if id == "rs_1" && data == "enc_blob"
1074        ));
1075    }
1076
1077    #[test]
1078    fn reasoning_done_item_without_encrypted_emits_summary_only() {
1079        let summary = vec![ReasoningSummary::SummaryText {
1080            text: "only summary".to_string(),
1081        }];
1082        let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
1083
1084        assert_eq!(choices.len(), 1);
1085        assert!(matches!(
1086            choices.first(),
1087            Some(RawStreamingChoice::Reasoning {
1088                id: Some(id),
1089                content: ReasoningContent::Summary(text),
1090            }) if id == "rs_2" && text == "only summary"
1091        ));
1092    }
1093
1094    #[test]
1095    fn content_part_added_deserializes_snake_case_part_type() {
1096        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1097            "type": "response.content_part.added",
1098            "item_id": "msg_1",
1099            "output_index": 0,
1100            "content_index": 0,
1101            "sequence_number": 3,
1102            "part": {
1103                "type": "output_text",
1104                "text": "hello"
1105            }
1106        }))
1107        .expect("content part event should deserialize");
1108
1109        assert!(matches!(
1110            chunk,
1111            StreamingCompletionChunk::Delta(chunk)
1112                if matches!(
1113                    chunk.data,
1114                    ItemChunkKind::ContentPartAdded(_)
1115                )
1116        ));
1117    }
1118
1119    #[test]
1120    fn content_part_done_deserializes_snake_case_part_type() {
1121        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1122            "type": "response.content_part.done",
1123            "item_id": "msg_1",
1124            "output_index": 0,
1125            "content_index": 0,
1126            "sequence_number": 4,
1127            "part": {
1128                "type": "summary_text",
1129                "text": "done"
1130            }
1131        }))
1132        .expect("content part done event should deserialize");
1133
1134        assert!(matches!(
1135            chunk,
1136            StreamingCompletionChunk::Delta(chunk)
1137                if matches!(
1138                    chunk.data,
1139                    ItemChunkKind::ContentPartDone(_)
1140                )
1141        ));
1142    }
1143
1144    #[test]
1145    fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
1146        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1147            "type": "response.reasoning_summary_part.added",
1148            "item_id": "rs_1",
1149            "output_index": 0,
1150            "summary_index": 0,
1151            "sequence_number": 5,
1152            "part": {
1153                "type": "summary_text",
1154                "text": "step 1"
1155            }
1156        }))
1157        .expect("reasoning summary part event should deserialize");
1158
1159        assert!(matches!(
1160            chunk,
1161            StreamingCompletionChunk::Delta(chunk)
1162                if matches!(
1163                    chunk.data,
1164                    ItemChunkKind::ReasoningSummaryPartAdded(_)
1165                )
1166        ));
1167    }
1168
1169    #[test]
1170    fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
1171        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1172            "type": "response.reasoning_summary_part.done",
1173            "item_id": "rs_1",
1174            "output_index": 0,
1175            "summary_index": 0,
1176            "sequence_number": 6,
1177            "part": {
1178                "type": "summary_text",
1179                "text": "step 2"
1180            }
1181        }))
1182        .expect("reasoning summary part done event should deserialize");
1183
1184        assert!(matches!(
1185            chunk,
1186            StreamingCompletionChunk::Delta(chunk)
1187                if matches!(
1188                    chunk.data,
1189                    ItemChunkKind::ReasoningSummaryPartDone(_)
1190                )
1191        ));
1192    }
1193
1194    #[tokio::test]
1195    async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
1196        let mut response = sample_response(ResponseStatus::Failed);
1197        response.error = Some(ResponseError {
1198            code: String::new(),
1199            message: "maximum context length exceeded".to_string(),
1200        });
1201
1202        let event = json!({
1203            "type": "response.failed",
1204            "sequence_number": 1,
1205            "response": response,
1206        });
1207
1208        let err = first_error_from_event(event).await;
1209
1210        assert_eq!(
1211            err.to_string(),
1212            "ProviderError: maximum context length exceeded"
1213        );
1214    }
1215
1216    #[tokio::test]
1217    async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
1218        let mut response = sample_response(ResponseStatus::Failed);
1219        response.error = Some(ResponseError {
1220            code: "context_length_exceeded".to_string(),
1221            message: "maximum context length exceeded".to_string(),
1222        });
1223
1224        let event = json!({
1225            "type": "response.failed",
1226            "sequence_number": 1,
1227            "response": response,
1228        });
1229
1230        let err = first_error_from_event(event).await;
1231
1232        assert_eq!(
1233            err.to_string(),
1234            "ProviderError: context_length_exceeded: maximum context length exceeded"
1235        );
1236    }
1237
1238    #[tokio::test]
1239    async fn response_incomplete_chunk_uses_incomplete_details_reason() {
1240        let mut response = sample_response(ResponseStatus::Incomplete);
1241        response.incomplete_details = Some(IncompleteDetailsReason {
1242            reason: "max_output_tokens".to_string(),
1243        });
1244
1245        let event = json!({
1246            "type": "response.incomplete",
1247            "sequence_number": 1,
1248            "response": response,
1249        });
1250
1251        let err = first_error_from_event(event).await;
1252
1253        assert_eq!(
1254            err.to_string(),
1255            "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
1256        );
1257    }
1258
1259    #[tokio::test]
1260    async fn response_failed_chunk_terminates_stream_without_followup_items() {
1261        let tool_call_done = json!({
1262            "type": "response.output_item.done",
1263            "sequence_number": 1,
1264            "item": {
1265                "type": "function_call",
1266                "id": "fc_123",
1267                "arguments": "{}",
1268                "call_id": "call_123",
1269                "name": "example_tool",
1270                "status": "completed"
1271            }
1272        });
1273
1274        let mut response = sample_response(ResponseStatus::Failed);
1275        response.error = Some(ResponseError {
1276            code: "server_error".to_string(),
1277            message: "response stream failed".to_string(),
1278        });
1279
1280        let failed = json!({
1281            "type": "response.failed",
1282            "sequence_number": 2,
1283            "response": response,
1284        });
1285
1286        let client = openai::Client::builder()
1287            .http_client(MockStreamingClient {
1288                sse_bytes: sse_bytes_from_json_events(&[tool_call_done, failed]),
1289            })
1290            .api_key("test-key")
1291            .build()
1292            .expect("client should build");
1293        let model = client.completion_model("gpt-5.4");
1294        let request = model.completion_request("hello").build();
1295        let mut stream = model.stream(request).await.expect("stream should start");
1296
1297        let err = stream
1298            .next()
1299            .await
1300            .expect("stream should yield an item")
1301            .expect_err("stream should surface a provider error");
1302        assert_eq!(
1303            err.to_string(),
1304            "ProviderError: server_error: response stream failed"
1305        );
1306        assert!(
1307            stream.next().await.is_none(),
1308            "stream should terminate immediately after the first terminal error"
1309        );
1310    }
1311
1312    #[tokio::test]
1313    async fn response_completed_chunk_populates_final_usage() {
1314        let mut response = sample_response(ResponseStatus::Completed);
1315        response.usage = Some(ResponsesUsage {
1316            input_tokens: 10,
1317            input_tokens_details: None,
1318            output_tokens: 5,
1319            output_tokens_details: OutputTokensDetails {
1320                reasoning_tokens: 0,
1321            },
1322            total_tokens: 15,
1323        });
1324
1325        let event = json!({
1326            "type": "response.completed",
1327            "sequence_number": 1,
1328            "response": response,
1329        });
1330
1331        let usage = final_usage_from_event(event).await;
1332        assert_eq!(usage.input_tokens, 10);
1333        assert_eq!(usage.output_tokens, 5);
1334        assert_eq!(usage.total_tokens, 15);
1335    }
1336
1337    #[tokio::test]
1338    async fn done_sentinel_is_ignored_without_debug_parse_noise() {
1339        use std::io::{self, Write};
1340        use std::sync::{Arc, Mutex};
1341
1342        #[derive(Clone)]
1343        struct SharedWriter(Arc<Mutex<Vec<u8>>>);
1344
1345        impl Write for SharedWriter {
1346            fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1347                self.0
1348                    .lock()
1349                    .expect("log buffer mutex should not be poisoned")
1350                    .extend_from_slice(buf);
1351                Ok(buf.len())
1352            }
1353
1354            fn flush(&mut self) -> io::Result<()> {
1355                Ok(())
1356            }
1357        }
1358
1359        let mut response = sample_response(ResponseStatus::Completed);
1360        response.usage = Some(ResponsesUsage {
1361            input_tokens: 4,
1362            input_tokens_details: None,
1363            output_tokens: 2,
1364            output_tokens_details: OutputTokensDetails {
1365                reasoning_tokens: 0,
1366            },
1367            total_tokens: 6,
1368        });
1369
1370        let captured = Arc::new(Mutex::new(Vec::new()));
1371        let subscriber = tracing_subscriber::fmt()
1372            .with_max_level(tracing::Level::DEBUG)
1373            .with_ansi(false)
1374            .without_time()
1375            .with_writer({
1376                let captured = captured.clone();
1377                move || SharedWriter(captured.clone())
1378            })
1379            .finish();
1380        let _guard = tracing::subscriber::set_default(subscriber);
1381
1382        let client = openai::Client::builder()
1383            .http_client(MockStreamingClient {
1384                sse_bytes: bytes::Bytes::from(format!(
1385                    "data: {}\n\ndata: [DONE]\n\n",
1386                    serde_json::to_string(&json!({
1387                        "type": "response.completed",
1388                        "sequence_number": 1,
1389                        "response": response,
1390                    }))
1391                    .expect("response event should serialize")
1392                )),
1393            })
1394            .api_key("test-key")
1395            .build()
1396            .expect("client should build");
1397        let model = client.completion_model("gpt-5.4");
1398        let request = model.completion_request("hello").build();
1399        let mut stream = model.stream(request).await.expect("stream should start");
1400
1401        let mut final_usage = None;
1402        while let Some(item) = stream.next().await {
1403            if let StreamedAssistantContent::Final(response) =
1404                item.expect("stream should complete successfully")
1405            {
1406                final_usage = Some(response.usage);
1407            }
1408        }
1409
1410        let usage = final_usage.expect("expected final response");
1411        assert_eq!(usage.input_tokens, 4);
1412        assert_eq!(usage.output_tokens, 2);
1413        assert_eq!(usage.total_tokens, 6);
1414
1415        let logs = String::from_utf8(
1416            captured
1417                .lock()
1418                .expect("log buffer mutex should not be poisoned")
1419                .clone(),
1420        )
1421        .expect("captured logs should be valid UTF-8");
1422        assert!(
1423            !logs.contains("Couldn't deserialize SSE data as StreamingCompletionChunk"),
1424            "expected [DONE] to bypass the parse-failure debug path, logs were: {logs}"
1425        );
1426    }
1427
1428    // requires `derive` rig-core feature due to using tool macro
1429    #[tokio::test]
1430    #[ignore = "requires API key"]
1431    async fn test_openai_streaming_tools_reasoning() {
1432        let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
1433        let client = openai::Client::new(&api_key).expect("Failed to build client");
1434        let agent = client
1435            .agent("gpt-5.2")
1436            .max_tokens(8192)
1437            .tool(ExampleTool)
1438            .additional_params(serde_json::json!({
1439                "reasoning": {"effort": "high"}
1440            }))
1441            .build();
1442
1443        let chat_history: Vec<Message> = Vec::new();
1444        let mut stream = agent
1445            .stream_chat("Call my example tool", &chat_history)
1446            .multi_turn(5)
1447            .await;
1448
1449        while let Some(item) = stream.next().await {
1450            println!("Got item: {item:?}");
1451        }
1452    }
1453}