1use crate::error::{Error, Result};
7use crate::http::client::Client;
8use crate::model::{
9 AssistantMessage, ContentBlock, Message, StopReason, StreamEvent, TextContent, ThinkingContent,
10 ToolCall, Usage, UserContent,
11};
12use crate::models::CompatConfig;
13use crate::provider::{Context, Provider, StreamOptions, ToolDef};
14use crate::sse::{SseEvent, SseStream};
15use async_trait::async_trait;
16use base64::Engine;
17use futures::StreamExt;
18use futures::stream::{self, Stream};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, VecDeque};
21use std::pin::Pin;
22use std::task::{Context as TaskContext, Poll};
23
24const OPENAI_RESPONSES_API_URL: &str = "https://api.openai.com/v1/responses";
29pub(crate) const CODEX_RESPONSES_API_URL: &str = "https://chatgpt.com/backend-api/codex/responses";
30const DEFAULT_MAX_OUTPUT_TOKENS: u32 = 4096;
31
32pub struct OpenAIResponsesProvider {
38 client: Client,
39 model: String,
40 base_url: String,
41 provider: String,
42 api: String,
43 codex_mode: bool,
44 compat: Option<CompatConfig>,
45}
46
47impl OpenAIResponsesProvider {
48 pub fn new(model: impl Into<String>) -> Self {
50 Self {
51 client: Client::new(),
52 model: model.into(),
53 base_url: OPENAI_RESPONSES_API_URL.to_string(),
54 provider: "openai".to_string(),
55 api: "openai-responses".to_string(),
56 codex_mode: false,
57 compat: None,
58 }
59 }
60
61 #[must_use]
63 pub fn with_provider_name(mut self, provider: impl Into<String>) -> Self {
64 self.provider = provider.into();
65 self
66 }
67
68 #[must_use]
70 pub fn with_api_name(mut self, api: impl Into<String>) -> Self {
71 self.api = api.into();
72 self
73 }
74
75 #[must_use]
77 pub const fn with_codex_mode(mut self, enabled: bool) -> Self {
78 self.codex_mode = enabled;
79 self
80 }
81
82 #[must_use]
84 pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
85 self.base_url = base_url.into();
86 self
87 }
88
89 #[must_use]
91 pub fn with_client(mut self, client: Client) -> Self {
92 self.client = client;
93 self
94 }
95
96 #[must_use]
98 pub fn with_compat(mut self, compat: Option<CompatConfig>) -> Self {
99 self.compat = compat;
100 self
101 }
102
103 pub fn build_request(
104 &self,
105 context: &Context<'_>,
106 options: &StreamOptions,
107 ) -> OpenAIResponsesRequest {
108 let input = build_openai_responses_input(context);
109 let tools: Option<Vec<OpenAIResponsesTool>> = if context.tools.is_empty() {
110 None
111 } else {
112 Some(
113 context
114 .tools
115 .iter()
116 .map(convert_tool_to_openai_responses)
117 .collect(),
118 )
119 };
120
121 let instructions = context.system_prompt.as_deref().map(ToString::to_string);
122
123 let (tool_choice, parallel_tool_calls, text, include, reasoning) = if self.codex_mode {
126 let effort = options
127 .thinking_level
128 .as_ref()
129 .map_or_else(|| "high".to_string(), ToString::to_string);
130 (
131 Some("auto"),
132 Some(true),
133 Some(OpenAIResponsesTextConfig {
134 verbosity: "medium",
135 }),
136 Some(vec!["reasoning.encrypted_content"]),
137 Some(OpenAIResponsesReasoning {
138 effort,
139 summary: Some("auto"),
140 }),
141 )
142 } else {
143 (None, None, None, None, None)
144 };
145
146 OpenAIResponsesRequest {
147 model: self.model.clone(),
148 input,
149 instructions,
150 temperature: options.temperature,
151 max_output_tokens: if self.codex_mode {
152 None
153 } else {
154 options.max_tokens.or(Some(DEFAULT_MAX_OUTPUT_TOKENS))
155 },
156 tools,
157 stream: true,
158 store: false,
159 tool_choice,
160 parallel_tool_calls,
161 text,
162 include,
163 reasoning,
164 }
165 }
166}
167
168fn bearer_token_from_authorization_header(value: &str) -> Option<String> {
169 let mut parts = value.split_whitespace();
170 let scheme = parts.next()?;
171 let bearer_value = parts.next()?;
172 if parts.next().is_some() {
173 return None;
174 }
175 if scheme.eq_ignore_ascii_case("bearer") && !bearer_value.trim().is_empty() {
176 Some(bearer_value.trim().to_string())
177 } else {
178 None
179 }
180}
181
182fn authorization_override(
183 options: &StreamOptions,
184 compat: Option<&CompatConfig>,
185) -> Option<String> {
186 super::first_non_empty_header_value_case_insensitive(&options.headers, &["authorization"])
187 .or_else(|| {
188 compat
189 .and_then(|compat| compat.custom_headers.as_ref())
190 .and_then(|headers| {
191 super::first_non_empty_header_value_case_insensitive(
192 headers,
193 &["authorization"],
194 )
195 })
196 })
197}
198
199#[async_trait]
200impl Provider for OpenAIResponsesProvider {
201 fn name(&self) -> &str {
202 &self.provider
203 }
204
205 fn api(&self) -> &str {
206 &self.api
207 }
208
209 fn model_id(&self) -> &str {
210 &self.model
211 }
212
213 #[allow(clippy::too_many_lines)]
214 async fn stream(
215 &self,
216 context: &Context<'_>,
217 options: &StreamOptions,
218 ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>>> {
219 let authorization_header_value = authorization_override(options, self.compat.as_ref());
220
221 let auth_value = if authorization_header_value.is_some() {
222 None
223 } else {
224 Some(
225 options
226 .api_key
227 .clone()
228 .or_else(|| std::env::var("OPENAI_API_KEY").ok())
229 .ok_or_else(|| {
230 Error::provider(
231 self.name(),
232 "Missing API key for provider. Configure credentials with /login <provider> or set the provider's API key env var.",
233 )
234 })?,
235 )
236 };
237
238 let request_body = self.build_request(context, options);
239
240 let mut request = self.client.post(&self.base_url).header(
243 "Accept",
244 "text/event-stream, application/x-ndjson, application/ndjson",
245 );
246
247 if let Some(ref auth_value) = auth_value {
248 request = request.header("Authorization", format!("Bearer {auth_value}"));
249 }
250
251 if self.codex_mode {
252 let codex_bearer = authorization_header_value
253 .as_deref()
254 .and_then(bearer_token_from_authorization_header)
255 .or_else(|| auth_value.clone())
256 .ok_or_else(|| {
257 Error::provider(
258 self.name(),
259 "OpenAI Codex mode requires a Bearer token. Provide one via /login openai-codex or an Authorization: Bearer <token> header.",
260 )
261 })?;
262 let account_id = extract_chatgpt_account_id(&codex_bearer).ok_or_else(|| {
263 Error::provider(
264 self.name(),
265 "Invalid OpenAI Codex OAuth token (missing chatgpt_account_id claim). Run /login openai-codex again.",
266 )
267 })?;
268 request = request
269 .header("chatgpt-account-id", account_id)
270 .header("OpenAI-Beta", "responses=experimental")
271 .header("originator", "pi")
272 .header("User-Agent", "pi_agent_rust");
273 if let Some(session_id) = &options.session_id {
274 request = request.header("session_id", session_id);
275 }
276 }
277
278 if let Some(compat) = &self.compat {
280 if let Some(custom_headers) = &compat.custom_headers {
281 request = super::apply_headers_ignoring_blank_auth_overrides(
282 request,
283 custom_headers,
284 &["authorization"],
285 );
286 }
287 }
288
289 request = super::apply_headers_ignoring_blank_auth_overrides(
291 request,
292 &options.headers,
293 &["authorization"],
294 );
295
296 let request = request.json(&request_body)?;
297
298 let response = Box::pin(request.send()).await?;
299 let status = response.status();
300 if !(200..300).contains(&status) {
301 let body = response
302 .text()
303 .await
304 .unwrap_or_else(|e| format!("<failed to read body: {e}>"));
305 return Err(Error::provider(
306 self.name(),
307 format!("OpenAI API error (HTTP {status}): {body}"),
308 ));
309 }
310
311 let content_type = response
317 .headers()
318 .iter()
319 .find(|(name, _)| name.eq_ignore_ascii_case("content-type"))
320 .map(|(_, value)| value.to_ascii_lowercase());
321 if content_type.is_none() && !self.codex_mode {
322 return Err(Error::api(format!(
323 "OpenAI API protocol error (HTTP {status}): missing content-type header (expected text/event-stream or application/x-ndjson)"
324 )));
325 }
326 let mut use_ndjson = false;
327 if let Some(ref ct) = content_type {
328 if ct.contains("application/x-ndjson") || ct.contains("application/ndjson") {
329 use_ndjson = true;
330 } else if !ct.contains("text/event-stream") {
331 return Err(Error::api(format!(
332 "OpenAI API protocol error (HTTP {status}): unexpected Content-Type {ct} (expected text/event-stream or application/x-ndjson)"
333 )));
334 }
335 }
336
337 let byte_stream = response.bytes_stream();
338 let event_source = if use_ndjson {
339 ResponsesEventStream::Ndjson(NdjsonStream::new(byte_stream))
340 } else {
341 ResponsesEventStream::Sse(SseStream::new(byte_stream))
342 };
343
344 let model = self.model.clone();
345 let api = self.api().to_string();
346 let provider = self.name().to_string();
347
348 let stream = stream::unfold(
349 StreamState::new(event_source, model, api, provider),
350 |mut state| async move {
351 loop {
352 if let Some(event) = state.pending_events.pop_front() {
353 return Some((Ok(event), state));
354 }
355
356 if state.finished {
359 return None;
360 }
361
362 match state.event_source.next().await {
363 Some(Ok(msg)) => {
364 state.transient_error_count = 0;
366 if msg.data == "[DONE]" {
367 if !state.finish_terminal_response() {
370 state.finish(None);
373 }
374 continue;
375 }
376
377 if let Err(e) = state.process_event(&msg.data) {
378 return Some((Err(e), state));
379 }
380 }
381 Some(Err(e)) => {
382 const MAX_CONSECUTIVE_TRANSIENT_ERRORS: usize = 5;
386 if e.kind() == std::io::ErrorKind::WriteZero
387 || e.kind() == std::io::ErrorKind::WouldBlock
388 || e.kind() == std::io::ErrorKind::TimedOut
389 {
390 state.transient_error_count += 1;
391 if state.transient_error_count <= MAX_CONSECUTIVE_TRANSIENT_ERRORS {
392 tracing::warn!(
393 kind = ?e.kind(),
394 count = state.transient_error_count,
395 "Transient error in SSE stream, continuing"
396 );
397 continue;
398 }
399 tracing::warn!(
400 kind = ?e.kind(),
401 "Error persisted after {MAX_CONSECUTIVE_TRANSIENT_ERRORS} \
402 consecutive attempts, treating as fatal"
403 );
404 }
405 let err = Error::api(format!("SSE error: {e}"));
406 return Some((Err(err), state));
407 }
408 None => {
409 if state.finish_terminal_response() {
410 continue;
411 }
412
413 return Some((
416 Err(Error::api("Stream ended without Done event")),
417 state,
418 ));
419 }
420 }
421 }
422 },
423 );
424
425 Ok(Box::pin(stream))
426 }
427}
428
429const NDJSON_MAX_EVENT_BYTES: usize = 100 * 1024 * 1024;
434
435struct NdjsonStream<S> {
436 inner: S,
437 buffer: Vec<u8>,
438 pending: VecDeque<String>,
439 finished: bool,
440 max_event_bytes: usize,
441}
442
443impl<S> NdjsonStream<S> {
444 const fn new(inner: S) -> Self {
445 Self {
446 inner,
447 buffer: Vec::new(),
448 pending: VecDeque::new(),
449 finished: false,
450 max_event_bytes: NDJSON_MAX_EVENT_BYTES,
451 }
452 }
453
454 #[cfg(test)]
455 fn with_max_event_bytes(inner: S, max_event_bytes: usize) -> Self {
456 Self {
457 inner,
458 buffer: Vec::new(),
459 pending: VecDeque::new(),
460 finished: false,
461 max_event_bytes,
462 }
463 }
464
465 fn invalid_utf8_error() -> std::io::Error {
466 std::io::Error::new(
467 std::io::ErrorKind::InvalidData,
468 "Invalid UTF-8 in NDJSON stream",
469 )
470 }
471
472 fn event_too_large_error(&self) -> std::io::Error {
473 std::io::Error::new(
474 std::io::ErrorKind::InvalidData,
475 format!("NDJSON event exceeds {} bytes", self.max_event_bytes),
476 )
477 }
478
479 fn push_line(&mut self, line_bytes: &[u8]) -> std::io::Result<()> {
480 if line_bytes.len() > self.max_event_bytes {
481 return Err(self.event_too_large_error());
482 }
483 let line = std::str::from_utf8(line_bytes).map_err(|_| Self::invalid_utf8_error())?;
484 let line = line.strip_suffix('\r').unwrap_or(line);
485 if line.trim().is_empty() {
486 return Ok(());
487 }
488 self.pending.push_back(line.to_string());
489 Ok(())
490 }
491
492 fn drain_buffer(&mut self) -> std::io::Result<()> {
493 if self.buffer.len() > self.max_event_bytes && memchr::memchr(b'\n', &self.buffer).is_none()
494 {
495 return Err(self.event_too_large_error());
496 }
497 while let Some(pos) = memchr::memchr(b'\n', &self.buffer) {
498 if pos > self.max_event_bytes {
499 return Err(self.event_too_large_error());
500 }
501 let mut line = self.buffer.drain(..=pos).collect::<Vec<u8>>();
502 if line.last() == Some(&b'\n') {
503 line.pop();
504 }
505 if !line.is_empty() {
506 self.push_line(&line)?;
507 }
508 }
509 Ok(())
510 }
511
512 fn finish_buffer(&mut self) -> std::io::Result<()> {
513 if self.buffer.is_empty() {
514 return Ok(());
515 }
516 let line = std::mem::take(&mut self.buffer);
517 self.push_line(&line)?;
518 Ok(())
519 }
520
521 fn event_from_line(line: String) -> SseEvent {
522 SseEvent {
523 data: line,
524 ..SseEvent::default()
525 }
526 }
527}
528
529impl<S> Stream for NdjsonStream<S>
530where
531 S: Stream<Item = std::io::Result<Vec<u8>>> + Unpin,
532{
533 type Item = std::io::Result<SseEvent>;
534
535 fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
536 if let Some(line) = self.pending.pop_front() {
537 return Poll::Ready(Some(Ok(Self::event_from_line(line))));
538 }
539
540 if self.finished {
541 return Poll::Ready(None);
542 }
543
544 loop {
545 match Pin::new(&mut self.inner).poll_next(cx) {
546 Poll::Ready(Some(Ok(chunk))) => {
547 self.buffer.extend_from_slice(&chunk);
548 if let Err(err) = self.drain_buffer() {
549 return Poll::Ready(Some(Err(err)));
550 }
551 if let Some(line) = self.pending.pop_front() {
552 return Poll::Ready(Some(Ok(Self::event_from_line(line))));
553 }
554 }
555 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
556 Poll::Ready(None) => {
557 self.finished = true;
558 if let Err(err) = self.finish_buffer() {
559 return Poll::Ready(Some(Err(err)));
560 }
561 if let Some(line) = self.pending.pop_front() {
562 return Poll::Ready(Some(Ok(Self::event_from_line(line))));
563 }
564 return Poll::Ready(None);
565 }
566 Poll::Pending => return Poll::Pending,
567 }
568 }
569 }
570}
571
572enum ResponsesEventStream<S>
573where
574 S: Stream<Item = std::io::Result<Vec<u8>>> + Unpin,
575{
576 Ndjson(NdjsonStream<S>),
577 Sse(SseStream<S>),
578}
579
580impl<S> Stream for ResponsesEventStream<S>
581where
582 S: Stream<Item = std::io::Result<Vec<u8>>> + Unpin,
583{
584 type Item = std::io::Result<SseEvent>;
585
586 fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
587 match self.as_mut().get_mut() {
588 Self::Ndjson(inner) => Pin::new(inner).poll_next(cx),
589 Self::Sse(inner) => Pin::new(inner).poll_next(cx),
590 }
591 }
592}
593
594#[derive(Debug, Clone, PartialEq, Eq, Hash)]
599struct TextKey {
600 item_id: String,
601 content_index: u32,
602}
603
604#[derive(Debug, Clone, PartialEq, Eq, Hash)]
605struct ReasoningKey {
606 item_id: String,
607 kind: ReasoningKind,
608 index: u32,
609}
610
611#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
612enum ReasoningKind {
613 Summary,
614 Text,
615}
616
617struct ToolCallState {
618 content_index: usize,
619 call_id: String,
620 name: String,
621 arguments: String,
622}
623
624enum TerminalContentSnapshot {
625 Text {
626 content_index: usize,
627 content: String,
628 },
629 Thinking {
630 content_index: usize,
631 content: String,
632 },
633 ToolCall {
634 content_index: usize,
635 },
636}
637
638struct StreamState<S>
639where
640 S: Stream<Item = std::result::Result<SseEvent, std::io::Error>> + Unpin,
641{
642 event_source: S,
643 partial: AssistantMessage,
644 pending_events: VecDeque<StreamEvent>,
645 started: bool,
646 finished: bool,
647 text_blocks: HashMap<TextKey, usize>,
648 reasoning_blocks: HashMap<ReasoningKey, usize>,
649 tool_calls_by_item_id: HashMap<String, ToolCallState>,
650 orphan_tool_call_arguments: HashMap<String, String>,
651 terminal_response_seen: bool,
652 terminal_incomplete_reason: Option<String>,
653 transient_error_count: usize,
655}
656
657impl<S> StreamState<S>
658where
659 S: Stream<Item = std::result::Result<SseEvent, std::io::Error>> + Unpin,
660{
661 fn new(event_source: S, model: String, api: String, provider: String) -> Self {
662 Self {
663 event_source,
664 partial: AssistantMessage {
665 content: Vec::new(),
666 api,
667 provider,
668 model,
669 usage: Usage::default(),
670 stop_reason: StopReason::Stop,
671 error_message: None,
672 timestamp: chrono::Utc::now().timestamp_millis(),
673 },
674 pending_events: VecDeque::new(),
675 started: false,
676 finished: false,
677 text_blocks: HashMap::new(),
678 reasoning_blocks: HashMap::new(),
679 tool_calls_by_item_id: HashMap::new(),
680 orphan_tool_call_arguments: HashMap::new(),
681 terminal_response_seen: false,
682 terminal_incomplete_reason: None,
683 transient_error_count: 0,
684 }
685 }
686
687 fn ensure_started(&mut self) {
688 if !self.started {
689 self.started = true;
690 self.pending_events.push_back(StreamEvent::Start {
691 partial: self.partial.clone(),
692 });
693 }
694 }
695
696 fn record_terminal_response(&mut self, incomplete_reason: Option<String>) {
697 self.terminal_response_seen = true;
698 self.terminal_incomplete_reason = incomplete_reason;
699 }
700
701 fn finish_terminal_response(&mut self) -> bool {
702 if !self.terminal_response_seen {
703 return false;
704 }
705
706 let incomplete_reason = self.terminal_incomplete_reason.take();
707 self.terminal_response_seen = false;
708 self.finish(incomplete_reason);
709 true
710 }
711
712 fn text_block_for(&mut self, item_id: String, content_index: u32) -> usize {
713 let key = TextKey {
714 item_id,
715 content_index,
716 };
717 if let Some(idx) = self.text_blocks.get(&key) {
718 return *idx;
719 }
720
721 let idx = self.partial.content.len();
722 self.partial
723 .content
724 .push(ContentBlock::Text(TextContent::new("")));
725 self.text_blocks.insert(key, idx);
726 self.pending_events
727 .push_back(StreamEvent::TextStart { content_index: idx });
728 idx
729 }
730
731 fn reasoning_block_for(&mut self, kind: ReasoningKind, item_id: String, index: u32) -> usize {
732 let key = ReasoningKey {
733 item_id,
734 kind,
735 index,
736 };
737 if let Some(idx) = self.reasoning_blocks.get(&key) {
738 return *idx;
739 }
740
741 let idx = self.partial.content.len();
742 self.partial
743 .content
744 .push(ContentBlock::Thinking(ThinkingContent {
745 thinking: String::new(),
746 thinking_signature: None,
747 }));
748 self.reasoning_blocks.insert(key, idx);
749 self.pending_events
750 .push_back(StreamEvent::ThinkingStart { content_index: idx });
751 idx
752 }
753
754 fn append_reasoning_delta(
755 &mut self,
756 kind: ReasoningKind,
757 item_id: String,
758 index: u32,
759 delta: String,
760 ) {
761 self.ensure_started();
762 let idx = self.reasoning_block_for(kind, item_id, index);
763 if let Some(ContentBlock::Thinking(block)) = self.partial.content.get_mut(idx) {
764 block.thinking.push_str(&delta);
765 }
766 self.pending_events.push_back(StreamEvent::ThinkingDelta {
767 content_index: idx,
768 delta,
769 });
770 }
771
772 fn apply_text_snapshot(&mut self, item_id: String, content_index: u32, text: String) {
773 self.ensure_started();
774 let idx = self.text_block_for(item_id, content_index);
775 let Some(ContentBlock::Text(block)) = self.partial.content.get_mut(idx) else {
776 return;
777 };
778
779 if block.text == text {
780 return;
781 }
782
783 if let Some(suffix) = text.strip_prefix(&block.text) {
784 if suffix.is_empty() {
785 return;
786 }
787 block.text.push_str(suffix);
788 self.pending_events.push_back(StreamEvent::TextDelta {
789 content_index: idx,
790 delta: suffix.to_string(),
791 });
792 return;
793 }
794
795 tracing::warn!(
796 content_index = idx,
797 old_len = block.text.len(),
798 new_len = text.len(),
799 "text snapshot does not extend accumulated text; replacing"
800 );
801 block.text = text;
802 }
803
804 fn apply_reasoning_snapshot(
805 &mut self,
806 kind: ReasoningKind,
807 item_id: String,
808 index: u32,
809 text: String,
810 ) {
811 self.ensure_started();
812 let idx = self.reasoning_block_for(kind, item_id, index);
813 let Some(ContentBlock::Thinking(block)) = self.partial.content.get_mut(idx) else {
814 return;
815 };
816
817 if block.thinking == text {
818 return;
819 }
820
821 if let Some(suffix) = text.strip_prefix(&block.thinking) {
822 if suffix.is_empty() {
823 return;
824 }
825 block.thinking.push_str(suffix);
826 self.pending_events.push_back(StreamEvent::ThinkingDelta {
827 content_index: idx,
828 delta: suffix.to_string(),
829 });
830 return;
831 }
832
833 tracing::warn!(
834 content_index = idx,
835 old_len = block.thinking.len(),
836 new_len = text.len(),
837 "reasoning snapshot does not extend accumulated thinking; replacing"
838 );
839 block.thinking = text;
840 }
841
842 #[allow(clippy::too_many_lines)]
843 fn process_event(&mut self, data: &str) -> Result<()> {
844 let chunk: OpenAIResponsesChunk = serde_json::from_str(data)
845 .map_err(|e| Error::api(format!("JSON parse error: {e}\nData: {data}")))?;
846
847 match chunk {
848 OpenAIResponsesChunk::OutputTextDelta {
849 item_id,
850 content_index,
851 delta,
852 } => {
853 self.ensure_started();
854 let idx = self.text_block_for(item_id, content_index);
855 if let Some(ContentBlock::Text(t)) = self.partial.content.get_mut(idx) {
856 t.text.push_str(&delta);
857 }
858 self.pending_events.push_back(StreamEvent::TextDelta {
859 content_index: idx,
860 delta,
861 });
862 }
863 OpenAIResponsesChunk::ReasoningSummaryTextDelta {
864 item_id,
865 summary_index,
866 delta,
867 } => {
868 self.append_reasoning_delta(ReasoningKind::Summary, item_id, summary_index, delta);
869 }
870 OpenAIResponsesChunk::ReasoningTextDelta {
871 item_id,
872 content_index,
873 delta,
874 } => {
875 self.append_reasoning_delta(ReasoningKind::Text, item_id, content_index, delta);
876 }
877 OpenAIResponsesChunk::OutputTextDone {
878 item_id,
879 content_index,
880 text,
881 } => {
882 self.apply_text_snapshot(item_id, content_index, text);
883 }
884 OpenAIResponsesChunk::ContentPartDone {
885 item_id,
886 content_index,
887 part,
888 } => match part {
889 OpenAIResponsesContentPartDone::OutputText { text } => {
890 self.apply_text_snapshot(item_id, content_index, text);
891 }
892 OpenAIResponsesContentPartDone::ReasoningText { text } => {
893 self.apply_reasoning_snapshot(
894 ReasoningKind::Text,
895 item_id,
896 content_index,
897 text,
898 );
899 }
900 OpenAIResponsesContentPartDone::Unknown => {}
901 },
902 OpenAIResponsesChunk::ReasoningTextDone {
903 item_id,
904 content_index,
905 text,
906 } => {
907 self.apply_reasoning_snapshot(ReasoningKind::Text, item_id, content_index, text);
908 }
909 OpenAIResponsesChunk::ReasoningSummaryTextDone {
910 item_id,
911 summary_index,
912 text,
913 } => {
914 self.apply_reasoning_snapshot(ReasoningKind::Summary, item_id, summary_index, text);
915 }
916 OpenAIResponsesChunk::ReasoningSummaryPartDone {
917 item_id,
918 summary_index,
919 part,
920 } => match part {
921 OpenAIResponsesReasoningSummaryPartDone::SummaryText { text } => {
922 self.apply_reasoning_snapshot(
923 ReasoningKind::Summary,
924 item_id,
925 summary_index,
926 text,
927 );
928 }
929 OpenAIResponsesReasoningSummaryPartDone::Unknown => {}
930 },
931 OpenAIResponsesChunk::OutputItemAdded { item } => {
932 if let OpenAIResponsesOutputItem::FunctionCall {
933 id,
934 call_id,
935 name,
936 arguments,
937 } = item
938 {
939 self.ensure_started();
940
941 let mut buffered_arguments = self
942 .orphan_tool_call_arguments
943 .remove(&id)
944 .unwrap_or_default();
945 buffered_arguments.insert_str(0, &arguments);
946
947 let content_index = self.partial.content.len();
948 self.partial.content.push(ContentBlock::ToolCall(ToolCall {
949 id: call_id.clone(),
950 name: name.clone(),
951 arguments: serde_json::Value::Null,
952 thought_signature: None,
953 }));
954
955 self.tool_calls_by_item_id.insert(
956 id,
957 ToolCallState {
958 content_index,
959 call_id,
960 name,
961 arguments: buffered_arguments.clone(),
962 },
963 );
964
965 self.pending_events
966 .push_back(StreamEvent::ToolCallStart { content_index });
967
968 if !buffered_arguments.is_empty() {
969 self.pending_events.push_back(StreamEvent::ToolCallDelta {
970 content_index,
971 delta: buffered_arguments,
972 });
973 }
974 }
975 }
976 OpenAIResponsesChunk::FunctionCallArgumentsDelta { item_id, delta } => {
977 self.ensure_started();
978 if let Some(tc) = self.tool_calls_by_item_id.get_mut(&item_id) {
979 tc.arguments.push_str(&delta);
980 self.pending_events.push_back(StreamEvent::ToolCallDelta {
981 content_index: tc.content_index,
982 delta,
983 });
984 } else {
985 self.orphan_tool_call_arguments
986 .entry(item_id)
987 .or_default()
988 .push_str(&delta);
989 }
990 }
991 OpenAIResponsesChunk::OutputItemDone { item } => {
992 if let OpenAIResponsesOutputItemDone::FunctionCall {
993 id,
994 call_id,
995 name,
996 arguments,
997 } = item
998 {
999 self.ensure_started();
1000 self.end_tool_call(&id, &call_id, &name, &arguments);
1001 }
1002 }
1003 OpenAIResponsesChunk::ResponseCompleted { response }
1004 | OpenAIResponsesChunk::ResponseDone { response }
1005 | OpenAIResponsesChunk::ResponseIncomplete { response } => {
1006 self.ensure_started();
1007 self.partial.usage.input = response.usage.input_tokens;
1008 self.partial.usage.output = response.usage.output_tokens;
1009 self.partial.usage.total_tokens = response
1010 .usage
1011 .total_tokens
1012 .unwrap_or(response.usage.input_tokens + response.usage.output_tokens);
1013 self.record_terminal_response(response.incomplete_reason());
1014 }
1015 OpenAIResponsesChunk::ResponseFailed { response } => {
1016 self.ensure_started();
1017 self.partial.stop_reason = StopReason::Error;
1018 self.partial.error_message = Some(
1019 response
1020 .error
1021 .and_then(|error| error.message)
1022 .unwrap_or_else(|| "Codex response failed".to_string()),
1023 );
1024 self.pending_events.push_back(StreamEvent::Error {
1025 reason: StopReason::Error,
1026 error: std::mem::take(&mut self.partial),
1027 });
1028 self.finished = true;
1029 }
1030 OpenAIResponsesChunk::Error { message } => {
1031 self.ensure_started();
1032 self.partial.stop_reason = StopReason::Error;
1033 self.partial.error_message = Some(message);
1034 self.pending_events.push_back(StreamEvent::Error {
1035 reason: StopReason::Error,
1036 error: std::mem::take(&mut self.partial),
1037 });
1038 self.finished = true;
1039 }
1040 OpenAIResponsesChunk::Unknown => {}
1041 }
1042
1043 Ok(())
1044 }
1045
1046 fn partial_has_tool_call(&self) -> bool {
1047 self.partial
1048 .content
1049 .iter()
1050 .any(|b| matches!(b, ContentBlock::ToolCall(_)))
1051 }
1052
1053 fn end_tool_call(&mut self, item_id: &str, call_id: &str, name: &str, arguments: &str) {
1054 let (mut tc, synthesized_start) = self.tool_calls_by_item_id.remove(item_id).map_or_else(
1055 || {
1056 let content_index = self.partial.content.len();
1059 self.partial.content.push(ContentBlock::ToolCall(ToolCall {
1060 id: call_id.to_string(),
1061 name: name.to_string(),
1062 arguments: serde_json::Value::Null,
1063 thought_signature: None,
1064 }));
1065 (
1066 ToolCallState {
1067 content_index,
1068 call_id: call_id.to_string(),
1069 name: name.to_string(),
1070 arguments: self
1071 .orphan_tool_call_arguments
1072 .remove(item_id)
1073 .unwrap_or_default(),
1074 },
1075 true,
1076 )
1077 },
1078 |state| (state, false),
1079 );
1080
1081 if synthesized_start {
1082 self.pending_events.push_back(StreamEvent::ToolCallStart {
1083 content_index: tc.content_index,
1084 });
1085 }
1086
1087 if !arguments.is_empty() {
1089 tc.arguments = arguments.to_string();
1090 }
1091
1092 if synthesized_start && !tc.arguments.is_empty() {
1093 self.pending_events.push_back(StreamEvent::ToolCallDelta {
1094 content_index: tc.content_index,
1095 delta: tc.arguments.clone(),
1096 });
1097 }
1098
1099 let parsed_args: serde_json::Value =
1100 serde_json::from_str(&tc.arguments).unwrap_or_else(|e| {
1101 tracing::warn!(
1102 error = %e,
1103 raw = %tc.arguments,
1104 "Failed to parse tool arguments as JSON"
1105 );
1106 serde_json::Value::Null
1107 });
1108
1109 self.partial.stop_reason = StopReason::ToolUse;
1110 self.pending_events.push_back(StreamEvent::ToolCallEnd {
1111 content_index: tc.content_index,
1112 tool_call: ToolCall {
1113 id: tc.call_id.clone(),
1114 name: tc.name.clone(),
1115 arguments: parsed_args.clone(),
1116 thought_signature: None,
1117 },
1118 });
1119
1120 if let Some(ContentBlock::ToolCall(block)) = self.partial.content.get_mut(tc.content_index)
1121 {
1122 block.id = tc.call_id;
1123 block.name = tc.name;
1124 block.arguments = parsed_args;
1125 }
1126 }
1127
1128 fn terminal_content_snapshots(&self) -> Vec<TerminalContentSnapshot> {
1129 self.partial
1130 .content
1131 .iter()
1132 .enumerate()
1133 .filter_map(|(content_index, block)| match block {
1134 ContentBlock::Text(t) => Some(TerminalContentSnapshot::Text {
1135 content_index,
1136 content: t.text.clone(),
1137 }),
1138 ContentBlock::Thinking(t) => Some(TerminalContentSnapshot::Thinking {
1139 content_index,
1140 content: t.thinking.clone(),
1141 }),
1142 ContentBlock::ToolCall(_) => {
1143 Some(TerminalContentSnapshot::ToolCall { content_index })
1144 }
1145 ContentBlock::Image(_) => None,
1146 })
1147 .collect()
1148 }
1149
1150 fn open_tool_call_snapshots_in_content_order(
1151 &self,
1152 ) -> Vec<(usize, String, String, String, String)> {
1153 let mut open_tool_calls: Vec<(usize, String, String, String, String)> = self
1154 .tool_calls_by_item_id
1155 .iter()
1156 .map(|(item_id, tc)| {
1157 (
1158 tc.content_index,
1159 item_id.clone(),
1160 tc.call_id.clone(),
1161 tc.name.clone(),
1162 tc.arguments.clone(),
1163 )
1164 })
1165 .collect();
1166 open_tool_calls.sort_by_key(|(content_index, ..)| *content_index);
1167 open_tool_calls
1168 }
1169
1170 fn finish(&mut self, incomplete_reason: Option<String>) {
1171 if self.finished {
1172 return;
1173 }
1174
1175 let mut open_tool_calls = self
1176 .open_tool_call_snapshots_in_content_order()
1177 .into_iter()
1178 .peekable();
1179
1180 for block in self.terminal_content_snapshots() {
1181 match block {
1182 TerminalContentSnapshot::Text {
1183 content_index,
1184 content,
1185 } => self.pending_events.push_back(StreamEvent::TextEnd {
1186 content_index,
1187 content,
1188 }),
1189 TerminalContentSnapshot::Thinking {
1190 content_index,
1191 content,
1192 } => self.pending_events.push_back(StreamEvent::ThinkingEnd {
1193 content_index,
1194 content,
1195 }),
1196 TerminalContentSnapshot::ToolCall { content_index } => {
1197 while let Some((_, item_id, call_id, name, arguments)) = open_tool_calls
1198 .next_if(|(tool_content_index, ..)| *tool_content_index == content_index)
1199 {
1200 self.end_tool_call(&item_id, &call_id, &name, &arguments);
1201 }
1202 }
1203 }
1204 }
1205
1206 for (_, item_id, call_id, name, arguments) in open_tool_calls {
1208 self.end_tool_call(&item_id, &call_id, &name, &arguments);
1209 }
1210
1211 if let Some(reason) = incomplete_reason {
1213 let reason_lower = reason.to_ascii_lowercase();
1214 if reason_lower.contains("max_output") || reason_lower.contains("length") {
1215 self.partial.stop_reason = StopReason::Length;
1216 } else if reason_lower.contains("tool") {
1217 self.partial.stop_reason = StopReason::ToolUse;
1218 } else if reason_lower.contains("content_filter") || reason_lower.contains("error") {
1219 self.partial.stop_reason = StopReason::Error;
1220 }
1221 } else if self.partial_has_tool_call() {
1222 self.partial.stop_reason = StopReason::ToolUse;
1223 }
1224
1225 let reason = self.partial.stop_reason;
1226 self.pending_events.push_back(StreamEvent::Done {
1227 reason,
1228 message: self.partial.clone(),
1229 });
1230 self.finished = true;
1231 }
1232}
1233
1234fn extract_chatgpt_account_id(token: &str) -> Option<String> {
1235 let mut parts = token.split('.');
1236 let _header = parts.next()?;
1237 let payload = parts.next()?;
1238 let _signature = parts.next()?;
1239 if parts.next().is_some() {
1240 return None;
1241 }
1242
1243 let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
1244 .decode(payload)
1245 .or_else(|_| base64::engine::general_purpose::URL_SAFE.decode(payload))
1246 .ok()?;
1247 let payload_json: serde_json::Value = serde_json::from_slice(&decoded).ok()?;
1248 payload_json
1249 .get("https://api.openai.com/auth")
1250 .and_then(|auth| auth.get("chatgpt_account_id"))
1251 .and_then(serde_json::Value::as_str)
1252 .map(str::trim)
1253 .filter(|value| !value.is_empty())
1254 .map(ToString::to_string)
1255}
1256
1257#[derive(Debug, Serialize)]
1262pub struct OpenAIResponsesRequest {
1263 model: String,
1264 input: Vec<OpenAIResponsesInputItem>,
1265 #[serde(skip_serializing_if = "Option::is_none")]
1266 instructions: Option<String>,
1267 #[serde(skip_serializing_if = "Option::is_none")]
1268 temperature: Option<f32>,
1269 #[serde(skip_serializing_if = "Option::is_none")]
1270 max_output_tokens: Option<u32>,
1271 #[serde(skip_serializing_if = "Option::is_none")]
1272 tools: Option<Vec<OpenAIResponsesTool>>,
1273 stream: bool,
1274 store: bool,
1275 #[serde(skip_serializing_if = "Option::is_none")]
1276 tool_choice: Option<&'static str>,
1277 #[serde(skip_serializing_if = "Option::is_none")]
1278 parallel_tool_calls: Option<bool>,
1279 #[serde(skip_serializing_if = "Option::is_none")]
1280 text: Option<OpenAIResponsesTextConfig>,
1281 #[serde(skip_serializing_if = "Option::is_none")]
1282 include: Option<Vec<&'static str>>,
1283 #[serde(skip_serializing_if = "Option::is_none")]
1284 reasoning: Option<OpenAIResponsesReasoning>,
1285}
1286
1287#[derive(Debug, Serialize)]
1288struct OpenAIResponsesTextConfig {
1289 verbosity: &'static str,
1290}
1291
1292#[derive(Debug, Serialize)]
1293struct OpenAIResponsesReasoning {
1294 effort: String,
1295 #[serde(skip_serializing_if = "Option::is_none")]
1296 summary: Option<&'static str>,
1297}
1298
1299#[derive(Debug, Serialize)]
1300#[serde(untagged)]
1301enum OpenAIResponsesInputItem {
1302 System {
1303 role: &'static str,
1304 content: String,
1305 },
1306 User {
1307 role: &'static str,
1308 content: Vec<OpenAIResponsesUserContentPart>,
1309 },
1310 Assistant {
1311 role: &'static str,
1312 content: Vec<OpenAIResponsesAssistantContentPart>,
1313 },
1314 FunctionCall {
1315 #[serde(rename = "type")]
1316 r#type: &'static str,
1317 call_id: String,
1318 name: String,
1319 arguments: String,
1320 },
1321 FunctionCallOutput {
1322 #[serde(rename = "type")]
1323 r#type: &'static str,
1324 call_id: String,
1325 output: String,
1326 },
1327}
1328
1329#[derive(Debug, Serialize)]
1330#[serde(tag = "type", rename_all = "snake_case")]
1331enum OpenAIResponsesUserContentPart {
1332 #[serde(rename = "input_text")]
1333 InputText { text: String },
1334 #[serde(rename = "input_image")]
1335 InputImage { image_url: String },
1336}
1337
1338#[derive(Debug, Serialize)]
1339#[serde(tag = "type", rename_all = "snake_case")]
1340enum OpenAIResponsesAssistantContentPart {
1341 #[serde(rename = "output_text")]
1342 OutputText { text: String },
1343}
1344
1345#[derive(Debug, Serialize)]
1346struct OpenAIResponsesTool {
1347 #[serde(rename = "type")]
1348 r#type: &'static str,
1349 name: String,
1350 #[serde(skip_serializing_if = "Option::is_none")]
1351 description: Option<String>,
1352 parameters: serde_json::Value,
1353}
1354
1355fn convert_tool_to_openai_responses(tool: &ToolDef) -> OpenAIResponsesTool {
1356 OpenAIResponsesTool {
1357 r#type: "function",
1358 name: tool.name.clone(),
1359 description: if tool.description.trim().is_empty() {
1360 None
1361 } else {
1362 Some(tool.description.clone())
1363 },
1364 parameters: tool.parameters.clone(),
1365 }
1366}
1367
1368fn build_openai_responses_input(context: &Context<'_>) -> Vec<OpenAIResponsesInputItem> {
1369 let mut input = Vec::with_capacity(context.messages.len());
1370
1371 for message in context.messages.iter() {
1376 match message {
1377 Message::User(user) => input.push(convert_user_message_to_responses(&user.content)),
1378 Message::Custom(custom) => input.push(OpenAIResponsesInputItem::User {
1379 role: "user",
1380 content: vec![OpenAIResponsesUserContentPart::InputText {
1381 text: custom.content.clone(),
1382 }],
1383 }),
1384 Message::Assistant(assistant) => {
1385 let mut pending_text = String::new();
1387
1388 for block in &assistant.content {
1389 match block {
1390 ContentBlock::Text(t) => pending_text.push_str(&t.text),
1391 ContentBlock::ToolCall(tc) => {
1392 if !pending_text.is_empty() {
1393 input.push(OpenAIResponsesInputItem::Assistant {
1394 role: "assistant",
1395 content: vec![
1396 OpenAIResponsesAssistantContentPart::OutputText {
1397 text: std::mem::take(&mut pending_text),
1398 },
1399 ],
1400 });
1401 }
1402 input.push(OpenAIResponsesInputItem::FunctionCall {
1403 r#type: "function_call",
1404 call_id: tc.id.clone(),
1405 name: tc.name.clone(),
1406 arguments: tc.arguments.to_string(),
1407 });
1408 }
1409 _ => {}
1410 }
1411 }
1412
1413 if !pending_text.is_empty() {
1414 input.push(OpenAIResponsesInputItem::Assistant {
1415 role: "assistant",
1416 content: vec![OpenAIResponsesAssistantContentPart::OutputText {
1417 text: pending_text,
1418 }],
1419 });
1420 }
1421 }
1422 Message::ToolResult(result) => {
1423 let mut out = String::new();
1424 for (i, block) in result.content.iter().enumerate() {
1425 if i > 0 {
1426 out.push('\n');
1427 }
1428 if let ContentBlock::Text(t) = block {
1429 out.push_str(&t.text);
1430 }
1431 }
1432 input.push(OpenAIResponsesInputItem::FunctionCallOutput {
1433 r#type: "function_call_output",
1434 call_id: result.tool_call_id.clone(),
1435 output: out,
1436 });
1437 }
1438 }
1439 }
1440
1441 input
1442}
1443
1444fn convert_user_message_to_responses(content: &UserContent) -> OpenAIResponsesInputItem {
1445 match content {
1446 UserContent::Text(text) => OpenAIResponsesInputItem::User {
1447 role: "user",
1448 content: vec![OpenAIResponsesUserContentPart::InputText { text: text.clone() }],
1449 },
1450 UserContent::Blocks(blocks) => {
1451 let mut parts = Vec::new();
1452 for block in blocks {
1453 match block {
1454 ContentBlock::Text(t) => {
1455 parts.push(OpenAIResponsesUserContentPart::InputText {
1456 text: t.text.clone(),
1457 });
1458 }
1459 ContentBlock::Image(img) => {
1460 let url = format!("data:{};base64,{}", img.mime_type, img.data);
1461 parts.push(OpenAIResponsesUserContentPart::InputImage { image_url: url });
1462 }
1463 _ => {}
1464 }
1465 }
1466 if parts.is_empty() {
1467 parts.push(OpenAIResponsesUserContentPart::InputText {
1468 text: String::new(),
1469 });
1470 }
1471 OpenAIResponsesInputItem::User {
1472 role: "user",
1473 content: parts,
1474 }
1475 }
1476 }
1477}
1478
1479#[derive(Debug, Deserialize)]
1484#[serde(tag = "type")]
1485enum OpenAIResponsesChunk {
1486 #[serde(rename = "response.output_text.delta")]
1487 OutputTextDelta {
1488 item_id: String,
1489 content_index: u32,
1490 delta: String,
1491 },
1492 #[serde(rename = "response.output_text.done")]
1493 OutputTextDone {
1494 item_id: String,
1495 content_index: u32,
1496 text: String,
1497 },
1498 #[serde(rename = "response.output_item.added")]
1499 OutputItemAdded { item: OpenAIResponsesOutputItem },
1500 #[serde(rename = "response.output_item.done")]
1501 OutputItemDone { item: OpenAIResponsesOutputItemDone },
1502 #[serde(rename = "response.function_call_arguments.delta")]
1503 FunctionCallArgumentsDelta { item_id: String, delta: String },
1504 #[serde(rename = "response.content_part.done")]
1505 ContentPartDone {
1506 item_id: String,
1507 content_index: u32,
1508 part: OpenAIResponsesContentPartDone,
1509 },
1510 #[serde(rename = "response.reasoning_text.delta")]
1511 ReasoningTextDelta {
1512 item_id: String,
1513 content_index: u32,
1514 delta: String,
1515 },
1516 #[serde(rename = "response.reasoning_text.done")]
1517 ReasoningTextDone {
1518 item_id: String,
1519 content_index: u32,
1520 text: String,
1521 },
1522 #[serde(rename = "response.reasoning_summary_text.delta")]
1523 ReasoningSummaryTextDelta {
1524 item_id: String,
1525 summary_index: u32,
1526 delta: String,
1527 },
1528 #[serde(rename = "response.reasoning_summary_text.done")]
1529 ReasoningSummaryTextDone {
1530 item_id: String,
1531 summary_index: u32,
1532 text: String,
1533 },
1534 #[serde(rename = "response.reasoning_summary_part.done")]
1535 ReasoningSummaryPartDone {
1536 item_id: String,
1537 summary_index: u32,
1538 part: OpenAIResponsesReasoningSummaryPartDone,
1539 },
1540 #[serde(rename = "response.completed")]
1541 ResponseCompleted {
1542 response: OpenAIResponsesDonePayload,
1543 },
1544 #[serde(rename = "response.done")]
1545 ResponseDone {
1546 response: OpenAIResponsesDonePayload,
1547 },
1548 #[serde(rename = "response.incomplete")]
1549 ResponseIncomplete {
1550 response: OpenAIResponsesDonePayload,
1551 },
1552 #[serde(rename = "response.failed")]
1553 ResponseFailed {
1554 response: OpenAIResponsesFailedPayload,
1555 },
1556 #[serde(rename = "error")]
1557 Error { message: String },
1558 #[serde(other)]
1559 Unknown,
1560}
1561
1562#[derive(Debug, Deserialize)]
1563#[serde(tag = "type")]
1564enum OpenAIResponsesContentPartDone {
1565 #[serde(rename = "output_text")]
1566 OutputText { text: String },
1567 #[serde(rename = "reasoning_text")]
1568 ReasoningText { text: String },
1569 #[serde(other)]
1570 Unknown,
1571}
1572
1573#[derive(Debug, Deserialize)]
1574#[serde(tag = "type")]
1575enum OpenAIResponsesReasoningSummaryPartDone {
1576 #[serde(rename = "summary_text")]
1577 SummaryText { text: String },
1578 #[serde(other)]
1579 Unknown,
1580}
1581
1582#[derive(Debug, Deserialize)]
1583#[serde(tag = "type")]
1584enum OpenAIResponsesOutputItem {
1585 #[serde(rename = "function_call")]
1586 FunctionCall {
1587 id: String,
1588 call_id: String,
1589 name: String,
1590 #[serde(default)]
1591 arguments: String,
1592 },
1593 #[serde(other)]
1594 Unknown,
1595}
1596
1597#[derive(Debug, Deserialize)]
1598#[serde(tag = "type")]
1599enum OpenAIResponsesOutputItemDone {
1600 #[serde(rename = "function_call")]
1601 FunctionCall {
1602 id: String,
1603 call_id: String,
1604 name: String,
1605 #[serde(default)]
1606 arguments: String,
1607 },
1608 #[serde(other)]
1609 Unknown,
1610}
1611
1612#[derive(Debug, Deserialize)]
1613struct OpenAIResponsesDonePayload {
1614 #[serde(default)]
1615 incomplete_details: Option<OpenAIResponsesIncompleteDetails>,
1616 usage: OpenAIResponsesUsage,
1617}
1618
1619#[derive(Debug, Deserialize)]
1620struct OpenAIResponsesFailedPayload {
1621 #[serde(default)]
1622 error: Option<OpenAIResponsesFailedError>,
1623}
1624
1625#[derive(Debug, Deserialize)]
1626struct OpenAIResponsesFailedError {
1627 #[serde(default)]
1628 message: Option<String>,
1629}
1630
1631#[derive(Debug, Deserialize)]
1632struct OpenAIResponsesIncompleteDetails {
1633 reason: String,
1634}
1635
1636#[derive(Debug, Deserialize)]
1637#[allow(clippy::struct_field_names)]
1638struct OpenAIResponsesUsage {
1639 input_tokens: u64,
1640 output_tokens: u64,
1641 #[serde(default)]
1642 total_tokens: Option<u64>,
1643}
1644
1645impl OpenAIResponsesDonePayload {
1646 fn incomplete_reason(&self) -> Option<String> {
1647 self.incomplete_details.as_ref().map(|d| d.reason.clone())
1648 }
1649}
1650
1651#[cfg(test)]
1656mod tests {
1657 use super::*;
1658 use asupersync::runtime::RuntimeBuilder;
1659 use futures::stream;
1660 use proptest::prelude::*;
1661 use proptest::string::string_regex;
1662 use serde_json::{Value, json};
1663 use std::collections::HashMap;
1664 use std::io::{Read, Write};
1665 use std::net::TcpListener;
1666 use std::sync::mpsc;
1667 use std::time::Duration;
1668
1669 #[test]
1670 fn test_provider_info() {
1671 let provider = OpenAIResponsesProvider::new("gpt-4o");
1672 assert_eq!(provider.name(), "openai");
1673 assert_eq!(provider.api(), "openai-responses");
1674 }
1675
1676 #[test]
1677 fn test_build_request_includes_system_tools_and_defaults() {
1678 let provider = OpenAIResponsesProvider::new("gpt-4o");
1679 let context = Context::owned(
1680 Some("System guidance".to_string()),
1681 vec![Message::User(crate::model::UserMessage {
1682 content: UserContent::Text("Ping".to_string()),
1683 timestamp: 0,
1684 })],
1685 vec![
1686 ToolDef {
1687 name: "search".to_string(),
1688 description: "Search docs".to_string(),
1689 parameters: json!({
1690 "type": "object",
1691 "properties": { "q": { "type": "string" } },
1692 "required": ["q"]
1693 }),
1694 },
1695 ToolDef {
1696 name: "blank_desc".to_string(),
1697 description: " ".to_string(),
1698 parameters: json!({ "type": "object" }),
1699 },
1700 ],
1701 );
1702 let options = StreamOptions {
1703 temperature: Some(0.3),
1704 ..Default::default()
1705 };
1706
1707 let request = provider.build_request(&context, &options);
1708 let value = serde_json::to_value(&request).expect("serialize request");
1709 assert_eq!(value["model"], "gpt-4o");
1710 let temperature = value["temperature"]
1711 .as_f64()
1712 .expect("temperature should serialize as number");
1713 assert!((temperature - 0.3).abs() < 1e-6);
1714 assert_eq!(value["max_output_tokens"], DEFAULT_MAX_OUTPUT_TOKENS);
1715 assert_eq!(value["stream"], true);
1716 assert_eq!(value["instructions"], "System guidance");
1717 assert_eq!(value["input"][0]["role"], "user");
1718 assert_eq!(value["input"][0]["content"][0]["type"], "input_text");
1719 assert_eq!(value["input"][0]["content"][0]["text"], "Ping");
1720 assert_eq!(value["tools"][0]["type"], "function");
1721 assert_eq!(value["tools"][0]["name"], "search");
1722 assert_eq!(value["tools"][0]["description"], "Search docs");
1723 assert_eq!(
1724 value["tools"][0]["parameters"],
1725 json!({
1726 "type": "object",
1727 "properties": { "q": { "type": "string" } },
1728 "required": ["q"]
1729 })
1730 );
1731 assert!(value["tools"][1].get("description").is_none());
1732 }
1733
1734 #[test]
1735 fn test_stream_parses_text_and_tool_call() {
1736 let events = vec![
1737 json!({
1738 "type": "response.output_text.delta",
1739 "item_id": "msg_1",
1740 "content_index": 0,
1741 "delta": "Hello"
1742 }),
1743 json!({
1744 "type": "response.output_item.added",
1745 "output_index": 1,
1746 "item": {
1747 "type": "function_call",
1748 "id": "fc_1",
1749 "call_id": "call_1",
1750 "name": "echo",
1751 "arguments": ""
1752 }
1753 }),
1754 json!({
1755 "type": "response.function_call_arguments.delta",
1756 "item_id": "fc_1",
1757 "output_index": 1,
1758 "delta": "{\"text\":\"hi\"}"
1759 }),
1760 json!({
1761 "type": "response.output_item.done",
1762 "output_index": 1,
1763 "item": {
1764 "type": "function_call",
1765 "id": "fc_1",
1766 "call_id": "call_1",
1767 "name": "echo",
1768 "arguments": "{\"text\":\"hi\"}",
1769 "status": "completed"
1770 }
1771 }),
1772 json!({
1773 "type": "response.completed",
1774 "response": {
1775 "incomplete_details": null,
1776 "usage": {
1777 "input_tokens": 1,
1778 "output_tokens": 2,
1779 "total_tokens": 3
1780 }
1781 }
1782 }),
1783 ];
1784
1785 let out = collect_events(&events);
1786 assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
1787 assert!(
1788 out.iter()
1789 .any(|e| matches!(e, StreamEvent::TextDelta { delta, .. } if delta == "Hello"))
1790 );
1791 assert!(out.iter().any(
1792 |e| matches!(e, StreamEvent::ToolCallEnd { tool_call, .. } if tool_call.name == "echo")
1793 ));
1794 assert!(out.iter().any(|e| matches!(
1795 e,
1796 StreamEvent::Done {
1797 reason: StopReason::ToolUse,
1798 ..
1799 }
1800 )));
1801 }
1802
1803 #[test]
1804 fn test_stream_accumulates_function_call_arguments_deltas() {
1805 let events = vec![
1806 json!({
1807 "type": "response.output_item.added",
1808 "item": {
1809 "type": "function_call",
1810 "id": "fc_2",
1811 "call_id": "call_2",
1812 "name": "search",
1813 "arguments": "{\"q\":\"ru"
1814 }
1815 }),
1816 json!({
1817 "type": "response.function_call_arguments.delta",
1818 "item_id": "fc_2",
1819 "delta": "st\"}"
1820 }),
1821 json!({
1822 "type": "response.output_item.done",
1823 "item": {
1824 "type": "function_call",
1825 "id": "fc_2",
1826 "call_id": "call_2",
1827 "name": "search",
1828 "arguments": ""
1829 }
1830 }),
1831 json!({
1832 "type": "response.completed",
1833 "response": {
1834 "incomplete_details": null,
1835 "usage": {
1836 "input_tokens": 1,
1837 "output_tokens": 1,
1838 "total_tokens": 2
1839 }
1840 }
1841 }),
1842 ];
1843
1844 let out = collect_events(&events);
1845 let tool_end = out
1846 .iter()
1847 .find_map(|event| match event {
1848 StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
1849 _ => None,
1850 })
1851 .expect("tool call end");
1852 assert_eq!(tool_end.id, "call_2");
1853 assert_eq!(tool_end.name, "search");
1854 assert_eq!(tool_end.arguments, json!({ "q": "rust" }));
1855 }
1856
1857 #[test]
1858 fn test_stream_synthesizes_tool_call_start_when_done_arrives_first() {
1859 let events = vec![
1860 json!({
1861 "type": "response.output_item.done",
1862 "item": {
1863 "type": "function_call",
1864 "id": "fc_3",
1865 "call_id": "call_3",
1866 "name": "echo",
1867 "arguments": "{\"text\":\"late start\"}",
1868 "status": "completed"
1869 }
1870 }),
1871 json!({
1872 "type": "response.completed",
1873 "response": {
1874 "incomplete_details": null,
1875 "usage": {
1876 "input_tokens": 1,
1877 "output_tokens": 1,
1878 "total_tokens": 2
1879 }
1880 }
1881 }),
1882 ];
1883
1884 let out = collect_events(&events);
1885 let start_idx = out
1886 .iter()
1887 .position(|event| matches!(event, StreamEvent::ToolCallStart { .. }))
1888 .expect("tool call start");
1889 let delta_idx = out
1890 .iter()
1891 .position(|event| matches!(event, StreamEvent::ToolCallDelta { delta, .. } if delta == "{\"text\":\"late start\"}"))
1892 .expect("tool call delta");
1893 let end_idx = out
1894 .iter()
1895 .position(|event| matches!(event, StreamEvent::ToolCallEnd { .. }))
1896 .expect("tool call end");
1897
1898 assert!(start_idx < delta_idx);
1899 assert!(delta_idx < end_idx);
1900
1901 let tool_end = out
1902 .iter()
1903 .find_map(|event| match event {
1904 StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
1905 _ => None,
1906 })
1907 .expect("tool call end");
1908 assert_eq!(tool_end.id, "call_3");
1909 assert_eq!(tool_end.name, "echo");
1910 assert_eq!(tool_end.arguments, json!({ "text": "late start" }));
1911 assert!(matches!(
1912 out.last(),
1913 Some(StreamEvent::Done {
1914 reason: StopReason::ToolUse,
1915 ..
1916 })
1917 ));
1918 }
1919
1920 #[test]
1921 fn test_stream_preserves_orphan_tool_call_deltas_until_done() {
1922 let events = vec![
1923 json!({
1924 "type": "response.function_call_arguments.delta",
1925 "item_id": "fc_4",
1926 "delta": "{\"text\":\"buffered\"}"
1927 }),
1928 json!({
1929 "type": "response.output_item.done",
1930 "item": {
1931 "type": "function_call",
1932 "id": "fc_4",
1933 "call_id": "call_4",
1934 "name": "echo",
1935 "arguments": "",
1936 "status": "completed"
1937 }
1938 }),
1939 json!({
1940 "type": "response.completed",
1941 "response": {
1942 "incomplete_details": null,
1943 "usage": {
1944 "input_tokens": 1,
1945 "output_tokens": 1,
1946 "total_tokens": 2
1947 }
1948 }
1949 }),
1950 ];
1951
1952 let out = collect_events(&events);
1953 assert!(
1954 out.iter()
1955 .any(|event| matches!(event, StreamEvent::ToolCallStart { .. }))
1956 );
1957 assert!(out.iter().any(
1958 |event| matches!(event, StreamEvent::ToolCallDelta { delta, .. } if delta == "{\"text\":\"buffered\"}")
1959 ));
1960
1961 let tool_end = out
1962 .iter()
1963 .find_map(|event| match event {
1964 StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
1965 _ => None,
1966 })
1967 .expect("tool call end");
1968 assert_eq!(tool_end.id, "call_4");
1969 assert_eq!(tool_end.arguments, json!({ "text": "buffered" }));
1970 }
1971
1972 #[test]
1973 fn test_stream_reads_late_tool_call_events_after_response_completed() {
1974 let body = [
1975 r#"data: {"type":"response.function_call_arguments.delta","item_id":"fc_5","delta":"{\"text\":\"late tool\"}"}"#,
1976 "",
1977 r#"data: {"type":"response.completed","response":{"incomplete_details":null,"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}"#,
1978 "",
1979 r#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_5","call_id":"call_5","name":"echo","arguments":"","status":"completed"}}"#,
1980 "",
1981 "data: [DONE]",
1982 "",
1983 ]
1984 .join("\n");
1985
1986 let out = collect_stream_events_from_body(&body);
1987 let start_idx = out
1988 .iter()
1989 .position(|event| matches!(event, StreamEvent::ToolCallStart { .. }))
1990 .expect("tool call start");
1991 let delta_idx = out
1992 .iter()
1993 .position(|event| matches!(event, StreamEvent::ToolCallDelta { delta, .. } if delta == "{\"text\":\"late tool\"}"))
1994 .expect("tool call delta");
1995 let end_idx = out
1996 .iter()
1997 .position(|event| matches!(event, StreamEvent::ToolCallEnd { .. }))
1998 .expect("tool call end");
1999 let done_idx = out
2000 .iter()
2001 .position(|event| matches!(event, StreamEvent::Done { .. }))
2002 .expect("done event");
2003
2004 assert!(start_idx < delta_idx);
2005 assert!(delta_idx < end_idx);
2006 assert!(end_idx < done_idx);
2007
2008 let tool_end = out
2009 .iter()
2010 .find_map(|event| match event {
2011 StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
2012 _ => None,
2013 })
2014 .expect("tool call end");
2015 assert_eq!(tool_end.id, "call_5");
2016 assert_eq!(tool_end.arguments, json!({ "text": "late tool" }));
2017 }
2018
2019 #[test]
2020 fn test_stream_materializes_output_text_done_without_deltas() {
2021 let events = vec![
2022 json!({
2023 "type": "response.output_text.done",
2024 "item_id": "msg_done",
2025 "content_index": 0,
2026 "text": "done-only text"
2027 }),
2028 json!({
2029 "type": "response.completed",
2030 "response": {
2031 "incomplete_details": null,
2032 "usage": {
2033 "input_tokens": 1,
2034 "output_tokens": 1,
2035 "total_tokens": 2
2036 }
2037 }
2038 }),
2039 ];
2040
2041 let out = collect_events(&events);
2042 assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
2043 assert!(matches!(
2044 out.get(1),
2045 Some(StreamEvent::TextStart { content_index: 0 })
2046 ));
2047 assert!(matches!(
2048 out.get(2),
2049 Some(StreamEvent::TextDelta {
2050 content_index: 0,
2051 delta,
2052 }) if delta == "done-only text"
2053 ));
2054 assert!(matches!(
2055 out.iter().find(|event| matches!(event, StreamEvent::TextEnd { .. })),
2056 Some(StreamEvent::TextEnd {
2057 content_index: 0,
2058 content,
2059 }) if content == "done-only text"
2060 ));
2061 assert!(matches!(
2062 out.last(),
2063 Some(StreamEvent::Done {
2064 reason: StopReason::Stop,
2065 ..
2066 })
2067 ));
2068 }
2069
2070 #[test]
2071 fn test_stream_backfills_missing_output_text_suffix_from_done_event() {
2072 let events = vec![
2073 json!({
2074 "type": "response.output_text.delta",
2075 "item_id": "msg_suffix",
2076 "content_index": 0,
2077 "delta": "Hello"
2078 }),
2079 json!({
2080 "type": "response.output_text.done",
2081 "item_id": "msg_suffix",
2082 "content_index": 0,
2083 "text": "Hello world"
2084 }),
2085 json!({
2086 "type": "response.completed",
2087 "response": {
2088 "incomplete_details": null,
2089 "usage": {
2090 "input_tokens": 1,
2091 "output_tokens": 2,
2092 "total_tokens": 3
2093 }
2094 }
2095 }),
2096 ];
2097
2098 let out = collect_events(&events);
2099 let text_deltas: Vec<(usize, String)> = out
2100 .iter()
2101 .filter_map(|event| match event {
2102 StreamEvent::TextDelta {
2103 content_index,
2104 delta,
2105 } => Some((*content_index, delta.clone())),
2106 _ => None,
2107 })
2108 .collect();
2109 assert_eq!(
2110 text_deltas,
2111 vec![(0, "Hello".to_string()), (0, " world".to_string())]
2112 );
2113 assert!(matches!(
2114 out.iter().find(|event| matches!(event, StreamEvent::TextEnd { .. })),
2115 Some(StreamEvent::TextEnd {
2116 content_index: 0,
2117 content,
2118 }) if content == "Hello world"
2119 ));
2120 }
2121
2122 #[test]
2123 fn test_stream_materializes_reasoning_summary_part_done_without_deltas() {
2124 let events = vec![
2125 json!({
2126 "type": "response.reasoning_summary_part.done",
2127 "item_id": "rs_done",
2128 "summary_index": 0,
2129 "part": {
2130 "type": "summary_text",
2131 "text": "Plan first, answer second."
2132 }
2133 }),
2134 json!({
2135 "type": "response.completed",
2136 "response": {
2137 "incomplete_details": null,
2138 "usage": {
2139 "input_tokens": 1,
2140 "output_tokens": 1,
2141 "total_tokens": 2
2142 }
2143 }
2144 }),
2145 ];
2146
2147 let out = collect_events(&events);
2148 assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
2149 assert!(matches!(
2150 out.get(1),
2151 Some(StreamEvent::ThinkingStart { content_index: 0 })
2152 ));
2153 assert!(matches!(
2154 out.get(2),
2155 Some(StreamEvent::ThinkingDelta {
2156 content_index: 0,
2157 delta,
2158 }) if delta == "Plan first, answer second."
2159 ));
2160 assert!(matches!(
2161 out.iter()
2162 .find(|event| matches!(event, StreamEvent::ThinkingEnd { .. })),
2163 Some(StreamEvent::ThinkingEnd {
2164 content_index: 0,
2165 content,
2166 }) if content == "Plan first, answer second."
2167 ));
2168 }
2169
2170 #[test]
2171 fn test_stream_materializes_reasoning_text_done_without_deltas() {
2172 let events = vec![
2173 json!({
2174 "type": "response.reasoning_text.done",
2175 "item_id": "rt_done",
2176 "content_index": 0,
2177 "text": "Private chain of thought."
2178 }),
2179 json!({
2180 "type": "response.completed",
2181 "response": {
2182 "incomplete_details": null,
2183 "usage": {
2184 "input_tokens": 1,
2185 "output_tokens": 1,
2186 "total_tokens": 2
2187 }
2188 }
2189 }),
2190 ];
2191
2192 let out = collect_events(&events);
2193 assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
2194 assert!(matches!(
2195 out.get(1),
2196 Some(StreamEvent::ThinkingStart { content_index: 0 })
2197 ));
2198 assert!(matches!(
2199 out.get(2),
2200 Some(StreamEvent::ThinkingDelta {
2201 content_index: 0,
2202 delta,
2203 }) if delta == "Private chain of thought."
2204 ));
2205 assert!(matches!(
2206 out.iter()
2207 .find(|event| matches!(event, StreamEvent::ThinkingEnd { .. })),
2208 Some(StreamEvent::ThinkingEnd {
2209 content_index: 0,
2210 content,
2211 }) if content == "Private chain of thought."
2212 ));
2213 }
2214
2215 #[test]
2216 fn test_stream_separates_reasoning_summary_and_reasoning_text_for_same_item() {
2217 let events = vec![
2218 json!({
2219 "type": "response.reasoning_summary_text.delta",
2220 "item_id": "rs_same",
2221 "summary_index": 0,
2222 "delta": "summary"
2223 }),
2224 json!({
2225 "type": "response.reasoning_text.delta",
2226 "item_id": "rs_same",
2227 "content_index": 0,
2228 "delta": "full reasoning"
2229 }),
2230 json!({
2231 "type": "response.completed",
2232 "response": {
2233 "incomplete_details": null,
2234 "usage": {
2235 "input_tokens": 1,
2236 "output_tokens": 1,
2237 "total_tokens": 2
2238 }
2239 }
2240 }),
2241 ];
2242
2243 let out = collect_events(&events);
2244 let thinking_starts: Vec<usize> = out
2245 .iter()
2246 .filter_map(|event| match event {
2247 StreamEvent::ThinkingStart { content_index } => Some(*content_index),
2248 _ => None,
2249 })
2250 .collect();
2251 assert_eq!(thinking_starts, vec![0, 1]);
2252
2253 let thinking_deltas: Vec<(usize, String)> = out
2254 .iter()
2255 .filter_map(|event| match event {
2256 StreamEvent::ThinkingDelta {
2257 content_index,
2258 delta,
2259 } => Some((*content_index, delta.clone())),
2260 _ => None,
2261 })
2262 .collect();
2263 assert_eq!(
2264 thinking_deltas,
2265 vec![
2266 (0, "summary".to_string()),
2267 (1, "full reasoning".to_string())
2268 ]
2269 );
2270
2271 let thinking_ends: Vec<(usize, String)> = out
2272 .iter()
2273 .filter_map(|event| match event {
2274 StreamEvent::ThinkingEnd {
2275 content_index,
2276 content,
2277 } => Some((*content_index, content.clone())),
2278 _ => None,
2279 })
2280 .collect();
2281 assert_eq!(
2282 thinking_ends,
2283 vec![
2284 (0, "summary".to_string()),
2285 (1, "full reasoning".to_string())
2286 ]
2287 );
2288 }
2289
2290 #[test]
2291 fn test_finish_emits_terminal_block_end_events_in_content_order() {
2292 let runtime = RuntimeBuilder::current_thread()
2293 .build()
2294 .expect("runtime build");
2295 runtime.block_on(async move {
2296 let byte_stream = stream::iter(vec![Ok(Vec::new())]);
2297 let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2298 let mut state = StreamState::new(
2299 event_source,
2300 "gpt-test".to_string(),
2301 "openai-responses".to_string(),
2302 "openai".to_string(),
2303 );
2304
2305 state
2306 .process_event(
2307 &json!({
2308 "type": "response.output_text.delta",
2309 "item_id": "msg_1",
2310 "content_index": 0,
2311 "delta": "hello",
2312 })
2313 .to_string(),
2314 )
2315 .expect("text delta");
2316 state
2317 .process_event(
2318 &json!({
2319 "type": "response.reasoning_summary_text.delta",
2320 "item_id": "rs_1",
2321 "summary_index": 0,
2322 "delta": "think",
2323 })
2324 .to_string(),
2325 )
2326 .expect("reasoning delta");
2327 state
2328 .process_event(
2329 &json!({
2330 "type": "response.output_text.delta",
2331 "item_id": "msg_2",
2332 "content_index": 0,
2333 "delta": "world",
2334 })
2335 .to_string(),
2336 )
2337 .expect("second text delta");
2338
2339 state.finish(None);
2340
2341 let terminal_end_kinds: Vec<(&'static str, usize)> = state
2342 .pending_events
2343 .iter()
2344 .filter_map(|event| match event {
2345 StreamEvent::TextEnd { content_index, .. } => Some(("text", *content_index)),
2346 StreamEvent::ThinkingEnd { content_index, .. } => {
2347 Some(("thinking", *content_index))
2348 }
2349 _ => None,
2350 })
2351 .collect();
2352
2353 assert_eq!(
2354 terminal_end_kinds,
2355 vec![("text", 0), ("thinking", 1), ("text", 2)]
2356 );
2357 });
2358 }
2359
2360 #[test]
2361 fn test_open_tool_call_snapshots_sort_by_content_index() {
2362 let runtime = RuntimeBuilder::current_thread()
2363 .build()
2364 .expect("runtime build");
2365 runtime.block_on(async move {
2366 let byte_stream = stream::iter(vec![Ok(Vec::new())]);
2367 let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2368 let mut state = StreamState::new(
2369 event_source,
2370 "gpt-test".to_string(),
2371 "openai-responses".to_string(),
2372 "openai".to_string(),
2373 );
2374
2375 state.tool_calls_by_item_id.insert(
2376 "late".to_string(),
2377 ToolCallState {
2378 content_index: 4,
2379 call_id: "call-late".to_string(),
2380 name: "later".to_string(),
2381 arguments: "{\"b\":2}".to_string(),
2382 },
2383 );
2384 state.tool_calls_by_item_id.insert(
2385 "early".to_string(),
2386 ToolCallState {
2387 content_index: 1,
2388 call_id: "call-early".to_string(),
2389 name: "earlier".to_string(),
2390 arguments: "{\"a\":1}".to_string(),
2391 },
2392 );
2393
2394 let ordered_ids: Vec<String> = state
2395 .open_tool_call_snapshots_in_content_order()
2396 .into_iter()
2397 .map(|(_, item_id, ..)| item_id)
2398 .collect();
2399
2400 assert_eq!(ordered_ids, vec!["early".to_string(), "late".to_string()]);
2401 });
2402 }
2403
2404 #[test]
2405 fn test_finish_keeps_open_tool_call_end_in_content_order() {
2406 let runtime = RuntimeBuilder::current_thread()
2407 .build()
2408 .expect("runtime build");
2409 runtime.block_on(async move {
2410 let byte_stream = stream::iter(vec![Ok(Vec::new())]);
2411 let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2412 let mut state = StreamState::new(
2413 event_source,
2414 "gpt-test".to_string(),
2415 "openai-responses".to_string(),
2416 "openai".to_string(),
2417 );
2418
2419 state
2420 .process_event(
2421 &json!({
2422 "type": "response.output_text.delta",
2423 "item_id": "msg_1",
2424 "content_index": 0,
2425 "delta": "before",
2426 })
2427 .to_string(),
2428 )
2429 .expect("first text delta");
2430 state
2431 .process_event(
2432 &json!({
2433 "type": "response.output_item.added",
2434 "item": {
2435 "type": "function_call",
2436 "id": "fc_7",
2437 "call_id": "call_7",
2438 "name": "echo",
2439 "arguments": "{\"text\":\"mid\"}"
2440 }
2441 })
2442 .to_string(),
2443 )
2444 .expect("tool call added");
2445 state
2446 .process_event(
2447 &json!({
2448 "type": "response.output_text.delta",
2449 "item_id": "msg_2",
2450 "content_index": 0,
2451 "delta": "after",
2452 })
2453 .to_string(),
2454 )
2455 .expect("second text delta");
2456
2457 state.finish(None);
2458
2459 let terminal_event_order: Vec<(&'static str, usize)> = state
2460 .pending_events
2461 .iter()
2462 .filter_map(|event| match event {
2463 StreamEvent::TextEnd { content_index, .. } => Some(("text", *content_index)),
2464 StreamEvent::ToolCallEnd { content_index, .. } => {
2465 Some(("tool", *content_index))
2466 }
2467 _ => None,
2468 })
2469 .collect();
2470
2471 assert_eq!(
2472 terminal_event_order,
2473 vec![("text", 0), ("tool", 1), ("text", 2)]
2474 );
2475 });
2476 }
2477
2478 #[test]
2479 fn test_stream_sets_bearer_auth_header() {
2480 let captured = run_stream_and_capture_headers().expect("captured request");
2481 assert_eq!(
2482 captured.headers.get("authorization").map(String::as_str),
2483 Some("Bearer test-openai-key")
2484 );
2485 assert_eq!(
2486 captured.headers.get("accept").map(String::as_str),
2487 Some("text/event-stream, application/x-ndjson, application/ndjson")
2488 );
2489
2490 let body: Value = serde_json::from_str(&captured.body).expect("request body json");
2491 assert_eq!(body["stream"], true);
2492 assert_eq!(body["input"][0]["role"], "user");
2493 assert_eq!(body["input"][0]["content"][0]["type"], "input_text");
2494 }
2495
2496 fn build_test_jwt(account_id: &str) -> String {
2497 let header = base64::engine::general_purpose::URL_SAFE_NO_PAD
2498 .encode(br#"{"alg":"none","typ":"JWT"}"#);
2499 let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(
2500 serde_json::to_vec(&json!({
2501 "https://api.openai.com/auth": {
2502 "chatgpt_account_id": account_id
2503 }
2504 }))
2505 .expect("payload json"),
2506 );
2507 format!("{header}.{payload}.sig")
2508 }
2509
2510 #[test]
2511 fn test_bearer_token_parser_accepts_case_insensitive_scheme() {
2512 let parsed_bearer = super::bearer_token_from_authorization_header("bEaReR abc.def.ghi");
2513 assert_eq!(parsed_bearer.as_deref(), Some("abc.def.ghi"));
2514 assert!(super::bearer_token_from_authorization_header("Basic abc").is_none());
2515 assert!(super::bearer_token_from_authorization_header("Bearer").is_none());
2516 }
2517
2518 #[test]
2519 fn test_codex_mode_adds_required_headers_with_authorization_override() {
2520 let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
2521 let provider = OpenAIResponsesProvider::new("gpt-4o")
2522 .with_provider_name("openai-codex")
2523 .with_api_name("openai-codex-responses")
2524 .with_codex_mode(true)
2525 .with_base_url(base_url);
2526 let context = Context::owned(
2527 None,
2528 vec![Message::User(crate::model::UserMessage {
2529 content: UserContent::Text("ping".to_string()),
2530 timestamp: 0,
2531 })],
2532 Vec::new(),
2533 );
2534 let test_jwt = build_test_jwt("acct_test_123");
2535 let mut headers = HashMap::new();
2536 headers.insert("Authorization".to_string(), format!("Bearer {test_jwt}"));
2537 let options = StreamOptions {
2538 headers,
2539 session_id: Some("session-abc".to_string()),
2540 ..Default::default()
2541 };
2542
2543 let runtime = RuntimeBuilder::current_thread()
2544 .build()
2545 .expect("runtime build");
2546 runtime.block_on(async {
2547 let mut stream = provider.stream(&context, &options).await.expect("stream");
2548 while let Some(event) = stream.next().await {
2549 if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
2550 break;
2551 }
2552 }
2553 });
2554
2555 let captured = rx.recv_timeout(Duration::from_secs(2)).expect("captured");
2556 let expected_auth = format!("Bearer {test_jwt}");
2557 assert_eq!(
2558 captured.headers.get("authorization").map(String::as_str),
2559 Some(expected_auth.as_str())
2560 );
2561 assert_eq!(captured.header_count("authorization"), 1);
2562 assert_eq!(
2563 captured.headers.get("user-agent").map(String::as_str),
2564 Some("pi_agent_rust")
2565 );
2566 assert_eq!(captured.header_count("user-agent"), 1);
2567 assert_eq!(
2568 captured
2569 .headers
2570 .get("chatgpt-account-id")
2571 .map(String::as_str),
2572 Some("acct_test_123")
2573 );
2574 assert_eq!(
2575 captured.headers.get("openai-beta").map(String::as_str),
2576 Some("responses=experimental")
2577 );
2578 assert_eq!(
2579 captured.headers.get("session_id").map(String::as_str),
2580 Some("session-abc")
2581 );
2582 }
2583
2584 #[test]
2585 fn test_codex_mode_accepts_compat_authorization_header_without_api_key() {
2586 let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
2587 let test_jwt = build_test_jwt("acct_compat_456");
2588 let mut custom_headers = HashMap::new();
2589 custom_headers.insert("Authorization".to_string(), format!("Bearer {test_jwt}"));
2590 let provider = OpenAIResponsesProvider::new("gpt-4o")
2591 .with_provider_name("openai-codex")
2592 .with_api_name("openai-codex-responses")
2593 .with_codex_mode(true)
2594 .with_base_url(base_url)
2595 .with_compat(Some(CompatConfig {
2596 custom_headers: Some(custom_headers),
2597 ..Default::default()
2598 }));
2599 let context = Context::owned(
2600 None,
2601 vec![Message::User(crate::model::UserMessage {
2602 content: UserContent::Text("ping".to_string()),
2603 timestamp: 0,
2604 })],
2605 Vec::new(),
2606 );
2607
2608 let runtime = RuntimeBuilder::current_thread()
2609 .build()
2610 .expect("runtime build");
2611 runtime.block_on(async {
2612 let mut stream = provider
2613 .stream(&context, &StreamOptions::default())
2614 .await
2615 .expect("stream");
2616 while let Some(event) = stream.next().await {
2617 if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
2618 break;
2619 }
2620 }
2621 });
2622
2623 let captured = rx.recv_timeout(Duration::from_secs(2)).expect("captured");
2624 assert_eq!(captured.header_count("authorization"), 1);
2625 assert_eq!(
2626 captured.headers.get("authorization").map(String::as_str),
2627 Some(format!("Bearer {test_jwt}").as_str())
2628 );
2629 assert_eq!(
2630 captured
2631 .headers
2632 .get("chatgpt-account-id")
2633 .map(String::as_str),
2634 Some("acct_compat_456")
2635 );
2636 }
2637
2638 #[test]
2639 fn test_stream_accepts_ndjson_content_type() {
2640 let body = concat!(
2641 "{\"type\":\"response.output_text.delta\",\"item_id\":\"msg_1\",\"content_index\":0,\"delta\":\"ok\"}\n",
2642 "{\"type\":\"response.completed\",\"response\":{\"incomplete_details\":null,\"usage\":{\"input_tokens\":1,\"output_tokens\":1,\"total_tokens\":2}}}\n"
2643 );
2644 let (base_url, _rx) = spawn_test_server(200, "application/x-ndjson", body);
2645 let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2646 let context = Context::owned(
2647 None,
2648 vec![Message::User(crate::model::UserMessage {
2649 content: UserContent::Text("ping".to_string()),
2650 timestamp: 0,
2651 })],
2652 Vec::new(),
2653 );
2654 let options = StreamOptions {
2655 api_key: Some("test-openai-key".to_string()),
2656 ..Default::default()
2657 };
2658
2659 let runtime = RuntimeBuilder::current_thread()
2660 .build()
2661 .expect("runtime build");
2662 runtime.block_on(async {
2663 let mut stream = provider.stream(&context, &options).await.expect("stream");
2664 let mut saw_delta = false;
2665 let mut done_usage: Option<Usage> = None;
2666
2667 while let Some(event) = stream.next().await {
2668 let event = event.expect("stream event");
2669 match event {
2670 StreamEvent::TextDelta { delta, .. } if delta == "ok" => {
2671 saw_delta = true;
2672 }
2673 StreamEvent::Done { message, .. } => {
2674 let usage = message.usage;
2675 done_usage = Some(usage);
2676 break;
2677 }
2678 StreamEvent::Error { .. } => {
2679 panic!("unexpected error while reading NDJSON stream");
2680 }
2681 _ => {}
2682 }
2683 }
2684
2685 assert!(saw_delta, "expected NDJSON text delta");
2686 let usage = done_usage.expect("missing Done usage");
2687 assert_eq!(usage.input, 1);
2688 assert_eq!(usage.output, 1);
2689 assert_eq!(usage.total_tokens, 2);
2690 });
2691 }
2692
2693 fn ndjson_body_from_events(events: &[Value]) -> String {
2694 let mut lines = Vec::with_capacity(events.len());
2695 for event in events {
2696 let data = serde_json::to_string(event).expect("serialize event");
2697 lines.push(data);
2698 }
2699 let mut body = lines.join("\n");
2700 body.push('\n');
2701 body
2702 }
2703
2704 fn collect_events(events: &[Value]) -> Vec<StreamEvent> {
2705 let runtime = RuntimeBuilder::current_thread()
2706 .build()
2707 .expect("runtime build");
2708 runtime.block_on(async move {
2709 let byte_stream = stream::iter(events.iter().map(|event| {
2710 let data = serde_json::to_string(event).expect("serialize event");
2711 Ok(format!("data: {data}\n\n").into_bytes())
2712 }));
2713
2714 let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2715 let mut state = StreamState::new(
2716 event_source,
2717 "gpt-test".to_string(),
2718 "openai-responses".to_string(),
2719 "openai".to_string(),
2720 );
2721
2722 let mut out = Vec::new();
2723 loop {
2724 if let Some(event) = state.pending_events.pop_front() {
2725 out.push(event);
2726 continue;
2727 }
2728
2729 if state.finished {
2730 break;
2731 }
2732
2733 match state.event_source.next().await {
2734 Some(item) => {
2735 let msg = item.expect("SSE event");
2736 if msg.data == "[DONE]" {
2737 if !state.finish_terminal_response() {
2738 state.finish(None);
2739 }
2740 continue;
2741 }
2742 state.process_event(&msg.data).expect("process_event");
2743 }
2744 None => {
2745 assert!(
2746 state.finish_terminal_response(),
2747 "stream ended without Done event"
2748 );
2749 }
2750 }
2751 }
2752
2753 out
2754 })
2755 }
2756
2757 fn collect_stream_events_from_body(body: &str) -> Vec<StreamEvent> {
2758 let (base_url, _rx) = spawn_test_server(200, "text/event-stream", body);
2759 let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2760 let context = Context::owned(
2761 None,
2762 vec![Message::User(crate::model::UserMessage {
2763 content: UserContent::Text("ping".to_string()),
2764 timestamp: 0,
2765 })],
2766 Vec::new(),
2767 );
2768 let options = StreamOptions {
2769 api_key: Some("test-openai-key".to_string()),
2770 ..Default::default()
2771 };
2772
2773 let runtime = RuntimeBuilder::current_thread()
2774 .build()
2775 .expect("runtime build");
2776 runtime.block_on(async {
2777 let mut stream = provider.stream(&context, &options).await.expect("stream");
2778 let mut out = Vec::new();
2779 while let Some(event) = stream.next().await {
2780 let event = event.expect("stream event");
2781 let is_terminal =
2782 matches!(event, StreamEvent::Done { .. } | StreamEvent::Error { .. });
2783 out.push(event);
2784 if is_terminal {
2785 break;
2786 }
2787 }
2788 out
2789 })
2790 }
2791
2792 fn collect_stream_events_from_ndjson_body(body: &str) -> Vec<StreamEvent> {
2793 let (base_url, _rx) = spawn_test_server(200, "application/x-ndjson", body);
2794 let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2795 let context = Context::owned(
2796 None,
2797 vec![Message::User(crate::model::UserMessage {
2798 content: UserContent::Text("ping".to_string()),
2799 timestamp: 0,
2800 })],
2801 Vec::new(),
2802 );
2803 let options = StreamOptions {
2804 api_key: Some("test-openai-key".to_string()),
2805 ..Default::default()
2806 };
2807
2808 let runtime = RuntimeBuilder::current_thread()
2809 .build()
2810 .expect("runtime build");
2811 runtime.block_on(async {
2812 let mut stream = provider.stream(&context, &options).await.expect("stream");
2813 let mut out = Vec::new();
2814 while let Some(event) = stream.next().await {
2815 let event = event.expect("stream event");
2816 let is_terminal =
2817 matches!(event, StreamEvent::Done { .. } | StreamEvent::Error { .. });
2818 out.push(event);
2819 if is_terminal {
2820 break;
2821 }
2822 }
2823 out
2824 })
2825 }
2826
2827 #[derive(Debug)]
2828 struct CapturedRequest {
2829 headers: HashMap<String, String>,
2830 header_lines: Vec<(String, String)>,
2831 body: String,
2832 }
2833
2834 impl CapturedRequest {
2835 fn header_count(&self, name: &str) -> usize {
2836 self.header_lines
2837 .iter()
2838 .filter(|(key, _)| key.eq_ignore_ascii_case(name))
2839 .count()
2840 }
2841 }
2842
2843 fn run_stream_and_capture_headers() -> Option<CapturedRequest> {
2844 let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
2845 let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2846 let context = Context::owned(
2847 None,
2848 vec![Message::User(crate::model::UserMessage {
2849 content: UserContent::Text("ping".to_string()),
2850 timestamp: 0,
2851 })],
2852 Vec::new(),
2853 );
2854 let options = StreamOptions {
2855 api_key: Some("test-openai-key".to_string()),
2856 ..Default::default()
2857 };
2858
2859 let runtime = RuntimeBuilder::current_thread()
2860 .build()
2861 .expect("runtime build");
2862 runtime.block_on(async {
2863 let mut stream = provider.stream(&context, &options).await.expect("stream");
2864 while let Some(event) = stream.next().await {
2865 if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
2866 break;
2867 }
2868 }
2869 });
2870
2871 rx.recv_timeout(Duration::from_secs(2)).ok()
2872 }
2873
2874 fn success_sse_body() -> String {
2875 [
2876 r#"data: {"type":"response.output_text.delta","item_id":"msg_1","content_index":0,"delta":"ok"}"#,
2877 "",
2878 r#"data: {"type":"response.completed","response":{"incomplete_details":null,"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}"#,
2879 "",
2880 ]
2881 .join("\n")
2882 }
2883
2884 fn spawn_test_server(
2885 status_code: u16,
2886 content_type: &str,
2887 body: &str,
2888 ) -> (String, mpsc::Receiver<CapturedRequest>) {
2889 let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
2890 let addr = listener.local_addr().expect("local addr");
2891 let (tx, rx) = mpsc::channel();
2892 let body = body.to_string();
2893 let content_type = content_type.to_string();
2894
2895 std::thread::spawn(move || {
2896 let (mut socket, _) = listener.accept().expect("accept");
2897 socket
2898 .set_read_timeout(Some(Duration::from_secs(2)))
2899 .expect("set read timeout");
2900
2901 let mut bytes = Vec::new();
2902 let mut chunk = [0_u8; 4096];
2903 loop {
2904 match socket.read(&mut chunk) {
2905 Ok(0) => break,
2906 Ok(n) => {
2907 bytes.extend_from_slice(&chunk[..n]);
2908 if bytes.windows(4).any(|window| window == b"\r\n\r\n") {
2909 break;
2910 }
2911 }
2912 Err(err)
2913 if err.kind() == std::io::ErrorKind::WouldBlock
2914 || err.kind() == std::io::ErrorKind::TimedOut =>
2915 {
2916 break;
2917 }
2918 Err(err) => assert!(false, "request header read failed: {err}"),
2919 }
2920 }
2921
2922 let header_end = bytes
2923 .windows(4)
2924 .position(|window| window == b"\r\n\r\n")
2925 .expect("request header boundary");
2926 let header_text = String::from_utf8_lossy(&bytes[..header_end]).to_string();
2927 let (headers, header_lines) = parse_headers(&header_text);
2928 let mut request_body = bytes[header_end + 4..].to_vec();
2929
2930 let content_length = headers
2931 .get("content-length")
2932 .and_then(|value| value.parse::<usize>().ok())
2933 .unwrap_or(0);
2934 while request_body.len() < content_length {
2935 match socket.read(&mut chunk) {
2936 Ok(0) => break,
2937 Ok(n) => request_body.extend_from_slice(&chunk[..n]),
2938 Err(err)
2939 if err.kind() == std::io::ErrorKind::WouldBlock
2940 || err.kind() == std::io::ErrorKind::TimedOut =>
2941 {
2942 break;
2943 }
2944 Err(err) => assert!(false, "request body read failed: {err}"),
2945 }
2946 }
2947
2948 let captured = CapturedRequest {
2949 headers,
2950 header_lines,
2951 body: String::from_utf8_lossy(&request_body).to_string(),
2952 };
2953 tx.send(captured).expect("send captured request");
2954
2955 let reason = match status_code {
2956 401 => "Unauthorized",
2957 500 => "Internal Server Error",
2958 _ => "OK",
2959 };
2960 let response = format!(
2961 "HTTP/1.1 {status_code} {reason}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
2962 body.len()
2963 );
2964 socket
2965 .write_all(response.as_bytes())
2966 .expect("write response");
2967 socket.flush().expect("flush response");
2968 });
2969
2970 (format!("http://{addr}/responses"), rx)
2971 }
2972
2973 fn parse_headers(header_text: &str) -> (HashMap<String, String>, Vec<(String, String)>) {
2974 let mut headers = HashMap::new();
2975 let mut header_lines = Vec::new();
2976 for line in header_text.lines().skip(1) {
2977 if let Some((name, value)) = line.split_once(':') {
2978 let normalized_name = name.trim().to_ascii_lowercase();
2979 let normalized_value = value.trim().to_string();
2980 header_lines.push((normalized_name.clone(), normalized_value.clone()));
2981 headers.insert(normalized_name, normalized_value);
2982 }
2983 }
2984 (headers, header_lines)
2985 }
2986
2987 #[derive(Debug, Deserialize)]
2992 struct ProviderFixture {
2993 cases: Vec<ProviderCase>,
2994 }
2995
2996 #[derive(Debug, Deserialize)]
2997 struct ProviderCase {
2998 name: String,
2999 events: Vec<Value>,
3000 expected: Vec<EventSummary>,
3001 }
3002
3003 #[derive(Debug, Deserialize, Serialize, PartialEq)]
3004 struct EventSummary {
3005 kind: String,
3006 #[serde(default)]
3007 content_index: Option<usize>,
3008 #[serde(default)]
3009 delta: Option<String>,
3010 #[serde(default)]
3011 content: Option<String>,
3012 #[serde(default)]
3013 reason: Option<String>,
3014 }
3015
3016 fn load_fixture(file_name: &str) -> ProviderFixture {
3017 let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3018 .join("tests/fixtures/provider_responses")
3019 .join(file_name);
3020 let data = std::fs::read_to_string(&path).expect("read fixture file");
3021 serde_json::from_str(&data).expect("parse fixture JSON")
3022 }
3023
3024 fn summarize_event(event: &StreamEvent) -> EventSummary {
3025 match event {
3026 StreamEvent::Start { .. } => EventSummary {
3027 kind: "start".to_string(),
3028 content_index: None,
3029 delta: None,
3030 content: None,
3031 reason: None,
3032 },
3033 StreamEvent::TextStart { content_index, .. } => EventSummary {
3034 kind: "text_start".to_string(),
3035 content_index: Some(*content_index),
3036 delta: None,
3037 content: None,
3038 reason: None,
3039 },
3040 StreamEvent::TextDelta {
3041 content_index,
3042 delta,
3043 ..
3044 } => EventSummary {
3045 kind: "text_delta".to_string(),
3046 content_index: Some(*content_index),
3047 delta: Some(delta.clone()),
3048 content: None,
3049 reason: None,
3050 },
3051 StreamEvent::TextEnd {
3052 content_index,
3053 content,
3054 ..
3055 } => EventSummary {
3056 kind: "text_end".to_string(),
3057 content_index: Some(*content_index),
3058 delta: None,
3059 content: Some(content.clone()),
3060 reason: None,
3061 },
3062 StreamEvent::Done { reason, .. } => EventSummary {
3063 kind: "done".to_string(),
3064 content_index: None,
3065 delta: None,
3066 content: None,
3067 reason: Some(reason_to_string(*reason)),
3068 },
3069 StreamEvent::Error { reason, .. } => EventSummary {
3070 kind: "error".to_string(),
3071 content_index: None,
3072 delta: None,
3073 content: None,
3074 reason: Some(reason_to_string(*reason)),
3075 },
3076 _ => EventSummary {
3077 kind: "other".to_string(),
3078 content_index: None,
3079 delta: None,
3080 content: None,
3081 reason: None,
3082 },
3083 }
3084 }
3085
3086 fn reason_to_string(reason: StopReason) -> String {
3087 match reason {
3088 StopReason::Stop => "stop".to_string(),
3089 StopReason::ToolUse => "tool_use".to_string(),
3090 StopReason::Length => "length".to_string(),
3091 StopReason::Error => "error".to_string(),
3092 StopReason::Aborted => "aborted".to_string(),
3093 }
3094 }
3095
3096 #[test]
3097 fn test_stream_fixtures() {
3098 let fixture = load_fixture("openai_responses_stream.json");
3099 for case in fixture.cases {
3100 let events = collect_events(&case.events);
3101 let summaries: Vec<EventSummary> = events.iter().map(summarize_event).collect();
3102 assert_eq!(summaries, case.expected, "case: {}", case.name);
3103 }
3104 }
3105
3106 #[test]
3107 fn test_stream_ndjson_fixtures() {
3108 let fixture = load_fixture("openai_responses_stream.json");
3109 for case in fixture.cases {
3110 let body = ndjson_body_from_events(&case.events);
3111 let events = collect_stream_events_from_ndjson_body(&body);
3112 let summaries: Vec<EventSummary> = events.iter().map(summarize_event).collect();
3113 assert_eq!(summaries, case.expected, "case: {}", case.name);
3114 }
3115 }
3116
3117 #[test]
3118 fn ndjson_rejects_oversized_event() {
3119 let delta = "x".repeat(128);
3120 let event = json!({
3121 "type": "response.output_text.delta",
3122 "item_id": "msg_1",
3123 "content_index": 0,
3124 "delta": delta,
3125 });
3126 let line = serde_json::to_string(&event).expect("serialize event");
3127 let max = line.len().saturating_sub(1).max(1);
3128 let body = format!("{line}\n");
3129
3130 let runtime = RuntimeBuilder::current_thread()
3131 .build()
3132 .expect("runtime build");
3133 let err = runtime.block_on(async move {
3134 let byte_stream = stream::iter(vec![Ok(body.into_bytes())]);
3135 let mut event_source = NdjsonStream::with_max_event_bytes(byte_stream, max);
3136 match event_source.next().await {
3137 Some(Err(e)) => Some(e),
3138 _ => None,
3139 }
3140 });
3141
3142 let err = err.expect("expected oversized NDJSON error");
3143 assert!(
3144 err.to_string().contains("NDJSON event exceeds"),
3145 "unexpected error: {err}"
3146 );
3147 }
3148
3149 fn ndjson_body_for_deltas(deltas: &[String]) -> String {
3150 let mut lines = Vec::with_capacity(deltas.len() + 1);
3151 for delta in deltas {
3152 let event = json!({
3153 "type": "response.output_text.delta",
3154 "item_id": "msg_1",
3155 "content_index": 0,
3156 "delta": delta,
3157 });
3158 lines.push(serde_json::to_string(&event).expect("serialize event"));
3159 }
3160 let done = json!({
3161 "type": "response.completed",
3162 "response": {
3163 "incomplete_details": null,
3164 "usage": { "input_tokens": 1, "output_tokens": 1, "total_tokens": 2 }
3165 }
3166 });
3167 lines.push(serde_json::to_string(&done).expect("serialize event"));
3168 let mut body = lines.join("\n");
3169 body.push('\n');
3170 body
3171 }
3172
3173 fn chunk_bytes(input: &[u8], chunk_sizes: &[usize]) -> Vec<Vec<u8>> {
3174 if chunk_sizes.is_empty() {
3175 return vec![input.to_vec()];
3176 }
3177 let mut chunks = Vec::new();
3178 let mut offset = 0usize;
3179 let mut idx = 0usize;
3180 while offset < input.len() {
3181 let size = chunk_sizes[idx % chunk_sizes.len()].max(1);
3182 let end = (offset + size).min(input.len());
3183 chunks.push(input[offset..end].to_vec());
3184 offset = end;
3185 idx += 1;
3186 }
3187 chunks
3188 }
3189
3190 fn collect_ndjson_events_from_chunks(body: &str, chunk_sizes: &[usize]) -> Vec<StreamEvent> {
3191 let bytes = body.as_bytes();
3192 let chunks = chunk_bytes(bytes, chunk_sizes);
3193 let runtime = RuntimeBuilder::current_thread()
3194 .build()
3195 .expect("runtime build");
3196 runtime.block_on(async move {
3197 let byte_stream = stream::iter(chunks.into_iter().map(Ok));
3198 let event_source = NdjsonStream::new(byte_stream);
3199 let mut state = StreamState::new(
3200 event_source,
3201 "gpt-test".to_string(),
3202 "openai-responses".to_string(),
3203 "openai".to_string(),
3204 );
3205
3206 let mut out = Vec::new();
3207 loop {
3208 if let Some(event) = state.pending_events.pop_front() {
3209 out.push(event);
3210 continue;
3211 }
3212
3213 if state.finished {
3214 break;
3215 }
3216
3217 match state.event_source.next().await {
3218 Some(item) => {
3219 let msg = item.expect("NDJSON event");
3220 state.process_event(&msg.data).expect("process_event");
3221 }
3222 None => {
3223 assert!(
3224 state.finish_terminal_response(),
3225 "stream ended without Done event"
3226 );
3227 }
3228 }
3229 }
3230
3231 out
3232 })
3233 }
3234
3235 fn summarize_events(events: &[StreamEvent]) -> Vec<EventSummary> {
3236 events.iter().map(summarize_event).collect()
3237 }
3238
3239 fn delta_strategy() -> impl Strategy<Value = String> {
3240 string_regex("[a-z0-9 ]{1,8}").expect("delta regex")
3241 }
3242
3243 proptest! {
3244 #![proptest_config(ProptestConfig {
3245 cases: 64,
3246 max_shrink_iters: 64,
3247 .. ProptestConfig::default()
3248 })]
3249
3250 #[test]
3251 fn ndjson_chunking_invariant(
3252 deltas in prop::collection::vec(delta_strategy(), 1..6),
3253 chunk_sizes in prop::collection::vec(1usize..16, 0..8),
3254 ) {
3255 let body = ndjson_body_for_deltas(&deltas);
3256 let expected = summarize_events(&collect_ndjson_events_from_chunks(&body, &[]));
3257 let actual = summarize_events(&collect_ndjson_events_from_chunks(&body, &chunk_sizes));
3258 prop_assert_eq!(actual, expected);
3259 }
3260 }
3261}
3262
3263#[cfg(feature = "fuzzing")]
3268pub mod fuzz {
3269 use super::*;
3270 use futures::stream;
3271 use std::pin::Pin;
3272
3273 type FuzzByteStream =
3274 Pin<Box<futures::stream::Empty<std::result::Result<Vec<u8>, std::io::Error>>>>;
3275 type FuzzStream = crate::sse::SseStream<FuzzByteStream>;
3276
3277 pub struct Processor(StreamState<FuzzStream>);
3279
3280 impl Default for Processor {
3281 fn default() -> Self {
3282 Self::new()
3283 }
3284 }
3285
3286 impl Processor {
3287 pub fn new() -> Self {
3289 let empty = stream::empty::<std::result::Result<Vec<u8>, std::io::Error>>();
3290 let event_source = crate::sse::SseStream::new(Box::pin(empty));
3291 Self(StreamState::new(
3292 event_source,
3293 "gpt-responses-fuzz".into(),
3294 "openai-responses".into(),
3295 "openai".into(),
3296 ))
3297 }
3298
3299 pub fn process_event(&mut self, data: &str) -> crate::error::Result<Vec<StreamEvent>> {
3301 self.0.process_event(data)?;
3302 Ok(self.0.pending_events.drain(..).collect())
3303 }
3304 }
3305}