omnillm 0.1.5

Production-grade LLM API gateway with multi-key load balancing, per-key rate limiting, circuit breaking, and cost tracking
Documentation
use serde_json::{json, Value};

use crate::types::{LlmStreamEvent, TokenUsage};

use super::super::{ProtocolError, ProviderProtocol, ProviderStreamFrame};
use super::helpers::*;
use super::response::{emit_openai_responses_response, parse_openai_responses_response};

pub(in crate::protocol) fn parse_openai_responses_stream_event(
    frame: &ProviderStreamFrame,
    body: &Value,
) -> Result<Option<LlmStreamEvent>, ProtocolError> {
    let event = frame
        .event
        .clone()
        .or_else(|| body.get("type").and_then(Value::as_str).map(str::to_owned))
        .unwrap_or_default();

    Ok(match event.as_str() {
        "response.created" | "response.in_progress" => Some(LlmStreamEvent::ResponseStarted {
            response_id: body
                .get("response")
                .and_then(|v| v.get("id"))
                .and_then(Value::as_str)
                .map(str::to_owned)
                .or_else(|| body.get("id").and_then(Value::as_str).map(str::to_owned)),
            model: body
                .get("response")
                .and_then(|v| v.get("model"))
                .or_else(|| body.get("model"))
                .and_then(Value::as_str)
                .unwrap_or("unknown")
                .to_string(),
            provider_protocol: ProviderProtocol::OpenAiResponses,
        }),
        "response.output_text.delta" => Some(LlmStreamEvent::TextDelta {
            delta: body
                .get("delta")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string(),
        }),
        "response.function_call_arguments.delta" => Some(LlmStreamEvent::ToolCallDelta {
            call_id: body
                .get("item_id")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string(),
            name: body
                .get("name")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string(),
            delta: body
                .get("delta")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string(),
        }),
        "response.output_item.added" => body
            .get("item")
            .map(parse_openai_responses_single_output_item)
            .transpose()?
            .map(|item| LlmStreamEvent::OutputItemAdded { item }),
        "response.reasoning_summary_text.delta" => Some(LlmStreamEvent::ReasoningDelta {
            delta: body
                .get("delta")
                .and_then(Value::as_str)
                .unwrap_or("")
                .to_string(),
        }),
        "response.completed" => {
            let response = body.get("response").unwrap_or(body);
            Some(LlmStreamEvent::Completed {
                response: parse_openai_responses_response(response)?,
            })
        }
        "response.error" | "error" => Some(LlmStreamEvent::Error {
            message: body
                .get("message")
                .or_else(|| body.get("error").and_then(|value| value.get("message")))
                .and_then(Value::as_str)
                .unwrap_or("unknown error")
                .to_string(),
        }),
        _ => None,
    })
}

pub(in crate::protocol) fn emit_openai_responses_stream_event(
    event: &LlmStreamEvent,
) -> Result<Option<ProviderStreamFrame>, ProtocolError> {
    let frame = match event {
        LlmStreamEvent::ResponseStarted {
            response_id, model, ..
        } => ProviderStreamFrame {
            event: Some("response.created".into()),
            data: serde_json::to_string(&json!({
                "type": "response.created",
                "response": { "id": response_id, "model": model }
            }))?,
        },
        LlmStreamEvent::TextDelta { delta } => ProviderStreamFrame {
            event: Some("response.output_text.delta".into()),
            data: serde_json::to_string(&json!({
                "type": "response.output_text.delta",
                "delta": delta
            }))?,
        },
        LlmStreamEvent::ToolCallDelta {
            call_id,
            name,
            delta,
        } => ProviderStreamFrame {
            event: Some("response.function_call_arguments.delta".into()),
            data: serde_json::to_string(&json!({
                "type": "response.function_call_arguments.delta",
                "item_id": call_id,
                "name": name,
                "delta": delta
            }))?,
        },
        LlmStreamEvent::OutputItemAdded { item } => ProviderStreamFrame {
            event: Some("response.output_item.added".into()),
            data: serde_json::to_string(&json!({
                "type": "response.output_item.added",
                "item": openai_responses_output_item(item.clone())?
            }))?,
        },
        LlmStreamEvent::ReasoningDelta { delta } => ProviderStreamFrame {
            event: Some("response.reasoning_summary_text.delta".into()),
            data: serde_json::to_string(&json!({
                "type": "response.reasoning_summary_text.delta",
                "delta": delta
            }))?,
        },
        LlmStreamEvent::Completed { response } => ProviderStreamFrame {
            event: Some("response.completed".into()),
            data: serde_json::to_string(&json!({
                "type": "response.completed",
                "response": emit_openai_responses_response(response)?
            }))?,
        },
        LlmStreamEvent::Error { message } => ProviderStreamFrame {
            event: Some("response.error".into()),
            data: serde_json::to_string(&json!({ "type": "response.error", "message": message }))?,
        },
        LlmStreamEvent::Usage { .. }
        | LlmStreamEvent::ContentPartAdded { .. }
        | LlmStreamEvent::ToolResult { .. } => return Ok(None),
    };
    Ok(Some(frame))
}

pub(in crate::protocol) fn parse_openai_chat_stream_events(
    body: &Value,
) -> Result<Vec<LlmStreamEvent>, ProtocolError> {
    let mut events = Vec::new();

    if let Some(choice) = body
        .get("choices")
        .and_then(Value::as_array)
        .and_then(|choices| choices.first())
    {
        if let Some(role) = choice
            .get("delta")
            .and_then(|value| value.get("role"))
            .and_then(Value::as_str)
        {
            events.push(LlmStreamEvent::ResponseStarted {
                response_id: body.get("id").and_then(Value::as_str).map(str::to_owned),
                model: body
                    .get("model")
                    .and_then(Value::as_str)
                    .unwrap_or(role)
                    .to_string(),
                provider_protocol: ProviderProtocol::OpenAiChatCompletions,
            });
        }

        if let Some(delta) = choice
            .get("delta")
            .and_then(|value| value.get("content"))
            .and_then(Value::as_str)
            .filter(|delta| !delta.is_empty())
        {
            events.push(LlmStreamEvent::TextDelta {
                delta: delta.to_string(),
            });
        }

        if let Some(tool_calls) = choice
            .get("delta")
            .and_then(|value| value.get("tool_calls"))
            .and_then(Value::as_array)
        {
            for tool_call in tool_calls {
                events.push(LlmStreamEvent::ToolCallDelta {
                    call_id: tool_call
                        .get("id")
                        .and_then(Value::as_str)
                        .unwrap_or("")
                        .to_string(),
                    name: tool_call
                        .get("function")
                        .and_then(|value| value.get("name"))
                        .and_then(Value::as_str)
                        .unwrap_or("")
                        .to_string(),
                    delta: tool_call
                        .get("function")
                        .and_then(|value| value.get("arguments"))
                        .and_then(Value::as_str)
                        .unwrap_or("")
                        .to_string(),
                });
            }
        }
    }

    if let Some(usage) = body.get("usage") {
        events.push(LlmStreamEvent::Usage {
            usage: TokenUsage {
                prompt_tokens: usage
                    .get("prompt_tokens")
                    .and_then(Value::as_u64)
                    .unwrap_or(0) as u32,
                completion_tokens: usage
                    .get("completion_tokens")
                    .and_then(Value::as_u64)
                    .unwrap_or(0) as u32,
                total_tokens: usage
                    .get("total_tokens")
                    .and_then(Value::as_u64)
                    .map(|value| value as u32),
                prompt_cache: parse_openai_prompt_cache_usage(Some(usage)),
            },
        });
    }

    Ok(events)
}

pub(in crate::protocol) fn emit_openai_chat_stream_event(
    event: &LlmStreamEvent,
) -> Result<Option<ProviderStreamFrame>, ProtocolError> {
    let data = match event {
        LlmStreamEvent::ResponseStarted {
            response_id, model, ..
        } => json!({
            "id": response_id,
            "model": model,
            "choices": [{ "index": 0, "delta": { "role": "assistant" } }]
        }),
        LlmStreamEvent::TextDelta { delta } => json!({
            "choices": [{ "index": 0, "delta": { "content": delta } }]
        }),
        LlmStreamEvent::ToolCallDelta {
            call_id,
            name,
            delta,
        } => json!({
            "choices": [{
                "index": 0,
                "delta": {
                    "tool_calls": [{
                        "id": call_id,
                        "type": "function",
                        "function": { "name": name, "arguments": delta }
                    }]
                }
            }]
        }),
        LlmStreamEvent::Usage { usage } => json!({
            "usage": openai_chat_usage_json(usage)
        }),
        LlmStreamEvent::Completed { .. } => {
            return Ok(Some(ProviderStreamFrame {
                event: None,
                data: "[DONE]".into(),
            }))
        }
        _ => return Ok(None),
    };

    Ok(Some(ProviderStreamFrame {
        event: None,
        data: serde_json::to_string(&data)?,
    }))
}