gephyr 1.16.18

Gephyr is a headless local AI relay/proxy API handling OpenAI, Claude, and Gemini-compatible APIs
Documentation
use super::models::*;
use bytes::Bytes;
use futures::StreamExt;
use serde_json::Value;
use std::collections::HashMap;
pub async fn collect_stream_to_json<S, E>(mut stream: S) -> Result<OpenAIResponse, String>
where
    S: futures::Stream<Item = Result<Bytes, E>> + Unpin,
    E: std::fmt::Display,
{
    let mut response = OpenAIResponse {
        id: "chatcmpl-unknown".to_string(),
        object: "chat.completion".to_string(),
        created: chrono::Utc::now().timestamp() as u64,
        model: "unknown".to_string(),
        choices: Vec::new(),
        usage: None,
    };

    let mut role: Option<String> = None;
    let mut content_parts: Vec<String> = Vec::new();
    let mut reasoning_parts: Vec<String> = Vec::new();
    let mut finish_reason: Option<String> = None;
    let mut tool_calls_map: HashMap<u32, (String, String, String, Vec<String>)> = HashMap::new();

    while let Some(chunk_result) = stream.next().await {
        let chunk = chunk_result.map_err(|e| format!("Stream error: {}", e))?;
        let text = String::from_utf8_lossy(&chunk);

        for line in text.lines() {
            let line = line.trim();
            if line.starts_with("data: ") {
                let data_str = line.trim_start_matches("data: ").trim();
                if data_str == "[DONE]" {
                    continue;
                }

                if let Ok(json) = serde_json::from_str::<Value>(data_str) {
                    if let Some(id) = json.get("id").and_then(|v| v.as_str()) {
                        response.id = id.to_string();
                    }
                    if let Some(model) = json.get("model").and_then(|v| v.as_str()) {
                        response.model = model.to_string();
                    }
                    if let Some(created) = json.get("created").and_then(|v| v.as_u64()) {
                        response.created = created;
                    }
                    if let Some(usage) = json.get("usage") {
                        if let Ok(u) = serde_json::from_value::<OpenAIUsage>(usage.clone()) {
                            response.usage = Some(u);
                        }
                    }
                    if let Some(choices) = json.get("choices").and_then(|v| v.as_array()) {
                        if let Some(choice) = choices.first() {
                            if let Some(delta) = choice.get("delta") {
                                if let Some(r) = delta.get("role").and_then(|v| v.as_str()) {
                                    role = Some(r.to_string());
                                }
                                if let Some(c) = delta.get("content").and_then(|v| v.as_str()) {
                                    content_parts.push(c.to_string());
                                }
                                if let Some(rc) =
                                    delta.get("reasoning_content").and_then(|v| v.as_str())
                                {
                                    reasoning_parts.push(rc.to_string());
                                }
                                if let Some(tcs) =
                                    delta.get("tool_calls").and_then(|v| v.as_array())
                                {
                                    for tc in tcs {
                                        let index =
                                            tc.get("index").and_then(|v| v.as_u64()).unwrap_or(0)
                                                as u32;

                                        let entry =
                                            tool_calls_map.entry(index).or_insert_with(|| {
                                                (
                                                    String::new(),
                                                    String::from("function"),
                                                    String::new(),
                                                    Vec::new(),
                                                )
                                            });

                                        if let Some(id) = tc.get("id").and_then(|v| v.as_str()) {
                                            if !id.is_empty() {
                                                entry.0 = id.to_string();
                                            }
                                        }

                                        if let Some(tc_type) =
                                            tc.get("type").and_then(|v| v.as_str())
                                        {
                                            if !tc_type.is_empty() {
                                                entry.1 = tc_type.to_string();
                                            }
                                        }

                                        if let Some(func) = tc.get("function") {
                                            if let Some(name) =
                                                func.get("name").and_then(|v| v.as_str())
                                            {
                                                if !name.is_empty() {
                                                    entry.2 = name.to_string();
                                                }
                                            }
                                            if let Some(args) =
                                                func.get("arguments").and_then(|v| v.as_str())
                                            {
                                                entry.3.push(args.to_string());
                                            }
                                        }
                                    }
                                }
                            }

                            if let Some(fr) = choice.get("finish_reason").and_then(|v| v.as_str()) {
                                finish_reason = Some(fr.to_string());
                            }
                        }
                    }
                }
            }
        }
    }
    let full_content = content_parts.join("");
    let full_reasoning = if reasoning_parts.is_empty() {
        None
    } else {
        Some(reasoning_parts.join(""))
    };
    let final_tool_calls: Option<Vec<ToolCall>> = if tool_calls_map.is_empty() {
        None
    } else {
        let mut calls: Vec<(u32, ToolCall)> = tool_calls_map
            .into_iter()
            .map(|(index, (id, tc_type, name, args_parts))| {
                (
                    index,
                    ToolCall {
                        id,
                        r#type: tc_type,
                        function: ToolFunction {
                            name,
                            arguments: args_parts.join(""),
                        },
                    },
                )
            })
            .collect();
        calls.sort_by_key(|(index, _)| *index);
        Some(calls.into_iter().map(|(_, tc)| tc).collect())
    };

    let message = OpenAIMessage {
        role: role.unwrap_or("assistant".to_string()),
        content: Some(OpenAIContent::String(full_content)),
        reasoning_content: full_reasoning,
        tool_calls: final_tool_calls,
        tool_call_id: None,
        name: None,
    };

    response.choices.push(Choice {
        index: 0,
        message,
        finish_reason: finish_reason.or(Some("stop".to_string())),
    });

    Ok(response)
}