1use crate::completion::{self, CompletionError, GetTokenUsage};
4use crate::http_client::HttpClientExt;
5use crate::http_client::sse::{Event, GenericEventSource};
6use crate::message::ReasoningContent;
7use crate::providers::openai::responses_api::{ReasoningSummary, ResponsesUsage};
8use crate::streaming;
9use crate::streaming::RawStreamingChoice;
10use crate::wasm_compat::WasmCompatSend;
11use async_stream::stream;
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use tracing::{Level, debug, enabled, info_span};
15use tracing_futures::Instrument as _;
16
17use super::{CompletionResponse, GenericResponsesCompletionModel, Output};
18
19type StreamingRawChoice = RawStreamingChoice<StreamingCompletionResponse>;
20
21#[derive(Debug, Serialize, Deserialize, Clone)]
30#[serde(untagged)]
31pub enum StreamingCompletionChunk {
32 Response(Box<ResponseChunk>),
33 Delta(ItemChunk),
34}
35
36#[derive(Debug, Serialize, Deserialize, Clone)]
38pub struct StreamingCompletionResponse {
39 pub usage: ResponsesUsage,
41}
42
43pub(crate) fn reasoning_choices_from_done_item(
44 id: &str,
45 summary: &[ReasoningSummary],
46 encrypted_content: Option<&str>,
47) -> Vec<RawStreamingChoice<StreamingCompletionResponse>> {
48 let mut choices = summary
49 .iter()
50 .map(|reasoning_summary| match reasoning_summary {
51 ReasoningSummary::SummaryText { text } => RawStreamingChoice::Reasoning {
52 id: Some(id.to_owned()),
53 content: ReasoningContent::Summary(text.to_owned()),
54 },
55 })
56 .collect::<Vec<_>>();
57
58 if let Some(encrypted_content) = encrypted_content {
59 choices.push(RawStreamingChoice::Reasoning {
60 id: Some(id.to_owned()),
61 content: ReasoningContent::Encrypted(encrypted_content.to_owned()),
62 });
63 }
64
65 choices
66}
67
68impl GetTokenUsage for StreamingCompletionResponse {
69 fn token_usage(&self) -> Option<crate::completion::Usage> {
70 self.usage.token_usage()
71 }
72}
73
74#[derive(Debug, Serialize, Deserialize, Clone)]
76pub struct ResponseChunk {
77 #[serde(rename = "type")]
79 pub kind: ResponseChunkKind,
80 pub response: CompletionResponse,
82 pub sequence_number: u64,
84}
85
86#[derive(Debug, Serialize, Deserialize, Clone)]
89pub enum ResponseChunkKind {
90 #[serde(rename = "response.created")]
91 ResponseCreated,
92 #[serde(rename = "response.in_progress")]
93 ResponseInProgress,
94 #[serde(rename = "response.completed")]
95 ResponseCompleted,
96 #[serde(rename = "response.failed")]
97 ResponseFailed,
98 #[serde(rename = "response.incomplete")]
99 ResponseIncomplete,
100}
101
102fn response_chunk_error_message(
103 kind: &ResponseChunkKind,
104 response: &CompletionResponse,
105 provider_name: &str,
106) -> Option<String> {
107 match kind {
108 ResponseChunkKind::ResponseFailed => Some(response_error_message(
109 response.error.as_ref(),
110 &format!("{provider_name} response stream returned a failed response"),
111 )),
112 ResponseChunkKind::ResponseIncomplete => {
113 let reason = response
114 .incomplete_details
115 .as_ref()
116 .map(|details| details.reason.as_str())
117 .unwrap_or("unknown reason");
118
119 Some(format!(
120 "{provider_name} response stream was incomplete: {reason}"
121 ))
122 }
123 _ => None,
124 }
125}
126
127fn response_error_message(error: Option<&super::ResponseError>, fallback: &str) -> String {
128 if let Some(error) = error {
129 if error.code.is_empty() {
130 error.message.clone()
131 } else {
132 format!("{}: {}", error.code, error.message)
133 }
134 } else {
135 fallback.to_string()
136 }
137}
138
139#[derive(Clone, Copy)]
140pub(crate) enum ResponsesStreamOptions {
141 Strict,
142 StrictWithImmediateToolCalls,
143}
144
145impl ResponsesStreamOptions {
146 pub(crate) const fn strict() -> Self {
147 Self::Strict
148 }
149
150 pub(crate) const fn strict_with_immediate_tool_calls() -> Self {
151 Self::StrictWithImmediateToolCalls
152 }
153
154 const fn errors_on_terminal_response(self) -> bool {
155 true
156 }
157
158 const fn emits_completed_tool_calls_immediately(self) -> bool {
159 matches!(self, Self::StrictWithImmediateToolCalls)
160 }
161}
162
163pub(crate) fn parse_sse_completion_body(
164 body: &str,
165 provider_name: &str,
166) -> Result<CompletionResponse, CompletionError> {
167 let mut completed = None;
168 let mut provider_error = None;
169
170 for line in body.lines() {
171 let data = line
172 .strip_prefix("data:")
173 .map(str::trim)
174 .unwrap_or_default();
175 if data.is_empty() || data == "[DONE]" {
176 continue;
177 }
178
179 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
180 if let StreamingCompletionChunk::Response(chunk) = chunk {
181 let ResponseChunk { kind, response, .. } = *chunk;
182 match kind {
183 ResponseChunkKind::ResponseCompleted => {
184 completed = Some(response);
185 break;
186 }
187 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete => {
188 provider_error =
189 response_chunk_error_message(&kind, &response, provider_name);
190 }
191 _ => {}
192 }
193 }
194 continue;
195 }
196
197 let value = match serde_json::from_str::<serde_json::Value>(data) {
198 Ok(value) => value,
199 Err(_) => continue,
200 };
201
202 match value.get("type").and_then(serde_json::Value::as_str) {
203 Some("response.completed") => {
204 if let Some(response) = value.get("response") {
205 completed = Some(serde_json::from_value(response.clone())?);
206 break;
207 }
208 }
209 Some("response.failed") | Some("response.incomplete") => {
210 provider_error = value
211 .get("response")
212 .cloned()
213 .and_then(|response| {
214 serde_json::from_value::<CompletionResponse>(response).ok()
215 })
216 .and_then(|response| {
217 let kind = if value.get("type").and_then(serde_json::Value::as_str)
218 == Some("response.failed")
219 {
220 ResponseChunkKind::ResponseFailed
221 } else {
222 ResponseChunkKind::ResponseIncomplete
223 };
224 response_chunk_error_message(&kind, &response, provider_name)
225 })
226 .or_else(|| {
227 value
228 .get("error")
229 .and_then(|error| error.get("message"))
230 .and_then(serde_json::Value::as_str)
231 .map(ToOwned::to_owned)
232 })
233 .or_else(|| Some(data.to_string()));
234 }
235 Some("error") => {
236 provider_error = value
237 .get("error")
238 .and_then(|error| error.get("message"))
239 .and_then(serde_json::Value::as_str)
240 .map(ToOwned::to_owned)
241 .or_else(|| Some(data.to_string()));
242 }
243 _ => {}
244 }
245 }
246
247 completed.ok_or_else(|| {
248 CompletionError::ProviderError(
249 provider_error.unwrap_or_else(|| {
250 format!("{provider_name} stream did not yield response.completed")
251 }),
252 )
253 })
254}
255
256struct RawChoiceAccumulator {
257 final_usage: ResponsesUsage,
258 tool_calls: Vec<StreamingRawChoice>,
259 tool_call_internal_ids: std::collections::HashMap<String, String>,
260}
261
262impl RawChoiceAccumulator {
263 fn new(initial_usage: ResponsesUsage) -> Self {
264 Self {
265 final_usage: initial_usage,
266 tool_calls: Vec::new(),
267 tool_call_internal_ids: std::collections::HashMap::new(),
268 }
269 }
270
271 fn decode_item_chunk(
272 &mut self,
273 item: ItemChunkKind,
274 options: ResponsesStreamOptions,
275 ) -> Vec<StreamingRawChoice> {
276 let mut immediate = Vec::new();
277
278 match item {
279 ItemChunkKind::OutputItemAdded(StreamingItemDoneOutput {
280 item: Output::FunctionCall(func),
281 ..
282 }) => {
283 let internal_call_id = self
284 .tool_call_internal_ids
285 .entry(func.id.clone())
286 .or_insert_with(|| nanoid::nanoid!())
287 .clone();
288 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
289 id: func.id,
290 internal_call_id,
291 content: streaming::ToolCallDeltaContent::Name(func.name),
292 });
293 }
294 ItemChunkKind::OutputItemDone(message) => {
295 self.push_output_item_done(
296 message.item,
297 &mut immediate,
298 options.emits_completed_tool_calls_immediately(),
299 );
300 }
301 ItemChunkKind::OutputTextDelta(delta) => {
302 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
303 }
304 ItemChunkKind::ReasoningSummaryTextDelta(delta) => {
305 immediate.push(streaming::RawStreamingChoice::ReasoningDelta {
306 id: None,
307 reasoning: delta.delta,
308 });
309 }
310 ItemChunkKind::RefusalDelta(delta) => {
311 immediate.push(streaming::RawStreamingChoice::Message(delta.delta));
312 }
313 ItemChunkKind::FunctionCallArgsDelta(delta) => {
314 let internal_call_id = self
315 .tool_call_internal_ids
316 .entry(delta.item_id.clone())
317 .or_insert_with(|| nanoid::nanoid!())
318 .clone();
319 immediate.push(streaming::RawStreamingChoice::ToolCallDelta {
320 id: delta.item_id,
321 internal_call_id,
322 content: streaming::ToolCallDeltaContent::Delta(delta.delta),
323 });
324 }
325 _ => {}
326 }
327
328 immediate
329 }
330
331 fn record_response_chunk(
332 &mut self,
333 kind: ResponseChunkKind,
334 response: CompletionResponse,
335 provider_name: &str,
336 options: ResponsesStreamOptions,
337 ) -> Result<(), CompletionError> {
338 match kind {
339 ResponseChunkKind::ResponseCompleted => {
340 if let Some(usage) = response.usage {
341 self.final_usage = usage;
342 }
343 Ok(())
344 }
345 ResponseChunkKind::ResponseFailed | ResponseChunkKind::ResponseIncomplete
346 if options.errors_on_terminal_response() =>
347 {
348 let error_message = response_chunk_error_message(&kind, &response, provider_name)
349 .unwrap_or_else(|| {
350 format!(
351 "{provider_name} returned terminal response {:?} without an error message",
352 kind
353 )
354 });
355 Err(CompletionError::ProviderError(error_message))
356 }
357 _ => Ok(()),
358 }
359 }
360
361 fn push_output_item_done(
362 &mut self,
363 item: Output,
364 immediate: &mut Vec<StreamingRawChoice>,
365 emit_completed_tool_calls_immediately: bool,
366 ) {
367 match item {
368 Output::FunctionCall(func) => {
369 let internal_call_id = self
370 .tool_call_internal_ids
371 .entry(func.id.clone())
372 .or_insert_with(|| nanoid::nanoid!())
373 .clone();
374 let tool_call =
375 streaming::RawStreamingToolCall::new(func.id, func.name, func.arguments)
376 .with_internal_call_id(internal_call_id)
377 .with_call_id(func.call_id);
378
379 if emit_completed_tool_calls_immediately {
380 immediate.push(streaming::RawStreamingChoice::ToolCall(tool_call));
381 } else {
382 self.tool_calls
383 .push(streaming::RawStreamingChoice::ToolCall(tool_call));
384 }
385 }
386 Output::Reasoning {
387 id,
388 summary,
389 encrypted_content,
390 ..
391 } => {
392 immediate.extend(reasoning_choices_from_done_item(
393 &id,
394 &summary,
395 encrypted_content.as_deref(),
396 ));
397 }
398 Output::Message(message) => {
399 immediate.push(streaming::RawStreamingChoice::MessageId(message.id));
400 }
401 Output::Unknown => {}
402 }
403 }
404
405 fn finish(mut self) -> Vec<StreamingRawChoice> {
406 let mut choices = Vec::new();
407 choices.append(&mut self.tool_calls);
408 choices.push(RawStreamingChoice::FinalResponse(
409 StreamingCompletionResponse {
410 usage: self.final_usage,
411 },
412 ));
413 choices
414 }
415}
416
417pub(crate) fn raw_choices_from_sse_body(
418 body: &str,
419 initial_usage: ResponsesUsage,
420 provider_name: &str,
421) -> Result<Vec<StreamingRawChoice>, CompletionError> {
422 let mut raw_choices = Vec::new();
423 let mut accumulator = RawChoiceAccumulator::new(initial_usage);
424 let options = ResponsesStreamOptions::strict();
425
426 for line in body.lines() {
427 let data = line
428 .strip_prefix("data:")
429 .map(str::trim)
430 .unwrap_or_default();
431 if data.is_empty() || data == "[DONE]" {
432 continue;
433 }
434
435 if let Ok(chunk) = serde_json::from_str::<StreamingCompletionChunk>(data) {
436 match chunk {
437 StreamingCompletionChunk::Delta(chunk) => {
438 raw_choices.extend(accumulator.decode_item_chunk(chunk.data, options));
439 }
440 StreamingCompletionChunk::Response(chunk) => {
441 let ResponseChunk { kind, response, .. } = *chunk;
442 accumulator.record_response_chunk(kind, response, provider_name, options)?;
443 }
444 }
445 continue;
446 }
447
448 let value = match serde_json::from_str::<serde_json::Value>(data) {
449 Ok(value) => value,
450 Err(_) => continue,
451 };
452
453 match value.get("type").and_then(serde_json::Value::as_str) {
454 Some("response.output_text.delta") | Some("response.refusal.delta") => {
455 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
456 raw_choices.push(streaming::RawStreamingChoice::Message(delta.to_owned()));
457 }
458 }
459 Some("response.reasoning_summary_text.delta") => {
460 if let Some(delta) = value.get("delta").and_then(serde_json::Value::as_str) {
461 raw_choices.push(streaming::RawStreamingChoice::ReasoningDelta {
462 id: None,
463 reasoning: delta.to_owned(),
464 });
465 }
466 }
467 Some("response.output_item.added") => {
468 if let Some(item) = value
469 .get("item")
470 .cloned()
471 .and_then(|item| serde_json::from_value::<Output>(item).ok())
472 && let Output::FunctionCall(func) = item
473 {
474 let internal_call_id = accumulator
475 .tool_call_internal_ids
476 .entry(func.id.clone())
477 .or_insert_with(|| nanoid::nanoid!())
478 .clone();
479 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
480 id: func.id,
481 internal_call_id,
482 content: streaming::ToolCallDeltaContent::Name(func.name),
483 });
484 }
485 }
486 Some("response.output_item.done") => {
487 if let Some(item) = value
488 .get("item")
489 .cloned()
490 .and_then(|item| serde_json::from_value::<Output>(item).ok())
491 {
492 accumulator.push_output_item_done(item, &mut raw_choices, false);
493 }
494 }
495 Some("response.function_call_arguments.delta") => {
496 if let (Some(item_id), Some(delta)) = (
497 value.get("item_id").and_then(serde_json::Value::as_str),
498 value.get("delta").and_then(serde_json::Value::as_str),
499 ) {
500 let internal_call_id = accumulator
501 .tool_call_internal_ids
502 .entry(item_id.to_owned())
503 .or_insert_with(|| nanoid::nanoid!())
504 .clone();
505 raw_choices.push(streaming::RawStreamingChoice::ToolCallDelta {
506 id: item_id.to_owned(),
507 internal_call_id,
508 content: streaming::ToolCallDeltaContent::Delta(delta.to_owned()),
509 });
510 }
511 }
512 Some("response.completed") | Some("response.failed") | Some("response.incomplete") => {
513 if let Some(response) = value.get("response").cloned() {
514 let response = serde_json::from_value::<CompletionResponse>(response)?;
515 let Some(kind) = (match value.get("type").and_then(serde_json::Value::as_str) {
516 Some("response.completed") => Some(ResponseChunkKind::ResponseCompleted),
517 Some("response.failed") => Some(ResponseChunkKind::ResponseFailed),
518 Some("response.incomplete") => Some(ResponseChunkKind::ResponseIncomplete),
519 _ => None,
520 }) else {
521 continue;
522 };
523 accumulator.record_response_chunk(kind, response, provider_name, options)?;
524 }
525 }
526 Some("error") => {
527 let message = value
528 .get("error")
529 .and_then(|error| error.get("message"))
530 .and_then(serde_json::Value::as_str)
531 .unwrap_or(data);
532 return Err(CompletionError::ProviderError(message.to_owned()));
533 }
534 _ => {}
535 }
536 }
537
538 raw_choices.extend(accumulator.finish());
539 Ok(raw_choices)
540}
541
542pub(crate) async fn completion_response_from_sse_body(
543 body: &str,
544 raw_response: CompletionResponse,
545 provider_name: &str,
546) -> Result<completion::CompletionResponse<CompletionResponse>, CompletionError> {
547 let raw_choices = raw_choices_from_sse_body(
548 body,
549 raw_response
550 .usage
551 .clone()
552 .unwrap_or_else(ResponsesUsage::new),
553 provider_name,
554 )?;
555 let stream = futures::stream::iter(
556 raw_choices
557 .into_iter()
558 .map(Ok::<_, CompletionError>)
559 .collect::<Vec<_>>(),
560 );
561 let mut stream = crate::streaming::StreamingCompletionResponse::stream(Box::pin(stream));
562
563 while let Some(item) = stream.next().await {
564 item?;
565 }
566
567 if choice_is_empty(&stream.choice) {
568 return Err(CompletionError::ResponseError(
569 "Response contained no parts".to_owned(),
570 ));
571 }
572
573 Ok(completion::CompletionResponse {
574 usage: stream
575 .response
576 .as_ref()
577 .and_then(GetTokenUsage::token_usage)
578 .unwrap_or_else(|| usage_from_raw_response(&raw_response)),
579 message_id: stream
580 .message_id
581 .clone()
582 .or_else(|| message_id_from_response(&raw_response)),
583 choice: stream.choice,
584 raw_response,
585 })
586}
587
588fn choice_is_empty(choice: &crate::OneOrMany<completion::AssistantContent>) -> bool {
589 choice.iter().all(|content| match content {
590 completion::AssistantContent::Text(text) => text.text.trim().is_empty(),
591 completion::AssistantContent::Reasoning(reasoning) => reasoning.content.is_empty(),
592 completion::AssistantContent::Image(_) => false,
593 completion::AssistantContent::ToolCall(_) => false,
594 })
595}
596
597fn message_id_from_response(response: &CompletionResponse) -> Option<String> {
598 response.output.iter().find_map(|item| match item {
599 Output::Message(message) => Some(message.id.clone()),
600 _ => None,
601 })
602}
603
604fn usage_from_raw_response(response: &CompletionResponse) -> completion::Usage {
605 response
606 .usage
607 .as_ref()
608 .and_then(GetTokenUsage::token_usage)
609 .unwrap_or_default()
610}
611
612pub(crate) fn stream_from_event_source<HttpClient, RequestBody>(
613 event_source: GenericEventSource<HttpClient, RequestBody>,
614 span: tracing::Span,
615 provider_name: &'static str,
616) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
617where
618 HttpClient: HttpClientExt + Clone + 'static,
619 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
620{
621 stream_from_event_source_with_options(
622 event_source,
623 span,
624 provider_name,
625 ResponsesStreamOptions::strict(),
626 )
627}
628
629pub(crate) fn stream_from_event_source_with_options<HttpClient, RequestBody>(
630 mut event_source: GenericEventSource<HttpClient, RequestBody>,
631 span: tracing::Span,
632 provider_name: &'static str,
633 options: ResponsesStreamOptions,
634) -> streaming::StreamingCompletionResponse<StreamingCompletionResponse>
635where
636 HttpClient: HttpClientExt + Clone + 'static,
637 RequestBody: Into<bytes::Bytes> + Clone + WasmCompatSend + 'static,
638{
639 let stream = stream! {
640 let mut accumulator = RawChoiceAccumulator::new(ResponsesUsage::new());
641 let span = tracing::Span::current();
642
643 let mut terminated_with_error = false;
644
645 while let Some(event_result) = event_source.next().await {
646 match event_result {
647 Ok(Event::Open) => {
648 tracing::trace!("SSE connection opened");
649 continue;
650 }
651 Ok(Event::Message(evt)) => {
652 if evt.data.trim().is_empty() || evt.data == "[DONE]" {
653 continue;
654 }
655
656 let data = serde_json::from_str::<StreamingCompletionChunk>(&evt.data);
657
658 let Ok(data) = data else {
659 let Err(err) = data else {
660 continue;
661 };
662 debug!(
663 "Couldn't deserialize SSE data as StreamingCompletionChunk: {:?}",
664 err
665 );
666 continue;
667 };
668
669 match data {
670 StreamingCompletionChunk::Delta(chunk) => {
671 for choice in accumulator.decode_item_chunk(chunk.data, options) {
672 yield Ok(choice);
673 }
674 }
675 StreamingCompletionChunk::Response(chunk) => {
676 let ResponseChunk { kind, response, .. } = *chunk;
677 if matches!(kind, ResponseChunkKind::ResponseCompleted) {
678 span.record("gen_ai.response.id", response.id.as_str());
679 span.record("gen_ai.response.model", response.model.as_str());
680 }
681 if let Err(error) =
682 accumulator.record_response_chunk(kind, response, provider_name, options)
683 {
684 terminated_with_error = true;
685 yield Err(error);
686 break;
687 }
688 }
689 }
690 }
691 Err(crate::http_client::Error::StreamEnded) => {
692 event_source.close();
693 }
694 Err(error) => {
695 tracing::error!(?error, "SSE error");
696 terminated_with_error = true;
697 yield Err(CompletionError::ProviderError(error.to_string()));
698 break;
699 }
700 }
701 }
702
703 event_source.close();
704
705 if terminated_with_error {
706 return;
707 }
708
709 let final_usage = accumulator.final_usage.clone();
710
711 for tool_call in accumulator.finish() {
712 yield Ok(tool_call)
713 }
714
715 span.record("gen_ai.usage.input_tokens", final_usage.input_tokens);
716 span.record("gen_ai.usage.output_tokens", final_usage.output_tokens);
717 let cached_tokens = final_usage
718 .input_tokens_details
719 .as_ref()
720 .map(|d| d.cached_tokens)
721 .unwrap_or(0);
722 span.record("gen_ai.usage.cache_read.input_tokens", cached_tokens);
723
724 }
725 .instrument(span);
726
727 streaming::StreamingCompletionResponse::stream(Box::pin(stream))
728}
729
730#[derive(Debug, Serialize, Deserialize, Clone)]
733pub struct ItemChunk {
734 pub item_id: Option<String>,
736 pub output_index: u64,
738 #[serde(flatten)]
740 pub data: ItemChunkKind,
741}
742
743#[derive(Debug, Serialize, Deserialize, Clone)]
745#[serde(tag = "type")]
746pub enum ItemChunkKind {
747 #[serde(rename = "response.output_item.added")]
748 OutputItemAdded(StreamingItemDoneOutput),
749 #[serde(rename = "response.output_item.done")]
750 OutputItemDone(StreamingItemDoneOutput),
751 #[serde(rename = "response.content_part.added")]
752 ContentPartAdded(ContentPartChunk),
753 #[serde(rename = "response.content_part.done")]
754 ContentPartDone(ContentPartChunk),
755 #[serde(rename = "response.output_text.delta")]
756 OutputTextDelta(DeltaTextChunk),
757 #[serde(rename = "response.output_text.done")]
758 OutputTextDone(OutputTextChunk),
759 #[serde(rename = "response.refusal.delta")]
760 RefusalDelta(DeltaTextChunk),
761 #[serde(rename = "response.refusal.done")]
762 RefusalDone(RefusalTextChunk),
763 #[serde(rename = "response.function_call_arguments.delta")]
764 FunctionCallArgsDelta(DeltaTextChunkWithItemId),
765 #[serde(rename = "response.function_call_arguments.done")]
766 FunctionCallArgsDone(ArgsTextChunk),
767 #[serde(rename = "response.reasoning_summary_part.added")]
768 ReasoningSummaryPartAdded(SummaryPartChunk),
769 #[serde(rename = "response.reasoning_summary_part.done")]
770 ReasoningSummaryPartDone(SummaryPartChunk),
771 #[serde(rename = "response.reasoning_summary_text.delta")]
772 ReasoningSummaryTextDelta(SummaryTextChunk),
773 #[serde(rename = "response.reasoning_summary_text.done")]
774 ReasoningSummaryTextDone(SummaryTextChunk),
775 #[serde(other)]
778 Unknown,
779}
780
781#[derive(Debug, Serialize, Deserialize, Clone)]
782pub struct StreamingItemDoneOutput {
783 pub sequence_number: u64,
784 pub item: Output,
785}
786
787#[derive(Debug, Serialize, Deserialize, Clone)]
788pub struct ContentPartChunk {
789 pub content_index: u64,
790 pub sequence_number: u64,
791 pub part: ContentPartChunkPart,
792}
793
794#[derive(Debug, Serialize, Deserialize, Clone)]
795#[serde(tag = "type", rename_all = "snake_case")]
796pub enum ContentPartChunkPart {
797 OutputText { text: String },
798 SummaryText { text: String },
799}
800
801#[derive(Debug, Serialize, Deserialize, Clone)]
802pub struct DeltaTextChunk {
803 pub content_index: u64,
804 pub sequence_number: u64,
805 pub delta: String,
806}
807
808#[derive(Debug, Serialize, Deserialize, Clone)]
809pub struct DeltaTextChunkWithItemId {
810 pub item_id: String,
811 pub content_index: u64,
812 pub sequence_number: u64,
813 pub delta: String,
814}
815
816#[derive(Debug, Serialize, Deserialize, Clone)]
817pub struct OutputTextChunk {
818 pub content_index: u64,
819 pub sequence_number: u64,
820 pub text: String,
821}
822
823#[derive(Debug, Serialize, Deserialize, Clone)]
824pub struct RefusalTextChunk {
825 pub content_index: u64,
826 pub sequence_number: u64,
827 pub refusal: String,
828}
829
830#[derive(Debug, Serialize, Deserialize, Clone)]
831pub struct ArgsTextChunk {
832 pub content_index: u64,
833 pub sequence_number: u64,
834 pub arguments: serde_json::Value,
835}
836
837#[derive(Debug, Serialize, Deserialize, Clone)]
838pub struct SummaryPartChunk {
839 pub summary_index: u64,
840 pub sequence_number: u64,
841 pub part: SummaryPartChunkPart,
842}
843
844#[derive(Debug, Serialize, Deserialize, Clone)]
845pub struct SummaryTextChunk {
846 pub summary_index: u64,
847 pub sequence_number: u64,
848 pub delta: String,
849}
850
851#[derive(Debug, Serialize, Deserialize, Clone)]
852#[serde(tag = "type", rename_all = "snake_case")]
853pub enum SummaryPartChunkPart {
854 SummaryText { text: String },
855}
856
857impl<Ext, H> GenericResponsesCompletionModel<Ext, H>
858where
859 crate::client::Client<Ext, H>:
860 HttpClientExt + Clone + std::fmt::Debug + WasmCompatSend + 'static,
861 Ext: crate::client::Provider + Clone + 'static,
862 H: Clone + Default + std::fmt::Debug + WasmCompatSend + 'static,
863{
864 pub(crate) async fn stream(
865 &self,
866 completion_request: crate::completion::CompletionRequest,
867 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
868 {
869 let mut request = self.create_completion_request(completion_request)?;
870 request.stream = Some(true);
871
872 if enabled!(Level::TRACE) {
873 tracing::trace!(
874 target: "rig::completions",
875 "OpenAI Responses streaming completion request: {}",
876 serde_json::to_string_pretty(&request)?
877 );
878 }
879
880 let body = serde_json::to_vec(&request)?;
881
882 let req = self
883 .client
884 .post("/responses")?
885 .body(body)
886 .map_err(|e| CompletionError::HttpError(e.into()))?;
887
888 let span = if tracing::Span::current().is_disabled() {
891 info_span!(
892 target: "rig::completions",
893 "chat_streaming",
894 gen_ai.operation.name = "chat_streaming",
895 gen_ai.provider.name = tracing::field::Empty,
896 gen_ai.request.model = tracing::field::Empty,
897 gen_ai.response.id = tracing::field::Empty,
898 gen_ai.response.model = tracing::field::Empty,
899 gen_ai.usage.output_tokens = tracing::field::Empty,
900 gen_ai.usage.input_tokens = tracing::field::Empty,
901 gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
902 )
903 } else {
904 tracing::Span::current()
905 };
906 span.record("gen_ai.provider.name", "openai");
907 span.record("gen_ai.request.model", &self.model);
908 let client = self.client.clone();
909 let event_source = GenericEventSource::new(client, req);
910
911 Ok(stream_from_event_source(event_source, span, "OpenAI"))
912 }
913}
914
915#[cfg(test)]
916mod tests {
917 use super::{ItemChunkKind, StreamingCompletionChunk, reasoning_choices_from_done_item};
918 use crate::completion::CompletionModel;
919 use crate::http_client::mock::MockStreamingClient;
920 use crate::message::ReasoningContent;
921 use crate::providers::internal::openai_chat_completions_compatible::test_support::sse_bytes_from_json_events;
922 use crate::providers::openai::responses_api::{
923 AdditionalParameters, CompletionResponse, IncompleteDetailsReason, OutputTokensDetails,
924 ReasoningSummary, ResponseError, ResponseObject, ResponseStatus, ResponsesUsage,
925 };
926 use crate::streaming::{RawStreamingChoice, StreamedAssistantContent};
927 use bytes::Bytes;
928 use futures::StreamExt;
929 use serde_json::{self, json};
930
931 use crate::{
932 client::CompletionClient,
933 completion::{Message, ToolDefinition},
934 providers::openai,
935 streaming::StreamingChat,
936 tool::{Tool, ToolError},
937 };
938
939 struct ExampleTool;
940
941 impl Default for MockStreamingClient {
942 fn default() -> Self {
943 Self {
944 sse_bytes: Bytes::new(),
945 }
946 }
947 }
948
949 impl std::fmt::Debug for MockStreamingClient {
950 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951 f.debug_struct("MockStreamingClient")
952 .finish_non_exhaustive()
953 }
954 }
955
956 fn sample_response(status: ResponseStatus) -> CompletionResponse {
957 CompletionResponse {
958 id: "resp_123".to_string(),
959 object: ResponseObject::Response,
960 created_at: 0,
961 status,
962 error: None,
963 incomplete_details: None,
964 instructions: None,
965 max_output_tokens: None,
966 model: "gpt-5.4".to_string(),
967 usage: None,
968 output: Vec::new(),
969 tools: Vec::new(),
970 additional_parameters: AdditionalParameters::default(),
971 }
972 }
973
974 async fn first_error_from_event(
975 event: serde_json::Value,
976 ) -> crate::completion::CompletionError {
977 let client = openai::Client::builder()
978 .http_client(MockStreamingClient {
979 sse_bytes: sse_bytes_from_json_events(&[event]),
980 })
981 .api_key("test-key")
982 .build()
983 .expect("client should build");
984 let model = client.completion_model("gpt-5.4");
985 let request = model.completion_request("hello").build();
986 let mut stream = model.stream(request).await.expect("stream should start");
987
988 stream
989 .next()
990 .await
991 .expect("stream should yield an item")
992 .expect_err("stream should surface a provider error")
993 }
994
995 async fn final_usage_from_event(event: serde_json::Value) -> ResponsesUsage {
996 let client = openai::Client::builder()
997 .http_client(MockStreamingClient {
998 sse_bytes: sse_bytes_from_json_events(&[event]),
999 })
1000 .api_key("test-key")
1001 .build()
1002 .expect("client should build");
1003 let model = client.completion_model("gpt-5.4");
1004 let request = model.completion_request("hello").build();
1005 let mut stream = model.stream(request).await.expect("stream should start");
1006
1007 while let Some(item) = stream.next().await {
1008 match item.expect("completed stream should not error") {
1009 StreamedAssistantContent::Final(res) => return res.usage,
1010 _ => continue,
1011 }
1012 }
1013
1014 panic!("stream should yield a final response");
1015 }
1016
1017 impl Tool for ExampleTool {
1018 type Args = ();
1019 type Error = ToolError;
1020 type Output = String;
1021 const NAME: &'static str = "example_tool";
1022
1023 async fn definition(&self, _prompt: String) -> ToolDefinition {
1024 ToolDefinition {
1025 name: self.name(),
1026 description: "A tool that returns some example text.".to_string(),
1027 parameters: serde_json::json!({
1028 "type": "object",
1029 "properties": {},
1030 "required": []
1031 }),
1032 }
1033 }
1034
1035 async fn call(&self, _input: Self::Args) -> Result<Self::Output, Self::Error> {
1036 let result = "Example answer".to_string();
1037 Ok(result)
1038 }
1039 }
1040
1041 #[test]
1042 fn reasoning_done_item_emits_summary_then_encrypted() {
1043 let summary = vec![
1044 ReasoningSummary::SummaryText {
1045 text: "step 1".to_string(),
1046 },
1047 ReasoningSummary::SummaryText {
1048 text: "step 2".to_string(),
1049 },
1050 ];
1051 let choices = reasoning_choices_from_done_item("rs_1", &summary, Some("enc_blob"));
1052
1053 assert_eq!(choices.len(), 3);
1054 assert!(matches!(
1055 choices.first(),
1056 Some(RawStreamingChoice::Reasoning {
1057 id: Some(id),
1058 content: ReasoningContent::Summary(text),
1059 }) if id == "rs_1" && text == "step 1"
1060 ));
1061 assert!(matches!(
1062 choices.get(1),
1063 Some(RawStreamingChoice::Reasoning {
1064 id: Some(id),
1065 content: ReasoningContent::Summary(text),
1066 }) if id == "rs_1" && text == "step 2"
1067 ));
1068 assert!(matches!(
1069 choices.get(2),
1070 Some(RawStreamingChoice::Reasoning {
1071 id: Some(id),
1072 content: ReasoningContent::Encrypted(data),
1073 }) if id == "rs_1" && data == "enc_blob"
1074 ));
1075 }
1076
1077 #[test]
1078 fn reasoning_done_item_without_encrypted_emits_summary_only() {
1079 let summary = vec![ReasoningSummary::SummaryText {
1080 text: "only summary".to_string(),
1081 }];
1082 let choices = reasoning_choices_from_done_item("rs_2", &summary, None);
1083
1084 assert_eq!(choices.len(), 1);
1085 assert!(matches!(
1086 choices.first(),
1087 Some(RawStreamingChoice::Reasoning {
1088 id: Some(id),
1089 content: ReasoningContent::Summary(text),
1090 }) if id == "rs_2" && text == "only summary"
1091 ));
1092 }
1093
1094 #[test]
1095 fn content_part_added_deserializes_snake_case_part_type() {
1096 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1097 "type": "response.content_part.added",
1098 "item_id": "msg_1",
1099 "output_index": 0,
1100 "content_index": 0,
1101 "sequence_number": 3,
1102 "part": {
1103 "type": "output_text",
1104 "text": "hello"
1105 }
1106 }))
1107 .expect("content part event should deserialize");
1108
1109 assert!(matches!(
1110 chunk,
1111 StreamingCompletionChunk::Delta(chunk)
1112 if matches!(
1113 chunk.data,
1114 ItemChunkKind::ContentPartAdded(_)
1115 )
1116 ));
1117 }
1118
1119 #[test]
1120 fn content_part_done_deserializes_snake_case_part_type() {
1121 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1122 "type": "response.content_part.done",
1123 "item_id": "msg_1",
1124 "output_index": 0,
1125 "content_index": 0,
1126 "sequence_number": 4,
1127 "part": {
1128 "type": "summary_text",
1129 "text": "done"
1130 }
1131 }))
1132 .expect("content part done event should deserialize");
1133
1134 assert!(matches!(
1135 chunk,
1136 StreamingCompletionChunk::Delta(chunk)
1137 if matches!(
1138 chunk.data,
1139 ItemChunkKind::ContentPartDone(_)
1140 )
1141 ));
1142 }
1143
1144 #[test]
1145 fn reasoning_summary_part_added_deserializes_snake_case_part_type() {
1146 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1147 "type": "response.reasoning_summary_part.added",
1148 "item_id": "rs_1",
1149 "output_index": 0,
1150 "summary_index": 0,
1151 "sequence_number": 5,
1152 "part": {
1153 "type": "summary_text",
1154 "text": "step 1"
1155 }
1156 }))
1157 .expect("reasoning summary part event should deserialize");
1158
1159 assert!(matches!(
1160 chunk,
1161 StreamingCompletionChunk::Delta(chunk)
1162 if matches!(
1163 chunk.data,
1164 ItemChunkKind::ReasoningSummaryPartAdded(_)
1165 )
1166 ));
1167 }
1168
1169 #[test]
1170 fn reasoning_summary_part_done_deserializes_snake_case_part_type() {
1171 let chunk: StreamingCompletionChunk = serde_json::from_value(json!({
1172 "type": "response.reasoning_summary_part.done",
1173 "item_id": "rs_1",
1174 "output_index": 0,
1175 "summary_index": 0,
1176 "sequence_number": 6,
1177 "part": {
1178 "type": "summary_text",
1179 "text": "step 2"
1180 }
1181 }))
1182 .expect("reasoning summary part done event should deserialize");
1183
1184 assert!(matches!(
1185 chunk,
1186 StreamingCompletionChunk::Delta(chunk)
1187 if matches!(
1188 chunk.data,
1189 ItemChunkKind::ReasoningSummaryPartDone(_)
1190 )
1191 ));
1192 }
1193
1194 #[tokio::test]
1195 async fn response_failed_chunk_surfaces_provider_error_without_empty_code_prefix() {
1196 let mut response = sample_response(ResponseStatus::Failed);
1197 response.error = Some(ResponseError {
1198 code: String::new(),
1199 message: "maximum context length exceeded".to_string(),
1200 });
1201
1202 let event = json!({
1203 "type": "response.failed",
1204 "sequence_number": 1,
1205 "response": response,
1206 });
1207
1208 let err = first_error_from_event(event).await;
1209
1210 assert_eq!(
1211 err.to_string(),
1212 "ProviderError: maximum context length exceeded"
1213 );
1214 }
1215
1216 #[tokio::test]
1217 async fn response_failed_chunk_surfaces_provider_error_with_code_prefix() {
1218 let mut response = sample_response(ResponseStatus::Failed);
1219 response.error = Some(ResponseError {
1220 code: "context_length_exceeded".to_string(),
1221 message: "maximum context length exceeded".to_string(),
1222 });
1223
1224 let event = json!({
1225 "type": "response.failed",
1226 "sequence_number": 1,
1227 "response": response,
1228 });
1229
1230 let err = first_error_from_event(event).await;
1231
1232 assert_eq!(
1233 err.to_string(),
1234 "ProviderError: context_length_exceeded: maximum context length exceeded"
1235 );
1236 }
1237
1238 #[tokio::test]
1239 async fn response_incomplete_chunk_uses_incomplete_details_reason() {
1240 let mut response = sample_response(ResponseStatus::Incomplete);
1241 response.incomplete_details = Some(IncompleteDetailsReason {
1242 reason: "max_output_tokens".to_string(),
1243 });
1244
1245 let event = json!({
1246 "type": "response.incomplete",
1247 "sequence_number": 1,
1248 "response": response,
1249 });
1250
1251 let err = first_error_from_event(event).await;
1252
1253 assert_eq!(
1254 err.to_string(),
1255 "ProviderError: OpenAI response stream was incomplete: max_output_tokens"
1256 );
1257 }
1258
1259 #[tokio::test]
1260 async fn response_failed_chunk_terminates_stream_without_followup_items() {
1261 let tool_call_done = json!({
1262 "type": "response.output_item.done",
1263 "sequence_number": 1,
1264 "item": {
1265 "type": "function_call",
1266 "id": "fc_123",
1267 "arguments": "{}",
1268 "call_id": "call_123",
1269 "name": "example_tool",
1270 "status": "completed"
1271 }
1272 });
1273
1274 let mut response = sample_response(ResponseStatus::Failed);
1275 response.error = Some(ResponseError {
1276 code: "server_error".to_string(),
1277 message: "response stream failed".to_string(),
1278 });
1279
1280 let failed = json!({
1281 "type": "response.failed",
1282 "sequence_number": 2,
1283 "response": response,
1284 });
1285
1286 let client = openai::Client::builder()
1287 .http_client(MockStreamingClient {
1288 sse_bytes: sse_bytes_from_json_events(&[tool_call_done, failed]),
1289 })
1290 .api_key("test-key")
1291 .build()
1292 .expect("client should build");
1293 let model = client.completion_model("gpt-5.4");
1294 let request = model.completion_request("hello").build();
1295 let mut stream = model.stream(request).await.expect("stream should start");
1296
1297 let err = stream
1298 .next()
1299 .await
1300 .expect("stream should yield an item")
1301 .expect_err("stream should surface a provider error");
1302 assert_eq!(
1303 err.to_string(),
1304 "ProviderError: server_error: response stream failed"
1305 );
1306 assert!(
1307 stream.next().await.is_none(),
1308 "stream should terminate immediately after the first terminal error"
1309 );
1310 }
1311
1312 #[tokio::test]
1313 async fn response_completed_chunk_populates_final_usage() {
1314 let mut response = sample_response(ResponseStatus::Completed);
1315 response.usage = Some(ResponsesUsage {
1316 input_tokens: 10,
1317 input_tokens_details: None,
1318 output_tokens: 5,
1319 output_tokens_details: OutputTokensDetails {
1320 reasoning_tokens: 0,
1321 },
1322 total_tokens: 15,
1323 });
1324
1325 let event = json!({
1326 "type": "response.completed",
1327 "sequence_number": 1,
1328 "response": response,
1329 });
1330
1331 let usage = final_usage_from_event(event).await;
1332 assert_eq!(usage.input_tokens, 10);
1333 assert_eq!(usage.output_tokens, 5);
1334 assert_eq!(usage.total_tokens, 15);
1335 }
1336
1337 #[tokio::test]
1338 async fn done_sentinel_is_ignored_without_debug_parse_noise() {
1339 use std::io::{self, Write};
1340 use std::sync::{Arc, Mutex};
1341
1342 #[derive(Clone)]
1343 struct SharedWriter(Arc<Mutex<Vec<u8>>>);
1344
1345 impl Write for SharedWriter {
1346 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1347 self.0
1348 .lock()
1349 .expect("log buffer mutex should not be poisoned")
1350 .extend_from_slice(buf);
1351 Ok(buf.len())
1352 }
1353
1354 fn flush(&mut self) -> io::Result<()> {
1355 Ok(())
1356 }
1357 }
1358
1359 let mut response = sample_response(ResponseStatus::Completed);
1360 response.usage = Some(ResponsesUsage {
1361 input_tokens: 4,
1362 input_tokens_details: None,
1363 output_tokens: 2,
1364 output_tokens_details: OutputTokensDetails {
1365 reasoning_tokens: 0,
1366 },
1367 total_tokens: 6,
1368 });
1369
1370 let captured = Arc::new(Mutex::new(Vec::new()));
1371 let subscriber = tracing_subscriber::fmt()
1372 .with_max_level(tracing::Level::DEBUG)
1373 .with_ansi(false)
1374 .without_time()
1375 .with_writer({
1376 let captured = captured.clone();
1377 move || SharedWriter(captured.clone())
1378 })
1379 .finish();
1380 let _guard = tracing::subscriber::set_default(subscriber);
1381
1382 let client = openai::Client::builder()
1383 .http_client(MockStreamingClient {
1384 sse_bytes: bytes::Bytes::from(format!(
1385 "data: {}\n\ndata: [DONE]\n\n",
1386 serde_json::to_string(&json!({
1387 "type": "response.completed",
1388 "sequence_number": 1,
1389 "response": response,
1390 }))
1391 .expect("response event should serialize")
1392 )),
1393 })
1394 .api_key("test-key")
1395 .build()
1396 .expect("client should build");
1397 let model = client.completion_model("gpt-5.4");
1398 let request = model.completion_request("hello").build();
1399 let mut stream = model.stream(request).await.expect("stream should start");
1400
1401 let mut final_usage = None;
1402 while let Some(item) = stream.next().await {
1403 if let StreamedAssistantContent::Final(response) =
1404 item.expect("stream should complete successfully")
1405 {
1406 final_usage = Some(response.usage);
1407 }
1408 }
1409
1410 let usage = final_usage.expect("expected final response");
1411 assert_eq!(usage.input_tokens, 4);
1412 assert_eq!(usage.output_tokens, 2);
1413 assert_eq!(usage.total_tokens, 6);
1414
1415 let logs = String::from_utf8(
1416 captured
1417 .lock()
1418 .expect("log buffer mutex should not be poisoned")
1419 .clone(),
1420 )
1421 .expect("captured logs should be valid UTF-8");
1422 assert!(
1423 !logs.contains("Couldn't deserialize SSE data as StreamingCompletionChunk"),
1424 "expected [DONE] to bypass the parse-failure debug path, logs were: {logs}"
1425 );
1426 }
1427
1428 #[tokio::test]
1430 #[ignore = "requires API key"]
1431 async fn test_openai_streaming_tools_reasoning() {
1432 let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var should exist");
1433 let client = openai::Client::new(&api_key).expect("Failed to build client");
1434 let agent = client
1435 .agent("gpt-5.2")
1436 .max_tokens(8192)
1437 .tool(ExampleTool)
1438 .additional_params(serde_json::json!({
1439 "reasoning": {"effort": "high"}
1440 }))
1441 .build();
1442
1443 let chat_history: Vec<Message> = Vec::new();
1444 let mut stream = agent
1445 .stream_chat("Call my example tool", &chat_history)
1446 .multi_turn(5)
1447 .await;
1448
1449 while let Some(item) = stream.next().await {
1450 println!("Got item: {item:?}");
1451 }
1452 }
1453}