Skip to main content

codex_convert_proxy/proxy/
streaming_handler.rs

1//! Streaming response handler for SSE conversion.
2//!
3//! Extracts the streaming response conversion logic from `response_body_filter`
4//! to improve code organization and single responsibility principle.
5
6use std::sync::Arc;
7
8use tracing::{debug, error, warn};
9
10use crate::convert::{chat_chunk_to_response_events, event_to_sse, ResponseStreamEvent};
11use crate::providers::Provider;
12use crate::proxy::context_store::{ConversationSnapshot, ConversationStore};
13use crate::types::chat_api::{ChatMessage, Content, FunctionCall, MessageRole, ToolCall};
14use crate::types::chat_api::ChatStreamChunk;
15use crate::util::parse_sse;
16
17use super::context::ProxyContext;
18
19/// Handler for streaming SSE response conversion.
20/// Processes Chat API SSE chunks and converts them to Responses API SSE events.
21pub struct StreamingResponseHandler<'a> {
22    /// Reference to proxy context for state access.
23    ctx: &'a mut ProxyContext,
24    /// Shared provider handle for transformations.
25    provider: Option<Arc<dyn Provider>>,
26    /// Whether to log bodies for debugging.
27    log_body: bool,
28    /// Conversation store for persisting completed turns.
29    conversation_store: Arc<ConversationStore>,
30}
31
32impl<'a> StreamingResponseHandler<'a> {
33    /// Create a new streaming handler.
34    pub fn new(
35        ctx: &'a mut ProxyContext,
36        provider: Option<Arc<dyn Provider>>,
37        log_body: bool,
38        conversation_store: Arc<ConversationStore>,
39    ) -> Self {
40        Self {
41            ctx,
42            provider,
43            log_body,
44            conversation_store,
45        }
46    }
47
48    /// Process a single streaming frame/body chunk.
49    ///
50    /// Parses SSE events from the accumulated response body (starting from last
51    /// parsed offset), converts each ChatStreamChunk to Responses API events,
52    /// and returns the combined SSE output.
53    ///
54    /// Returns `None` if no conversion events were generated.
55    pub fn process_stream_frame(&mut self) -> Option<String> {
56        // Use accumulated body for SSE parsing (events may span multiple frames)
57        // Only parse from the last parsed offset to avoid re-processing events
58        let text = std::str::from_utf8(&self.ctx.buffers.response_body).unwrap_or("");
59        let unparsed_text = &text[self.ctx.buffers.stream_body_parsed_offset..];
60
61        debug!(
62            "[STREAM_RAW] is_streaming=true, body={}",
63            String::from_utf8_lossy(&self.ctx.buffers.response_body)
64                .chars()
65                .take(200)
66                .collect::<String>()
67        );
68
69        let mut converted_chunks: Vec<String> = Vec::new();
70
71        // Use SSE utility module to parse only new events
72        let (events, parse_end_pos) = parse_sse(unparsed_text);
73        let new_events_count = events.len();
74        debug!(
75            "[STREAM_PARSE] Found {} new SSE events (offset={}, parse_end={})",
76            new_events_count,
77            self.ctx.buffers.stream_body_parsed_offset,
78            parse_end_pos
79        );
80
81        for event in events {
82            // Skip [DONE] marker events - they don't contain JSON
83            if event.data == "[DONE]" {
84                continue;
85            }
86
87            // Parse as ChatStreamChunk
88            match serde_json::from_str::<ChatStreamChunk>(&event.data) {
89                Ok(chunk) => {
90                    self.ctx.diagnostics.stream_chunks_parsed += 1;
91                    let mut chunk = chunk;
92
93                    // Apply provider transformation
94                    self.apply_provider_transform(&mut chunk);
95
96                    // Convert to Response API events
97                    self.convert_chunk_to_events(&mut chunk, &mut converted_chunks);
98                }
99                Err(e) => {
100                    debug!("[STREAM_PARSE] Failed to parse JSON: {}", e);
101                }
102            }
103        }
104
105        // Update the parse offset to avoid re-parsing on next frame
106        // Use parse_end_pos (relative to unparsed_text) to calculate absolute position
107        if new_events_count > 0 {
108            self.ctx.buffers.stream_body_parsed_offset += parse_end_pos;
109        }
110
111        // Compact parsed prefix periodically to keep streaming memory bounded.
112        if self.ctx.buffers.stream_body_parsed_offset >= crate::constants::STREAM_PARSE_COMPACT_THRESHOLD {
113            self.ctx.buffers.response_body.drain(..self.ctx.buffers.stream_body_parsed_offset);
114            debug!(
115                "[STREAM_PARSE] compacted parsed prefix bytes={}",
116                self.ctx.buffers.stream_body_parsed_offset
117            );
118            self.ctx.buffers.stream_body_parsed_offset = 0;
119        }
120
121        if !converted_chunks.is_empty() {
122            Some(converted_chunks.join(""))
123        } else {
124            None
125        }
126    }
127
128    /// Finalize the stream by appending response.completed event.
129    ///
130    /// Should be called at end_of_body when streaming conversion is enabled.
131    /// Returns SSE events for the completed response, including [DONE] marker.
132    pub fn finalize_stream(&mut self) -> Vec<String> {
133        let mut converted_chunks: Vec<String> = Vec::new();
134
135        if let Some(ref mut state) = self.ctx.stream_state
136            && !state.emit.is_completed {
137                if self.ctx.diagnostics.stream_chunks_parsed == 0 {
138                    warn!(
139                        "[STREAM_COMPLETE_SKIP] skip response.completed because no valid upstream chunks were parsed (status={:?}, content_type={:?})",
140                        self.ctx.diagnostics.upstream_status,
141                        self.ctx.diagnostics.upstream_content_type
142                    );
143                    state.emit.is_completed = true;
144                } else {
145                    let response_obj = state.build_response_object();
146                    if let Some(mut messages) = self.ctx.follow_up.pending_conversation_messages.clone() {
147                        let assistant_tool_calls: Vec<ToolCall> = state
148                            .tool_calls
149                            .completed
150                            .iter()
151                            .map(|tc| ToolCall {
152                                id: tc.call_id.clone(),
153                                tool_type: "function".to_string(),
154                                function: FunctionCall {
155                                    name: tc.name.clone(),
156                                    arguments: tc.arguments.clone(),
157                                },
158                            })
159                            .collect();
160                        messages.push(ChatMessage {
161                            role: MessageRole::Assistant,
162                            content: Content::String(if state.text.full_text.is_empty() {
163                                state.text.refusal_text.clone()
164                            } else {
165                                state.text.full_text.clone()
166                            }),
167                            name: None,
168                            annotations: None,
169                            tool_calls: if assistant_tool_calls.is_empty() {
170                                None
171                            } else {
172                                Some(assistant_tool_calls)
173                            },
174                            tool_call_id: None,
175                            function_call: None,
176                            refusal: if state.text.refusal_text.is_empty() {
177                                None
178                            } else {
179                                Some(state.text.refusal_text.clone())
180                            },
181                        });
182                        self.conversation_store.insert(
183                            response_obj.id.clone(),
184                            ConversationSnapshot {
185                                instructions: self.ctx.follow_up.pending_instructions.clone(),
186                                messages,
187                            },
188                        );
189                    }
190                    debug!(
191                        "[STREAM_COMPLETE] response_id={}, output_count={}, has_reasoning={}, has_text={}, tool_calls={}, parsed_chunks={}",
192                        response_obj.id,
193                        response_obj.output.len(),
194                        state.emit.is_reasoning_added,
195                        state.emit.is_output_item_added,
196                        state.tool_calls.completed.len(),
197                        self.ctx.diagnostics.stream_chunks_parsed
198                    );
199                    if self.log_body
200                        && let Ok(json) = serde_json::to_string(&response_obj) {
201                            debug!("[STREAM_COMPLETE_JSON] {}", json);
202                        }
203                    let completed_event = ResponseStreamEvent::Completed {
204                        response: response_obj,
205                    };
206                    let seq = state.take_sequence_number();
207                    let sse_data = event_to_sse(&completed_event, seq);
208                    converted_chunks.push(sse_data);
209                    // Append SSE [DONE] marker to signal stream end
210                    converted_chunks.push("data: [DONE]\n\n".to_string());
211                    state.emit.is_completed = true;
212                }
213            }
214
215        converted_chunks
216    }
217
218    /// Apply provider-specific transformation to stream chunk.
219    fn apply_provider_transform(&mut self, chunk: &mut ChatStreamChunk) {
220        if let Some(ref provider) = self.provider {
221            provider.transform_stream_chunk(chunk);
222        }
223    }
224
225    /// Convert a ChatStreamChunk to Response API events.
226    fn convert_chunk_to_events(
227        &mut self,
228        chunk: &mut ChatStreamChunk,
229        converted_chunks: &mut Vec<String>,
230    ) {
231        if let Some(ref mut state) = self.ctx.stream_state {
232            // Update usage from this chunk
233            state.update_usage(chunk);
234
235            match chat_chunk_to_response_events(chunk, state) {
236                Ok(events) => {
237                    let mut sse_data = String::new();
238                    for event in &events {
239                        let seq = state.take_sequence_number();
240                        sse_data.push_str(&event_to_sse(event, seq));
241                    }
242                    if !sse_data.is_empty() {
243                        debug!("[STREAM_CHUNK] {}", sse_data);
244                        converted_chunks.push(sse_data);
245                    }
246                }
247                Err(e) => {
248                    error!("Failed to convert stream chunk: {}", e);
249                }
250            }
251        }
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    // Integration tests for StreamingResponseHandler behavior
258    // are covered by proxy::filters tests
259}