llm-connector 0.7.1

Next-generation Rust library for LLM protocol abstraction with native multi-modal support. Supports 12+ providers (OpenAI, Anthropic, Google, Aliyun, Zhipu, Ollama, Tencent, Volcengine, LongCat, Moonshot, DeepSeek, Xiaomi) with clean Protocol/Provider separation, type-safe interface, and universal streaming.
Documentation
//! Server-Sent Events (SSE) streaming utilities
//!
//! This module provides SSE parsing for streaming responses from LLM providers.
//! All major commercial LLM providers (OpenAI, Anthropic, DeepSeek, etc.) use SSE for streaming.

use crate::error::LlmConnectorError;
use futures_util::{Stream, StreamExt};
use std::pin::Pin;

/// Parse Server-Sent Events (SSE) from an HTTP response and yield one JSON string per event.
/// This function handles the SSE format where events are separated by double newlines.
#[inline]
pub fn sse_events(
    response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
    let stream = response.bytes_stream();

    let events_stream = stream
        .scan(String::new(), move |buffer, chunk_result| {
            let mut out: Vec<Result<String, LlmConnectorError>> = Vec::new();
            match chunk_result {
                Ok(chunk) => {
                    let chunk_str = String::from_utf8_lossy(&chunk).replace("\r\n", "\n");
                    buffer.push_str(&chunk_str);

                    // Extract all complete SSE events separated by double-newline
                    while let Some(boundary_idx) = buffer.find("\n\n") {
                        let event_str: String = buffer.drain(..boundary_idx + 2).collect();

                        // Aggregate all `data:` lines
                        let mut data_lines: Vec<String> = Vec::new();
                        for raw_line in event_str.split('\n') {
                            // Trim trailing CR (already normalized), and leading/trailing spaces
                            let line = raw_line.trim_end();
                            if let Some(rest) = line
                                .strip_prefix("data: ")
                                .or_else(|| line.strip_prefix("data:"))
                            {
                                let payload = rest.trim_start();
                                // Skip final marker
                                if payload.trim() == "[DONE]" {
                                    // ignore terminal marker
                                    continue;
                                }
                                // Collect payload line (may span multiple `data:` lines)
                                if !payload.is_empty() {
                                    data_lines.push(payload.to_string());
                                }
                            }
                        }

                        if !data_lines.is_empty() {
                            // Per SSE spec, multiple data lines are joined with `\n`
                            out.push(Ok(data_lines.join("\n")));
                        }
                    }
                }
                Err(e) => {
                    out.push(Err(LlmConnectorError::NetworkError(e.to_string())));
                }
            }
            std::future::ready(Some(out))
        })
        .flat_map(futures_util::stream::iter);

    Box::pin(events_stream)
}

/// Parse line-delimited JSON events (non-SSE) from an HTTP response.
/// Each line is treated as a standalone JSON payload.
#[inline]
pub fn json_lines_events(
    response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
    let stream = response.bytes_stream();

    let events_stream = stream
        .scan(String::new(), move |buffer, chunk_result| {
            let mut out: Vec<Result<String, LlmConnectorError>> = Vec::new();
            match chunk_result {
                Ok(chunk) => {
                    let chunk_str = String::from_utf8_lossy(&chunk).replace("\r\n", "\n");
                    buffer.push_str(&chunk_str);

                    // Extract complete lines
                    while let Some(boundary_idx) = buffer.find('\n') {
                        let line: String = buffer.drain(..boundary_idx + 1).collect();
                        let trimmed = line.trim();
                        if trimmed.is_empty() {
                            continue;
                        }
                        if trimmed == "[DONE]" {
                            continue;
                        }
                        out.push(Ok(trimmed.to_string()));
                    }
                }
                Err(e) => {
                    out.push(Err(LlmConnectorError::NetworkError(e.to_string())));
                }
            }
            std::future::ready(Some(out))
        })
        .flat_map(futures_util::stream::iter);

    Box::pin(events_stream)
}

/// Convert SSE string stream to StreamingResponse stream
///
/// This function handles tool_calls accumulation for streaming responses.
/// In OpenAI's streaming API, tool_calls are sent incrementally across multiple chunks,
/// and need to be accumulated by index.
#[cfg(feature = "streaming")]
pub fn sse_to_streaming_response(response: reqwest::Response) -> crate::types::ChatStream {
    use crate::types::{StreamingResponse, ToolCall};
    use futures_util::StreamExt;
    use std::collections::HashMap;

    let string_stream = sse_events(response);

    // State for accumulating tool_calls across chunks
    let response_stream = string_stream.scan(
        HashMap::<usize, ToolCall>::new(),
        |accumulated_tool_calls, result| {
            let processed = result.and_then(|json_str| {
                // Try to parse as StreamingResponse
                let mut streaming_response = serde_json::from_str::<StreamingResponse>(&json_str)
                    .map_err(|e| {
                    crate::error::LlmConnectorError::ParseError(format!(
                        "Failed to parse streaming response: {}",
                        e
                    ))
                })?;

                // 🔧 Fix: Populate the convenience `content` field from choices[0].delta
                // This is critical for Volcengine and other OpenAI-compatible providers
                //
                // Priority order:
                // 1. delta.content (standard OpenAI format)
                // 2. delta.reasoning_content (Volcengine Doubao-Seed-Code, DeepSeek R1)
                // 3. delta.reasoning (Qwen, DeepSeek)
                // 4. delta.thought (OpenAI o1)
                // 5. delta.thinking (Anthropic)
                if streaming_response.content.is_empty()
                    && let Some(choice) = streaming_response.choices.first()
                {
                    let content_to_use = choice
                        .delta
                        .content
                        .as_ref()
                        .filter(|s| !s.is_empty())
                        .or(choice.delta.reasoning_content.as_ref())
                        .or(choice.delta.reasoning.as_ref())
                        .or(choice.delta.thought.as_ref())
                        .or(choice.delta.thinking.as_ref());

                    if let Some(content) = content_to_use {
                        streaming_response.content = content.clone();
                    }
                }

                // 🔧 Fix: Accumulate tool_calls across chunks
                // OpenAI streaming API sends tool_calls incrementally with an index field
                if let Some(choice) = streaming_response.choices.first_mut()
                    && let Some(delta_tool_calls) = &choice.delta.tool_calls
                {
                    for delta_call in delta_tool_calls {
                        let index = delta_call.index.unwrap_or(0);

                        accumulated_tool_calls
                            .entry(index)
                            .and_modify(|existing| existing.merge_delta(delta_call))
                            .or_insert_with(|| delta_call.clone());
                    }

                    // Replace delta.tool_calls with accumulated complete tool_calls
                    // Only include complete tool_calls (have id, type, and name)
                    let complete_calls: Vec<ToolCall> = accumulated_tool_calls
                        .values()
                        .filter(|call| call.is_complete())
                        .cloned()
                        .collect();

                    if !complete_calls.is_empty() {
                        choice.delta.tool_calls = Some(complete_calls);
                    } else {
                        // Don't send incomplete tool_calls to avoid duplicate execution
                        choice.delta.tool_calls = None;
                    }
                }

                Ok(streaming_response)
            });

            std::future::ready(Some(processed))
        },
    );

    Box::pin(response_stream)
}

#[cfg(test)]
mod tests {
    #[test]
    #[cfg(feature = "streaming")]
    fn test_streaming_response_content_population() {
        use crate::types::StreamingResponse;

        // Test 1: Standard OpenAI format with content
        let json_standard = r#"{
            "id": "test-1",
            "object": "chat.completion.chunk",
            "created": 1234567890,
            "model": "gpt-4",
            "choices": [{
                "index": 0,
                "delta": {
                    "role": "assistant",
                    "content": "Hello world"
                },
                "finish_reason": null
            }]
        }"#;

        let mut response: StreamingResponse = serde_json::from_str(json_standard).unwrap();

        // Simulate the content population logic
        if response.content.is_empty()
            && let Some(choice) = response.choices.first()
        {
            let content_to_use = choice
                .delta
                .content
                .as_ref()
                .filter(|s| !s.is_empty())
                .or(choice.delta.reasoning_content.as_ref())
                .or(choice.delta.reasoning.as_ref())
                .or(choice.delta.thought.as_ref())
                .or(choice.delta.thinking.as_ref());

            if let Some(content) = content_to_use {
                response.content = content.clone();
            }
        }

        assert_eq!(response.content, "Hello world");

        // Test 2: Volcengine Doubao-Seed-Code format with reasoning_content
        let json_volcengine = r#"{
            "id": "test-2",
            "object": "chat.completion.chunk",
            "created": 1234567890,
            "model": "doubao-seed-code",
            "choices": [{
                "index": 0,
                "delta": {
                    "role": "assistant",
                    "content": "",
                    "reasoning_content": "I am Doubao"
                },
                "finish_reason": null
            }]
        }"#;

        let mut response: StreamingResponse = serde_json::from_str(json_volcengine).unwrap();

        // Simulate the content population logic
        if response.content.is_empty()
            && let Some(choice) = response.choices.first()
        {
            let content_to_use = choice
                .delta
                .content
                .as_ref()
                .filter(|s| !s.is_empty())
                .or(choice.delta.reasoning_content.as_ref())
                .or(choice.delta.reasoning.as_ref())
                .or(choice.delta.thought.as_ref())
                .or(choice.delta.thinking.as_ref());

            if let Some(content) = content_to_use {
                response.content = content.clone();
            }
        }

        assert_eq!(response.content, "I am Doubao");

        // Test 3: DeepSeek format with reasoning
        let json_deepseek = r#"{
            "id": "test-3",
            "object": "chat.completion.chunk",
            "created": 1234567890,
            "model": "deepseek-r1",
            "choices": [{
                "index": 0,
                "delta": {
                    "role": "assistant",
                    "content": "",
                    "reasoning": "Let me think..."
                },
                "finish_reason": null
            }]
        }"#;

        let mut response: StreamingResponse = serde_json::from_str(json_deepseek).unwrap();

        // Simulate the content population logic
        if response.content.is_empty()
            && let Some(choice) = response.choices.first()
        {
            let content_to_use = choice
                .delta
                .content
                .as_ref()
                .filter(|s| !s.is_empty())
                .or(choice.delta.reasoning_content.as_ref())
                .or(choice.delta.reasoning.as_ref())
                .or(choice.delta.thought.as_ref())
                .or(choice.delta.thinking.as_ref());

            if let Some(content) = content_to_use {
                response.content = content.clone();
            }
        }

        assert_eq!(response.content, "Let me think...");
    }

    #[test]
    fn test_sse_event_parsing() {
        // Test SSE format parsing
        let sse_data = "data: {\"test\": \"value\"}\n\n";
        assert!(sse_data.starts_with("data: "));

        let sse_done = "data: [DONE]\n\n";
        assert!(sse_done.contains("[DONE]"));
    }
}