Skip to main content

codex_convert_proxy/proxy/
streaming_handler.rs

1//! Streaming response handler for SSE conversion.
2//!
3//! Extracts the streaming response conversion logic from `response_body_filter`
4//! to improve code organization and single responsibility principle.
5
6use std::sync::Arc;
7
8use tracing::{debug, error, warn};
9
10use crate::convert::{chat_chunk_to_response_events, event_to_sse, ResponseStreamEvent};
11use crate::providers::Provider;
12use crate::proxy::context_store::{ConversationSnapshot, ConversationStore};
13use crate::types::chat_api::{ChatMessage, Content, FunctionCall, MessageRole, ToolCall};
14use crate::types::chat_api::ChatStreamChunk;
15use crate::util::parse_sse;
16
17use super::context::ProxyContext;
18
19/// Handler for streaming SSE response conversion.
20/// Processes Chat API SSE chunks and converts them to Responses API SSE events.
21pub struct StreamingResponseHandler<'a> {
22    /// Reference to proxy context for state access.
23    ctx: &'a mut ProxyContext,
24    /// Provider clone for transformations (owned to avoid lifetime complexity).
25    provider: Option<Box<dyn Provider + Send + Sync>>,
26    /// Whether to log bodies for debugging.
27    log_body: bool,
28    /// Conversation store for persisting completed turns.
29    conversation_store: Arc<ConversationStore>,
30}
31
32impl<'a> StreamingResponseHandler<'a> {
33    /// Create a new streaming handler.
34    pub fn new(
35        ctx: &'a mut ProxyContext,
36        provider: Option<Box<dyn Provider + Send + Sync>>,
37        log_body: bool,
38        conversation_store: Arc<ConversationStore>,
39    ) -> Self {
40        Self {
41            ctx,
42            provider,
43            log_body,
44            conversation_store,
45        }
46    }
47
48    /// Process a single streaming frame/body chunk.
49    ///
50    /// Parses SSE events from the accumulated response body (starting from last
51    /// parsed offset), converts each ChatStreamChunk to Responses API events,
52    /// and returns the combined SSE output.
53    ///
54    /// Returns `None` if no conversion events were generated.
55    pub fn process_stream_frame(&mut self) -> Option<String> {
56        // Use accumulated body for SSE parsing (events may span multiple frames)
57        // Only parse from the last parsed offset to avoid re-processing events
58        let text = std::str::from_utf8(&self.ctx.response_body).unwrap_or("");
59        let unparsed_text = &text[self.ctx.stream_body_parsed_offset..];
60
61        debug!(
62            "[STREAM_RAW] is_streaming=true, body={}",
63            String::from_utf8_lossy(&self.ctx.response_body)
64                .chars()
65                .take(200)
66                .collect::<String>()
67        );
68
69        let mut converted_chunks: Vec<String> = Vec::new();
70
71        // Use SSE utility module to parse only new events
72        let (events, parse_end_pos) = parse_sse(unparsed_text);
73        let new_events_count = events.len();
74        debug!(
75            "[STREAM_PARSE] Found {} new SSE events (offset={}, parse_end={})",
76            new_events_count,
77            self.ctx.stream_body_parsed_offset,
78            parse_end_pos
79        );
80
81        for event in events {
82            // Skip [DONE] marker events - they don't contain JSON
83            if event.data == "[DONE]" {
84                continue;
85            }
86
87            // Parse as ChatStreamChunk
88            match serde_json::from_str::<ChatStreamChunk>(&event.data) {
89                Ok(chunk) => {
90                    self.ctx.stream_chunks_parsed += 1;
91                    let mut chunk = chunk;
92
93                    // Apply provider transformation
94                    self.apply_provider_transform(&mut chunk);
95
96                    // Convert to Response API events
97                    self.convert_chunk_to_events(&mut chunk, &mut converted_chunks);
98                }
99                Err(e) => {
100                    debug!("[STREAM_PARSE] Failed to parse JSON: {}", e);
101                }
102            }
103        }
104
105        // Update the parse offset to avoid re-parsing on next frame
106        // Use parse_end_pos (relative to unparsed_text) to calculate absolute position
107        if new_events_count > 0 {
108            self.ctx.stream_body_parsed_offset += parse_end_pos;
109        }
110
111        // Compact parsed prefix periodically to keep streaming memory bounded.
112        if self.ctx.stream_body_parsed_offset >= crate::constants::STREAM_PARSE_COMPACT_THRESHOLD {
113            self.ctx.response_body.drain(..self.ctx.stream_body_parsed_offset);
114            debug!(
115                "[STREAM_PARSE] compacted parsed prefix bytes={}",
116                self.ctx.stream_body_parsed_offset
117            );
118            self.ctx.stream_body_parsed_offset = 0;
119        }
120
121        if !converted_chunks.is_empty() {
122            Some(converted_chunks.join(""))
123        } else {
124            None
125        }
126    }
127
128    /// Finalize the stream by appending response.completed event.
129    ///
130    /// Should be called at end_of_body when streaming conversion is enabled.
131    /// Returns SSE events for the completed response, including [DONE] marker.
132    pub fn finalize_stream(&mut self) -> Vec<String> {
133        let mut converted_chunks: Vec<String> = Vec::new();
134
135        if let Some(ref mut state) = self.ctx.stream_state
136            && !state.is_completed {
137                if self.ctx.stream_chunks_parsed == 0 {
138                    warn!(
139                        "[STREAM_COMPLETE_SKIP] skip response.completed because no valid upstream chunks were parsed (status={:?}, content_type={:?})",
140                        self.ctx.upstream_status,
141                        self.ctx.upstream_content_type
142                    );
143                    state.is_completed = true;
144                } else {
145                    let response_obj = state.build_response_object();
146                    if let Some(mut messages) = self.ctx.pending_conversation_messages.clone() {
147                        let assistant_tool_calls: Vec<ToolCall> = state
148                            .completed_tool_calls
149                            .iter()
150                            .map(|tc| ToolCall {
151                                id: tc.call_id.clone(),
152                                tool_type: "function".to_string(),
153                                function: FunctionCall {
154                                    name: tc.name.clone(),
155                                    arguments: tc.arguments.clone(),
156                                },
157                            })
158                            .collect();
159                        messages.push(ChatMessage {
160                            role: MessageRole::Assistant,
161                            content: Content::String(if state.full_text.is_empty() {
162                                state.refusal_text.clone()
163                            } else {
164                                state.full_text.clone()
165                            }),
166                            name: None,
167                            annotations: None,
168                            tool_calls: if assistant_tool_calls.is_empty() {
169                                None
170                            } else {
171                                Some(assistant_tool_calls)
172                            },
173                            tool_call_id: None,
174                            function_call: None,
175                            refusal: if state.refusal_text.is_empty() {
176                                None
177                            } else {
178                                Some(state.refusal_text.clone())
179                            },
180                        });
181                        self.conversation_store.insert(
182                            response_obj.id.clone(),
183                            ConversationSnapshot {
184                                instructions: self.ctx.pending_instructions.clone(),
185                                messages,
186                            },
187                        );
188                    }
189                    debug!(
190                        "[STREAM_COMPLETE] response_id={}, output_count={}, has_reasoning={}, has_text={}, tool_calls={}, parsed_chunks={}",
191                        response_obj.id,
192                        response_obj.output.len(),
193                        state.is_reasoning_added,
194                        state.is_output_item_added,
195                        state.completed_tool_calls.len(),
196                        self.ctx.stream_chunks_parsed
197                    );
198                    if self.log_body
199                        && let Ok(json) = serde_json::to_string(&response_obj) {
200                            debug!("[STREAM_COMPLETE_JSON] {}", json);
201                        }
202                    let completed_event = ResponseStreamEvent::Completed {
203                        response: response_obj,
204                    };
205                    let sse_data = event_to_sse(&completed_event);
206                    converted_chunks.push(sse_data);
207                    // Append SSE [DONE] marker to signal stream end
208                    converted_chunks.push("data: [DONE]\n\n".to_string());
209                    state.is_completed = true;
210                }
211            }
212
213        converted_chunks
214    }
215
216    /// Apply provider-specific transformation to stream chunk.
217    fn apply_provider_transform(&mut self, chunk: &mut ChatStreamChunk) {
218        if let Some(ref provider) = self.provider {
219            provider.transform_stream_chunk(chunk);
220        }
221    }
222
223    /// Convert a ChatStreamChunk to Response API events.
224    fn convert_chunk_to_events(
225        &mut self,
226        chunk: &mut ChatStreamChunk,
227        converted_chunks: &mut Vec<String>,
228    ) {
229        if let Some(ref mut state) = self.ctx.stream_state {
230            // Update usage from this chunk
231            state.update_usage(chunk);
232
233            match chat_chunk_to_response_events(chunk, state) {
234                Ok(events) => {
235                    let sse_data: String = events
236                        .iter()
237                        .map(event_to_sse)
238                        .collect();
239                    if !sse_data.is_empty() {
240                        debug!("[STREAM_CHUNK] {}", sse_data);
241                        converted_chunks.push(sse_data);
242                    }
243                }
244                Err(e) => {
245                    error!("Failed to convert stream chunk: {}", e);
246                }
247            }
248        }
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    // Integration tests for StreamingResponseHandler behavior
255    // are covered by proxy::filters tests
256}