codexia 0.3.1

OpenAI- and Anthropic-compatible local API gateway backed by Codex OAuth.
Documentation
use crate::{
    Error, Result,
    codex::sse,
    openai::{
        response::Usage,
        types::{FunctionCall, ToolCall},
    },
};
use futures_util::StreamExt;
use reqwest::Response;
use serde_json::Value;

/// Aggregated output built from a streamed Codex response.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct ChatOutput {
    /// Concatenated assistant text collected from deltas or final message items.
    pub text: String,
    /// Function calls emitted by the model during the response.
    pub tool_calls: Vec<ToolCall>,
    /// Token usage reported by the upstream response, when available.
    pub usage: Option<Usage>,
    /// OpenAI-compatible finish reason derived from the terminal response event.
    pub finish_reason: String,
}

/// Consumes a streamed Codex HTTP response and folds its SSE events into one output.
///
/// # Errors
///
/// Returns an error when the SSE stream contains an upstream failure event or
/// cannot be decoded into JSON events.
pub async fn collect_output(response: Response) -> Result<ChatOutput> {
    let mut events = Box::pin(sse::json_events(Box::pin(response.bytes_stream())));
    let mut output = ChatOutput {
        finish_reason: "stop".to_owned(),
        ..ChatOutput::default()
    };

    while let Some(event) = events.next().await {
        let event = event?;
        apply_event(&mut output, &event)?;
        if is_done_event(&event) {
            break;
        }
    }

    if !output.tool_calls.is_empty() {
        "tool_calls".clone_into(&mut output.finish_reason);
    }

    Ok(output)
}

/// Applies one parsed Codex event to an in-progress chat output.
///
/// # Errors
///
/// Returns an error when the event encodes an upstream failure.
pub fn apply_event(output: &mut ChatOutput, event: &Value) -> Result<()> {
    if let Some(message) = event_error(event) {
        return Err(Error::upstream(message));
    }

    if let Some(delta) = text_delta(event) {
        output.text.push_str(&delta);
    }

    // Streaming item completion events are the earliest stable point where
    // function calls can be collected without waiting for the final envelope.
    if matches!(event_type(event), Some("response.output_item.done")) {
        if let Some(item) = event.get("item") {
            apply_output_item(output, item, false);
        }
    }

    if let Some(response) = event.get("response") {
        if let Some(usage) = parse_usage(response.get("usage")) {
            output.usage = Some(usage);
        }
        // Some responses only expose final text in the completed output array,
        // so backfill it if no incremental text deltas were observed.
        if output.text.is_empty() {
            for item in response
                .get("output")
                .and_then(Value::as_array)
                .into_iter()
                .flatten()
            {
                apply_output_item(output, item, true);
            }
        }
    }

    if is_done_event(event) {
        finish_reason(event).clone_into(&mut output.finish_reason);
    }

    Ok(())
}

/// Extracts a text delta from a streaming event, if the event carries one.
#[must_use]
pub fn text_delta(event: &Value) -> Option<String> {
    matches!(event_type(event), Some("response.output_text.delta"))
        .then(|| {
            event
                .get("delta")
                .and_then(Value::as_str)
                .map(str::to_owned)
        })
        .flatten()
}

/// Returns whether an event marks the end of a streamed Codex response.
#[must_use]
pub fn is_done_event(event: &Value) -> bool {
    matches!(
        event_type(event),
        Some("response.completed" | "response.done" | "response.incomplete")
    )
}

/// Maps a terminal Codex event to the finish reason exposed to chat clients.
#[must_use]
pub fn finish_reason(event: &Value) -> &'static str {
    match event_type(event) {
        Some("response.incomplete") => "length",
        _ => "stop",
    }
}

/// Extracts an upstream error message from an event, if the event represents failure.
#[must_use]
pub fn event_error(event: &Value) -> Option<String> {
    match event_type(event) {
        Some("error") => event
            .get("message")
            .or_else(|| event.get("code"))
            .and_then(Value::as_str)
            .map(str::to_owned)
            .or_else(|| Some(event.to_string())),
        Some("response.failed") => event
            .pointer("/response/error/message")
            .and_then(Value::as_str)
            .map(str::to_owned)
            .or_else(|| Some("Codex response failed".to_owned())),
        _ => None,
    }
}

/// Extracts a completed function call from a streaming output-item event.
pub fn event_tool_call(event: &Value) -> Option<ToolCall> {
    matches!(event_type(event), Some("response.output_item.done"))
        .then(|| event.get("item"))
        .flatten()
        .and_then(parse_tool_call)
}

/// Extracts function calls from the final `response.output` payload on a terminal event.
pub fn response_tool_calls(event: &Value) -> Vec<ToolCall> {
    event
        .get("response")
        .and_then(|response| response.get("output"))
        .and_then(Value::as_array)
        .into_iter()
        .flatten()
        .filter_map(parse_tool_call)
        .collect()
}

fn apply_output_item(output: &mut ChatOutput, item: &Value, fill_text: bool) {
    match item.get("type").and_then(Value::as_str) {
        Some("message") if fill_text => {
            let text = item
                .get("content")
                .and_then(Value::as_array)
                .into_iter()
                .flatten()
                .filter_map(output_text)
                .collect::<Vec<_>>()
                .join("");
            output.text.push_str(&text);
        }
        Some("function_call") => {
            if let Some(tool_call) = parse_tool_call(item) {
                if !output.tool_calls.iter().any(|call| call.id == tool_call.id) {
                    output.tool_calls.push(tool_call);
                }
            }
        }
        _ => {}
    }
}

fn output_text(part: &Value) -> Option<&str> {
    match part.get("type").and_then(Value::as_str) {
        Some("output_text") => part.get("text").and_then(Value::as_str),
        Some("refusal") => part.get("refusal").and_then(Value::as_str),
        _ => None,
    }
}

fn parse_tool_call(item: &Value) -> Option<ToolCall> {
    let id = item
        .get("call_id")
        .or_else(|| item.get("id"))?
        .as_str()?
        .to_owned();
    let name = item.get("name")?.as_str()?.to_owned();
    let arguments = item
        .get("arguments")
        .and_then(Value::as_str)
        .unwrap_or("{}")
        .to_owned();

    Some(ToolCall {
        id,
        kind: "function".to_owned(),
        function: FunctionCall { name, arguments },
    })
}

fn parse_usage(value: Option<&Value>) -> Option<Usage> {
    let usage = value?;
    let prompt_tokens = u32::try_from(usage.get("input_tokens")?.as_u64()?).ok()?;
    let completion_tokens = u32::try_from(usage.get("output_tokens")?.as_u64()?).ok()?;
    let total_tokens = usage
        .get("total_tokens")
        .and_then(Value::as_u64)
        .map_or_else(
            || Some(prompt_tokens.saturating_add(completion_tokens)),
            |value| u32::try_from(value).ok(),
        )?;

    Some(Usage {
        prompt_tokens,
        completion_tokens,
        total_tokens,
    })
}

fn event_type(event: &Value) -> Option<&str> {
    event.get("type").and_then(Value::as_str)
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn applies_text_delta_and_usage() {
        let mut output = ChatOutput::default();
        apply_event(
            &mut output,
            &json!({"type": "response.output_text.delta", "delta": "hi"}),
        )
        .unwrap();
        apply_event(
            &mut output,
            &json!({
                "type": "response.completed",
                "response": {"usage": {"input_tokens": 2, "output_tokens": 3, "total_tokens": 5}}
            }),
        )
        .unwrap();

        assert_eq!(output.text, "hi");
        assert_eq!(output.usage.unwrap().total_tokens, 5);
    }

    #[test]
    fn extracts_final_text_when_no_delta_was_seen() {
        let mut output = ChatOutput::default();
        apply_event(
            &mut output,
            &json!({
                "type": "response.completed",
                "response": {
                    "output": [{
                        "type": "message",
                        "content": [{"type": "output_text", "text": "final"}]
                    }]
                }
            }),
        )
        .unwrap();

        assert_eq!(output.text, "final");
    }

    #[test]
    fn extracts_function_call_items() {
        let mut output = ChatOutput::default();
        apply_event(
            &mut output,
            &json!({
                "type": "response.output_item.done",
                "item": {
                    "type": "function_call",
                    "call_id": "call_1",
                    "name": "lookup",
                    "arguments": "{\"q\":\"x\"}"
                }
            }),
        )
        .unwrap();

        assert_eq!(output.tool_calls[0].function.name, "lookup");
    }

    #[test]
    fn extracts_tool_call_from_streaming_item_done_event() {
        let tool_call = event_tool_call(&json!({
            "type": "response.output_item.done",
            "item": {
                "type": "function_call",
                "call_id": "call_1",
                "name": "lookup",
                "arguments": "{\"q\":\"x\"}"
            }
        }))
        .unwrap();

        assert_eq!(tool_call.id, "call_1");
        assert_eq!(tool_call.function.arguments, "{\"q\":\"x\"}");
    }

    #[test]
    fn extracts_tool_calls_from_final_response_output() {
        let tool_calls = response_tool_calls(&json!({
            "type": "response.completed",
            "response": {
                "output": [{
                    "type": "function_call",
                    "call_id": "call_1",
                    "name": "lookup",
                    "arguments": "{}"
                }]
            }
        }));

        assert_eq!(tool_calls.len(), 1);
        assert_eq!(tool_calls[0].function.name, "lookup");
    }
}