codex-convert-proxy 0.1.2

A high-performance proxy server that converts between different AI API formats
Documentation
//! Streaming response handler for SSE conversion.
//!
//! Extracts the streaming response conversion logic from `response_body_filter`
//! to improve code organization and single responsibility principle.

use std::sync::Arc;

use tracing::{debug, error, warn};

use crate::convert::{chat_chunk_to_response_events, event_to_sse, ResponseStreamEvent};
use crate::providers::Provider;
use crate::proxy::context_store::{ConversationSnapshot, ConversationStore};
use crate::types::chat_api::{ChatMessage, Content, FunctionCall, MessageRole, ToolCall};
use crate::types::chat_api::ChatStreamChunk;
use crate::util::parse_sse;

use super::context::ProxyContext;

/// Handler for streaming SSE response conversion.
/// Processes Chat API SSE chunks and converts them to Responses API SSE events.
pub struct StreamingResponseHandler<'a> {
    /// Reference to proxy context for state access.
    ctx: &'a mut ProxyContext,
    /// Shared provider handle for transformations.
    provider: Option<Arc<dyn Provider>>,
    /// Whether to log bodies for debugging.
    log_body: bool,
    /// Conversation store for persisting completed turns.
    conversation_store: Arc<ConversationStore>,
}

impl<'a> StreamingResponseHandler<'a> {
    /// Create a new streaming handler.
    pub fn new(
        ctx: &'a mut ProxyContext,
        provider: Option<Arc<dyn Provider>>,
        log_body: bool,
        conversation_store: Arc<ConversationStore>,
    ) -> Self {
        Self {
            ctx,
            provider,
            log_body,
            conversation_store,
        }
    }

    /// Process a single streaming frame/body chunk.
    ///
    /// Parses SSE events from the accumulated response body (starting from last
    /// parsed offset), converts each ChatStreamChunk to Responses API events,
    /// and returns the combined SSE output.
    ///
    /// Returns `None` if no conversion events were generated.
    pub fn process_stream_frame(&mut self) -> Option<String> {
        // Use accumulated body for SSE parsing (events may span multiple frames)
        // Only parse from the last parsed offset to avoid re-processing events
        let text = std::str::from_utf8(&self.ctx.buffers.response_body).unwrap_or("");
        let unparsed_text = &text[self.ctx.buffers.stream_body_parsed_offset..];

        debug!(
            "[STREAM_RAW] is_streaming=true, body={}",
            String::from_utf8_lossy(&self.ctx.buffers.response_body)
                .chars()
                .take(200)
                .collect::<String>()
        );

        let mut converted_chunks: Vec<String> = Vec::new();

        // Use SSE utility module to parse only new events
        let (events, parse_end_pos) = parse_sse(unparsed_text);
        let new_events_count = events.len();
        debug!(
            "[STREAM_PARSE] Found {} new SSE events (offset={}, parse_end={})",
            new_events_count,
            self.ctx.buffers.stream_body_parsed_offset,
            parse_end_pos
        );

        for event in events {
            // Skip [DONE] marker events - they don't contain JSON
            if event.data == "[DONE]" {
                continue;
            }

            // Parse as ChatStreamChunk
            match serde_json::from_str::<ChatStreamChunk>(&event.data) {
                Ok(chunk) => {
                    self.ctx.diagnostics.stream_chunks_parsed += 1;
                    let mut chunk = chunk;

                    // Apply provider transformation
                    self.apply_provider_transform(&mut chunk);

                    // Convert to Response API events
                    self.convert_chunk_to_events(&mut chunk, &mut converted_chunks);
                }
                Err(e) => {
                    debug!("[STREAM_PARSE] Failed to parse JSON: {}", e);
                }
            }
        }

        // Update the parse offset to avoid re-parsing on next frame
        // Use parse_end_pos (relative to unparsed_text) to calculate absolute position
        if new_events_count > 0 {
            self.ctx.buffers.stream_body_parsed_offset += parse_end_pos;
        }

        // Compact parsed prefix periodically to keep streaming memory bounded.
        if self.ctx.buffers.stream_body_parsed_offset >= crate::constants::STREAM_PARSE_COMPACT_THRESHOLD {
            self.ctx.buffers.response_body.drain(..self.ctx.buffers.stream_body_parsed_offset);
            debug!(
                "[STREAM_PARSE] compacted parsed prefix bytes={}",
                self.ctx.buffers.stream_body_parsed_offset
            );
            self.ctx.buffers.stream_body_parsed_offset = 0;
        }

        if !converted_chunks.is_empty() {
            Some(converted_chunks.join(""))
        } else {
            None
        }
    }

    /// Finalize the stream by appending response.completed event.
    ///
    /// Should be called at end_of_body when streaming conversion is enabled.
    /// Returns SSE events for the completed response, including [DONE] marker.
    pub fn finalize_stream(&mut self) -> Vec<String> {
        let mut converted_chunks: Vec<String> = Vec::new();

        if let Some(ref mut state) = self.ctx.stream_state
            && !state.emit.is_completed {
                if self.ctx.diagnostics.stream_chunks_parsed == 0 {
                    warn!(
                        "[STREAM_COMPLETE_SKIP] skip response.completed because no valid upstream chunks were parsed (status={:?}, content_type={:?})",
                        self.ctx.diagnostics.upstream_status,
                        self.ctx.diagnostics.upstream_content_type
                    );
                    state.emit.is_completed = true;
                } else {
                    let response_obj = state.build_response_object();
                    if let Some(mut messages) = self.ctx.follow_up.pending_conversation_messages.clone() {
                        let assistant_tool_calls: Vec<ToolCall> = state
                            .tool_calls
                            .completed
                            .iter()
                            .map(|tc| ToolCall {
                                id: tc.call_id.clone(),
                                tool_type: "function".to_string(),
                                function: FunctionCall {
                                    name: tc.name.clone(),
                                    arguments: tc.arguments.clone(),
                                },
                            })
                            .collect();
                        messages.push(ChatMessage {
                            role: MessageRole::Assistant,
                            content: Content::String(if state.text.full_text.is_empty() {
                                state.text.refusal_text.clone()
                            } else {
                                state.text.full_text.clone()
                            }),
                            name: None,
                            annotations: None,
                            tool_calls: if assistant_tool_calls.is_empty() {
                                None
                            } else {
                                Some(assistant_tool_calls)
                            },
                            tool_call_id: None,
                            function_call: None,
                            refusal: if state.text.refusal_text.is_empty() {
                                None
                            } else {
                                Some(state.text.refusal_text.clone())
                            },
                        });
                        self.conversation_store.insert(
                            response_obj.id.clone(),
                            ConversationSnapshot {
                                instructions: self.ctx.follow_up.pending_instructions.clone(),
                                messages,
                            },
                        );
                    }
                    debug!(
                        "[STREAM_COMPLETE] response_id={}, output_count={}, has_reasoning={}, has_text={}, tool_calls={}, parsed_chunks={}",
                        response_obj.id,
                        response_obj.output.len(),
                        state.emit.is_reasoning_added,
                        state.emit.is_output_item_added,
                        state.tool_calls.completed.len(),
                        self.ctx.diagnostics.stream_chunks_parsed
                    );
                    if self.log_body
                        && let Ok(json) = serde_json::to_string(&response_obj) {
                            debug!("[STREAM_COMPLETE_JSON] {}", json);
                        }
                    let completed_event = ResponseStreamEvent::Completed {
                        response: response_obj,
                    };
                    let seq = state.take_sequence_number();
                    let sse_data = event_to_sse(&completed_event, seq);
                    converted_chunks.push(sse_data);
                    // Append SSE [DONE] marker to signal stream end
                    converted_chunks.push("data: [DONE]\n\n".to_string());
                    state.emit.is_completed = true;
                }
            }

        converted_chunks
    }

    /// Apply provider-specific transformation to stream chunk.
    fn apply_provider_transform(&mut self, chunk: &mut ChatStreamChunk) {
        if let Some(ref provider) = self.provider {
            provider.transform_stream_chunk(chunk);
        }
    }

    /// Convert a ChatStreamChunk to Response API events.
    fn convert_chunk_to_events(
        &mut self,
        chunk: &mut ChatStreamChunk,
        converted_chunks: &mut Vec<String>,
    ) {
        if let Some(ref mut state) = self.ctx.stream_state {
            // Update usage from this chunk
            state.update_usage(chunk);

            match chat_chunk_to_response_events(chunk, state) {
                Ok(events) => {
                    let mut sse_data = String::new();
                    for event in &events {
                        let seq = state.take_sequence_number();
                        sse_data.push_str(&event_to_sse(event, seq));
                    }
                    if !sse_data.is_empty() {
                        debug!("[STREAM_CHUNK] {}", sse_data);
                        converted_chunks.push(sse_data);
                    }
                }
                Err(e) => {
                    error!("Failed to convert stream chunk: {}", e);
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    // Integration tests for StreamingResponseHandler behavior
    // are covered by proxy::filters tests
}