codex_convert_proxy/proxy/
streaming_handler.rs1use std::sync::Arc;
7
8use tracing::{debug, error, warn};
9
10use crate::convert::{chat_chunk_to_response_events, event_to_sse, ResponseStreamEvent};
11use crate::providers::Provider;
12use crate::proxy::context_store::{ConversationSnapshot, ConversationStore};
13use crate::types::chat_api::{ChatMessage, Content, FunctionCall, MessageRole, ToolCall};
14use crate::types::chat_api::ChatStreamChunk;
15use crate::util::parse_sse;
16
17use super::context::ProxyContext;
18
19pub struct StreamingResponseHandler<'a> {
22 ctx: &'a mut ProxyContext,
24 provider: Option<Arc<dyn Provider>>,
26 log_body: bool,
28 conversation_store: Arc<ConversationStore>,
30}
31
32impl<'a> StreamingResponseHandler<'a> {
33 pub fn new(
35 ctx: &'a mut ProxyContext,
36 provider: Option<Arc<dyn Provider>>,
37 log_body: bool,
38 conversation_store: Arc<ConversationStore>,
39 ) -> Self {
40 Self {
41 ctx,
42 provider,
43 log_body,
44 conversation_store,
45 }
46 }
47
48 pub fn process_stream_frame(&mut self) -> Option<String> {
56 let text = std::str::from_utf8(&self.ctx.buffers.response_body).unwrap_or("");
59 let unparsed_text = &text[self.ctx.buffers.stream_body_parsed_offset..];
60
61 debug!(
62 "[STREAM_RAW] is_streaming=true, body={}",
63 String::from_utf8_lossy(&self.ctx.buffers.response_body)
64 .chars()
65 .take(200)
66 .collect::<String>()
67 );
68
69 let mut converted_chunks: Vec<String> = Vec::new();
70
71 let (events, parse_end_pos) = parse_sse(unparsed_text);
73 let new_events_count = events.len();
74 debug!(
75 "[STREAM_PARSE] Found {} new SSE events (offset={}, parse_end={})",
76 new_events_count,
77 self.ctx.buffers.stream_body_parsed_offset,
78 parse_end_pos
79 );
80
81 for event in events {
82 if event.data == "[DONE]" {
84 continue;
85 }
86
87 match serde_json::from_str::<ChatStreamChunk>(&event.data) {
89 Ok(chunk) => {
90 self.ctx.diagnostics.stream_chunks_parsed += 1;
91 let mut chunk = chunk;
92
93 self.apply_provider_transform(&mut chunk);
95
96 self.convert_chunk_to_events(&mut chunk, &mut converted_chunks);
98 }
99 Err(e) => {
100 debug!("[STREAM_PARSE] Failed to parse JSON: {}", e);
101 }
102 }
103 }
104
105 if new_events_count > 0 {
108 self.ctx.buffers.stream_body_parsed_offset += parse_end_pos;
109 }
110
111 if self.ctx.buffers.stream_body_parsed_offset >= crate::constants::STREAM_PARSE_COMPACT_THRESHOLD {
113 self.ctx.buffers.response_body.drain(..self.ctx.buffers.stream_body_parsed_offset);
114 debug!(
115 "[STREAM_PARSE] compacted parsed prefix bytes={}",
116 self.ctx.buffers.stream_body_parsed_offset
117 );
118 self.ctx.buffers.stream_body_parsed_offset = 0;
119 }
120
121 if !converted_chunks.is_empty() {
122 Some(converted_chunks.join(""))
123 } else {
124 None
125 }
126 }
127
128 pub fn finalize_stream(&mut self) -> Vec<String> {
133 let mut converted_chunks: Vec<String> = Vec::new();
134
135 if let Some(ref mut state) = self.ctx.stream_state
136 && !state.emit.is_completed {
137 if self.ctx.diagnostics.stream_chunks_parsed == 0 {
138 warn!(
139 "[STREAM_COMPLETE_SKIP] skip response.completed because no valid upstream chunks were parsed (status={:?}, content_type={:?})",
140 self.ctx.diagnostics.upstream_status,
141 self.ctx.diagnostics.upstream_content_type
142 );
143 state.emit.is_completed = true;
144 } else {
145 let response_obj = state.build_response_object();
146 if let Some(mut messages) = self.ctx.follow_up.pending_conversation_messages.clone() {
147 let assistant_tool_calls: Vec<ToolCall> = state
148 .tool_calls
149 .completed
150 .iter()
151 .map(|tc| ToolCall {
152 id: tc.call_id.clone(),
153 tool_type: "function".to_string(),
154 function: FunctionCall {
155 name: tc.name.clone(),
156 arguments: tc.arguments.clone(),
157 },
158 })
159 .collect();
160 messages.push(ChatMessage {
161 role: MessageRole::Assistant,
162 content: Content::String(if state.text.full_text.is_empty() {
163 state.text.refusal_text.clone()
164 } else {
165 state.text.full_text.clone()
166 }),
167 name: None,
168 annotations: None,
169 tool_calls: if assistant_tool_calls.is_empty() {
170 None
171 } else {
172 Some(assistant_tool_calls)
173 },
174 tool_call_id: None,
175 function_call: None,
176 refusal: if state.text.refusal_text.is_empty() {
177 None
178 } else {
179 Some(state.text.refusal_text.clone())
180 },
181 });
182 self.conversation_store.insert(
183 response_obj.id.clone(),
184 ConversationSnapshot {
185 instructions: self.ctx.follow_up.pending_instructions.clone(),
186 messages,
187 },
188 );
189 }
190 debug!(
191 "[STREAM_COMPLETE] response_id={}, output_count={}, has_reasoning={}, has_text={}, tool_calls={}, parsed_chunks={}",
192 response_obj.id,
193 response_obj.output.len(),
194 state.emit.is_reasoning_added,
195 state.emit.is_output_item_added,
196 state.tool_calls.completed.len(),
197 self.ctx.diagnostics.stream_chunks_parsed
198 );
199 if self.log_body
200 && let Ok(json) = serde_json::to_string(&response_obj) {
201 debug!("[STREAM_COMPLETE_JSON] {}", json);
202 }
203 let completed_event = ResponseStreamEvent::Completed {
204 response: response_obj,
205 };
206 let seq = state.take_sequence_number();
207 let sse_data = event_to_sse(&completed_event, seq);
208 converted_chunks.push(sse_data);
209 converted_chunks.push("data: [DONE]\n\n".to_string());
211 state.emit.is_completed = true;
212 }
213 }
214
215 converted_chunks
216 }
217
218 fn apply_provider_transform(&mut self, chunk: &mut ChatStreamChunk) {
220 if let Some(ref provider) = self.provider {
221 provider.transform_stream_chunk(chunk);
222 }
223 }
224
225 fn convert_chunk_to_events(
227 &mut self,
228 chunk: &mut ChatStreamChunk,
229 converted_chunks: &mut Vec<String>,
230 ) {
231 if let Some(ref mut state) = self.ctx.stream_state {
232 state.update_usage(chunk);
234
235 match chat_chunk_to_response_events(chunk, state) {
236 Ok(events) => {
237 let mut sse_data = String::new();
238 for event in &events {
239 let seq = state.take_sequence_number();
240 sse_data.push_str(&event_to_sse(event, seq));
241 }
242 if !sse_data.is_empty() {
243 debug!("[STREAM_CHUNK] {}", sse_data);
244 converted_chunks.push(sse_data);
245 }
246 }
247 Err(e) => {
248 error!("Failed to convert stream chunk: {}", e);
249 }
250 }
251 }
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 }