Skip to main content

pi/providers/
openai_responses.rs

1//! OpenAI Responses API provider implementation.
2//!
3//! This module implements the Provider trait for the OpenAI `responses` endpoint,
4//! supporting streaming output text and function tool calls.
5
6use crate::error::{Error, Result};
7use crate::http::client::Client;
8use crate::model::{
9    AssistantMessage, ContentBlock, Message, StopReason, StreamEvent, TextContent, ThinkingContent,
10    ToolCall, Usage, UserContent,
11};
12use crate::models::CompatConfig;
13use crate::provider::{Context, Provider, StreamOptions, ToolDef};
14use crate::sse::{SseEvent, SseStream};
15use async_trait::async_trait;
16use base64::Engine;
17use futures::StreamExt;
18use futures::stream::{self, Stream};
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, VecDeque};
21use std::pin::Pin;
22use std::task::{Context as TaskContext, Poll};
23
24// ============================================================================
25// Constants
26// ============================================================================
27
28const OPENAI_RESPONSES_API_URL: &str = "https://api.openai.com/v1/responses";
29pub(crate) const CODEX_RESPONSES_API_URL: &str = "https://chatgpt.com/backend-api/codex/responses";
30const DEFAULT_MAX_OUTPUT_TOKENS: u32 = 4096;
31
32// ============================================================================
33// OpenAI Responses Provider
34// ============================================================================
35
36/// OpenAI Responses API provider.
37pub struct OpenAIResponsesProvider {
38    client: Client,
39    model: String,
40    base_url: String,
41    provider: String,
42    api: String,
43    codex_mode: bool,
44    compat: Option<CompatConfig>,
45}
46
47impl OpenAIResponsesProvider {
48    /// Create a new OpenAI Responses provider.
49    pub fn new(model: impl Into<String>) -> Self {
50        Self {
51            client: Client::new(),
52            model: model.into(),
53            base_url: OPENAI_RESPONSES_API_URL.to_string(),
54            provider: "openai".to_string(),
55            api: "openai-responses".to_string(),
56            codex_mode: false,
57            compat: None,
58        }
59    }
60
61    /// Override the provider name reported in streamed events.
62    #[must_use]
63    pub fn with_provider_name(mut self, provider: impl Into<String>) -> Self {
64        self.provider = provider.into();
65        self
66    }
67
68    /// Override API identifier reported in streamed events.
69    #[must_use]
70    pub fn with_api_name(mut self, api: impl Into<String>) -> Self {
71        self.api = api.into();
72        self
73    }
74
75    /// Enable OpenAI Codex Responses mode (ChatGPT OAuth endpoint + headers).
76    #[must_use]
77    pub const fn with_codex_mode(mut self, enabled: bool) -> Self {
78        self.codex_mode = enabled;
79        self
80    }
81
82    /// Create with a custom base URL.
83    #[must_use]
84    pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
85        self.base_url = base_url.into();
86        self
87    }
88
89    /// Create with a custom HTTP client (VCR, test harness, etc.).
90    #[must_use]
91    pub fn with_client(mut self, client: Client) -> Self {
92        self.client = client;
93        self
94    }
95
96    /// Attach provider-specific compatibility overrides.
97    #[must_use]
98    pub fn with_compat(mut self, compat: Option<CompatConfig>) -> Self {
99        self.compat = compat;
100        self
101    }
102
103    pub fn build_request(
104        &self,
105        context: &Context<'_>,
106        options: &StreamOptions,
107    ) -> OpenAIResponsesRequest {
108        let input = build_openai_responses_input(context);
109        let tools: Option<Vec<OpenAIResponsesTool>> = if context.tools.is_empty() {
110            None
111        } else {
112            Some(
113                context
114                    .tools
115                    .iter()
116                    .map(convert_tool_to_openai_responses)
117                    .collect(),
118            )
119        };
120
121        let instructions = context.system_prompt.as_deref().map(ToString::to_string);
122
123        // Codex mode requires additional fields per the TS reference implementation.
124        // tool_choice and parallel_tool_calls are always sent (not conditional on tools).
125        let (tool_choice, parallel_tool_calls, text, include, reasoning) = if self.codex_mode {
126            let effort = options
127                .thinking_level
128                .as_ref()
129                .map_or_else(|| "high".to_string(), ToString::to_string);
130            (
131                Some("auto"),
132                Some(true),
133                Some(OpenAIResponsesTextConfig {
134                    verbosity: "medium",
135                }),
136                Some(vec!["reasoning.encrypted_content"]),
137                Some(OpenAIResponsesReasoning {
138                    effort,
139                    summary: Some("auto"),
140                }),
141            )
142        } else {
143            (None, None, None, None, None)
144        };
145
146        OpenAIResponsesRequest {
147            model: self.model.clone(),
148            input,
149            instructions,
150            temperature: options.temperature,
151            max_output_tokens: if self.codex_mode {
152                None
153            } else {
154                options.max_tokens.or(Some(DEFAULT_MAX_OUTPUT_TOKENS))
155            },
156            tools,
157            stream: true,
158            store: false,
159            tool_choice,
160            parallel_tool_calls,
161            text,
162            include,
163            reasoning,
164        }
165    }
166}
167
168fn bearer_token_from_authorization_header(value: &str) -> Option<String> {
169    let mut parts = value.split_whitespace();
170    let scheme = parts.next()?;
171    let bearer_value = parts.next()?;
172    if parts.next().is_some() {
173        return None;
174    }
175    if scheme.eq_ignore_ascii_case("bearer") && !bearer_value.trim().is_empty() {
176        Some(bearer_value.trim().to_string())
177    } else {
178        None
179    }
180}
181
182fn authorization_override(
183    options: &StreamOptions,
184    compat: Option<&CompatConfig>,
185) -> Option<String> {
186    super::first_non_empty_header_value_case_insensitive(&options.headers, &["authorization"])
187        .or_else(|| {
188            compat
189                .and_then(|compat| compat.custom_headers.as_ref())
190                .and_then(|headers| {
191                    super::first_non_empty_header_value_case_insensitive(
192                        headers,
193                        &["authorization"],
194                    )
195                })
196        })
197}
198
199#[async_trait]
200impl Provider for OpenAIResponsesProvider {
201    fn name(&self) -> &str {
202        &self.provider
203    }
204
205    fn api(&self) -> &str {
206        &self.api
207    }
208
209    fn model_id(&self) -> &str {
210        &self.model
211    }
212
213    #[allow(clippy::too_many_lines)]
214    async fn stream(
215        &self,
216        context: &Context<'_>,
217        options: &StreamOptions,
218    ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>>> {
219        let authorization_header_value = authorization_override(options, self.compat.as_ref());
220
221        let auth_value = if authorization_header_value.is_some() {
222            None
223        } else {
224            Some(
225                options
226                    .api_key
227                    .clone()
228                    .or_else(|| std::env::var("OPENAI_API_KEY").ok())
229                    .ok_or_else(|| {
230                        Error::provider(
231                            self.name(),
232                            "Missing API key for provider. Configure credentials with /login <provider> or set the provider's API key env var.",
233                        )
234                    })?,
235            )
236        };
237
238        let request_body = self.build_request(context, options);
239
240        // Note: Content-Type is set by .json() below; setting it here too
241        // produces a duplicate header that OpenAI's server rejects.
242        let mut request = self.client.post(&self.base_url).header(
243            "Accept",
244            "text/event-stream, application/x-ndjson, application/ndjson",
245        );
246
247        if let Some(ref auth_value) = auth_value {
248            request = request.header("Authorization", format!("Bearer {auth_value}"));
249        }
250
251        if self.codex_mode {
252            let codex_bearer = authorization_header_value
253                .as_deref()
254                .and_then(bearer_token_from_authorization_header)
255                .or_else(|| auth_value.clone())
256                .ok_or_else(|| {
257                    Error::provider(
258                        self.name(),
259                        "OpenAI Codex mode requires a Bearer token. Provide one via /login openai-codex or an Authorization: Bearer <token> header.",
260                    )
261                })?;
262            let account_id = extract_chatgpt_account_id(&codex_bearer).ok_or_else(|| {
263                Error::provider(
264                    self.name(),
265                    "Invalid OpenAI Codex OAuth token (missing chatgpt_account_id claim). Run /login openai-codex again.",
266                )
267            })?;
268            request = request
269                .header("chatgpt-account-id", account_id)
270                .header("OpenAI-Beta", "responses=experimental")
271                .header("originator", "pi")
272                .header("User-Agent", "pi_agent_rust");
273            if let Some(session_id) = &options.session_id {
274                request = request.header("session_id", session_id);
275            }
276        }
277
278        // Apply provider-specific custom headers from compat config.
279        if let Some(compat) = &self.compat {
280            if let Some(custom_headers) = &compat.custom_headers {
281                request = super::apply_headers_ignoring_blank_auth_overrides(
282                    request,
283                    custom_headers,
284                    &["authorization"],
285                );
286            }
287        }
288
289        // Per-request headers from StreamOptions (highest priority).
290        request = super::apply_headers_ignoring_blank_auth_overrides(
291            request,
292            &options.headers,
293            &["authorization"],
294        );
295
296        let request = request.json(&request_body)?;
297
298        let response = Box::pin(request.send()).await?;
299        let status = response.status();
300        if !(200..300).contains(&status) {
301            let body = response
302                .text()
303                .await
304                .unwrap_or_else(|e| format!("<failed to read body: {e}>"));
305            return Err(Error::provider(
306                self.name(),
307                format!("OpenAI API error (HTTP {status}): {body}"),
308            ));
309        }
310
311        // Validate Content-Type when present. For standard OpenAI Responses
312        // endpoints we fail closed if the header is missing; for Codex we allow
313        // it because some endpoints omit the header while still streaming SSE.
314        // If the header IS present and indicates a non-SSE type, reject early
315        // so we fail closed instead of silently dropping events in the SSE parser.
316        let content_type = response
317            .headers()
318            .iter()
319            .find(|(name, _)| name.eq_ignore_ascii_case("content-type"))
320            .map(|(_, value)| value.to_ascii_lowercase());
321        if content_type.is_none() && !self.codex_mode {
322            return Err(Error::api(format!(
323                "OpenAI API protocol error (HTTP {status}): missing content-type header (expected text/event-stream or application/x-ndjson)"
324            )));
325        }
326        let mut use_ndjson = false;
327        if let Some(ref ct) = content_type {
328            if ct.contains("application/x-ndjson") || ct.contains("application/ndjson") {
329                use_ndjson = true;
330            } else if !ct.contains("text/event-stream") {
331                return Err(Error::api(format!(
332                    "OpenAI API protocol error (HTTP {status}): unexpected Content-Type {ct} (expected text/event-stream or application/x-ndjson)"
333                )));
334            }
335        }
336
337        let byte_stream = response.bytes_stream();
338        let event_source = if use_ndjson {
339            ResponsesEventStream::Ndjson(NdjsonStream::new(byte_stream))
340        } else {
341            ResponsesEventStream::Sse(SseStream::new(byte_stream))
342        };
343
344        let model = self.model.clone();
345        let api = self.api().to_string();
346        let provider = self.name().to_string();
347
348        let stream = stream::unfold(
349            StreamState::new(event_source, model, api, provider),
350            |mut state| async move {
351                loop {
352                    if let Some(event) = state.pending_events.pop_front() {
353                        return Some((Ok(event), state));
354                    }
355
356                    // We may have queued terminal events (ToolCallEnd, Done, etc) after
357                    // consuming the final wire chunk. Only stop once the queue is empty.
358                    if state.finished {
359                        return None;
360                    }
361
362                    match state.event_source.next().await {
363                        Some(Ok(msg)) => {
364                            // A successful chunk resets the consecutive error counter.
365                            state.transient_error_count = 0;
366                            if msg.data == "[DONE]" {
367                                // Response terminal metadata can arrive before trailing item
368                                // events, so only emit Done once the wire stream itself ends.
369                                if !state.finish_terminal_response() {
370                                    // Best-effort fallback: if we didn't see a completed or
371                                    // incomplete chunk, emit Done using current state.
372                                    state.finish(None);
373                                }
374                                continue;
375                            }
376
377                            if let Err(e) = state.process_event(&msg.data) {
378                                return Some((Err(e), state));
379                            }
380                        }
381                        Some(Err(e)) => {
382                            // WriteZero, WouldBlock, and TimedOut errors are treated as transient.
383                            // Skip them and keep reading the stream, but cap
384                            // consecutive occurrences to avoid infinite loops.
385                            const MAX_CONSECUTIVE_TRANSIENT_ERRORS: usize = 5;
386                            if e.kind() == std::io::ErrorKind::WriteZero
387                                || e.kind() == std::io::ErrorKind::WouldBlock
388                                || e.kind() == std::io::ErrorKind::TimedOut
389                            {
390                                state.transient_error_count += 1;
391                                if state.transient_error_count <= MAX_CONSECUTIVE_TRANSIENT_ERRORS {
392                                    tracing::warn!(
393                                        kind = ?e.kind(),
394                                        count = state.transient_error_count,
395                                        "Transient error in SSE stream, continuing"
396                                    );
397                                    continue;
398                                }
399                                tracing::warn!(
400                                    kind = ?e.kind(),
401                                    "Error persisted after {MAX_CONSECUTIVE_TRANSIENT_ERRORS} \
402                                     consecutive attempts, treating as fatal"
403                                );
404                            }
405                            let err = Error::api(format!("SSE error: {e}"));
406                            return Some((Err(err), state));
407                        }
408                        None => {
409                            if state.finish_terminal_response() {
410                                continue;
411                            }
412
413                            // If the stream ends unexpectedly, surface an error. This matches the
414                            // agent loop expectation that providers emit Done/Error explicitly.
415                            return Some((
416                                Err(Error::api("Stream ended without Done event")),
417                                state,
418                            ));
419                        }
420                    }
421                }
422            },
423        );
424
425        Ok(Box::pin(stream))
426    }
427}
428
429// ============================================================================
430// NDJSON Stream
431// ============================================================================
432
433const NDJSON_MAX_EVENT_BYTES: usize = 100 * 1024 * 1024;
434
435struct NdjsonStream<S> {
436    inner: S,
437    buffer: Vec<u8>,
438    pending: VecDeque<String>,
439    finished: bool,
440    max_event_bytes: usize,
441}
442
443impl<S> NdjsonStream<S> {
444    const fn new(inner: S) -> Self {
445        Self {
446            inner,
447            buffer: Vec::new(),
448            pending: VecDeque::new(),
449            finished: false,
450            max_event_bytes: NDJSON_MAX_EVENT_BYTES,
451        }
452    }
453
454    #[cfg(test)]
455    fn with_max_event_bytes(inner: S, max_event_bytes: usize) -> Self {
456        Self {
457            inner,
458            buffer: Vec::new(),
459            pending: VecDeque::new(),
460            finished: false,
461            max_event_bytes,
462        }
463    }
464
465    fn invalid_utf8_error() -> std::io::Error {
466        std::io::Error::new(
467            std::io::ErrorKind::InvalidData,
468            "Invalid UTF-8 in NDJSON stream",
469        )
470    }
471
472    fn event_too_large_error(&self) -> std::io::Error {
473        std::io::Error::new(
474            std::io::ErrorKind::InvalidData,
475            format!("NDJSON event exceeds {} bytes", self.max_event_bytes),
476        )
477    }
478
479    fn push_line(&mut self, line_bytes: &[u8]) -> std::io::Result<()> {
480        if line_bytes.len() > self.max_event_bytes {
481            return Err(self.event_too_large_error());
482        }
483        let line = std::str::from_utf8(line_bytes).map_err(|_| Self::invalid_utf8_error())?;
484        let line = line.strip_suffix('\r').unwrap_or(line);
485        if line.trim().is_empty() {
486            return Ok(());
487        }
488        self.pending.push_back(line.to_string());
489        Ok(())
490    }
491
492    fn drain_buffer(&mut self) -> std::io::Result<()> {
493        if self.buffer.len() > self.max_event_bytes && memchr::memchr(b'\n', &self.buffer).is_none()
494        {
495            return Err(self.event_too_large_error());
496        }
497        while let Some(pos) = memchr::memchr(b'\n', &self.buffer) {
498            if pos > self.max_event_bytes {
499                return Err(self.event_too_large_error());
500            }
501            let mut line = self.buffer.drain(..=pos).collect::<Vec<u8>>();
502            if line.last() == Some(&b'\n') {
503                line.pop();
504            }
505            if !line.is_empty() {
506                self.push_line(&line)?;
507            }
508        }
509        Ok(())
510    }
511
512    fn finish_buffer(&mut self) -> std::io::Result<()> {
513        if self.buffer.is_empty() {
514            return Ok(());
515        }
516        let line = std::mem::take(&mut self.buffer);
517        self.push_line(&line)?;
518        Ok(())
519    }
520
521    fn event_from_line(line: String) -> SseEvent {
522        SseEvent {
523            data: line,
524            ..SseEvent::default()
525        }
526    }
527}
528
529impl<S> Stream for NdjsonStream<S>
530where
531    S: Stream<Item = std::io::Result<Vec<u8>>> + Unpin,
532{
533    type Item = std::io::Result<SseEvent>;
534
535    fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
536        if let Some(line) = self.pending.pop_front() {
537            return Poll::Ready(Some(Ok(Self::event_from_line(line))));
538        }
539
540        if self.finished {
541            return Poll::Ready(None);
542        }
543
544        loop {
545            match Pin::new(&mut self.inner).poll_next(cx) {
546                Poll::Ready(Some(Ok(chunk))) => {
547                    self.buffer.extend_from_slice(&chunk);
548                    if let Err(err) = self.drain_buffer() {
549                        return Poll::Ready(Some(Err(err)));
550                    }
551                    if let Some(line) = self.pending.pop_front() {
552                        return Poll::Ready(Some(Ok(Self::event_from_line(line))));
553                    }
554                }
555                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
556                Poll::Ready(None) => {
557                    self.finished = true;
558                    if let Err(err) = self.finish_buffer() {
559                        return Poll::Ready(Some(Err(err)));
560                    }
561                    if let Some(line) = self.pending.pop_front() {
562                        return Poll::Ready(Some(Ok(Self::event_from_line(line))));
563                    }
564                    return Poll::Ready(None);
565                }
566                Poll::Pending => return Poll::Pending,
567            }
568        }
569    }
570}
571
572enum ResponsesEventStream<S>
573where
574    S: Stream<Item = std::io::Result<Vec<u8>>> + Unpin,
575{
576    Ndjson(NdjsonStream<S>),
577    Sse(SseStream<S>),
578}
579
580impl<S> Stream for ResponsesEventStream<S>
581where
582    S: Stream<Item = std::io::Result<Vec<u8>>> + Unpin,
583{
584    type Item = std::io::Result<SseEvent>;
585
586    fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
587        match self.as_mut().get_mut() {
588            Self::Ndjson(inner) => Pin::new(inner).poll_next(cx),
589            Self::Sse(inner) => Pin::new(inner).poll_next(cx),
590        }
591    }
592}
593
594// ============================================================================
595// Stream State
596// ============================================================================
597
598#[derive(Debug, Clone, PartialEq, Eq, Hash)]
599struct TextKey {
600    item_id: String,
601    content_index: u32,
602}
603
604#[derive(Debug, Clone, PartialEq, Eq, Hash)]
605struct ReasoningKey {
606    item_id: String,
607    kind: ReasoningKind,
608    index: u32,
609}
610
611#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
612enum ReasoningKind {
613    Summary,
614    Text,
615}
616
617struct ToolCallState {
618    content_index: usize,
619    call_id: String,
620    name: String,
621    arguments: String,
622}
623
624enum TerminalContentSnapshot {
625    Text {
626        content_index: usize,
627        content: String,
628    },
629    Thinking {
630        content_index: usize,
631        content: String,
632    },
633    ToolCall {
634        content_index: usize,
635    },
636}
637
638struct StreamState<S>
639where
640    S: Stream<Item = std::result::Result<SseEvent, std::io::Error>> + Unpin,
641{
642    event_source: S,
643    partial: AssistantMessage,
644    pending_events: VecDeque<StreamEvent>,
645    started: bool,
646    finished: bool,
647    text_blocks: HashMap<TextKey, usize>,
648    reasoning_blocks: HashMap<ReasoningKey, usize>,
649    tool_calls_by_item_id: HashMap<String, ToolCallState>,
650    orphan_tool_call_arguments: HashMap<String, String>,
651    terminal_response_seen: bool,
652    terminal_incomplete_reason: Option<String>,
653    /// Consecutive WriteZero errors seen without a successful event in between.
654    transient_error_count: usize,
655}
656
657impl<S> StreamState<S>
658where
659    S: Stream<Item = std::result::Result<SseEvent, std::io::Error>> + Unpin,
660{
661    fn new(event_source: S, model: String, api: String, provider: String) -> Self {
662        Self {
663            event_source,
664            partial: AssistantMessage {
665                content: Vec::new(),
666                api,
667                provider,
668                model,
669                usage: Usage::default(),
670                stop_reason: StopReason::Stop,
671                error_message: None,
672                timestamp: chrono::Utc::now().timestamp_millis(),
673            },
674            pending_events: VecDeque::new(),
675            started: false,
676            finished: false,
677            text_blocks: HashMap::new(),
678            reasoning_blocks: HashMap::new(),
679            tool_calls_by_item_id: HashMap::new(),
680            orphan_tool_call_arguments: HashMap::new(),
681            terminal_response_seen: false,
682            terminal_incomplete_reason: None,
683            transient_error_count: 0,
684        }
685    }
686
687    fn ensure_started(&mut self) {
688        if !self.started {
689            self.started = true;
690            self.pending_events.push_back(StreamEvent::Start {
691                partial: self.partial.clone(),
692            });
693        }
694    }
695
696    fn record_terminal_response(&mut self, incomplete_reason: Option<String>) {
697        self.terminal_response_seen = true;
698        self.terminal_incomplete_reason = incomplete_reason;
699    }
700
701    fn finish_terminal_response(&mut self) -> bool {
702        if !self.terminal_response_seen {
703            return false;
704        }
705
706        let incomplete_reason = self.terminal_incomplete_reason.take();
707        self.terminal_response_seen = false;
708        self.finish(incomplete_reason);
709        true
710    }
711
712    fn text_block_for(&mut self, item_id: String, content_index: u32) -> usize {
713        let key = TextKey {
714            item_id,
715            content_index,
716        };
717        if let Some(idx) = self.text_blocks.get(&key) {
718            return *idx;
719        }
720
721        let idx = self.partial.content.len();
722        self.partial
723            .content
724            .push(ContentBlock::Text(TextContent::new("")));
725        self.text_blocks.insert(key, idx);
726        self.pending_events
727            .push_back(StreamEvent::TextStart { content_index: idx });
728        idx
729    }
730
731    fn reasoning_block_for(&mut self, kind: ReasoningKind, item_id: String, index: u32) -> usize {
732        let key = ReasoningKey {
733            item_id,
734            kind,
735            index,
736        };
737        if let Some(idx) = self.reasoning_blocks.get(&key) {
738            return *idx;
739        }
740
741        let idx = self.partial.content.len();
742        self.partial
743            .content
744            .push(ContentBlock::Thinking(ThinkingContent {
745                thinking: String::new(),
746                thinking_signature: None,
747            }));
748        self.reasoning_blocks.insert(key, idx);
749        self.pending_events
750            .push_back(StreamEvent::ThinkingStart { content_index: idx });
751        idx
752    }
753
754    fn append_reasoning_delta(
755        &mut self,
756        kind: ReasoningKind,
757        item_id: String,
758        index: u32,
759        delta: String,
760    ) {
761        self.ensure_started();
762        let idx = self.reasoning_block_for(kind, item_id, index);
763        if let Some(ContentBlock::Thinking(block)) = self.partial.content.get_mut(idx) {
764            block.thinking.push_str(&delta);
765        }
766        self.pending_events.push_back(StreamEvent::ThinkingDelta {
767            content_index: idx,
768            delta,
769        });
770    }
771
772    fn apply_text_snapshot(&mut self, item_id: String, content_index: u32, text: String) {
773        self.ensure_started();
774        let idx = self.text_block_for(item_id, content_index);
775        let Some(ContentBlock::Text(block)) = self.partial.content.get_mut(idx) else {
776            return;
777        };
778
779        if block.text == text {
780            return;
781        }
782
783        if let Some(suffix) = text.strip_prefix(&block.text) {
784            if suffix.is_empty() {
785                return;
786            }
787            block.text.push_str(suffix);
788            self.pending_events.push_back(StreamEvent::TextDelta {
789                content_index: idx,
790                delta: suffix.to_string(),
791            });
792            return;
793        }
794
795        tracing::warn!(
796            content_index = idx,
797            old_len = block.text.len(),
798            new_len = text.len(),
799            "text snapshot does not extend accumulated text; replacing"
800        );
801        block.text = text;
802    }
803
804    fn apply_reasoning_snapshot(
805        &mut self,
806        kind: ReasoningKind,
807        item_id: String,
808        index: u32,
809        text: String,
810    ) {
811        self.ensure_started();
812        let idx = self.reasoning_block_for(kind, item_id, index);
813        let Some(ContentBlock::Thinking(block)) = self.partial.content.get_mut(idx) else {
814            return;
815        };
816
817        if block.thinking == text {
818            return;
819        }
820
821        if let Some(suffix) = text.strip_prefix(&block.thinking) {
822            if suffix.is_empty() {
823                return;
824            }
825            block.thinking.push_str(suffix);
826            self.pending_events.push_back(StreamEvent::ThinkingDelta {
827                content_index: idx,
828                delta: suffix.to_string(),
829            });
830            return;
831        }
832
833        tracing::warn!(
834            content_index = idx,
835            old_len = block.thinking.len(),
836            new_len = text.len(),
837            "reasoning snapshot does not extend accumulated thinking; replacing"
838        );
839        block.thinking = text;
840    }
841
842    #[allow(clippy::too_many_lines)]
843    fn process_event(&mut self, data: &str) -> Result<()> {
844        let chunk: OpenAIResponsesChunk = serde_json::from_str(data)
845            .map_err(|e| Error::api(format!("JSON parse error: {e}\nData: {data}")))?;
846
847        match chunk {
848            OpenAIResponsesChunk::OutputTextDelta {
849                item_id,
850                content_index,
851                delta,
852            } => {
853                self.ensure_started();
854                let idx = self.text_block_for(item_id, content_index);
855                if let Some(ContentBlock::Text(t)) = self.partial.content.get_mut(idx) {
856                    t.text.push_str(&delta);
857                }
858                self.pending_events.push_back(StreamEvent::TextDelta {
859                    content_index: idx,
860                    delta,
861                });
862            }
863            OpenAIResponsesChunk::ReasoningSummaryTextDelta {
864                item_id,
865                summary_index,
866                delta,
867            } => {
868                self.append_reasoning_delta(ReasoningKind::Summary, item_id, summary_index, delta);
869            }
870            OpenAIResponsesChunk::ReasoningTextDelta {
871                item_id,
872                content_index,
873                delta,
874            } => {
875                self.append_reasoning_delta(ReasoningKind::Text, item_id, content_index, delta);
876            }
877            OpenAIResponsesChunk::OutputTextDone {
878                item_id,
879                content_index,
880                text,
881            } => {
882                self.apply_text_snapshot(item_id, content_index, text);
883            }
884            OpenAIResponsesChunk::ContentPartDone {
885                item_id,
886                content_index,
887                part,
888            } => match part {
889                OpenAIResponsesContentPartDone::OutputText { text } => {
890                    self.apply_text_snapshot(item_id, content_index, text);
891                }
892                OpenAIResponsesContentPartDone::ReasoningText { text } => {
893                    self.apply_reasoning_snapshot(
894                        ReasoningKind::Text,
895                        item_id,
896                        content_index,
897                        text,
898                    );
899                }
900                OpenAIResponsesContentPartDone::Unknown => {}
901            },
902            OpenAIResponsesChunk::ReasoningTextDone {
903                item_id,
904                content_index,
905                text,
906            } => {
907                self.apply_reasoning_snapshot(ReasoningKind::Text, item_id, content_index, text);
908            }
909            OpenAIResponsesChunk::ReasoningSummaryTextDone {
910                item_id,
911                summary_index,
912                text,
913            } => {
914                self.apply_reasoning_snapshot(ReasoningKind::Summary, item_id, summary_index, text);
915            }
916            OpenAIResponsesChunk::ReasoningSummaryPartDone {
917                item_id,
918                summary_index,
919                part,
920            } => match part {
921                OpenAIResponsesReasoningSummaryPartDone::SummaryText { text } => {
922                    self.apply_reasoning_snapshot(
923                        ReasoningKind::Summary,
924                        item_id,
925                        summary_index,
926                        text,
927                    );
928                }
929                OpenAIResponsesReasoningSummaryPartDone::Unknown => {}
930            },
931            OpenAIResponsesChunk::OutputItemAdded { item } => {
932                if let OpenAIResponsesOutputItem::FunctionCall {
933                    id,
934                    call_id,
935                    name,
936                    arguments,
937                } = item
938                {
939                    self.ensure_started();
940
941                    let mut buffered_arguments = self
942                        .orphan_tool_call_arguments
943                        .remove(&id)
944                        .unwrap_or_default();
945                    buffered_arguments.insert_str(0, &arguments);
946
947                    let content_index = self.partial.content.len();
948                    self.partial.content.push(ContentBlock::ToolCall(ToolCall {
949                        id: call_id.clone(),
950                        name: name.clone(),
951                        arguments: serde_json::Value::Null,
952                        thought_signature: None,
953                    }));
954
955                    self.tool_calls_by_item_id.insert(
956                        id,
957                        ToolCallState {
958                            content_index,
959                            call_id,
960                            name,
961                            arguments: buffered_arguments.clone(),
962                        },
963                    );
964
965                    self.pending_events
966                        .push_back(StreamEvent::ToolCallStart { content_index });
967
968                    if !buffered_arguments.is_empty() {
969                        self.pending_events.push_back(StreamEvent::ToolCallDelta {
970                            content_index,
971                            delta: buffered_arguments,
972                        });
973                    }
974                }
975            }
976            OpenAIResponsesChunk::FunctionCallArgumentsDelta { item_id, delta } => {
977                self.ensure_started();
978                if let Some(tc) = self.tool_calls_by_item_id.get_mut(&item_id) {
979                    tc.arguments.push_str(&delta);
980                    self.pending_events.push_back(StreamEvent::ToolCallDelta {
981                        content_index: tc.content_index,
982                        delta,
983                    });
984                } else {
985                    self.orphan_tool_call_arguments
986                        .entry(item_id)
987                        .or_default()
988                        .push_str(&delta);
989                }
990            }
991            OpenAIResponsesChunk::OutputItemDone { item } => {
992                if let OpenAIResponsesOutputItemDone::FunctionCall {
993                    id,
994                    call_id,
995                    name,
996                    arguments,
997                } = item
998                {
999                    self.ensure_started();
1000                    self.end_tool_call(&id, &call_id, &name, &arguments);
1001                }
1002            }
1003            OpenAIResponsesChunk::ResponseCompleted { response }
1004            | OpenAIResponsesChunk::ResponseDone { response }
1005            | OpenAIResponsesChunk::ResponseIncomplete { response } => {
1006                self.ensure_started();
1007                self.partial.usage.input = response.usage.input_tokens;
1008                self.partial.usage.output = response.usage.output_tokens;
1009                self.partial.usage.total_tokens = response
1010                    .usage
1011                    .total_tokens
1012                    .unwrap_or(response.usage.input_tokens + response.usage.output_tokens);
1013                self.record_terminal_response(response.incomplete_reason());
1014            }
1015            OpenAIResponsesChunk::ResponseFailed { response } => {
1016                self.ensure_started();
1017                self.partial.stop_reason = StopReason::Error;
1018                self.partial.error_message = Some(
1019                    response
1020                        .error
1021                        .and_then(|error| error.message)
1022                        .unwrap_or_else(|| "Codex response failed".to_string()),
1023                );
1024                self.pending_events.push_back(StreamEvent::Error {
1025                    reason: StopReason::Error,
1026                    error: std::mem::take(&mut self.partial),
1027                });
1028                self.finished = true;
1029            }
1030            OpenAIResponsesChunk::Error { message } => {
1031                self.ensure_started();
1032                self.partial.stop_reason = StopReason::Error;
1033                self.partial.error_message = Some(message);
1034                self.pending_events.push_back(StreamEvent::Error {
1035                    reason: StopReason::Error,
1036                    error: std::mem::take(&mut self.partial),
1037                });
1038                self.finished = true;
1039            }
1040            OpenAIResponsesChunk::Unknown => {}
1041        }
1042
1043        Ok(())
1044    }
1045
1046    fn partial_has_tool_call(&self) -> bool {
1047        self.partial
1048            .content
1049            .iter()
1050            .any(|b| matches!(b, ContentBlock::ToolCall(_)))
1051    }
1052
1053    fn end_tool_call(&mut self, item_id: &str, call_id: &str, name: &str, arguments: &str) {
1054        let (mut tc, synthesized_start) = self.tool_calls_by_item_id.remove(item_id).map_or_else(
1055            || {
1056                // If we missed the added event, synthesize the full tool-call block now
1057                // so downstream consumers still see a valid Start → Delta? → End sequence.
1058                let content_index = self.partial.content.len();
1059                self.partial.content.push(ContentBlock::ToolCall(ToolCall {
1060                    id: call_id.to_string(),
1061                    name: name.to_string(),
1062                    arguments: serde_json::Value::Null,
1063                    thought_signature: None,
1064                }));
1065                (
1066                    ToolCallState {
1067                        content_index,
1068                        call_id: call_id.to_string(),
1069                        name: name.to_string(),
1070                        arguments: self
1071                            .orphan_tool_call_arguments
1072                            .remove(item_id)
1073                            .unwrap_or_default(),
1074                    },
1075                    true,
1076                )
1077            },
1078            |state| (state, false),
1079        );
1080
1081        if synthesized_start {
1082            self.pending_events.push_back(StreamEvent::ToolCallStart {
1083                content_index: tc.content_index,
1084            });
1085        }
1086
1087        // Prefer the final arguments field when present.
1088        if !arguments.is_empty() {
1089            tc.arguments = arguments.to_string();
1090        }
1091
1092        if synthesized_start && !tc.arguments.is_empty() {
1093            self.pending_events.push_back(StreamEvent::ToolCallDelta {
1094                content_index: tc.content_index,
1095                delta: tc.arguments.clone(),
1096            });
1097        }
1098
1099        let parsed_args: serde_json::Value =
1100            serde_json::from_str(&tc.arguments).unwrap_or_else(|e| {
1101                tracing::warn!(
1102                    error = %e,
1103                    raw = %tc.arguments,
1104                    "Failed to parse tool arguments as JSON"
1105                );
1106                serde_json::Value::Null
1107            });
1108
1109        self.partial.stop_reason = StopReason::ToolUse;
1110        self.pending_events.push_back(StreamEvent::ToolCallEnd {
1111            content_index: tc.content_index,
1112            tool_call: ToolCall {
1113                id: tc.call_id.clone(),
1114                name: tc.name.clone(),
1115                arguments: parsed_args.clone(),
1116                thought_signature: None,
1117            },
1118        });
1119
1120        if let Some(ContentBlock::ToolCall(block)) = self.partial.content.get_mut(tc.content_index)
1121        {
1122            block.id = tc.call_id;
1123            block.name = tc.name;
1124            block.arguments = parsed_args;
1125        }
1126    }
1127
1128    fn terminal_content_snapshots(&self) -> Vec<TerminalContentSnapshot> {
1129        self.partial
1130            .content
1131            .iter()
1132            .enumerate()
1133            .filter_map(|(content_index, block)| match block {
1134                ContentBlock::Text(t) => Some(TerminalContentSnapshot::Text {
1135                    content_index,
1136                    content: t.text.clone(),
1137                }),
1138                ContentBlock::Thinking(t) => Some(TerminalContentSnapshot::Thinking {
1139                    content_index,
1140                    content: t.thinking.clone(),
1141                }),
1142                ContentBlock::ToolCall(_) => {
1143                    Some(TerminalContentSnapshot::ToolCall { content_index })
1144                }
1145                ContentBlock::Image(_) => None,
1146            })
1147            .collect()
1148    }
1149
1150    fn open_tool_call_snapshots_in_content_order(
1151        &self,
1152    ) -> Vec<(usize, String, String, String, String)> {
1153        let mut open_tool_calls: Vec<(usize, String, String, String, String)> = self
1154            .tool_calls_by_item_id
1155            .iter()
1156            .map(|(item_id, tc)| {
1157                (
1158                    tc.content_index,
1159                    item_id.clone(),
1160                    tc.call_id.clone(),
1161                    tc.name.clone(),
1162                    tc.arguments.clone(),
1163                )
1164            })
1165            .collect();
1166        open_tool_calls.sort_by_key(|(content_index, ..)| *content_index);
1167        open_tool_calls
1168    }
1169
1170    fn finish(&mut self, incomplete_reason: Option<String>) {
1171        if self.finished {
1172            return;
1173        }
1174
1175        let mut open_tool_calls = self
1176            .open_tool_call_snapshots_in_content_order()
1177            .into_iter()
1178            .peekable();
1179
1180        for block in self.terminal_content_snapshots() {
1181            match block {
1182                TerminalContentSnapshot::Text {
1183                    content_index,
1184                    content,
1185                } => self.pending_events.push_back(StreamEvent::TextEnd {
1186                    content_index,
1187                    content,
1188                }),
1189                TerminalContentSnapshot::Thinking {
1190                    content_index,
1191                    content,
1192                } => self.pending_events.push_back(StreamEvent::ThinkingEnd {
1193                    content_index,
1194                    content,
1195                }),
1196                TerminalContentSnapshot::ToolCall { content_index } => {
1197                    while let Some((_, item_id, call_id, name, arguments)) = open_tool_calls
1198                        .next_if(|(tool_content_index, ..)| *tool_content_index == content_index)
1199                    {
1200                        self.end_tool_call(&item_id, &call_id, &name, &arguments);
1201                    }
1202                }
1203            }
1204        }
1205
1206        // Best-effort: close any tool calls whose content block disappeared unexpectedly.
1207        for (_, item_id, call_id, name, arguments) in open_tool_calls {
1208            self.end_tool_call(&item_id, &call_id, &name, &arguments);
1209        }
1210
1211        // Infer stop reason.
1212        if let Some(reason) = incomplete_reason {
1213            let reason_lower = reason.to_ascii_lowercase();
1214            if reason_lower.contains("max_output") || reason_lower.contains("length") {
1215                self.partial.stop_reason = StopReason::Length;
1216            } else if reason_lower.contains("tool") {
1217                self.partial.stop_reason = StopReason::ToolUse;
1218            } else if reason_lower.contains("content_filter") || reason_lower.contains("error") {
1219                self.partial.stop_reason = StopReason::Error;
1220            }
1221        } else if self.partial_has_tool_call() {
1222            self.partial.stop_reason = StopReason::ToolUse;
1223        }
1224
1225        let reason = self.partial.stop_reason;
1226        self.pending_events.push_back(StreamEvent::Done {
1227            reason,
1228            message: self.partial.clone(),
1229        });
1230        self.finished = true;
1231    }
1232}
1233
1234fn extract_chatgpt_account_id(token: &str) -> Option<String> {
1235    let mut parts = token.split('.');
1236    let _header = parts.next()?;
1237    let payload = parts.next()?;
1238    let _signature = parts.next()?;
1239    if parts.next().is_some() {
1240        return None;
1241    }
1242
1243    let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
1244        .decode(payload)
1245        .or_else(|_| base64::engine::general_purpose::URL_SAFE.decode(payload))
1246        .ok()?;
1247    let payload_json: serde_json::Value = serde_json::from_slice(&decoded).ok()?;
1248    payload_json
1249        .get("https://api.openai.com/auth")
1250        .and_then(|auth| auth.get("chatgpt_account_id"))
1251        .and_then(serde_json::Value::as_str)
1252        .map(str::trim)
1253        .filter(|value| !value.is_empty())
1254        .map(ToString::to_string)
1255}
1256
1257// ============================================================================
1258// OpenAI Responses API Types (minimal)
1259// ============================================================================
1260
1261#[derive(Debug, Serialize)]
1262pub struct OpenAIResponsesRequest {
1263    model: String,
1264    input: Vec<OpenAIResponsesInputItem>,
1265    #[serde(skip_serializing_if = "Option::is_none")]
1266    instructions: Option<String>,
1267    #[serde(skip_serializing_if = "Option::is_none")]
1268    temperature: Option<f32>,
1269    #[serde(skip_serializing_if = "Option::is_none")]
1270    max_output_tokens: Option<u32>,
1271    #[serde(skip_serializing_if = "Option::is_none")]
1272    tools: Option<Vec<OpenAIResponsesTool>>,
1273    stream: bool,
1274    store: bool,
1275    #[serde(skip_serializing_if = "Option::is_none")]
1276    tool_choice: Option<&'static str>,
1277    #[serde(skip_serializing_if = "Option::is_none")]
1278    parallel_tool_calls: Option<bool>,
1279    #[serde(skip_serializing_if = "Option::is_none")]
1280    text: Option<OpenAIResponsesTextConfig>,
1281    #[serde(skip_serializing_if = "Option::is_none")]
1282    include: Option<Vec<&'static str>>,
1283    #[serde(skip_serializing_if = "Option::is_none")]
1284    reasoning: Option<OpenAIResponsesReasoning>,
1285}
1286
1287#[derive(Debug, Serialize)]
1288struct OpenAIResponsesTextConfig {
1289    verbosity: &'static str,
1290}
1291
1292#[derive(Debug, Serialize)]
1293struct OpenAIResponsesReasoning {
1294    effort: String,
1295    #[serde(skip_serializing_if = "Option::is_none")]
1296    summary: Option<&'static str>,
1297}
1298
1299#[derive(Debug, Serialize)]
1300#[serde(untagged)]
1301enum OpenAIResponsesInputItem {
1302    System {
1303        role: &'static str,
1304        content: String,
1305    },
1306    User {
1307        role: &'static str,
1308        content: Vec<OpenAIResponsesUserContentPart>,
1309    },
1310    Assistant {
1311        role: &'static str,
1312        content: Vec<OpenAIResponsesAssistantContentPart>,
1313    },
1314    FunctionCall {
1315        #[serde(rename = "type")]
1316        r#type: &'static str,
1317        call_id: String,
1318        name: String,
1319        arguments: String,
1320    },
1321    FunctionCallOutput {
1322        #[serde(rename = "type")]
1323        r#type: &'static str,
1324        call_id: String,
1325        output: String,
1326    },
1327}
1328
1329#[derive(Debug, Serialize)]
1330#[serde(tag = "type", rename_all = "snake_case")]
1331enum OpenAIResponsesUserContentPart {
1332    #[serde(rename = "input_text")]
1333    InputText { text: String },
1334    #[serde(rename = "input_image")]
1335    InputImage { image_url: String },
1336}
1337
1338#[derive(Debug, Serialize)]
1339#[serde(tag = "type", rename_all = "snake_case")]
1340enum OpenAIResponsesAssistantContentPart {
1341    #[serde(rename = "output_text")]
1342    OutputText { text: String },
1343}
1344
1345#[derive(Debug, Serialize)]
1346struct OpenAIResponsesTool {
1347    #[serde(rename = "type")]
1348    r#type: &'static str,
1349    name: String,
1350    #[serde(skip_serializing_if = "Option::is_none")]
1351    description: Option<String>,
1352    parameters: serde_json::Value,
1353}
1354
1355fn convert_tool_to_openai_responses(tool: &ToolDef) -> OpenAIResponsesTool {
1356    OpenAIResponsesTool {
1357        r#type: "function",
1358        name: tool.name.clone(),
1359        description: if tool.description.trim().is_empty() {
1360            None
1361        } else {
1362            Some(tool.description.clone())
1363        },
1364        parameters: tool.parameters.clone(),
1365    }
1366}
1367
1368fn build_openai_responses_input(context: &Context<'_>) -> Vec<OpenAIResponsesInputItem> {
1369    let mut input = Vec::with_capacity(context.messages.len());
1370
1371    // System prompt is sent as top-level `instructions` field, not in input array.
1372    // Some providers (e.g. OpenAI Codex) require `instructions` and reject system
1373    // messages in the input array.
1374
1375    for message in context.messages.iter() {
1376        match message {
1377            Message::User(user) => input.push(convert_user_message_to_responses(&user.content)),
1378            Message::Custom(custom) => input.push(OpenAIResponsesInputItem::User {
1379                role: "user",
1380                content: vec![OpenAIResponsesUserContentPart::InputText {
1381                    text: custom.content.clone(),
1382                }],
1383            }),
1384            Message::Assistant(assistant) => {
1385                // Preserve ordering between text and tool calls.
1386                let mut pending_text = String::new();
1387
1388                for block in &assistant.content {
1389                    match block {
1390                        ContentBlock::Text(t) => pending_text.push_str(&t.text),
1391                        ContentBlock::ToolCall(tc) => {
1392                            if !pending_text.is_empty() {
1393                                input.push(OpenAIResponsesInputItem::Assistant {
1394                                    role: "assistant",
1395                                    content: vec![
1396                                        OpenAIResponsesAssistantContentPart::OutputText {
1397                                            text: std::mem::take(&mut pending_text),
1398                                        },
1399                                    ],
1400                                });
1401                            }
1402                            input.push(OpenAIResponsesInputItem::FunctionCall {
1403                                r#type: "function_call",
1404                                call_id: tc.id.clone(),
1405                                name: tc.name.clone(),
1406                                arguments: tc.arguments.to_string(),
1407                            });
1408                        }
1409                        _ => {}
1410                    }
1411                }
1412
1413                if !pending_text.is_empty() {
1414                    input.push(OpenAIResponsesInputItem::Assistant {
1415                        role: "assistant",
1416                        content: vec![OpenAIResponsesAssistantContentPart::OutputText {
1417                            text: pending_text,
1418                        }],
1419                    });
1420                }
1421            }
1422            Message::ToolResult(result) => {
1423                let mut out = String::new();
1424                for (i, block) in result.content.iter().enumerate() {
1425                    if i > 0 {
1426                        out.push('\n');
1427                    }
1428                    if let ContentBlock::Text(t) = block {
1429                        out.push_str(&t.text);
1430                    }
1431                }
1432                input.push(OpenAIResponsesInputItem::FunctionCallOutput {
1433                    r#type: "function_call_output",
1434                    call_id: result.tool_call_id.clone(),
1435                    output: out,
1436                });
1437            }
1438        }
1439    }
1440
1441    input
1442}
1443
1444fn convert_user_message_to_responses(content: &UserContent) -> OpenAIResponsesInputItem {
1445    match content {
1446        UserContent::Text(text) => OpenAIResponsesInputItem::User {
1447            role: "user",
1448            content: vec![OpenAIResponsesUserContentPart::InputText { text: text.clone() }],
1449        },
1450        UserContent::Blocks(blocks) => {
1451            let mut parts = Vec::new();
1452            for block in blocks {
1453                match block {
1454                    ContentBlock::Text(t) => {
1455                        parts.push(OpenAIResponsesUserContentPart::InputText {
1456                            text: t.text.clone(),
1457                        });
1458                    }
1459                    ContentBlock::Image(img) => {
1460                        let url = format!("data:{};base64,{}", img.mime_type, img.data);
1461                        parts.push(OpenAIResponsesUserContentPart::InputImage { image_url: url });
1462                    }
1463                    _ => {}
1464                }
1465            }
1466            if parts.is_empty() {
1467                parts.push(OpenAIResponsesUserContentPart::InputText {
1468                    text: String::new(),
1469                });
1470            }
1471            OpenAIResponsesInputItem::User {
1472                role: "user",
1473                content: parts,
1474            }
1475        }
1476    }
1477}
1478
1479// ============================================================================
1480// Streaming Chunk Types (minimal, forward-compatible)
1481// ============================================================================
1482
1483#[derive(Debug, Deserialize)]
1484#[serde(tag = "type")]
1485enum OpenAIResponsesChunk {
1486    #[serde(rename = "response.output_text.delta")]
1487    OutputTextDelta {
1488        item_id: String,
1489        content_index: u32,
1490        delta: String,
1491    },
1492    #[serde(rename = "response.output_text.done")]
1493    OutputTextDone {
1494        item_id: String,
1495        content_index: u32,
1496        text: String,
1497    },
1498    #[serde(rename = "response.output_item.added")]
1499    OutputItemAdded { item: OpenAIResponsesOutputItem },
1500    #[serde(rename = "response.output_item.done")]
1501    OutputItemDone { item: OpenAIResponsesOutputItemDone },
1502    #[serde(rename = "response.function_call_arguments.delta")]
1503    FunctionCallArgumentsDelta { item_id: String, delta: String },
1504    #[serde(rename = "response.content_part.done")]
1505    ContentPartDone {
1506        item_id: String,
1507        content_index: u32,
1508        part: OpenAIResponsesContentPartDone,
1509    },
1510    #[serde(rename = "response.reasoning_text.delta")]
1511    ReasoningTextDelta {
1512        item_id: String,
1513        content_index: u32,
1514        delta: String,
1515    },
1516    #[serde(rename = "response.reasoning_text.done")]
1517    ReasoningTextDone {
1518        item_id: String,
1519        content_index: u32,
1520        text: String,
1521    },
1522    #[serde(rename = "response.reasoning_summary_text.delta")]
1523    ReasoningSummaryTextDelta {
1524        item_id: String,
1525        summary_index: u32,
1526        delta: String,
1527    },
1528    #[serde(rename = "response.reasoning_summary_text.done")]
1529    ReasoningSummaryTextDone {
1530        item_id: String,
1531        summary_index: u32,
1532        text: String,
1533    },
1534    #[serde(rename = "response.reasoning_summary_part.done")]
1535    ReasoningSummaryPartDone {
1536        item_id: String,
1537        summary_index: u32,
1538        part: OpenAIResponsesReasoningSummaryPartDone,
1539    },
1540    #[serde(rename = "response.completed")]
1541    ResponseCompleted {
1542        response: OpenAIResponsesDonePayload,
1543    },
1544    #[serde(rename = "response.done")]
1545    ResponseDone {
1546        response: OpenAIResponsesDonePayload,
1547    },
1548    #[serde(rename = "response.incomplete")]
1549    ResponseIncomplete {
1550        response: OpenAIResponsesDonePayload,
1551    },
1552    #[serde(rename = "response.failed")]
1553    ResponseFailed {
1554        response: OpenAIResponsesFailedPayload,
1555    },
1556    #[serde(rename = "error")]
1557    Error { message: String },
1558    #[serde(other)]
1559    Unknown,
1560}
1561
1562#[derive(Debug, Deserialize)]
1563#[serde(tag = "type")]
1564enum OpenAIResponsesContentPartDone {
1565    #[serde(rename = "output_text")]
1566    OutputText { text: String },
1567    #[serde(rename = "reasoning_text")]
1568    ReasoningText { text: String },
1569    #[serde(other)]
1570    Unknown,
1571}
1572
1573#[derive(Debug, Deserialize)]
1574#[serde(tag = "type")]
1575enum OpenAIResponsesReasoningSummaryPartDone {
1576    #[serde(rename = "summary_text")]
1577    SummaryText { text: String },
1578    #[serde(other)]
1579    Unknown,
1580}
1581
1582#[derive(Debug, Deserialize)]
1583#[serde(tag = "type")]
1584enum OpenAIResponsesOutputItem {
1585    #[serde(rename = "function_call")]
1586    FunctionCall {
1587        id: String,
1588        call_id: String,
1589        name: String,
1590        #[serde(default)]
1591        arguments: String,
1592    },
1593    #[serde(other)]
1594    Unknown,
1595}
1596
1597#[derive(Debug, Deserialize)]
1598#[serde(tag = "type")]
1599enum OpenAIResponsesOutputItemDone {
1600    #[serde(rename = "function_call")]
1601    FunctionCall {
1602        id: String,
1603        call_id: String,
1604        name: String,
1605        #[serde(default)]
1606        arguments: String,
1607    },
1608    #[serde(other)]
1609    Unknown,
1610}
1611
1612#[derive(Debug, Deserialize)]
1613struct OpenAIResponsesDonePayload {
1614    #[serde(default)]
1615    incomplete_details: Option<OpenAIResponsesIncompleteDetails>,
1616    usage: OpenAIResponsesUsage,
1617}
1618
1619#[derive(Debug, Deserialize)]
1620struct OpenAIResponsesFailedPayload {
1621    #[serde(default)]
1622    error: Option<OpenAIResponsesFailedError>,
1623}
1624
1625#[derive(Debug, Deserialize)]
1626struct OpenAIResponsesFailedError {
1627    #[serde(default)]
1628    message: Option<String>,
1629}
1630
1631#[derive(Debug, Deserialize)]
1632struct OpenAIResponsesIncompleteDetails {
1633    reason: String,
1634}
1635
1636#[derive(Debug, Deserialize)]
1637#[allow(clippy::struct_field_names)]
1638struct OpenAIResponsesUsage {
1639    input_tokens: u64,
1640    output_tokens: u64,
1641    #[serde(default)]
1642    total_tokens: Option<u64>,
1643}
1644
1645impl OpenAIResponsesDonePayload {
1646    fn incomplete_reason(&self) -> Option<String> {
1647        self.incomplete_details.as_ref().map(|d| d.reason.clone())
1648    }
1649}
1650
1651// ============================================================================
1652// Tests
1653// ============================================================================
1654
1655#[cfg(test)]
1656mod tests {
1657    use super::*;
1658    use asupersync::runtime::RuntimeBuilder;
1659    use futures::stream;
1660    use proptest::prelude::*;
1661    use proptest::string::string_regex;
1662    use serde_json::{Value, json};
1663    use std::collections::HashMap;
1664    use std::io::{Read, Write};
1665    use std::net::TcpListener;
1666    use std::sync::mpsc;
1667    use std::time::Duration;
1668
1669    #[test]
1670    fn test_provider_info() {
1671        let provider = OpenAIResponsesProvider::new("gpt-4o");
1672        assert_eq!(provider.name(), "openai");
1673        assert_eq!(provider.api(), "openai-responses");
1674    }
1675
1676    #[test]
1677    fn test_build_request_includes_system_tools_and_defaults() {
1678        let provider = OpenAIResponsesProvider::new("gpt-4o");
1679        let context = Context::owned(
1680            Some("System guidance".to_string()),
1681            vec![Message::User(crate::model::UserMessage {
1682                content: UserContent::Text("Ping".to_string()),
1683                timestamp: 0,
1684            })],
1685            vec![
1686                ToolDef {
1687                    name: "search".to_string(),
1688                    description: "Search docs".to_string(),
1689                    parameters: json!({
1690                        "type": "object",
1691                        "properties": { "q": { "type": "string" } },
1692                        "required": ["q"]
1693                    }),
1694                },
1695                ToolDef {
1696                    name: "blank_desc".to_string(),
1697                    description: "   ".to_string(),
1698                    parameters: json!({ "type": "object" }),
1699                },
1700            ],
1701        );
1702        let options = StreamOptions {
1703            temperature: Some(0.3),
1704            ..Default::default()
1705        };
1706
1707        let request = provider.build_request(&context, &options);
1708        let value = serde_json::to_value(&request).expect("serialize request");
1709        assert_eq!(value["model"], "gpt-4o");
1710        let temperature = value["temperature"]
1711            .as_f64()
1712            .expect("temperature should serialize as number");
1713        assert!((temperature - 0.3).abs() < 1e-6);
1714        assert_eq!(value["max_output_tokens"], DEFAULT_MAX_OUTPUT_TOKENS);
1715        assert_eq!(value["stream"], true);
1716        assert_eq!(value["instructions"], "System guidance");
1717        assert_eq!(value["input"][0]["role"], "user");
1718        assert_eq!(value["input"][0]["content"][0]["type"], "input_text");
1719        assert_eq!(value["input"][0]["content"][0]["text"], "Ping");
1720        assert_eq!(value["tools"][0]["type"], "function");
1721        assert_eq!(value["tools"][0]["name"], "search");
1722        assert_eq!(value["tools"][0]["description"], "Search docs");
1723        assert_eq!(
1724            value["tools"][0]["parameters"],
1725            json!({
1726                "type": "object",
1727                "properties": { "q": { "type": "string" } },
1728                "required": ["q"]
1729            })
1730        );
1731        assert!(value["tools"][1].get("description").is_none());
1732    }
1733
1734    #[test]
1735    fn test_stream_parses_text_and_tool_call() {
1736        let events = vec![
1737            json!({
1738                "type": "response.output_text.delta",
1739                "item_id": "msg_1",
1740                "content_index": 0,
1741                "delta": "Hello"
1742            }),
1743            json!({
1744                "type": "response.output_item.added",
1745                "output_index": 1,
1746                "item": {
1747                    "type": "function_call",
1748                    "id": "fc_1",
1749                    "call_id": "call_1",
1750                    "name": "echo",
1751                    "arguments": ""
1752                }
1753            }),
1754            json!({
1755                "type": "response.function_call_arguments.delta",
1756                "item_id": "fc_1",
1757                "output_index": 1,
1758                "delta": "{\"text\":\"hi\"}"
1759            }),
1760            json!({
1761                "type": "response.output_item.done",
1762                "output_index": 1,
1763                "item": {
1764                    "type": "function_call",
1765                    "id": "fc_1",
1766                    "call_id": "call_1",
1767                    "name": "echo",
1768                    "arguments": "{\"text\":\"hi\"}",
1769                    "status": "completed"
1770                }
1771            }),
1772            json!({
1773                "type": "response.completed",
1774                "response": {
1775                    "incomplete_details": null,
1776                    "usage": {
1777                        "input_tokens": 1,
1778                        "output_tokens": 2,
1779                        "total_tokens": 3
1780                    }
1781                }
1782            }),
1783        ];
1784
1785        let out = collect_events(&events);
1786        assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
1787        assert!(
1788            out.iter()
1789                .any(|e| matches!(e, StreamEvent::TextDelta { delta, .. } if delta == "Hello"))
1790        );
1791        assert!(out.iter().any(
1792            |e| matches!(e, StreamEvent::ToolCallEnd { tool_call, .. } if tool_call.name == "echo")
1793        ));
1794        assert!(out.iter().any(|e| matches!(
1795            e,
1796            StreamEvent::Done {
1797                reason: StopReason::ToolUse,
1798                ..
1799            }
1800        )));
1801    }
1802
1803    #[test]
1804    fn test_stream_accumulates_function_call_arguments_deltas() {
1805        let events = vec![
1806            json!({
1807                "type": "response.output_item.added",
1808                "item": {
1809                    "type": "function_call",
1810                    "id": "fc_2",
1811                    "call_id": "call_2",
1812                    "name": "search",
1813                    "arguments": "{\"q\":\"ru"
1814                }
1815            }),
1816            json!({
1817                "type": "response.function_call_arguments.delta",
1818                "item_id": "fc_2",
1819                "delta": "st\"}"
1820            }),
1821            json!({
1822                "type": "response.output_item.done",
1823                "item": {
1824                    "type": "function_call",
1825                    "id": "fc_2",
1826                    "call_id": "call_2",
1827                    "name": "search",
1828                    "arguments": ""
1829                }
1830            }),
1831            json!({
1832                "type": "response.completed",
1833                "response": {
1834                    "incomplete_details": null,
1835                    "usage": {
1836                        "input_tokens": 1,
1837                        "output_tokens": 1,
1838                        "total_tokens": 2
1839                    }
1840                }
1841            }),
1842        ];
1843
1844        let out = collect_events(&events);
1845        let tool_end = out
1846            .iter()
1847            .find_map(|event| match event {
1848                StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
1849                _ => None,
1850            })
1851            .expect("tool call end");
1852        assert_eq!(tool_end.id, "call_2");
1853        assert_eq!(tool_end.name, "search");
1854        assert_eq!(tool_end.arguments, json!({ "q": "rust" }));
1855    }
1856
1857    #[test]
1858    fn test_stream_synthesizes_tool_call_start_when_done_arrives_first() {
1859        let events = vec![
1860            json!({
1861                "type": "response.output_item.done",
1862                "item": {
1863                    "type": "function_call",
1864                    "id": "fc_3",
1865                    "call_id": "call_3",
1866                    "name": "echo",
1867                    "arguments": "{\"text\":\"late start\"}",
1868                    "status": "completed"
1869                }
1870            }),
1871            json!({
1872                "type": "response.completed",
1873                "response": {
1874                    "incomplete_details": null,
1875                    "usage": {
1876                        "input_tokens": 1,
1877                        "output_tokens": 1,
1878                        "total_tokens": 2
1879                    }
1880                }
1881            }),
1882        ];
1883
1884        let out = collect_events(&events);
1885        let start_idx = out
1886            .iter()
1887            .position(|event| matches!(event, StreamEvent::ToolCallStart { .. }))
1888            .expect("tool call start");
1889        let delta_idx = out
1890            .iter()
1891            .position(|event| matches!(event, StreamEvent::ToolCallDelta { delta, .. } if delta == "{\"text\":\"late start\"}"))
1892            .expect("tool call delta");
1893        let end_idx = out
1894            .iter()
1895            .position(|event| matches!(event, StreamEvent::ToolCallEnd { .. }))
1896            .expect("tool call end");
1897
1898        assert!(start_idx < delta_idx);
1899        assert!(delta_idx < end_idx);
1900
1901        let tool_end = out
1902            .iter()
1903            .find_map(|event| match event {
1904                StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
1905                _ => None,
1906            })
1907            .expect("tool call end");
1908        assert_eq!(tool_end.id, "call_3");
1909        assert_eq!(tool_end.name, "echo");
1910        assert_eq!(tool_end.arguments, json!({ "text": "late start" }));
1911        assert!(matches!(
1912            out.last(),
1913            Some(StreamEvent::Done {
1914                reason: StopReason::ToolUse,
1915                ..
1916            })
1917        ));
1918    }
1919
1920    #[test]
1921    fn test_stream_preserves_orphan_tool_call_deltas_until_done() {
1922        let events = vec![
1923            json!({
1924                "type": "response.function_call_arguments.delta",
1925                "item_id": "fc_4",
1926                "delta": "{\"text\":\"buffered\"}"
1927            }),
1928            json!({
1929                "type": "response.output_item.done",
1930                "item": {
1931                    "type": "function_call",
1932                    "id": "fc_4",
1933                    "call_id": "call_4",
1934                    "name": "echo",
1935                    "arguments": "",
1936                    "status": "completed"
1937                }
1938            }),
1939            json!({
1940                "type": "response.completed",
1941                "response": {
1942                    "incomplete_details": null,
1943                    "usage": {
1944                        "input_tokens": 1,
1945                        "output_tokens": 1,
1946                        "total_tokens": 2
1947                    }
1948                }
1949            }),
1950        ];
1951
1952        let out = collect_events(&events);
1953        assert!(
1954            out.iter()
1955                .any(|event| matches!(event, StreamEvent::ToolCallStart { .. }))
1956        );
1957        assert!(out.iter().any(
1958            |event| matches!(event, StreamEvent::ToolCallDelta { delta, .. } if delta == "{\"text\":\"buffered\"}")
1959        ));
1960
1961        let tool_end = out
1962            .iter()
1963            .find_map(|event| match event {
1964                StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
1965                _ => None,
1966            })
1967            .expect("tool call end");
1968        assert_eq!(tool_end.id, "call_4");
1969        assert_eq!(tool_end.arguments, json!({ "text": "buffered" }));
1970    }
1971
1972    #[test]
1973    fn test_stream_reads_late_tool_call_events_after_response_completed() {
1974        let body = [
1975            r#"data: {"type":"response.function_call_arguments.delta","item_id":"fc_5","delta":"{\"text\":\"late tool\"}"}"#,
1976            "",
1977            r#"data: {"type":"response.completed","response":{"incomplete_details":null,"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}"#,
1978            "",
1979            r#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_5","call_id":"call_5","name":"echo","arguments":"","status":"completed"}}"#,
1980            "",
1981            "data: [DONE]",
1982            "",
1983        ]
1984        .join("\n");
1985
1986        let out = collect_stream_events_from_body(&body);
1987        let start_idx = out
1988            .iter()
1989            .position(|event| matches!(event, StreamEvent::ToolCallStart { .. }))
1990            .expect("tool call start");
1991        let delta_idx = out
1992            .iter()
1993            .position(|event| matches!(event, StreamEvent::ToolCallDelta { delta, .. } if delta == "{\"text\":\"late tool\"}"))
1994            .expect("tool call delta");
1995        let end_idx = out
1996            .iter()
1997            .position(|event| matches!(event, StreamEvent::ToolCallEnd { .. }))
1998            .expect("tool call end");
1999        let done_idx = out
2000            .iter()
2001            .position(|event| matches!(event, StreamEvent::Done { .. }))
2002            .expect("done event");
2003
2004        assert!(start_idx < delta_idx);
2005        assert!(delta_idx < end_idx);
2006        assert!(end_idx < done_idx);
2007
2008        let tool_end = out
2009            .iter()
2010            .find_map(|event| match event {
2011                StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
2012                _ => None,
2013            })
2014            .expect("tool call end");
2015        assert_eq!(tool_end.id, "call_5");
2016        assert_eq!(tool_end.arguments, json!({ "text": "late tool" }));
2017    }
2018
2019    #[test]
2020    fn test_stream_materializes_output_text_done_without_deltas() {
2021        let events = vec![
2022            json!({
2023                "type": "response.output_text.done",
2024                "item_id": "msg_done",
2025                "content_index": 0,
2026                "text": "done-only text"
2027            }),
2028            json!({
2029                "type": "response.completed",
2030                "response": {
2031                    "incomplete_details": null,
2032                    "usage": {
2033                        "input_tokens": 1,
2034                        "output_tokens": 1,
2035                        "total_tokens": 2
2036                    }
2037                }
2038            }),
2039        ];
2040
2041        let out = collect_events(&events);
2042        assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
2043        assert!(matches!(
2044            out.get(1),
2045            Some(StreamEvent::TextStart { content_index: 0 })
2046        ));
2047        assert!(matches!(
2048            out.get(2),
2049            Some(StreamEvent::TextDelta {
2050                content_index: 0,
2051                delta,
2052            }) if delta == "done-only text"
2053        ));
2054        assert!(matches!(
2055            out.iter().find(|event| matches!(event, StreamEvent::TextEnd { .. })),
2056            Some(StreamEvent::TextEnd {
2057                content_index: 0,
2058                content,
2059            }) if content == "done-only text"
2060        ));
2061        assert!(matches!(
2062            out.last(),
2063            Some(StreamEvent::Done {
2064                reason: StopReason::Stop,
2065                ..
2066            })
2067        ));
2068    }
2069
2070    #[test]
2071    fn test_stream_backfills_missing_output_text_suffix_from_done_event() {
2072        let events = vec![
2073            json!({
2074                "type": "response.output_text.delta",
2075                "item_id": "msg_suffix",
2076                "content_index": 0,
2077                "delta": "Hello"
2078            }),
2079            json!({
2080                "type": "response.output_text.done",
2081                "item_id": "msg_suffix",
2082                "content_index": 0,
2083                "text": "Hello world"
2084            }),
2085            json!({
2086                "type": "response.completed",
2087                "response": {
2088                    "incomplete_details": null,
2089                    "usage": {
2090                        "input_tokens": 1,
2091                        "output_tokens": 2,
2092                        "total_tokens": 3
2093                    }
2094                }
2095            }),
2096        ];
2097
2098        let out = collect_events(&events);
2099        let text_deltas: Vec<(usize, String)> = out
2100            .iter()
2101            .filter_map(|event| match event {
2102                StreamEvent::TextDelta {
2103                    content_index,
2104                    delta,
2105                } => Some((*content_index, delta.clone())),
2106                _ => None,
2107            })
2108            .collect();
2109        assert_eq!(
2110            text_deltas,
2111            vec![(0, "Hello".to_string()), (0, " world".to_string())]
2112        );
2113        assert!(matches!(
2114            out.iter().find(|event| matches!(event, StreamEvent::TextEnd { .. })),
2115            Some(StreamEvent::TextEnd {
2116                content_index: 0,
2117                content,
2118            }) if content == "Hello world"
2119        ));
2120    }
2121
2122    #[test]
2123    fn test_stream_materializes_reasoning_summary_part_done_without_deltas() {
2124        let events = vec![
2125            json!({
2126                "type": "response.reasoning_summary_part.done",
2127                "item_id": "rs_done",
2128                "summary_index": 0,
2129                "part": {
2130                    "type": "summary_text",
2131                    "text": "Plan first, answer second."
2132                }
2133            }),
2134            json!({
2135                "type": "response.completed",
2136                "response": {
2137                    "incomplete_details": null,
2138                    "usage": {
2139                        "input_tokens": 1,
2140                        "output_tokens": 1,
2141                        "total_tokens": 2
2142                    }
2143                }
2144            }),
2145        ];
2146
2147        let out = collect_events(&events);
2148        assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
2149        assert!(matches!(
2150            out.get(1),
2151            Some(StreamEvent::ThinkingStart { content_index: 0 })
2152        ));
2153        assert!(matches!(
2154            out.get(2),
2155            Some(StreamEvent::ThinkingDelta {
2156                content_index: 0,
2157                delta,
2158            }) if delta == "Plan first, answer second."
2159        ));
2160        assert!(matches!(
2161            out.iter()
2162                .find(|event| matches!(event, StreamEvent::ThinkingEnd { .. })),
2163            Some(StreamEvent::ThinkingEnd {
2164                content_index: 0,
2165                content,
2166            }) if content == "Plan first, answer second."
2167        ));
2168    }
2169
2170    #[test]
2171    fn test_stream_materializes_reasoning_text_done_without_deltas() {
2172        let events = vec![
2173            json!({
2174                "type": "response.reasoning_text.done",
2175                "item_id": "rt_done",
2176                "content_index": 0,
2177                "text": "Private chain of thought."
2178            }),
2179            json!({
2180                "type": "response.completed",
2181                "response": {
2182                    "incomplete_details": null,
2183                    "usage": {
2184                        "input_tokens": 1,
2185                        "output_tokens": 1,
2186                        "total_tokens": 2
2187                    }
2188                }
2189            }),
2190        ];
2191
2192        let out = collect_events(&events);
2193        assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
2194        assert!(matches!(
2195            out.get(1),
2196            Some(StreamEvent::ThinkingStart { content_index: 0 })
2197        ));
2198        assert!(matches!(
2199            out.get(2),
2200            Some(StreamEvent::ThinkingDelta {
2201                content_index: 0,
2202                delta,
2203            }) if delta == "Private chain of thought."
2204        ));
2205        assert!(matches!(
2206            out.iter()
2207                .find(|event| matches!(event, StreamEvent::ThinkingEnd { .. })),
2208            Some(StreamEvent::ThinkingEnd {
2209                content_index: 0,
2210                content,
2211            }) if content == "Private chain of thought."
2212        ));
2213    }
2214
2215    #[test]
2216    fn test_stream_separates_reasoning_summary_and_reasoning_text_for_same_item() {
2217        let events = vec![
2218            json!({
2219                "type": "response.reasoning_summary_text.delta",
2220                "item_id": "rs_same",
2221                "summary_index": 0,
2222                "delta": "summary"
2223            }),
2224            json!({
2225                "type": "response.reasoning_text.delta",
2226                "item_id": "rs_same",
2227                "content_index": 0,
2228                "delta": "full reasoning"
2229            }),
2230            json!({
2231                "type": "response.completed",
2232                "response": {
2233                    "incomplete_details": null,
2234                    "usage": {
2235                        "input_tokens": 1,
2236                        "output_tokens": 1,
2237                        "total_tokens": 2
2238                    }
2239                }
2240            }),
2241        ];
2242
2243        let out = collect_events(&events);
2244        let thinking_starts: Vec<usize> = out
2245            .iter()
2246            .filter_map(|event| match event {
2247                StreamEvent::ThinkingStart { content_index } => Some(*content_index),
2248                _ => None,
2249            })
2250            .collect();
2251        assert_eq!(thinking_starts, vec![0, 1]);
2252
2253        let thinking_deltas: Vec<(usize, String)> = out
2254            .iter()
2255            .filter_map(|event| match event {
2256                StreamEvent::ThinkingDelta {
2257                    content_index,
2258                    delta,
2259                } => Some((*content_index, delta.clone())),
2260                _ => None,
2261            })
2262            .collect();
2263        assert_eq!(
2264            thinking_deltas,
2265            vec![
2266                (0, "summary".to_string()),
2267                (1, "full reasoning".to_string())
2268            ]
2269        );
2270
2271        let thinking_ends: Vec<(usize, String)> = out
2272            .iter()
2273            .filter_map(|event| match event {
2274                StreamEvent::ThinkingEnd {
2275                    content_index,
2276                    content,
2277                } => Some((*content_index, content.clone())),
2278                _ => None,
2279            })
2280            .collect();
2281        assert_eq!(
2282            thinking_ends,
2283            vec![
2284                (0, "summary".to_string()),
2285                (1, "full reasoning".to_string())
2286            ]
2287        );
2288    }
2289
2290    #[test]
2291    fn test_finish_emits_terminal_block_end_events_in_content_order() {
2292        let runtime = RuntimeBuilder::current_thread()
2293            .build()
2294            .expect("runtime build");
2295        runtime.block_on(async move {
2296            let byte_stream = stream::iter(vec![Ok(Vec::new())]);
2297            let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2298            let mut state = StreamState::new(
2299                event_source,
2300                "gpt-test".to_string(),
2301                "openai-responses".to_string(),
2302                "openai".to_string(),
2303            );
2304
2305            state
2306                .process_event(
2307                    &json!({
2308                        "type": "response.output_text.delta",
2309                        "item_id": "msg_1",
2310                        "content_index": 0,
2311                        "delta": "hello",
2312                    })
2313                    .to_string(),
2314                )
2315                .expect("text delta");
2316            state
2317                .process_event(
2318                    &json!({
2319                        "type": "response.reasoning_summary_text.delta",
2320                        "item_id": "rs_1",
2321                        "summary_index": 0,
2322                        "delta": "think",
2323                    })
2324                    .to_string(),
2325                )
2326                .expect("reasoning delta");
2327            state
2328                .process_event(
2329                    &json!({
2330                        "type": "response.output_text.delta",
2331                        "item_id": "msg_2",
2332                        "content_index": 0,
2333                        "delta": "world",
2334                    })
2335                    .to_string(),
2336                )
2337                .expect("second text delta");
2338
2339            state.finish(None);
2340
2341            let terminal_end_kinds: Vec<(&'static str, usize)> = state
2342                .pending_events
2343                .iter()
2344                .filter_map(|event| match event {
2345                    StreamEvent::TextEnd { content_index, .. } => Some(("text", *content_index)),
2346                    StreamEvent::ThinkingEnd { content_index, .. } => {
2347                        Some(("thinking", *content_index))
2348                    }
2349                    _ => None,
2350                })
2351                .collect();
2352
2353            assert_eq!(
2354                terminal_end_kinds,
2355                vec![("text", 0), ("thinking", 1), ("text", 2)]
2356            );
2357        });
2358    }
2359
2360    #[test]
2361    fn test_open_tool_call_snapshots_sort_by_content_index() {
2362        let runtime = RuntimeBuilder::current_thread()
2363            .build()
2364            .expect("runtime build");
2365        runtime.block_on(async move {
2366            let byte_stream = stream::iter(vec![Ok(Vec::new())]);
2367            let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2368            let mut state = StreamState::new(
2369                event_source,
2370                "gpt-test".to_string(),
2371                "openai-responses".to_string(),
2372                "openai".to_string(),
2373            );
2374
2375            state.tool_calls_by_item_id.insert(
2376                "late".to_string(),
2377                ToolCallState {
2378                    content_index: 4,
2379                    call_id: "call-late".to_string(),
2380                    name: "later".to_string(),
2381                    arguments: "{\"b\":2}".to_string(),
2382                },
2383            );
2384            state.tool_calls_by_item_id.insert(
2385                "early".to_string(),
2386                ToolCallState {
2387                    content_index: 1,
2388                    call_id: "call-early".to_string(),
2389                    name: "earlier".to_string(),
2390                    arguments: "{\"a\":1}".to_string(),
2391                },
2392            );
2393
2394            let ordered_ids: Vec<String> = state
2395                .open_tool_call_snapshots_in_content_order()
2396                .into_iter()
2397                .map(|(_, item_id, ..)| item_id)
2398                .collect();
2399
2400            assert_eq!(ordered_ids, vec!["early".to_string(), "late".to_string()]);
2401        });
2402    }
2403
2404    #[test]
2405    fn test_finish_keeps_open_tool_call_end_in_content_order() {
2406        let runtime = RuntimeBuilder::current_thread()
2407            .build()
2408            .expect("runtime build");
2409        runtime.block_on(async move {
2410            let byte_stream = stream::iter(vec![Ok(Vec::new())]);
2411            let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2412            let mut state = StreamState::new(
2413                event_source,
2414                "gpt-test".to_string(),
2415                "openai-responses".to_string(),
2416                "openai".to_string(),
2417            );
2418
2419            state
2420                .process_event(
2421                    &json!({
2422                        "type": "response.output_text.delta",
2423                        "item_id": "msg_1",
2424                        "content_index": 0,
2425                        "delta": "before",
2426                    })
2427                    .to_string(),
2428                )
2429                .expect("first text delta");
2430            state
2431                .process_event(
2432                    &json!({
2433                        "type": "response.output_item.added",
2434                        "item": {
2435                            "type": "function_call",
2436                            "id": "fc_7",
2437                            "call_id": "call_7",
2438                            "name": "echo",
2439                            "arguments": "{\"text\":\"mid\"}"
2440                        }
2441                    })
2442                    .to_string(),
2443                )
2444                .expect("tool call added");
2445            state
2446                .process_event(
2447                    &json!({
2448                        "type": "response.output_text.delta",
2449                        "item_id": "msg_2",
2450                        "content_index": 0,
2451                        "delta": "after",
2452                    })
2453                    .to_string(),
2454                )
2455                .expect("second text delta");
2456
2457            state.finish(None);
2458
2459            let terminal_event_order: Vec<(&'static str, usize)> = state
2460                .pending_events
2461                .iter()
2462                .filter_map(|event| match event {
2463                    StreamEvent::TextEnd { content_index, .. } => Some(("text", *content_index)),
2464                    StreamEvent::ToolCallEnd { content_index, .. } => {
2465                        Some(("tool", *content_index))
2466                    }
2467                    _ => None,
2468                })
2469                .collect();
2470
2471            assert_eq!(
2472                terminal_event_order,
2473                vec![("text", 0), ("tool", 1), ("text", 2)]
2474            );
2475        });
2476    }
2477
2478    #[test]
2479    fn test_stream_sets_bearer_auth_header() {
2480        let captured = run_stream_and_capture_headers().expect("captured request");
2481        assert_eq!(
2482            captured.headers.get("authorization").map(String::as_str),
2483            Some("Bearer test-openai-key")
2484        );
2485        assert_eq!(
2486            captured.headers.get("accept").map(String::as_str),
2487            Some("text/event-stream, application/x-ndjson, application/ndjson")
2488        );
2489
2490        let body: Value = serde_json::from_str(&captured.body).expect("request body json");
2491        assert_eq!(body["stream"], true);
2492        assert_eq!(body["input"][0]["role"], "user");
2493        assert_eq!(body["input"][0]["content"][0]["type"], "input_text");
2494    }
2495
2496    fn build_test_jwt(account_id: &str) -> String {
2497        let header = base64::engine::general_purpose::URL_SAFE_NO_PAD
2498            .encode(br#"{"alg":"none","typ":"JWT"}"#);
2499        let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(
2500            serde_json::to_vec(&json!({
2501                "https://api.openai.com/auth": {
2502                    "chatgpt_account_id": account_id
2503                }
2504            }))
2505            .expect("payload json"),
2506        );
2507        format!("{header}.{payload}.sig")
2508    }
2509
2510    #[test]
2511    fn test_bearer_token_parser_accepts_case_insensitive_scheme() {
2512        let parsed_bearer = super::bearer_token_from_authorization_header("bEaReR abc.def.ghi");
2513        assert_eq!(parsed_bearer.as_deref(), Some("abc.def.ghi"));
2514        assert!(super::bearer_token_from_authorization_header("Basic abc").is_none());
2515        assert!(super::bearer_token_from_authorization_header("Bearer").is_none());
2516    }
2517
2518    #[test]
2519    fn test_codex_mode_adds_required_headers_with_authorization_override() {
2520        let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
2521        let provider = OpenAIResponsesProvider::new("gpt-4o")
2522            .with_provider_name("openai-codex")
2523            .with_api_name("openai-codex-responses")
2524            .with_codex_mode(true)
2525            .with_base_url(base_url);
2526        let context = Context::owned(
2527            None,
2528            vec![Message::User(crate::model::UserMessage {
2529                content: UserContent::Text("ping".to_string()),
2530                timestamp: 0,
2531            })],
2532            Vec::new(),
2533        );
2534        let test_jwt = build_test_jwt("acct_test_123");
2535        let mut headers = HashMap::new();
2536        headers.insert("Authorization".to_string(), format!("Bearer {test_jwt}"));
2537        let options = StreamOptions {
2538            headers,
2539            session_id: Some("session-abc".to_string()),
2540            ..Default::default()
2541        };
2542
2543        let runtime = RuntimeBuilder::current_thread()
2544            .build()
2545            .expect("runtime build");
2546        runtime.block_on(async {
2547            let mut stream = provider.stream(&context, &options).await.expect("stream");
2548            while let Some(event) = stream.next().await {
2549                if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
2550                    break;
2551                }
2552            }
2553        });
2554
2555        let captured = rx.recv_timeout(Duration::from_secs(2)).expect("captured");
2556        let expected_auth = format!("Bearer {test_jwt}");
2557        assert_eq!(
2558            captured.headers.get("authorization").map(String::as_str),
2559            Some(expected_auth.as_str())
2560        );
2561        assert_eq!(captured.header_count("authorization"), 1);
2562        assert_eq!(
2563            captured.headers.get("user-agent").map(String::as_str),
2564            Some("pi_agent_rust")
2565        );
2566        assert_eq!(captured.header_count("user-agent"), 1);
2567        assert_eq!(
2568            captured
2569                .headers
2570                .get("chatgpt-account-id")
2571                .map(String::as_str),
2572            Some("acct_test_123")
2573        );
2574        assert_eq!(
2575            captured.headers.get("openai-beta").map(String::as_str),
2576            Some("responses=experimental")
2577        );
2578        assert_eq!(
2579            captured.headers.get("session_id").map(String::as_str),
2580            Some("session-abc")
2581        );
2582    }
2583
2584    #[test]
2585    fn test_codex_mode_accepts_compat_authorization_header_without_api_key() {
2586        let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
2587        let test_jwt = build_test_jwt("acct_compat_456");
2588        let mut custom_headers = HashMap::new();
2589        custom_headers.insert("Authorization".to_string(), format!("Bearer {test_jwt}"));
2590        let provider = OpenAIResponsesProvider::new("gpt-4o")
2591            .with_provider_name("openai-codex")
2592            .with_api_name("openai-codex-responses")
2593            .with_codex_mode(true)
2594            .with_base_url(base_url)
2595            .with_compat(Some(CompatConfig {
2596                custom_headers: Some(custom_headers),
2597                ..Default::default()
2598            }));
2599        let context = Context::owned(
2600            None,
2601            vec![Message::User(crate::model::UserMessage {
2602                content: UserContent::Text("ping".to_string()),
2603                timestamp: 0,
2604            })],
2605            Vec::new(),
2606        );
2607
2608        let runtime = RuntimeBuilder::current_thread()
2609            .build()
2610            .expect("runtime build");
2611        runtime.block_on(async {
2612            let mut stream = provider
2613                .stream(&context, &StreamOptions::default())
2614                .await
2615                .expect("stream");
2616            while let Some(event) = stream.next().await {
2617                if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
2618                    break;
2619                }
2620            }
2621        });
2622
2623        let captured = rx.recv_timeout(Duration::from_secs(2)).expect("captured");
2624        assert_eq!(captured.header_count("authorization"), 1);
2625        assert_eq!(
2626            captured.headers.get("authorization").map(String::as_str),
2627            Some(format!("Bearer {test_jwt}").as_str())
2628        );
2629        assert_eq!(
2630            captured
2631                .headers
2632                .get("chatgpt-account-id")
2633                .map(String::as_str),
2634            Some("acct_compat_456")
2635        );
2636    }
2637
2638    #[test]
2639    fn test_stream_accepts_ndjson_content_type() {
2640        let body = concat!(
2641            "{\"type\":\"response.output_text.delta\",\"item_id\":\"msg_1\",\"content_index\":0,\"delta\":\"ok\"}\n",
2642            "{\"type\":\"response.completed\",\"response\":{\"incomplete_details\":null,\"usage\":{\"input_tokens\":1,\"output_tokens\":1,\"total_tokens\":2}}}\n"
2643        );
2644        let (base_url, _rx) = spawn_test_server(200, "application/x-ndjson", body);
2645        let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2646        let context = Context::owned(
2647            None,
2648            vec![Message::User(crate::model::UserMessage {
2649                content: UserContent::Text("ping".to_string()),
2650                timestamp: 0,
2651            })],
2652            Vec::new(),
2653        );
2654        let options = StreamOptions {
2655            api_key: Some("test-openai-key".to_string()),
2656            ..Default::default()
2657        };
2658
2659        let runtime = RuntimeBuilder::current_thread()
2660            .build()
2661            .expect("runtime build");
2662        runtime.block_on(async {
2663            let mut stream = provider.stream(&context, &options).await.expect("stream");
2664            let mut saw_delta = false;
2665            let mut done_usage: Option<Usage> = None;
2666
2667            while let Some(event) = stream.next().await {
2668                let event = event.expect("stream event");
2669                match event {
2670                    StreamEvent::TextDelta { delta, .. } if delta == "ok" => {
2671                        saw_delta = true;
2672                    }
2673                    StreamEvent::Done { message, .. } => {
2674                        let usage = message.usage;
2675                        done_usage = Some(usage);
2676                        break;
2677                    }
2678                    StreamEvent::Error { .. } => {
2679                        panic!("unexpected error while reading NDJSON stream");
2680                    }
2681                    _ => {}
2682                }
2683            }
2684
2685            assert!(saw_delta, "expected NDJSON text delta");
2686            let usage = done_usage.expect("missing Done usage");
2687            assert_eq!(usage.input, 1);
2688            assert_eq!(usage.output, 1);
2689            assert_eq!(usage.total_tokens, 2);
2690        });
2691    }
2692
2693    fn ndjson_body_from_events(events: &[Value]) -> String {
2694        let mut lines = Vec::with_capacity(events.len());
2695        for event in events {
2696            let data = serde_json::to_string(event).expect("serialize event");
2697            lines.push(data);
2698        }
2699        let mut body = lines.join("\n");
2700        body.push('\n');
2701        body
2702    }
2703
2704    fn collect_events(events: &[Value]) -> Vec<StreamEvent> {
2705        let runtime = RuntimeBuilder::current_thread()
2706            .build()
2707            .expect("runtime build");
2708        runtime.block_on(async move {
2709            let byte_stream = stream::iter(events.iter().map(|event| {
2710                let data = serde_json::to_string(event).expect("serialize event");
2711                Ok(format!("data: {data}\n\n").into_bytes())
2712            }));
2713
2714            let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
2715            let mut state = StreamState::new(
2716                event_source,
2717                "gpt-test".to_string(),
2718                "openai-responses".to_string(),
2719                "openai".to_string(),
2720            );
2721
2722            let mut out = Vec::new();
2723            loop {
2724                if let Some(event) = state.pending_events.pop_front() {
2725                    out.push(event);
2726                    continue;
2727                }
2728
2729                if state.finished {
2730                    break;
2731                }
2732
2733                match state.event_source.next().await {
2734                    Some(item) => {
2735                        let msg = item.expect("SSE event");
2736                        if msg.data == "[DONE]" {
2737                            if !state.finish_terminal_response() {
2738                                state.finish(None);
2739                            }
2740                            continue;
2741                        }
2742                        state.process_event(&msg.data).expect("process_event");
2743                    }
2744                    None => {
2745                        assert!(
2746                            state.finish_terminal_response(),
2747                            "stream ended without Done event"
2748                        );
2749                    }
2750                }
2751            }
2752
2753            out
2754        })
2755    }
2756
2757    fn collect_stream_events_from_body(body: &str) -> Vec<StreamEvent> {
2758        let (base_url, _rx) = spawn_test_server(200, "text/event-stream", body);
2759        let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2760        let context = Context::owned(
2761            None,
2762            vec![Message::User(crate::model::UserMessage {
2763                content: UserContent::Text("ping".to_string()),
2764                timestamp: 0,
2765            })],
2766            Vec::new(),
2767        );
2768        let options = StreamOptions {
2769            api_key: Some("test-openai-key".to_string()),
2770            ..Default::default()
2771        };
2772
2773        let runtime = RuntimeBuilder::current_thread()
2774            .build()
2775            .expect("runtime build");
2776        runtime.block_on(async {
2777            let mut stream = provider.stream(&context, &options).await.expect("stream");
2778            let mut out = Vec::new();
2779            while let Some(event) = stream.next().await {
2780                let event = event.expect("stream event");
2781                let is_terminal =
2782                    matches!(event, StreamEvent::Done { .. } | StreamEvent::Error { .. });
2783                out.push(event);
2784                if is_terminal {
2785                    break;
2786                }
2787            }
2788            out
2789        })
2790    }
2791
2792    fn collect_stream_events_from_ndjson_body(body: &str) -> Vec<StreamEvent> {
2793        let (base_url, _rx) = spawn_test_server(200, "application/x-ndjson", body);
2794        let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2795        let context = Context::owned(
2796            None,
2797            vec![Message::User(crate::model::UserMessage {
2798                content: UserContent::Text("ping".to_string()),
2799                timestamp: 0,
2800            })],
2801            Vec::new(),
2802        );
2803        let options = StreamOptions {
2804            api_key: Some("test-openai-key".to_string()),
2805            ..Default::default()
2806        };
2807
2808        let runtime = RuntimeBuilder::current_thread()
2809            .build()
2810            .expect("runtime build");
2811        runtime.block_on(async {
2812            let mut stream = provider.stream(&context, &options).await.expect("stream");
2813            let mut out = Vec::new();
2814            while let Some(event) = stream.next().await {
2815                let event = event.expect("stream event");
2816                let is_terminal =
2817                    matches!(event, StreamEvent::Done { .. } | StreamEvent::Error { .. });
2818                out.push(event);
2819                if is_terminal {
2820                    break;
2821                }
2822            }
2823            out
2824        })
2825    }
2826
2827    #[derive(Debug)]
2828    struct CapturedRequest {
2829        headers: HashMap<String, String>,
2830        header_lines: Vec<(String, String)>,
2831        body: String,
2832    }
2833
2834    impl CapturedRequest {
2835        fn header_count(&self, name: &str) -> usize {
2836            self.header_lines
2837                .iter()
2838                .filter(|(key, _)| key.eq_ignore_ascii_case(name))
2839                .count()
2840        }
2841    }
2842
2843    fn run_stream_and_capture_headers() -> Option<CapturedRequest> {
2844        let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
2845        let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
2846        let context = Context::owned(
2847            None,
2848            vec![Message::User(crate::model::UserMessage {
2849                content: UserContent::Text("ping".to_string()),
2850                timestamp: 0,
2851            })],
2852            Vec::new(),
2853        );
2854        let options = StreamOptions {
2855            api_key: Some("test-openai-key".to_string()),
2856            ..Default::default()
2857        };
2858
2859        let runtime = RuntimeBuilder::current_thread()
2860            .build()
2861            .expect("runtime build");
2862        runtime.block_on(async {
2863            let mut stream = provider.stream(&context, &options).await.expect("stream");
2864            while let Some(event) = stream.next().await {
2865                if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
2866                    break;
2867                }
2868            }
2869        });
2870
2871        rx.recv_timeout(Duration::from_secs(2)).ok()
2872    }
2873
2874    fn success_sse_body() -> String {
2875        [
2876            r#"data: {"type":"response.output_text.delta","item_id":"msg_1","content_index":0,"delta":"ok"}"#,
2877            "",
2878            r#"data: {"type":"response.completed","response":{"incomplete_details":null,"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}"#,
2879            "",
2880        ]
2881        .join("\n")
2882    }
2883
2884    fn spawn_test_server(
2885        status_code: u16,
2886        content_type: &str,
2887        body: &str,
2888    ) -> (String, mpsc::Receiver<CapturedRequest>) {
2889        let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
2890        let addr = listener.local_addr().expect("local addr");
2891        let (tx, rx) = mpsc::channel();
2892        let body = body.to_string();
2893        let content_type = content_type.to_string();
2894
2895        std::thread::spawn(move || {
2896            let (mut socket, _) = listener.accept().expect("accept");
2897            socket
2898                .set_read_timeout(Some(Duration::from_secs(2)))
2899                .expect("set read timeout");
2900
2901            let mut bytes = Vec::new();
2902            let mut chunk = [0_u8; 4096];
2903            loop {
2904                match socket.read(&mut chunk) {
2905                    Ok(0) => break,
2906                    Ok(n) => {
2907                        bytes.extend_from_slice(&chunk[..n]);
2908                        if bytes.windows(4).any(|window| window == b"\r\n\r\n") {
2909                            break;
2910                        }
2911                    }
2912                    Err(err)
2913                        if err.kind() == std::io::ErrorKind::WouldBlock
2914                            || err.kind() == std::io::ErrorKind::TimedOut =>
2915                    {
2916                        break;
2917                    }
2918                    Err(err) => assert!(false, "request header read failed: {err}"),
2919                }
2920            }
2921
2922            let header_end = bytes
2923                .windows(4)
2924                .position(|window| window == b"\r\n\r\n")
2925                .expect("request header boundary");
2926            let header_text = String::from_utf8_lossy(&bytes[..header_end]).to_string();
2927            let (headers, header_lines) = parse_headers(&header_text);
2928            let mut request_body = bytes[header_end + 4..].to_vec();
2929
2930            let content_length = headers
2931                .get("content-length")
2932                .and_then(|value| value.parse::<usize>().ok())
2933                .unwrap_or(0);
2934            while request_body.len() < content_length {
2935                match socket.read(&mut chunk) {
2936                    Ok(0) => break,
2937                    Ok(n) => request_body.extend_from_slice(&chunk[..n]),
2938                    Err(err)
2939                        if err.kind() == std::io::ErrorKind::WouldBlock
2940                            || err.kind() == std::io::ErrorKind::TimedOut =>
2941                    {
2942                        break;
2943                    }
2944                    Err(err) => assert!(false, "request body read failed: {err}"),
2945                }
2946            }
2947
2948            let captured = CapturedRequest {
2949                headers,
2950                header_lines,
2951                body: String::from_utf8_lossy(&request_body).to_string(),
2952            };
2953            tx.send(captured).expect("send captured request");
2954
2955            let reason = match status_code {
2956                401 => "Unauthorized",
2957                500 => "Internal Server Error",
2958                _ => "OK",
2959            };
2960            let response = format!(
2961                "HTTP/1.1 {status_code} {reason}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
2962                body.len()
2963            );
2964            socket
2965                .write_all(response.as_bytes())
2966                .expect("write response");
2967            socket.flush().expect("flush response");
2968        });
2969
2970        (format!("http://{addr}/responses"), rx)
2971    }
2972
2973    fn parse_headers(header_text: &str) -> (HashMap<String, String>, Vec<(String, String)>) {
2974        let mut headers = HashMap::new();
2975        let mut header_lines = Vec::new();
2976        for line in header_text.lines().skip(1) {
2977            if let Some((name, value)) = line.split_once(':') {
2978                let normalized_name = name.trim().to_ascii_lowercase();
2979                let normalized_value = value.trim().to_string();
2980                header_lines.push((normalized_name.clone(), normalized_value.clone()));
2981                headers.insert(normalized_name, normalized_value);
2982            }
2983        }
2984        (headers, header_lines)
2985    }
2986
2987    // ========================================================================
2988    // Fixture-based stream parsing tests
2989    // ========================================================================
2990
2991    #[derive(Debug, Deserialize)]
2992    struct ProviderFixture {
2993        cases: Vec<ProviderCase>,
2994    }
2995
2996    #[derive(Debug, Deserialize)]
2997    struct ProviderCase {
2998        name: String,
2999        events: Vec<Value>,
3000        expected: Vec<EventSummary>,
3001    }
3002
3003    #[derive(Debug, Deserialize, Serialize, PartialEq)]
3004    struct EventSummary {
3005        kind: String,
3006        #[serde(default)]
3007        content_index: Option<usize>,
3008        #[serde(default)]
3009        delta: Option<String>,
3010        #[serde(default)]
3011        content: Option<String>,
3012        #[serde(default)]
3013        reason: Option<String>,
3014    }
3015
3016    fn load_fixture(file_name: &str) -> ProviderFixture {
3017        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
3018            .join("tests/fixtures/provider_responses")
3019            .join(file_name);
3020        let data = std::fs::read_to_string(&path).expect("read fixture file");
3021        serde_json::from_str(&data).expect("parse fixture JSON")
3022    }
3023
3024    fn summarize_event(event: &StreamEvent) -> EventSummary {
3025        match event {
3026            StreamEvent::Start { .. } => EventSummary {
3027                kind: "start".to_string(),
3028                content_index: None,
3029                delta: None,
3030                content: None,
3031                reason: None,
3032            },
3033            StreamEvent::TextStart { content_index, .. } => EventSummary {
3034                kind: "text_start".to_string(),
3035                content_index: Some(*content_index),
3036                delta: None,
3037                content: None,
3038                reason: None,
3039            },
3040            StreamEvent::TextDelta {
3041                content_index,
3042                delta,
3043                ..
3044            } => EventSummary {
3045                kind: "text_delta".to_string(),
3046                content_index: Some(*content_index),
3047                delta: Some(delta.clone()),
3048                content: None,
3049                reason: None,
3050            },
3051            StreamEvent::TextEnd {
3052                content_index,
3053                content,
3054                ..
3055            } => EventSummary {
3056                kind: "text_end".to_string(),
3057                content_index: Some(*content_index),
3058                delta: None,
3059                content: Some(content.clone()),
3060                reason: None,
3061            },
3062            StreamEvent::Done { reason, .. } => EventSummary {
3063                kind: "done".to_string(),
3064                content_index: None,
3065                delta: None,
3066                content: None,
3067                reason: Some(reason_to_string(*reason)),
3068            },
3069            StreamEvent::Error { reason, .. } => EventSummary {
3070                kind: "error".to_string(),
3071                content_index: None,
3072                delta: None,
3073                content: None,
3074                reason: Some(reason_to_string(*reason)),
3075            },
3076            _ => EventSummary {
3077                kind: "other".to_string(),
3078                content_index: None,
3079                delta: None,
3080                content: None,
3081                reason: None,
3082            },
3083        }
3084    }
3085
3086    fn reason_to_string(reason: StopReason) -> String {
3087        match reason {
3088            StopReason::Stop => "stop".to_string(),
3089            StopReason::ToolUse => "tool_use".to_string(),
3090            StopReason::Length => "length".to_string(),
3091            StopReason::Error => "error".to_string(),
3092            StopReason::Aborted => "aborted".to_string(),
3093        }
3094    }
3095
3096    #[test]
3097    fn test_stream_fixtures() {
3098        let fixture = load_fixture("openai_responses_stream.json");
3099        for case in fixture.cases {
3100            let events = collect_events(&case.events);
3101            let summaries: Vec<EventSummary> = events.iter().map(summarize_event).collect();
3102            assert_eq!(summaries, case.expected, "case: {}", case.name);
3103        }
3104    }
3105
3106    #[test]
3107    fn test_stream_ndjson_fixtures() {
3108        let fixture = load_fixture("openai_responses_stream.json");
3109        for case in fixture.cases {
3110            let body = ndjson_body_from_events(&case.events);
3111            let events = collect_stream_events_from_ndjson_body(&body);
3112            let summaries: Vec<EventSummary> = events.iter().map(summarize_event).collect();
3113            assert_eq!(summaries, case.expected, "case: {}", case.name);
3114        }
3115    }
3116
3117    #[test]
3118    fn ndjson_rejects_oversized_event() {
3119        let delta = "x".repeat(128);
3120        let event = json!({
3121            "type": "response.output_text.delta",
3122            "item_id": "msg_1",
3123            "content_index": 0,
3124            "delta": delta,
3125        });
3126        let line = serde_json::to_string(&event).expect("serialize event");
3127        let max = line.len().saturating_sub(1).max(1);
3128        let body = format!("{line}\n");
3129
3130        let runtime = RuntimeBuilder::current_thread()
3131            .build()
3132            .expect("runtime build");
3133        let err = runtime.block_on(async move {
3134            let byte_stream = stream::iter(vec![Ok(body.into_bytes())]);
3135            let mut event_source = NdjsonStream::with_max_event_bytes(byte_stream, max);
3136            match event_source.next().await {
3137                Some(Err(e)) => Some(e),
3138                _ => None,
3139            }
3140        });
3141
3142        let err = err.expect("expected oversized NDJSON error");
3143        assert!(
3144            err.to_string().contains("NDJSON event exceeds"),
3145            "unexpected error: {err}"
3146        );
3147    }
3148
3149    fn ndjson_body_for_deltas(deltas: &[String]) -> String {
3150        let mut lines = Vec::with_capacity(deltas.len() + 1);
3151        for delta in deltas {
3152            let event = json!({
3153                "type": "response.output_text.delta",
3154                "item_id": "msg_1",
3155                "content_index": 0,
3156                "delta": delta,
3157            });
3158            lines.push(serde_json::to_string(&event).expect("serialize event"));
3159        }
3160        let done = json!({
3161            "type": "response.completed",
3162            "response": {
3163                "incomplete_details": null,
3164                "usage": { "input_tokens": 1, "output_tokens": 1, "total_tokens": 2 }
3165            }
3166        });
3167        lines.push(serde_json::to_string(&done).expect("serialize event"));
3168        let mut body = lines.join("\n");
3169        body.push('\n');
3170        body
3171    }
3172
3173    fn chunk_bytes(input: &[u8], chunk_sizes: &[usize]) -> Vec<Vec<u8>> {
3174        if chunk_sizes.is_empty() {
3175            return vec![input.to_vec()];
3176        }
3177        let mut chunks = Vec::new();
3178        let mut offset = 0usize;
3179        let mut idx = 0usize;
3180        while offset < input.len() {
3181            let size = chunk_sizes[idx % chunk_sizes.len()].max(1);
3182            let end = (offset + size).min(input.len());
3183            chunks.push(input[offset..end].to_vec());
3184            offset = end;
3185            idx += 1;
3186        }
3187        chunks
3188    }
3189
3190    fn collect_ndjson_events_from_chunks(body: &str, chunk_sizes: &[usize]) -> Vec<StreamEvent> {
3191        let bytes = body.as_bytes();
3192        let chunks = chunk_bytes(bytes, chunk_sizes);
3193        let runtime = RuntimeBuilder::current_thread()
3194            .build()
3195            .expect("runtime build");
3196        runtime.block_on(async move {
3197            let byte_stream = stream::iter(chunks.into_iter().map(Ok));
3198            let event_source = NdjsonStream::new(byte_stream);
3199            let mut state = StreamState::new(
3200                event_source,
3201                "gpt-test".to_string(),
3202                "openai-responses".to_string(),
3203                "openai".to_string(),
3204            );
3205
3206            let mut out = Vec::new();
3207            loop {
3208                if let Some(event) = state.pending_events.pop_front() {
3209                    out.push(event);
3210                    continue;
3211                }
3212
3213                if state.finished {
3214                    break;
3215                }
3216
3217                match state.event_source.next().await {
3218                    Some(item) => {
3219                        let msg = item.expect("NDJSON event");
3220                        state.process_event(&msg.data).expect("process_event");
3221                    }
3222                    None => {
3223                        assert!(
3224                            state.finish_terminal_response(),
3225                            "stream ended without Done event"
3226                        );
3227                    }
3228                }
3229            }
3230
3231            out
3232        })
3233    }
3234
3235    fn summarize_events(events: &[StreamEvent]) -> Vec<EventSummary> {
3236        events.iter().map(summarize_event).collect()
3237    }
3238
3239    fn delta_strategy() -> impl Strategy<Value = String> {
3240        string_regex("[a-z0-9 ]{1,8}").expect("delta regex")
3241    }
3242
3243    proptest! {
3244        #![proptest_config(ProptestConfig {
3245            cases: 64,
3246            max_shrink_iters: 64,
3247            .. ProptestConfig::default()
3248        })]
3249
3250        #[test]
3251        fn ndjson_chunking_invariant(
3252            deltas in prop::collection::vec(delta_strategy(), 1..6),
3253            chunk_sizes in prop::collection::vec(1usize..16, 0..8),
3254        ) {
3255            let body = ndjson_body_for_deltas(&deltas);
3256            let expected = summarize_events(&collect_ndjson_events_from_chunks(&body, &[]));
3257            let actual = summarize_events(&collect_ndjson_events_from_chunks(&body, &chunk_sizes));
3258            prop_assert_eq!(actual, expected);
3259        }
3260    }
3261}
3262
3263// ============================================================================
3264// Fuzzing support
3265// ============================================================================
3266
3267#[cfg(feature = "fuzzing")]
3268pub mod fuzz {
3269    use super::*;
3270    use futures::stream;
3271    use std::pin::Pin;
3272
3273    type FuzzByteStream =
3274        Pin<Box<futures::stream::Empty<std::result::Result<Vec<u8>, std::io::Error>>>>;
3275    type FuzzStream = crate::sse::SseStream<FuzzByteStream>;
3276
3277    /// Opaque wrapper around the OpenAI Responses stream processor state.
3278    pub struct Processor(StreamState<FuzzStream>);
3279
3280    impl Default for Processor {
3281        fn default() -> Self {
3282            Self::new()
3283        }
3284    }
3285
3286    impl Processor {
3287        /// Create a fresh processor with default state.
3288        pub fn new() -> Self {
3289            let empty = stream::empty::<std::result::Result<Vec<u8>, std::io::Error>>();
3290            let event_source = crate::sse::SseStream::new(Box::pin(empty));
3291            Self(StreamState::new(
3292                event_source,
3293                "gpt-responses-fuzz".into(),
3294                "openai-responses".into(),
3295                "openai".into(),
3296            ))
3297        }
3298
3299        /// Feed one SSE data payload and return any emitted `StreamEvent`s.
3300        pub fn process_event(&mut self, data: &str) -> crate::error::Result<Vec<StreamEvent>> {
3301            self.0.process_event(data)?;
3302            Ok(self.0.pending_events.drain(..).collect())
3303        }
3304    }
3305}