codex_convert_proxy/proxy/
streaming_handler.rs1use std::sync::Arc;
7
8use tracing::{debug, error, warn};
9
10use crate::convert::{chat_chunk_to_response_events, event_to_sse, ResponseStreamEvent};
11use crate::providers::Provider;
12use crate::proxy::context_store::{ConversationSnapshot, ConversationStore};
13use crate::types::chat_api::{ChatMessage, Content, FunctionCall, MessageRole, ToolCall};
14use crate::types::chat_api::ChatStreamChunk;
15use crate::util::parse_sse;
16
17use super::context::ProxyContext;
18
19pub struct StreamingResponseHandler<'a> {
22 ctx: &'a mut ProxyContext,
24 provider: Option<Box<dyn Provider + Send + Sync>>,
26 log_body: bool,
28 conversation_store: Arc<ConversationStore>,
30}
31
32impl<'a> StreamingResponseHandler<'a> {
33 pub fn new(
35 ctx: &'a mut ProxyContext,
36 provider: Option<Box<dyn Provider + Send + Sync>>,
37 log_body: bool,
38 conversation_store: Arc<ConversationStore>,
39 ) -> Self {
40 Self {
41 ctx,
42 provider,
43 log_body,
44 conversation_store,
45 }
46 }
47
48 pub fn process_stream_frame(&mut self) -> Option<String> {
56 let text = std::str::from_utf8(&self.ctx.response_body).unwrap_or("");
59 let unparsed_text = &text[self.ctx.stream_body_parsed_offset..];
60
61 debug!(
62 "[STREAM_RAW] is_streaming=true, body={}",
63 String::from_utf8_lossy(&self.ctx.response_body)
64 .chars()
65 .take(200)
66 .collect::<String>()
67 );
68
69 let mut converted_chunks: Vec<String> = Vec::new();
70
71 let (events, parse_end_pos) = parse_sse(unparsed_text);
73 let new_events_count = events.len();
74 debug!(
75 "[STREAM_PARSE] Found {} new SSE events (offset={}, parse_end={})",
76 new_events_count,
77 self.ctx.stream_body_parsed_offset,
78 parse_end_pos
79 );
80
81 for event in events {
82 if event.data == "[DONE]" {
84 continue;
85 }
86
87 match serde_json::from_str::<ChatStreamChunk>(&event.data) {
89 Ok(chunk) => {
90 self.ctx.stream_chunks_parsed += 1;
91 let mut chunk = chunk;
92
93 self.apply_provider_transform(&mut chunk);
95
96 self.convert_chunk_to_events(&mut chunk, &mut converted_chunks);
98 }
99 Err(e) => {
100 debug!("[STREAM_PARSE] Failed to parse JSON: {}", e);
101 }
102 }
103 }
104
105 if new_events_count > 0 {
108 self.ctx.stream_body_parsed_offset += parse_end_pos;
109 }
110
111 if self.ctx.stream_body_parsed_offset >= crate::constants::STREAM_PARSE_COMPACT_THRESHOLD {
113 self.ctx.response_body.drain(..self.ctx.stream_body_parsed_offset);
114 debug!(
115 "[STREAM_PARSE] compacted parsed prefix bytes={}",
116 self.ctx.stream_body_parsed_offset
117 );
118 self.ctx.stream_body_parsed_offset = 0;
119 }
120
121 if !converted_chunks.is_empty() {
122 Some(converted_chunks.join(""))
123 } else {
124 None
125 }
126 }
127
128 pub fn finalize_stream(&mut self) -> Vec<String> {
133 let mut converted_chunks: Vec<String> = Vec::new();
134
135 if let Some(ref mut state) = self.ctx.stream_state
136 && !state.is_completed {
137 if self.ctx.stream_chunks_parsed == 0 {
138 warn!(
139 "[STREAM_COMPLETE_SKIP] skip response.completed because no valid upstream chunks were parsed (status={:?}, content_type={:?})",
140 self.ctx.upstream_status,
141 self.ctx.upstream_content_type
142 );
143 state.is_completed = true;
144 } else {
145 let response_obj = state.build_response_object();
146 if let Some(mut messages) = self.ctx.pending_conversation_messages.clone() {
147 let assistant_tool_calls: Vec<ToolCall> = state
148 .completed_tool_calls
149 .iter()
150 .map(|tc| ToolCall {
151 id: tc.call_id.clone(),
152 tool_type: "function".to_string(),
153 function: FunctionCall {
154 name: tc.name.clone(),
155 arguments: tc.arguments.clone(),
156 },
157 })
158 .collect();
159 messages.push(ChatMessage {
160 role: MessageRole::Assistant,
161 content: Content::String(if state.full_text.is_empty() {
162 state.refusal_text.clone()
163 } else {
164 state.full_text.clone()
165 }),
166 name: None,
167 annotations: None,
168 tool_calls: if assistant_tool_calls.is_empty() {
169 None
170 } else {
171 Some(assistant_tool_calls)
172 },
173 tool_call_id: None,
174 function_call: None,
175 refusal: if state.refusal_text.is_empty() {
176 None
177 } else {
178 Some(state.refusal_text.clone())
179 },
180 });
181 self.conversation_store.insert(
182 response_obj.id.clone(),
183 ConversationSnapshot {
184 instructions: self.ctx.pending_instructions.clone(),
185 messages,
186 },
187 );
188 }
189 debug!(
190 "[STREAM_COMPLETE] response_id={}, output_count={}, has_reasoning={}, has_text={}, tool_calls={}, parsed_chunks={}",
191 response_obj.id,
192 response_obj.output.len(),
193 state.is_reasoning_added,
194 state.is_output_item_added,
195 state.completed_tool_calls.len(),
196 self.ctx.stream_chunks_parsed
197 );
198 if self.log_body
199 && let Ok(json) = serde_json::to_string(&response_obj) {
200 debug!("[STREAM_COMPLETE_JSON] {}", json);
201 }
202 let completed_event = ResponseStreamEvent::Completed {
203 response: response_obj,
204 };
205 let sse_data = event_to_sse(&completed_event);
206 converted_chunks.push(sse_data);
207 converted_chunks.push("data: [DONE]\n\n".to_string());
209 state.is_completed = true;
210 }
211 }
212
213 converted_chunks
214 }
215
216 fn apply_provider_transform(&mut self, chunk: &mut ChatStreamChunk) {
218 if let Some(ref provider) = self.provider {
219 provider.transform_stream_chunk(chunk);
220 }
221 }
222
223 fn convert_chunk_to_events(
225 &mut self,
226 chunk: &mut ChatStreamChunk,
227 converted_chunks: &mut Vec<String>,
228 ) {
229 if let Some(ref mut state) = self.ctx.stream_state {
230 state.update_usage(chunk);
232
233 match chat_chunk_to_response_events(chunk, state) {
234 Ok(events) => {
235 let sse_data: String = events
236 .iter()
237 .map(event_to_sse)
238 .collect();
239 if !sse_data.is_empty() {
240 debug!("[STREAM_CHUNK] {}", sse_data);
241 converted_chunks.push(sse_data);
242 }
243 }
244 Err(e) => {
245 error!("Failed to convert stream chunk: {}", e);
246 }
247 }
248 }
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 }