1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
//! Streaming response handler for SSE conversion.
//!
//! Extracts the streaming response conversion logic from `response_body_filter`
//! to improve code organization and single responsibility principle.
use std::sync::Arc;
use tracing::{debug, error, warn};
use crate::convert::{chat_chunk_to_response_events, event_to_sse, ResponseStreamEvent};
use crate::providers::Provider;
use crate::proxy::context_store::{ConversationSnapshot, ConversationStore};
use crate::types::chat_api::{ChatMessage, Content, FunctionCall, MessageRole, ToolCall};
use crate::types::chat_api::ChatStreamChunk;
use crate::util::parse_sse;
use super::context::ProxyContext;
/// Handler for streaming SSE response conversion.
/// Processes Chat API SSE chunks and converts them to Responses API SSE events.
pub struct StreamingResponseHandler<'a> {
/// Reference to proxy context for state access.
ctx: &'a mut ProxyContext,
/// Shared provider handle for transformations.
provider: Option<Arc<dyn Provider>>,
/// Whether to log bodies for debugging.
log_body: bool,
/// Conversation store for persisting completed turns.
conversation_store: Arc<ConversationStore>,
}
impl<'a> StreamingResponseHandler<'a> {
/// Create a new streaming handler.
pub fn new(
ctx: &'a mut ProxyContext,
provider: Option<Arc<dyn Provider>>,
log_body: bool,
conversation_store: Arc<ConversationStore>,
) -> Self {
Self {
ctx,
provider,
log_body,
conversation_store,
}
}
/// Process a single streaming frame/body chunk.
///
/// Parses SSE events from the accumulated response body (starting from last
/// parsed offset), converts each ChatStreamChunk to Responses API events,
/// and returns the combined SSE output.
///
/// Returns `None` if no conversion events were generated.
pub fn process_stream_frame(&mut self) -> Option<String> {
// Use accumulated body for SSE parsing (events may span multiple frames)
// Only parse from the last parsed offset to avoid re-processing events
let text = std::str::from_utf8(&self.ctx.buffers.response_body).unwrap_or("");
let unparsed_text = &text[self.ctx.buffers.stream_body_parsed_offset..];
debug!(
"[STREAM_RAW] is_streaming=true, body={}",
String::from_utf8_lossy(&self.ctx.buffers.response_body)
.chars()
.take(200)
.collect::<String>()
);
let mut converted_chunks: Vec<String> = Vec::new();
// Use SSE utility module to parse only new events
let (events, parse_end_pos) = parse_sse(unparsed_text);
let new_events_count = events.len();
debug!(
"[STREAM_PARSE] Found {} new SSE events (offset={}, parse_end={})",
new_events_count,
self.ctx.buffers.stream_body_parsed_offset,
parse_end_pos
);
for event in events {
// Skip [DONE] marker events - they don't contain JSON
if event.data == "[DONE]" {
continue;
}
// Parse as ChatStreamChunk
match serde_json::from_str::<ChatStreamChunk>(&event.data) {
Ok(chunk) => {
self.ctx.diagnostics.stream_chunks_parsed += 1;
let mut chunk = chunk;
// Apply provider transformation
self.apply_provider_transform(&mut chunk);
// Convert to Response API events
self.convert_chunk_to_events(&mut chunk, &mut converted_chunks);
}
Err(e) => {
debug!("[STREAM_PARSE] Failed to parse JSON: {}", e);
}
}
}
// Update the parse offset to avoid re-parsing on next frame
// Use parse_end_pos (relative to unparsed_text) to calculate absolute position
if new_events_count > 0 {
self.ctx.buffers.stream_body_parsed_offset += parse_end_pos;
}
// Compact parsed prefix periodically to keep streaming memory bounded.
if self.ctx.buffers.stream_body_parsed_offset >= crate::constants::STREAM_PARSE_COMPACT_THRESHOLD {
self.ctx.buffers.response_body.drain(..self.ctx.buffers.stream_body_parsed_offset);
debug!(
"[STREAM_PARSE] compacted parsed prefix bytes={}",
self.ctx.buffers.stream_body_parsed_offset
);
self.ctx.buffers.stream_body_parsed_offset = 0;
}
if !converted_chunks.is_empty() {
Some(converted_chunks.join(""))
} else {
None
}
}
/// Finalize the stream by appending response.completed event.
///
/// Should be called at end_of_body when streaming conversion is enabled.
/// Returns SSE events for the completed response, including [DONE] marker.
pub fn finalize_stream(&mut self) -> Vec<String> {
let mut converted_chunks: Vec<String> = Vec::new();
if let Some(ref mut state) = self.ctx.stream_state
&& !state.emit.is_completed {
if self.ctx.diagnostics.stream_chunks_parsed == 0 {
warn!(
"[STREAM_COMPLETE_SKIP] skip response.completed because no valid upstream chunks were parsed (status={:?}, content_type={:?})",
self.ctx.diagnostics.upstream_status,
self.ctx.diagnostics.upstream_content_type
);
state.emit.is_completed = true;
} else {
let response_obj = state.build_response_object();
if let Some(mut messages) = self.ctx.follow_up.pending_conversation_messages.clone() {
let assistant_tool_calls: Vec<ToolCall> = state
.tool_calls
.completed
.iter()
.map(|tc| ToolCall {
id: tc.call_id.clone(),
tool_type: "function".to_string(),
function: FunctionCall {
name: tc.name.clone(),
arguments: tc.arguments.clone(),
},
})
.collect();
messages.push(ChatMessage {
role: MessageRole::Assistant,
content: Content::String(if state.text.full_text.is_empty() {
state.text.refusal_text.clone()
} else {
state.text.full_text.clone()
}),
name: None,
annotations: None,
tool_calls: if assistant_tool_calls.is_empty() {
None
} else {
Some(assistant_tool_calls)
},
tool_call_id: None,
function_call: None,
refusal: if state.text.refusal_text.is_empty() {
None
} else {
Some(state.text.refusal_text.clone())
},
});
self.conversation_store.insert(
response_obj.id.clone(),
ConversationSnapshot {
instructions: self.ctx.follow_up.pending_instructions.clone(),
messages,
},
);
}
debug!(
"[STREAM_COMPLETE] response_id={}, output_count={}, has_reasoning={}, has_text={}, tool_calls={}, parsed_chunks={}",
response_obj.id,
response_obj.output.len(),
state.emit.is_reasoning_added,
state.emit.is_output_item_added,
state.tool_calls.completed.len(),
self.ctx.diagnostics.stream_chunks_parsed
);
if self.log_body
&& let Ok(json) = serde_json::to_string(&response_obj) {
debug!("[STREAM_COMPLETE_JSON] {}", json);
}
let completed_event = ResponseStreamEvent::Completed {
response: response_obj,
};
let seq = state.take_sequence_number();
let sse_data = event_to_sse(&completed_event, seq);
converted_chunks.push(sse_data);
// Append SSE [DONE] marker to signal stream end
converted_chunks.push("data: [DONE]\n\n".to_string());
state.emit.is_completed = true;
}
}
converted_chunks
}
/// Apply provider-specific transformation to stream chunk.
fn apply_provider_transform(&mut self, chunk: &mut ChatStreamChunk) {
if let Some(ref provider) = self.provider {
provider.transform_stream_chunk(chunk);
}
}
/// Convert a ChatStreamChunk to Response API events.
fn convert_chunk_to_events(
&mut self,
chunk: &mut ChatStreamChunk,
converted_chunks: &mut Vec<String>,
) {
if let Some(ref mut state) = self.ctx.stream_state {
// Update usage from this chunk
state.update_usage(chunk);
match chat_chunk_to_response_events(chunk, state) {
Ok(events) => {
let mut sse_data = String::new();
for event in &events {
let seq = state.take_sequence_number();
sse_data.push_str(&event_to_sse(event, seq));
}
if !sse_data.is_empty() {
debug!("[STREAM_CHUNK] {}", sse_data);
converted_chunks.push(sse_data);
}
}
Err(e) => {
error!("Failed to convert stream chunk: {}", e);
}
}
}
}
}
#[cfg(test)]
mod tests {
// Integration tests for StreamingResponseHandler behavior
// are covered by proxy::filters tests
}