1use crate::completion::{CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{
8 ReasoningSummary, ResponsesCompletionModel, ResponsesUsage,
9};
10use crate::streaming;
11use crate::streaming::RawStreamingChoice;
12use crate::wasm_compat::WasmCompatSend;
13use async_stream::stream;
14use futures::StreamExt;
15use serde::{Deserialize, Serialize};
16use tracing::{Level, debug, enabled, info_span};
17use tracing_futures::Instrument as _;
18
19use super::{CompletionResponse, Output};
20
21#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32 Response(Box<ResponseChunk>),
33 Delta(ItemChunk),
34}
35
36#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39 pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44 id: &str,
45 summary: &[ReasoningSummary],
46 encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48 let mut choices = summary
49 .iter()
50 .map(|reasoning_summary| match reasoning_summary {
51 ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52 id: Some(id.to_owned()),
53 content: ReasoningContent::Summary(text.to_owned()),
54 },
55 })
56 .collect::<Vec<_>>();
57
58 if let Some(encrypted_content) = encrypted_content {
59 choices.push(RawStreamingChoice::Reasoning {
60 id: Some(id.to_owned()),
61 content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62 });
63 }
64
65 choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69 fn token_usage(&self) -> Option<crate::completion::Usage> {
70 let mut usage = crate::completion::Usage::new();
71 usage.input_tokens = self.usage.input_tokens;
72 usage.output_tokens = self.usage.output_tokens;
73 usage.total_tokens = self.usage.total_tokens;
74 usage.cached_input_tokens = self
75 .usage
76 .input_tokens_details
77 .as_ref()
78 .map(|d| d.cached_tokens)
79 .unwrap_or(0);
80 Some(usage)
81 }
82}
83
84#[derive(Debug, Serialize, Deserialize, Clone)]
86pub struct ResponseChunk {
87 #[serde(rename = "type")]
89 pub kind: ResponseChunkKind,
90 pub response: CompletionResponse,
92 pub sequence_number: u64,
94}
95
96#[derive(Debug, Serialize, Deserialize, Clone)]
99pub enum ResponseChunkKind {
100 #[serde(rename = "response.created")]
101 ResponseCreated,
102 #[serde(rename = "response.in_progress")]
103 ResponseInProgress,
104 #[serde(rename = "response.completed")]
105 ResponseCompleted,
106 #[serde(rename = "response.failed")]
107 ResponseFailed,
108 #[serde(rename = "response.incomplete")]
109 ResponseIncomplete,
110}
111
112fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
113 if let Some(error) = error {
114 if error.code.is_empty() {
115 error.message.clone()
116 } else {
117 format!("{}: {}", error.code, error.message)
118 }
119 } else {
120 format!("OpenAI response stream returned a {fallback}")
121 }
122}
123
124fn response_chunk_error_message(
125 kind: &ResponseChunkKind,
126 response: &CompletionResponse,
127) -> Option<String> {
128 match kind {
129 ResponseChunkKind::ResponseFailed => Some(response_error_message(
130 response.error.as_ref(),
131 "failed response",
132 )),
133 ResponseChunkKind::ResponseIncomplete => {
134 let reason = response
135 .incomplete_details
136 .as_ref()
137 .map(|details| details.reason.as_str())
138 .unwrap_or("unknown reason");
139
140 Some(format!("OpenAI response stream was incomplete: {reason}"))
141 }
142 _ => None,
143 }
144}
145
146#[derive(Debug, Serialize, Deserialize, Clone)]
149pub struct ItemChunk {
150 pub item_id: Option<String>,
152 pub output_index: u64,
154 #[serde(flatten)]
156 pub data: ItemChunkKind,
157}
158
159#[derive(Debug, Serialize, Deserialize, Clone)]
161#[serde(tag = "type")]
162pub enum ItemChunkKind {
163 #[serde(rename = "response.output_item.added")]
164 OutputItemAdded(StreamingItemDoneOutput),
165 #[serde(rename = "response.output_item.done")]
166 OutputItemDone(StreamingItemDoneOutput),
167 #[serde(rename = "response.content_part.added")]
168 ContentPartAdded(ContentPartChunk),
169 #[serde(rename = "response.content_part.done")]
170 ContentPartDone(ContentPartChunk),
171 #[serde(rename = "response.output_text.delta")]
172 OutputTextDelta(DeltaTextChunk),
173 #[serde(rename = "response.output_text.done")]
174 OutputTextDone(OutputTextChunk),
175 #[serde(rename = "response.refusal.delta")]
176 RefusalDelta(DeltaTextChunk),
177 #[serde(rename = "response.refusal.done")]
178 RefusalDone(RefusalTextChunk),
179 #[serde(rename = "response.function_call_arguments.delta")]
180 FunctionCallArgsDelta(DeltaTextChunkWithItemId),
181 #[serde(rename = "response.function_call_arguments.done")]
182 FunctionCallArgsDone(ArgsTextChunk),
183 #[serde(rename = "response.reasoning_summary_part.added")]
184 ReasoningSummaryPartAdded(SummaryPartChunk),
185 #[serde(rename = "response.reasoning_summary_part.done")]
186 ReasoningSummaryPartDone(SummaryPartChunk),
187 #[serde(rename = "response.reasoning_summary_text.delta")]
188 ReasoningSummaryTextDelta(SummaryTextChunk),
189 #[serde(rename = "response.reasoning_summary_text.done")]
190 ReasoningSummaryTextDone(SummaryTextChunk),
191}
192
193#[derive(Debug, Serialize, Deserialize, Clone)]
194pub struct StreamingItemDoneOutput {
195 pub sequence_number: u64,
196 pub item: Output,
197}
198
199#[derive(Debug, Serialize, Deserialize, Clone)]
200pub struct ContentPartChunk {
201 pub content_index: u64,
202 pub sequence_number: u64,
203 pub part: ContentPartChunkPart,
204}
205
206#[derive(Debug, Serialize, Deserialize, Clone)]
207#[serde(tag = "type", rename_all = "snake_case")]
208pub enum ContentPartChunkPart {
209 OutputText { text: String },
210 SummaryText { text: String },
211}
212
213#[derive(Debug, Serialize, Deserialize, Clone)]
214pub struct DeltaTextChunk {
215 pub content_index: u64,
216 pub sequence_number: u64,
217 pub delta: String,
218}
219
220#[derive(Debug, Serialize, Deserialize, Clone)]
221pub struct DeltaTextChunkWithItemId {
222 pub item_id: String,
223 pub content_index: u64,
224 pub sequence_number: u64,
225 pub delta: String,
226}
227
228#[derive(Debug, Serialize, Deserialize, Clone)]
229pub struct OutputTextChunk {
230 pub content_index: u64,
231 pub sequence_number: u64,
232 pub text: String,
233}
234
235#[derive(Debug, Serialize, Deserialize, Clone)]
236pub struct RefusalTextChunk {
237 pub content_index: u64,
238 pub sequence_number: u64,
239 pub refusal: String,
240}
241
242#[derive(Debug, Serialize, Deserialize, Clone)]
243pub struct ArgsTextChunk {
244 pub content_index: u64,
245 pub sequence_number: u64,
246 pub arguments: serde_json::Value,
247}
248
249#[derive(Debug, Serialize, Deserialize, Clone)]
250pub struct SummaryPartChunk {
251 pub summary_index: u64,
252 pub sequence_number: u64,
253 pub part: SummaryPartChunkPart,
254}
255
256#[derive(Debug, Serialize, Deserialize, Clone)]
257pub struct SummaryTextChunk {
258 pub summary_index: u64,
259 pub sequence_number: u64,
260 pub delta: String,
261}
262
263#[derive(Debug, Serialize, Deserialize, Clone)]
264#[serde(tag = "type", rename_all = "snake_case")]
265pub enum SummaryPartChunkPart {
266 SummaryText { text: String },
267}
268
269impl<T> ResponsesCompletionModel<T>
270where
271 T: HttpClientExt + Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
272{
273 pub(crate) async fn stream(
274 &self,
275 completion_request: crate::completion::CompletionRequest,
276 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
277 {
278 let mut request = self.create_completion_request(completion_request)?;
279 request.stream = Some(true);
280
281 if enabled!(Level::TRACE) {
282 tracing::trace!(
283 target: "rig::completions",
284 "OpenAI Responses streaming completion request: {}",
285 serde_json::to_string_pretty(&request)?
286 );
287 }
288
289 let body = serde_json::to_vec(&request)?;
290
291 let req = self
292 .client
293 .post("/responses")?
294 .body(body)
295 .map_err(|e| CompletionError::HttpError(e.into()))?;
296
297 let span = if tracing::Span::current().is_disabled() {
300 info_span!(
301 target: "rig::completions",
302 "chat_streaming",
303 gen_ai.operation.name = "chat_streaming",
304 gen_ai.provider.name = tracing::field::Empty,
305 gen_ai.request.model = tracing::field::Empty,
306 gen_ai.response.id = tracing::field::Empty,
307 gen_ai.response.model = tracing::field::Empty,
308 gen_ai.usage.output_tokens = tracing::field::Empty,
309 gen_ai.usage.input_tokens = tracing::field::Empty,
310 gen_ai.usage.cached_tokens = tracing::field::Empty,
311 )
312 } else {
313 tracing::Span::current()
314 };
315 span.record("gen_ai.provider.name", "openai");
316 span.record("gen_ai.request.model", &self.model);
317 let client = self.client.clone();
319
320 let mut event_source = GenericEventSource::new(client, req);
321
322 let stream = stream! {
323 let mut final_usage = ResponsesUsage::new();
324
325 let mut tool_calls: Vec<RawStreamingChoice<StreamingCompletionResponse>> = Vec::new();
326 let mut tool_call_internal_ids: std::collections::HashMap<String, String> = std::collections::HashMap::new();
327 let span = tracing::Span::current();
328
329 while let Some(event_result) = event_source.next().await {
330 match event_result {
331 Ok(Event::Open) => {
332 tracing::trace!("SSE connection opened");
333 tracing::info!("OpenAI stream started");
334 continue;
335 }
336 Ok(Event::Message(evt)) => {
337 if evt.data.trim().is_empty() {
339 continue;
340 }
341
342 let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
343
344 let Ok(data) = data else {
345 let err = data.unwrap_err();
346 debug!("Couldn't serialize data as StreamingCompletionResponse: {:?}", err);
347 continue;
348 };
349
350 if let StreamingCompletionChunk::Delta(chunk) = &data {
351 match &chunk.data {
352 ItemChunkKind::OutputItemAdded(message) => {
353 if let StreamingItemDoneOutput { item: Output::FunctionCall(func), .. } = message {
354 let internal_call_id = tool_call_internal_ids
355 .entry(func.id.clone())
356 .or_insert_with(|| nanoid::nanoid!())
357 .clone();
358 yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
359 id: func.id.clone(),
360 internal_call_id,
361 content: streaming::ToolCallDeltaContent::Name(func.name.clone()),
362 });
363 }
364 }
365 ItemChunkKind::OutputItemDone(message) => {
366 match message {
367 StreamingItemDoneOutput { item: Output::FunctionCall(func), .. } => {
368 let internal_id = tool_call_internal_ids
369 .entry(func.id.clone())
370 .or_insert_with(|| nanoid::nanoid!())
371 .clone();
372 let raw_tool_call = streaming::RawStreamingToolCall::new(
373 func.id.clone(),
374 func.name.clone(),
375 func.arguments.clone(),
376 )
377 .with_internal_call_id(internal_id)
378 .with_call_id(func.call_id.clone());
379 tool_calls.push(streaming::RawStreamingChoice::ToolCall(raw_tool_call));
380 }
381
382 StreamingItemDoneOutput { item: Output::Reasoning { summary, id, encrypted_content, .. }, .. } => {
383 for reasoning_choice in reasoning_choices_from_done_item(
384 id,
385 summary,
386 encrypted_content.as_deref(),
387 ) {
388 yield Ok(reasoning_choice);
389 }
390 }
391 StreamingItemDoneOutput { item: Output::Message(msg), .. } => {
392 yield Ok(streaming::RawStreamingChoice::MessageId(msg.id.clone()));
393 }
394 }
395 }
396 ItemChunkKind::OutputTextDelta(delta) => {
397 yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
398 }
399 ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
400 yield Ok(streaming::RawStreamingChoice::ReasoningDelta { id: None, reasoning: delta.delta.clone() })
401 }
402 ItemChunkKind::RefusalDelta(delta) => {
403 yield Ok(streaming::RawStreamingChoice::Message(delta.delta.clone()))
404 }
405 ItemChunkKind::FunctionCallArgsDelta(delta) => {
406 let internal_call_id = tool_call_internal_ids
407 .entry(delta.item_id.clone())
408 .or_insert_with(|| nanoid::nanoid!())
409 .clone();
410 yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
411 id: delta.item_id.clone(),
412 internal_call_id,
413 content: streaming::ToolCallDeltaContent::Delta(delta.delta.clone())
414 })
415 }
416
417 _ => { continue }
418 }
419 }
420
421 if let StreamingCompletionChunk::Response(chunk) = data {
422 let ResponseChunk { kind, response, .. } = *chunk;
423
424 match kind {
425 ResponseChunkKind::ResponseCompleted => {
426 span.record("gen_ai.response.id", response.id.as_str());
427 span.record("gen_ai.response.model", response.model.as_str());
428 if let Some(usage) = response.usage {
429 final_usage = usage;
430 }
431 }
432 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
433 let error_message = response_chunk_error_message(&kind, &response)
434 .expect("terminal response should have an error message");
435 yield Err(CompletionError::ProviderError(error_message));
436 break;
437 }
438 _ => continue,
439 }
440 }
441 }
442 Err(crate::http_client::Error::StreamEnded) => {
443 event_source.close();
444 }
445 Err(error) => {
446 tracing::error!(?error, "SSE error");
447 yield Err(CompletionError::ProviderError(error.to_string()));
448 break;
449 }
450 }
451 }
452
453 event_source.close();
455
456 for tool_call in &tool_calls {
457 yield Ok(tool_call.to_owned())
458 }
459
460 span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
461 span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
462 span.record(
463 "gen_ai.usage.cached_tokens",
464 final_usage
465 .input_tokens_details
466 .as_ref()
467 .map(|d| d.cached_tokens)
468 .unwrap_or(0),
469 );
470 tracing::info!("OpenAI stream finished");
471
472 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
473 usage: final_usage
474 }));
475 }.instrument(span);
476
477 Ok(streaming::StreamingCompletionResponse::stream(Box::pin(
478 stream,
479 )))
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
486 use crate::completion::CompletionModel;
487 use crate::http_client::mock::MockStreamingClient;
488 use crate::message::ReasoningContent;
489 use crate::providers::openai::responses_api::{
490 AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
491 ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
492 };
493 use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
494 use bytes::Bytes;
495 use futures::StreamExt;
496 use serde_json::{self, json};
497
498 use crate::{
499 client::CompletionClient,
500 completion::{Message, ToolDefinition},
501 providers::openai,
502 streaming::StreamingChat,
503 tool::{Tool, ToolError},
504 };
505
506 struct ExampleTool;
507
508 impl Default for MockStreamingClient {
509 fn default() -> Self {
510 Self {
511 sse_bytes: Bytes::new(),
512 }
513 }
514 }
515
516 impl std::fmt::Debug for MockStreamingClient {
517 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518 f.debug_struct("MockStreamingClient")
519 .finish_non_exhaustive()
520 }
521 }
522
523 fn sample_response(status: ResponseStatus) -> CompletionResponse {
524 CompletionResponse {
525 id: "resp_123".to_string(),
526 object: ResponseObject::Response,
527 created_at: 0,
528 status,
529 error: None,
530 incomplete_details: None,
531 instructions: None,
532 max_output_tokens: None,
533 model: "gpt-5.4".to_string(),
534 usage: None,
535 output: Vec::new(),
536 tools: Vec::new(),
537 additional_parameters: AdditionalParameters::default(),
538 }
539 }
540
541 fn sse_event_bytes(event: serde_json::Value) -> Bytes {
542 Bytes::from(format!(
543 "data: {}\n\n",
544 serde_json::to_string(&event).expect("event should serialize")
545 ))
546 }
547
548 async fn first_error_from_event(
549 event: serde_json::Value,
550 ) -> crate::completion::CompletionError {
551 let client = openai::Client::builder()
552 .http_client(MockStreamingClient {
553 sse_bytes: sse_event_bytes(event),
554 })
555 .api_key("test-key")
556 .build()
557 .expect("client should build");
558 let model = client.completion_model("gpt-5.4");
559 let request = model.completion_request("hello").build();
560 let mut stream = model.stream(request).await.expect("stream should start");
561
562 stream
563 .next()
564 .await
565 .expect("stream should yield an item")
566 .expect_err("stream should surface a provider error")
567 }
568
569 async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
570 let client = openai::Client::builder()
571 .http_client(MockStreamingClient {
572 sse_bytes: sse_event_bytes(event),
573 })
574 .api_key("test-key")
575 .build()
576 .expect("client should build");
577 let model = client.completion_model("gpt-5.4");
578 let request = model.completion_request("hello").build();
579 let mut stream = model.stream(request).await.expect("stream should start");
580
581 while let Some(item) = stream.next().await {
582 match item.expect("completed stream should not error") {
583 StreamedAssistantContent::Final(res) => return res.usage,
584 _ => continue,
585 }
586 }
587
588 panic!("stream should yield a final response");
589 }
590
591 impl Tool for ExampleTool {
592 type Args = ();
593 type Error = ToolError;
594 type Output = String;
595 const NAME: &'static str = "example_tool";
596
597 async fn definition(&self, _prompt: String) -> ToolDefinition {
598 ToolDefinition {
599 name: self.name(),
600 description: "A tool that returns some example text.".to_string(),
601 parameters: serde_json::json!({
602 "type": "object",
603 "properties": {},
604 "required": []
605 }),
606 }
607 }
608
609 async fn call(&self, _input: Self::Args) -> Result<Self::Output, Self::Error> {
610 let result = "Example answer".to_string();
611 Ok(result)
612 }
613 }
614
615 #[test]
616 fn reasoning_done_item_emits_summary_then_encrypted() {
617 let summary = vec![
618 ReasoningSummary::SummaryText {
619 text: "step 1".to_string(),
620 },
621 ReasoningSummary::SummaryText {
622 text: "step 2".to_string(),
623 },
624 ];
625 let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
626
627 assert_eq!(choices.len(), 3);
628 assert!(matches!(
629 choices.first(),
630 Some(RawStreamingChoice::Reasoning {
631 id: Some(id),
632 content: ReasoningContent::Summary(text),
633 }) if id == "rs_1" && text == "step 1"
634 ));
635 assert!(matches!(
636 choices.get(1),
637 Some(RawStreamingChoice::Reasoning {
638 id: Some(id),
639 content: ReasoningContent::Summary(text),
640 }) if id == "rs_1" && text == "step 2"
641 ));
642 assert!(matches!(
643 choices.get(2),
644 Some(RawStreamingChoice::Reasoning {
645 id: Some(id),
646 content: ReasoningContent::Encrypted(data),
647 }) if id == "rs_1" && data == "enc_blob"
648 ));
649 }
650
651 #[test]
652 fn reasoning_done_item_without_encrypted_emits_summary_only() {
653 let summary = vec![ReasoningSummary::SummaryText {
654 text: "only summary".to_string(),
655 }];
656 let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
657
658 assert_eq!(choices.len(), 1);
659 assert!(matches!(
660 choices.first(),
661 Some(RawStreamingChoice::Reasoning {
662 id: Some(id),
663 content: ReasoningContent::Summary(text),
664 }) if id == "rs_2" && text == "only summary"
665 ));
666 }
667
668 #[test]
669 fn content_part_added_deserializes_snake_case_part_type() {
670 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
671 "type": "response.content_part.added",
672 "item_id": "msg_1",
673 "output_index": 0,
674 "content_index": 0,
675 "sequence_number": 3,
676 "part": {
677 "type": "output_text",
678 "text": "hello"
679 }
680 }))
681 .expect("content part event should deserialize");
682
683 assert!(matches!(
684 chunk,
685 StreamingCompletionChunk::Delta(chunk)
686 if matches!(
687 chunk.data,
688 ItemChunkKind::ContentPartAdded(_)
689 )
690 ));
691 }
692
693 #[test]
694 fn content_part_done_deserializes_snake_case_part_type() {
695 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
696 "type": "response.content_part.done",
697 "item_id": "msg_1",
698 "output_index": 0,
699 "content_index": 0,
700 "sequence_number": 4,
701 "part": {
702 "type": "summary_text",
703 "text": "done"
704 }
705 }))
706 .expect("content part done event should deserialize");
707
708 assert!(matches!(
709 chunk,
710 StreamingCompletionChunk::Delta(chunk)
711 if matches!(
712 chunk.data,
713 ItemChunkKind::ContentPartDone(_)
714 )
715 ));
716 }
717
718 #[test]
719 fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
720 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
721 "type": "response.reasoning_summary_part.added",
722 "item_id": "rs_1",
723 "output_index": 0,
724 "summary_index": 0,
725 "sequence_number": 5,
726 "part": {
727 "type": "summary_text",
728 "text": "step 1"
729 }
730 }))
731 .expect("reasoning summary part event should deserialize");
732
733 assert!(matches!(
734 chunk,
735 StreamingCompletionChunk::Delta(chunk)
736 if matches!(
737 chunk.data,
738 ItemChunkKind::ReasoningSummaryPartAdded(_)
739 )
740 ));
741 }
742
743 #[test]
744 fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
745 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
746 "type": "response.reasoning_summary_part.done",
747 "item_id": "rs_1",
748 "output_index": 0,
749 "summary_index": 0,
750 "sequence_number": 6,
751 "part": {
752 "type": "summary_text",
753 "text": "step 2"
754 }
755 }))
756 .expect("reasoning summary part done event should deserialize");
757
758 assert!(matches!(
759 chunk,
760 StreamingCompletionChunk::Delta(chunk)
761 if matches!(
762 chunk.data,
763 ItemChunkKind::ReasoningSummaryPartDone(_)
764 )
765 ));
766 }
767
768 #[tokio::test]
769 async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
770 let mut response = sample_response(ResponseStatus::Failed);
771 response.error = Some(ResponseError {
772 code: String::new(),
773 message: "maximum context length exceeded".to_string(),
774 });
775
776 let event = json!({
777 "type": "response.failed",
778 "sequence_number": 1,
779 "response": response,
780 });
781
782 let err = first_error_from_event(event).await;
783
784 assert_eq!(
785 err.to_string(),
786 "ProviderError: maximum context length exceeded"
787 );
788 }
789
790 #[tokio::test]
791 async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
792 let mut response = sample_response(ResponseStatus::Failed);
793 response.error = Some(ResponseError {
794 code: "context_length_exceeded".to_string(),
795 message: "maximum context length exceeded".to_string(),
796 });
797
798 let event = json!({
799 "type": "response.failed",
800 "sequence_number": 1,
801 "response": response,
802 });
803
804 let err = first_error_from_event(event).await;
805
806 assert_eq!(
807 err.to_string(),
808 "ProviderError: context_length_exceeded: maximum context length exceeded"
809 );
810 }
811
812 #[tokio::test]
813 async fn response_incomplete_chunk_uses_incomplete_details_reason() {
814 let mut response = sample_response(ResponseStatus::Incomplete);
815 response.incomplete_details = Some(IncompleteDetailsReason {
816 reason: "max_output_tokens".to_string(),
817 });
818
819 let event = json!({
820 "type": "response.incomplete",
821 "sequence_number": 1,
822 "response": response,
823 });
824
825 let err = first_error_from_event(event).await;
826
827 assert_eq!(
828 err.to_string(),
829 "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
830 );
831 }
832
833 #[tokio::test]
834 async fn response_completed_chunk_populates_final_usage() {
835 let mut response = sample_response(ResponseStatus::Completed);
836 response.usage = Some(ResponsesUsage {
837 input_tokens: 10,
838 input_tokens_details: None,
839 output_tokens: 5,
840 output_tokens_details: OutputTokensDetails {
841 reasoning_tokens: 0,
842 },
843 total_tokens: 15,
844 });
845
846 let event = json!({
847 "type": "response.completed",
848 "sequence_number": 1,
849 "response": response,
850 });
851
852 let usage = final_usage_from_event(event).await;
853 assert_eq!(usage.input_tokens, 10);
854 assert_eq!(usage.output_tokens, 5);
855 assert_eq!(usage.total_tokens, 15);
856 }
857
858 #[tokio::test]
860 #[ignore = "requires API key"]
861 async fn test_openai_streaming_tools_reasoning() {
862 let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
863 let client = openai::Client::new(&api_key).expect("Failed to build client");
864 let agent = client
865 .agent("gpt-5.2")
866 .max_tokens(8192)
867 .tool(ExampleTool)
868 .additional_params(serde_json::json!({
869 "reasoning": {"effort": "high"}
870 }))
871 .build();
872
873 let chat_history: Vec<Message> = Vec::new();
874 let mut stream = agent
875 .stream_chat("Call my example tool", &chat_history)
876 .multi_turn(5)
877 .await;
878
879 while let Some(item) = stream.next().await {
880 println!("Got item: {item:?}");
881 }
882 }
883}