Skip to main content

rig/providers/openai/responses_api/
streaming.rs

1//! The streaming module for the OpenAI Responses API.
2//! Please see the `openai_streaming` or `openai_streaming_with_tools` example for more practical usage.
3use crate::completion::{CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{
8    ReasoningSummary, ResponsesCompletionModel, ResponsesUsage,
9};
10use crate::streaming;
11use crate::streaming::RawStreamingChoice;
12use crate::wasm_compat::WasmCompatSend;
13use async_stream::stream;
14use futures::StreamExt;
15use serde::{Deserialize, Serialize};
16use tracing::{Level, debug, enabled, info_span};
17use tracing_futures::Instrument as _;
18
19use super::{CompletionResponse, Output};
20
21// ================================================================
22// OpenAI Responses Streaming API
23// ================================================================
24
25/// A streaming completion chunk.
26/// Streaming chunks can come in one of two forms:
27/// - A response chunk (where the completed response will have the total token usage)
28/// - An item chunk commonly referred to as a delta. In the completions API this would be referred to as the message delta.
29#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32    Response(Box<ResponseChunk>),
33    Delta(ItemChunk),
34}
35
36/// The final streaming response from the OpenAI Responses API.
37#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39    /// Token usage
40    pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44    id: &str,
45    summary: &[ReasoningSummary],
46    encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48    let mut choices = summary
49        .iter()
50        .map(|reasoning_summary| match reasoning_summary {
51            ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52                id: Some(id.to_owned()),
53                content: ReasoningContent::Summary(text.to_owned()),
54            },
55        })
56        .collect::<Vec<_>>();
57
58    if let Some(encrypted_content) = encrypted_content {
59        choices.push(RawStreamingChoice::Reasoning {
60            id: Some(id.to_owned()),
61            content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62        });
63    }
64
65    choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69    fn token_usage(&self) -> Option<crate::completion::Usage> {
70        let mut usage = crate::completion::Usage::new();
71        usage.input_tokens = self.usage.input_tokens;
72        usage.output_tokens = self.usage.output_tokens;
73        usage.total_tokens = self.usage.total_tokens;
74        usage.cached_input_tokens = self
75            .usage
76            .input_tokens_details
77            .as_ref()
78            .map(|d| d.cached_tokens)
79            .unwrap_or(0);
80        Some(usage)
81    }
82}
83
84/// A response chunk from OpenAI's response API.
85#[derive(Debug, Serialize, Deserialize, Clone)]
86pub struct ResponseChunk {
87    /// The response chunk type
88    #[serde(rename = "type")]
89    pub kind: ResponseChunkKind,
90    /// The response itself
91    pub response: CompletionResponse,
92    /// The item sequence
93    pub sequence_number: u64,
94}
95
96/// Response chunk type.
97/// Renames are used to ensure that this type gets (de)serialized properly.
98#[derive(Debug, Serialize, Deserialize, Clone)]
99pub enum ResponseChunkKind {
100    #[serde(rename = "response.created")]
101    ResponseCreated,
102    #[serde(rename = "response.in_progress")]
103    ResponseInProgress,
104    #[serde(rename = "response.completed")]
105    ResponseCompleted,
106    #[serde(rename = "response.failed")]
107    ResponseFailed,
108    #[serde(rename = "response.incomplete")]
109    ResponseIncomplete,
110}
111
112fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
113    if let Some(error) = error {
114        if error.code.is_empty() {
115            error.message.clone()
116        } else {
117            format!("{}: {}", error.code, error.message)
118        }
119    } else {
120        format!("OpenAI response stream returned a {fallback}")
121    }
122}
123
124fn response_chunk_error_message(
125    kind: &ResponseChunkKind,
126    response: &CompletionResponse,
127) -> Option<String> {
128    match kind {
129        ResponseChunkKind::ResponseFailed => Some(response_error_message(
130            response.error.as_ref(),
131            "failed response",
132        )),
133        ResponseChunkKind::ResponseIncomplete => {
134            let reason = response
135                .incomplete_details
136                .as_ref()
137                .map(|details| details.reason.as_str())
138                .unwrap_or("unknown reason");
139
140            Some(format!("OpenAI response stream was incomplete: {reason}"))
141        }
142        _ => None,
143    }
144}
145
146/// An item message chunk from OpenAI's Responses API.
147/// See
148#[derive(Debug, Serialize, Deserialize, Clone)]
149pub struct ItemChunk {
150    /// Item ID. Optional.
151    pub item_id: Option<String>,
152    /// The output index of the item from a given streamed response.
153    pub output_index: u64,
154    /// The item type chunk, as well as the inner data.
155    #[serde(flatten)]
156    pub data: ItemChunkKind,
157}
158
159/// The item chunk type from OpenAI's Responses API.
160#[derive(Debug, Serialize, Deserialize, Clone)]
161#[serde(tag = "type")]
162pub enum ItemChunkKind {
163    #[serde(rename = "response.output_item.added")]
164    OutputItemAdded(StreamingItemDoneOutput),
165    #[serde(rename = "response.output_item.done")]
166    OutputItemDone(StreamingItemDoneOutput),
167    #[serde(rename = "response.content_part.added")]
168    ContentPartAdded(ContentPartChunk),
169    #[serde(rename = "response.content_part.done")]
170    ContentPartDone(ContentPartChunk),
171    #[serde(rename = "response.output_text.delta")]
172    OutputTextDelta(DeltaTextChunk),
173    #[serde(rename = "response.output_text.done")]
174    OutputTextDone(OutputTextChunk),
175    #[serde(rename = "response.refusal.delta")]
176    RefusalDelta(DeltaTextChunk),
177    #[serde(rename = "response.refusal.done")]
178    RefusalDone(RefusalTextChunk),
179    #[serde(rename = "response.function_call_arguments.delta")]
180    FunctionCallArgsDelta(DeltaTextChunkWithItemId),
181    #[serde(rename = "response.function_call_arguments.done")]
182    FunctionCallArgsDone(ArgsTextChunk),
183    #[serde(rename = "response.reasoning_summary_part.added")]
184    ReasoningSummaryPartAdded(SummaryPartChunk),
185    #[serde(rename = "response.reasoning_summary_part.done")]
186    ReasoningSummaryPartDone(SummaryPartChunk),
187    #[serde(rename = "response.reasoning_summary_text.delta")]
188    ReasoningSummaryTextDelta(SummaryTextChunk),
189    #[serde(rename = "response.reasoning_summary_text.done")]
190    ReasoningSummaryTextDone(SummaryTextChunk),
191}
192
193#[derive(Debug, Serialize, Deserialize, Clone)]
194pub struct StreamingItemDoneOutput {
195    pub sequence_number: u64,
196    pub item: Output,
197}
198
199#[derive(Debug, Serialize, Deserialize, Clone)]
200pub struct ContentPartChunk {
201    pub content_index: u64,
202    pub sequence_number: u64,
203    pub part: ContentPartChunkPart,
204}
205
206#[derive(Debug, Serialize, Deserialize, Clone)]
207#[serde(tag = "type", rename_all = "snake_case")]
208pub enum ContentPartChunkPart {
209    OutputText { text: String },
210    SummaryText { text: String },
211}
212
213#[derive(Debug, Serialize, Deserialize, Clone)]
214pub struct DeltaTextChunk {
215    pub content_index: u64,
216    pub sequence_number: u64,
217    pub delta: String,
218}
219
220#[derive(Debug, Serialize, Deserialize, Clone)]
221pub struct DeltaTextChunkWithItemId {
222    pub item_id: String,
223    pub content_index: u64,
224    pub sequence_number: u64,
225    pub delta: String,
226}
227
228#[derive(Debug, Serialize, Deserialize, Clone)]
229pub struct OutputTextChunk {
230    pub content_index: u64,
231    pub sequence_number: u64,
232    pub text: String,
233}
234
235#[derive(Debug, Serialize, Deserialize, Clone)]
236pub struct RefusalTextChunk {
237    pub content_index: u64,
238    pub sequence_number: u64,
239    pub refusal: String,
240}
241
242#[derive(Debug, Serialize, Deserialize, Clone)]
243pub struct ArgsTextChunk {
244    pub content_index: u64,
245    pub sequence_number: u64,
246    pub arguments: serde_json::Value,
247}
248
249#[derive(Debug, Serialize, Deserialize, Clone)]
250pub struct SummaryPartChunk {
251    pub summary_index: u64,
252    pub sequence_number: u64,
253    pub part: SummaryPartChunkPart,
254}
255
256#[derive(Debug, Serialize, Deserialize, Clone)]
257pub struct SummaryTextChunk {
258    pub summary_index: u64,
259    pub sequence_number: u64,
260    pub delta: String,
261}
262
263#[derive(Debug, Serialize, Deserialize, Clone)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum SummaryPartChunkPart {
266    SummaryText { text: String },
267}
268
269impl<T> ResponsesCompletionModel<T>
270where
271    T: HttpClientExt + Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
272{
273    pub(crate) async fn stream(
274        &self,
275        completion_request: crate::completion::CompletionRequest,
276    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
277    {
278        let mut request = self.create_completion_request(completion_request)?;
279        request.stream = Some(true);
280
281        if enabled!(Level::TRACE) {
282            tracing::trace!(
283                target: "rig::completions",
284                "OpenAI Responses streaming completion request: {}",
285                serde_json::to_string_pretty(&request)?
286            );
287        }
288
289        let body = serde_json::to_vec(&request)?;
290
291        let req = self
292            .client
293            .post("/responses")?
294            .body(body)
295            .map_err(|e| CompletionError::HttpError(e.into()))?;
296
297        // let request_builder = self.client.post_reqwest("/responses").json(&request);
298
299        let span = if tracing::Span::current().is_disabled() {
300            info_span!(
301                target: "rig::completions",
302                "chat_streaming",
303                gen_ai.operation.name = "chat_streaming",
304                gen_ai.provider.name = tracing::field::Empty,
305                gen_ai.request.model = tracing::field::Empty,
306                gen_ai.response.id = tracing::field::Empty,
307                gen_ai.response.model = tracing::field::Empty,
308                gen_ai.usage.output_tokens = tracing::field::Empty,
309                gen_ai.usage.input_tokens = tracing::field::Empty,
310                gen_ai.usage.cached_tokens = tracing::field::Empty,
311            )
312        } else {
313            tracing::Span::current()
314        };
315        span.record("gen_ai.provider.name", "openai");
316        span.record("gen_ai.request.model", &self.model);
317        // Build the request with proper headers for SSE
318        let client = self.client.clone();
319
320        let mut event_source = GenericEventSource::new(client, req);
321
322        let stream = stream! {
323            let mut final_usage = ResponsesUsage::new();
324
325            let mut tool_calls: Vec<RawStreamingChoice<StreamingCompletionResponse>> = Vec::new();
326            let mut tool_call_internal_ids: std::collections::HashMap<String, String> = std::collections::HashMap::new();
327            let span = tracing::Span::current();
328
329            while let Some(event_result) = event_source.next().await {
330                match event_result {
331                    Ok(Event::Open) => {
332                        tracing::trace!("SSE connection opened");
333                        tracing::info!("OpenAI stream started");
334                        continue;
335                    }
336                    Ok(Event::Message(evt)) => {
337                        // Skip heartbeat messages or empty data
338                        if evt.data.trim().is_empty() {
339                            continue;
340                        }
341
342                        let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
343
344                        let Ok(data) = data else {
345                            let err = data.unwrap_err();
346                            debug!("Couldn't serialize data as StreamingCompletionResponse: {:?}", err);
347                            continue;
348                        };
349
350                        if let StreamingCompletionChunk::Delta(chunk) = &data {
351                            match &chunk.data {
352                                ItemChunkKind::OutputItemAdded(message) => {
353                                    if let StreamingItemDoneOutput { item: Output::FunctionCall(func), .. } = message {
354                                        let internal_call_id = tool_call_internal_ids
355                                            .entry(func.id.clone())
356                                            .or_insert_with(|| nanoid::nanoid!())
357                                            .clone();
358                                        yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
359                                            id: func.id.clone(),
360                                            internal_call_id,
361                                            content: streaming::ToolCallDeltaContent::Name(func.name.clone()),
362                                        });
363                                    }
364                                }
365                                ItemChunkKind::OutputItemDone(message) => {
366                                    match message {
367                                        StreamingItemDoneOutput {  item: Output::FunctionCall(func), .. } => {
368                                            let internal_id = tool_call_internal_ids
369                                                .entry(func.id.clone())
370                                                .or_insert_with(|| nanoid::nanoid!())
371                                                .clone();
372                                            let raw_tool_call = streaming::RawStreamingToolCall::new(
373                                                func.id.clone(),
374                                                func.name.clone(),
375                                                func.arguments.clone(),
376                                            )
377                                                .with_internal_call_id(internal_id)
378                                                .with_call_id(func.call_id.clone());
379                                            tool_calls.push(streaming::RawStreamingChoice::ToolCall(raw_tool_call));
380                                        }
381
382                                        StreamingItemDoneOutput {  item: Output::Reasoning {  summary, id, encrypted_content, .. }, .. } => {
383                                            for reasoning_choice in reasoning_choices_from_done_item(
384                                                id,
385                                                summary,
386                                                encrypted_content.as_deref(),
387                                            ) {
388                                                yield Ok(reasoning_choice);
389                                            }
390                                        }
391                                        StreamingItemDoneOutput { item: Output::Message(msg), .. } => {
392                                            yield Ok(streaming::RawStreamingChoice::MessageId(msg.id.clone()));
393                                        }
394                                    }
395                                }
396                                ItemChunkKind::OutputTextDelta(delta) => {
397                                    yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
398                                }
399                                ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
400                                    yield Ok(streaming::RawStreamingChoice::ReasoningDelta { id: None, reasoning: delta.delta.clone() })
401                                }
402                                ItemChunkKind::RefusalDelta(delta) => {
403                                    yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
404                                }
405                                ItemChunkKind::FunctionCallArgsDelta(delta) => {
406                                    let internal_call_id = tool_call_internal_ids
407                                        .entry(delta.item_id.clone())
408                                        .or_insert_with(|| nanoid::nanoid!())
409                                        .clone();
410                                    yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
411                                        id: delta.item_id.clone(),
412                                        internal_call_id,
413                                        content: streaming::ToolCallDeltaContent::Delta(delta.delta.clone())
414                                    })
415                                }
416
417                                _ => { continue }
418                            }
419                        }
420
421                        if let StreamingCompletionChunk::Response(chunk) = data {
422                            let ResponseChunk { kind, response, .. } = *chunk;
423
424                            match kind {
425                                ResponseChunkKind::ResponseCompleted => {
426                                    span.record("gen_ai.response.id", response.id.as_str());
427                                    span.record("gen_ai.response.model", response.model.as_str());
428                                    if let Some(usage) = response.usage {
429                                        final_usage = usage;
430                                    }
431                                }
432                                ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
433                                    let error_message = response_chunk_error_message(&kind, &response)
434                                        .expect("terminal response should have an error message");
435                                    yield Err(CompletionError::ProviderError(error_message));
436                                    break;
437                                }
438                                _ => continue,
439                            }
440                        }
441                    }
442                    Err(crate::http_client::Error::StreamEnded) => {
443                        event_source.close();
444                    }
445                    Err(error) => {
446                        tracing::error!(?error, "SSE error");
447                        yield Err(CompletionError::ProviderError(error.to_string()));
448                        break;
449                    }
450                }
451            }
452
453            // Ensure event source is closed when stream ends
454            event_source.close();
455
456            for tool_call in &tool_calls {
457                yield Ok(tool_call.to_owned())
458            }
459
460            span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
461            span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
462            span.record(
463                "gen_ai.usage.cached_tokens",
464                final_usage
465                    .input_tokens_details
466                    .as_ref()
467                    .map(|d| d.cached_tokens)
468                    .unwrap_or(0),
469            );
470            tracing::info!("OpenAI stream finished");
471
472            yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
473                usage: final_usage
474            }));
475        }.instrument(span);
476
477        Ok(streaming::StreamingCompletionResponse::stream(Box::pin(
478            stream,
479        )))
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
486    use crate::completion::CompletionModel;
487    use crate::http_client::mock::MockStreamingClient;
488    use crate::message::ReasoningContent;
489    use crate::providers::openai::responses_api::{
490        AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
491        ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
492    };
493    use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
494    use bytes::Bytes;
495    use futures::StreamExt;
496    use serde_json::{self, json};
497
498    use crate::{
499        client::CompletionClient,
500        completion::{Message, ToolDefinition},
501        providers::openai,
502        streaming::StreamingChat,
503        tool::{Tool, ToolError},
504    };
505
506    struct ExampleTool;
507
508    impl Default for MockStreamingClient {
509        fn default() -> Self {
510            Self {
511                sse_bytes: Bytes::new(),
512            }
513        }
514    }
515
516    impl std::fmt::Debug for MockStreamingClient {
517        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518            f.debug_struct("MockStreamingClient")
519                .finish_non_exhaustive()
520        }
521    }
522
523    fn sample_response(status: ResponseStatus) -> CompletionResponse {
524        CompletionResponse {
525            id: "resp_123".to_string(),
526            object: ResponseObject::Response,
527            created_at: 0,
528            status,
529            error: None,
530            incomplete_details: None,
531            instructions: None,
532            max_output_tokens: None,
533            model: "gpt-5.4".to_string(),
534            usage: None,
535            output: Vec::new(),
536            tools: Vec::new(),
537            additional_parameters: AdditionalParameters::default(),
538        }
539    }
540
541    fn sse_event_bytes(event: serde_json::Value) -> Bytes {
542        Bytes::from(format!(
543            "data: {}\n\n",
544            serde_json::to_string(&event).expect("event should serialize")
545        ))
546    }
547
548    async fn first_error_from_event(
549        event: serde_json::Value,
550    ) -> crate::completion::CompletionError {
551        let client = openai::Client::builder()
552            .http_client(MockStreamingClient {
553                sse_bytes: sse_event_bytes(event),
554            })
555            .api_key("test-key")
556            .build()
557            .expect("client should build");
558        let model = client.completion_model("gpt-5.4");
559        let request = model.completion_request("hello").build();
560        let mut stream = model.stream(request).await.expect("stream should start");
561
562        stream
563            .next()
564            .await
565            .expect("stream should yield an item")
566            .expect_err("stream should surface a provider error")
567    }
568
569    async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
570        let client = openai::Client::builder()
571            .http_client(MockStreamingClient {
572                sse_bytes: sse_event_bytes(event),
573            })
574            .api_key("test-key")
575            .build()
576            .expect("client should build");
577        let model = client.completion_model("gpt-5.4");
578        let request = model.completion_request("hello").build();
579        let mut stream = model.stream(request).await.expect("stream should start");
580
581        while let Some(item) = stream.next().await {
582            match item.expect("completed stream should not error") {
583                StreamedAssistantContent::Final(res) => return res.usage,
584                _ => continue,
585            }
586        }
587
588        panic!("stream should yield a final response");
589    }
590
591    impl Tool for ExampleTool {
592        type Args = ();
593        type Error = ToolError;
594        type Output = String;
595        const NAME: &'static str = "example_tool";
596
597        async fn definition(&self, _prompt: String) -> ToolDefinition {
598            ToolDefinition {
599                name: self.name(),
600                description: "A tool that returns some example text.".to_string(),
601                parameters: serde_json::json!({
602                        "type": "object",
603                        "properties": {},
604                        "required": []
605                }),
606            }
607        }
608
609        async fn call(&self, _input: Self::Args) -> Result<Self::Output, Self::Error> {
610            let result = "Example answer".to_string();
611            Ok(result)
612        }
613    }
614
615    #[test]
616    fn reasoning_done_item_emits_summary_then_encrypted() {
617        let summary = vec![
618            ReasoningSummary::SummaryText {
619                text: "step 1".to_string(),
620            },
621            ReasoningSummary::SummaryText {
622                text: "step 2".to_string(),
623            },
624        ];
625        let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
626
627        assert_eq!(choices.len(), 3);
628        assert!(matches!(
629            choices.first(),
630            Some(RawStreamingChoice::Reasoning {
631                id: Some(id),
632                content: ReasoningContent::Summary(text),
633            }) if id == "rs_1" && text == "step 1"
634        ));
635        assert!(matches!(
636            choices.get(1),
637            Some(RawStreamingChoice::Reasoning {
638                id: Some(id),
639                content: ReasoningContent::Summary(text),
640            }) if id == "rs_1" && text == "step 2"
641        ));
642        assert!(matches!(
643            choices.get(2),
644            Some(RawStreamingChoice::Reasoning {
645                id: Some(id),
646                content: ReasoningContent::Encrypted(data),
647            }) if id == "rs_1" && data == "enc_blob"
648        ));
649    }
650
651    #[test]
652    fn reasoning_done_item_without_encrypted_emits_summary_only() {
653        let summary = vec![ReasoningSummary::SummaryText {
654            text: "only summary".to_string(),
655        }];
656        let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
657
658        assert_eq!(choices.len(), 1);
659        assert!(matches!(
660            choices.first(),
661            Some(RawStreamingChoice::Reasoning {
662                id: Some(id),
663                content: ReasoningContent::Summary(text),
664            }) if id == "rs_2" && text == "only summary"
665        ));
666    }
667
668    #[test]
669    fn content_part_added_deserializes_snake_case_part_type() {
670        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
671            "type": "response.content_part.added",
672            "item_id": "msg_1",
673            "output_index": 0,
674            "content_index": 0,
675            "sequence_number": 3,
676            "part": {
677                "type": "output_text",
678                "text": "hello"
679            }
680        }))
681        .expect("content part event should deserialize");
682
683        assert!(matches!(
684            chunk,
685            StreamingCompletionChunk::Delta(chunk)
686                if matches!(
687                    chunk.data,
688                    ItemChunkKind::ContentPartAdded(_)
689                )
690        ));
691    }
692
693    #[test]
694    fn content_part_done_deserializes_snake_case_part_type() {
695        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
696            "type": "response.content_part.done",
697            "item_id": "msg_1",
698            "output_index": 0,
699            "content_index": 0,
700            "sequence_number": 4,
701            "part": {
702                "type": "summary_text",
703                "text": "done"
704            }
705        }))
706        .expect("content part done event should deserialize");
707
708        assert!(matches!(
709            chunk,
710            StreamingCompletionChunk::Delta(chunk)
711                if matches!(
712                    chunk.data,
713                    ItemChunkKind::ContentPartDone(_)
714                )
715        ));
716    }
717
718    #[test]
719    fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
720        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
721            "type": "response.reasoning_summary_part.added",
722            "item_id": "rs_1",
723            "output_index": 0,
724            "summary_index": 0,
725            "sequence_number": 5,
726            "part": {
727                "type": "summary_text",
728                "text": "step 1"
729            }
730        }))
731        .expect("reasoning summary part event should deserialize");
732
733        assert!(matches!(
734            chunk,
735            StreamingCompletionChunk::Delta(chunk)
736                if matches!(
737                    chunk.data,
738                    ItemChunkKind::ReasoningSummaryPartAdded(_)
739                )
740        ));
741    }
742
743    #[test]
744    fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
745        let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
746            "type": "response.reasoning_summary_part.done",
747            "item_id": "rs_1",
748            "output_index": 0,
749            "summary_index": 0,
750            "sequence_number": 6,
751            "part": {
752                "type": "summary_text",
753                "text": "step 2"
754            }
755        }))
756        .expect("reasoning summary part done event should deserialize");
757
758        assert!(matches!(
759            chunk,
760            StreamingCompletionChunk::Delta(chunk)
761                if matches!(
762                    chunk.data,
763                    ItemChunkKind::ReasoningSummaryPartDone(_)
764                )
765        ));
766    }
767
768    #[tokio::test]
769    async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
770        let mut response = sample_response(ResponseStatus::Failed);
771        response.error = Some(ResponseError {
772            code: String::new(),
773            message: "maximum context length exceeded".to_string(),
774        });
775
776        let event = json!({
777            "type": "response.failed",
778            "sequence_number": 1,
779            "response": response,
780        });
781
782        let err = first_error_from_event(event).await;
783
784        assert_eq!(
785            err.to_string(),
786            "ProviderError: maximum context length exceeded"
787        );
788    }
789
790    #[tokio::test]
791    async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
792        let mut response = sample_response(ResponseStatus::Failed);
793        response.error = Some(ResponseError {
794            code: "context_length_exceeded".to_string(),
795            message: "maximum context length exceeded".to_string(),
796        });
797
798        let event = json!({
799            "type": "response.failed",
800            "sequence_number": 1,
801            "response": response,
802        });
803
804        let err = first_error_from_event(event).await;
805
806        assert_eq!(
807            err.to_string(),
808            "ProviderError: context_length_exceeded: maximum context length exceeded"
809        );
810    }
811
812    #[tokio::test]
813    async fn response_incomplete_chunk_uses_incomplete_details_reason() {
814        let mut response = sample_response(ResponseStatus::Incomplete);
815        response.incomplete_details = Some(IncompleteDetailsReason {
816            reason: "max_output_tokens".to_string(),
817        });
818
819        let event = json!({
820            "type": "response.incomplete",
821            "sequence_number": 1,
822            "response": response,
823        });
824
825        let err = first_error_from_event(event).await;
826
827        assert_eq!(
828            err.to_string(),
829            "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
830        );
831    }
832
833    #[tokio::test]
834    async fn response_completed_chunk_populates_final_usage() {
835        let mut response = sample_response(ResponseStatus::Completed);
836        response.usage = Some(ResponsesUsage {
837            input_tokens: 10,
838            input_tokens_details: None,
839            output_tokens: 5,
840            output_tokens_details: OutputTokensDetails {
841                reasoning_tokens: 0,
842            },
843            total_tokens: 15,
844        });
845
846        let event = json!({
847            "type": "response.completed",
848            "sequence_number": 1,
849            "response": response,
850        });
851
852        let usage = final_usage_from_event(event).await;
853        assert_eq!(usage.input_tokens, 10);
854        assert_eq!(usage.output_tokens, 5);
855        assert_eq!(usage.total_tokens, 15);
856    }
857
858    // requires `derive` rig-core feature due to using tool macro
859    #[tokio::test]
860    #[ignore = "requires API key"]
861    async fn test_openai_streaming_tools_reasoning() {
862        let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
863        let client = openai::Client::new(&api_key).expect("Failed to build client");
864        let agent = client
865            .agent("gpt-5.2")
866            .max_tokens(8192)
867            .tool(ExampleTool)
868            .additional_params(serde_json::json!({
869                "reasoning": {"effort": "high"}
870            }))
871            .build();
872
873        let chat_history: Vec<Message> = Vec::new();
874        let mut stream = agent
875            .stream_chat("Call my example tool", &chat_history)
876            .multi_turn(5)
877            .await;
878
879        while let Some(item) = stream.next().await {
880            println!("Got item: {item:?}");
881        }
882    }
883}