codexia 1.0.1

OpenAI- and Anthropic-compatible local API gateway backed by Codex OAuth.
Documentation
use crate::anthropic::{
    MessageBatchRequest, MessageBatchResult, MessageBatchResultType, error_body,
    from_openai_response_value,
};
use crate::codex::convert::responses_to_codex_request;
use crate::server::handlers::responses::{
    anthropic_responses_request, collect_response_input_items,
};
use axum::http::{HeaderMap, header::HOST};

pub(in crate::server::handlers) fn build_batch_id() -> String {
    format!(
        "msgbatch_{}_{:08x}",
        crate::config::now_unix(),
        rand::random::<u32>()
    )
}

pub(in crate::server::handlers) fn batch_results_url(
    headers: &HeaderMap,
    batch_id: &str,
) -> String {
    let host = headers
        .get(HOST)
        .and_then(|value| value.to_str().ok())
        .unwrap_or("127.0.0.1:14550");
    format!("http://{host}/v1/messages/batches/{batch_id}/results")
}

pub(in crate::server::handlers) async fn run_message_batch_worker(
    batches: crate::server::store::BatchStore,
    token_manager: crate::token::TokenManager,
    codex: crate::codex::client::CodexClient,
    batch_id: String,
    requests: Vec<MessageBatchRequest>,
) {
    let mut requests = std::collections::VecDeque::from(requests);
    while let Some(item) = requests.pop_front() {
        if batches.cancel_requested(&batch_id).await.unwrap_or(false) {
            let mut canceled = vec![MessageBatchResult {
                custom_id: item.custom_id,
                result: MessageBatchResultType::Canceled,
            }];
            canceled.extend(requests.into_iter().map(|pending| MessageBatchResult {
                custom_id: pending.custom_id,
                result: MessageBatchResultType::Canceled,
            }));
            finalize_canceled_batch(&batches, &batch_id, canceled).await;
            return;
        }

        let result = match anthropic_responses_request(&item.params).and_then(|response_request| {
            collect_response_input_items(&response_request, None)
                .map(|input_items| (response_request, input_items))
        }) {
            Ok((response_request, input_items)) => match token_manager.credentials().await {
                Ok(credentials) => match codex
                    .complete_response(
                        responses_to_codex_request(&response_request, &input_items),
                        &credentials,
                    )
                    .await
                {
                    Ok(response) => MessageBatchResult {
                        custom_id: item.custom_id,
                        result: MessageBatchResultType::Succeeded {
                            message: from_openai_response_value(&response, &response_request.model),
                        },
                    },
                    Err(error) => MessageBatchResult {
                        custom_id: item.custom_id,
                        result: MessageBatchResultType::Errored {
                            error: error_body(&error),
                        },
                    },
                },
                Err(error) => MessageBatchResult {
                    custom_id: item.custom_id,
                    result: MessageBatchResultType::Errored {
                        error: error_body(&error),
                    },
                },
            },
            Err(error) => MessageBatchResult {
                custom_id: item.custom_id,
                result: MessageBatchResultType::Errored {
                    error: error_body(&error),
                },
            },
        };

        batches
            .update(&batch_id, move |stored| {
                stored.results.push(result.clone());
                match &result.result {
                    MessageBatchResultType::Succeeded { .. } => {
                        stored.batch.request_counts.succeeded =
                            stored.batch.request_counts.succeeded.saturating_add(1);
                    }
                    MessageBatchResultType::Errored { .. } => {
                        stored.batch.request_counts.errored =
                            stored.batch.request_counts.errored.saturating_add(1);
                    }
                    MessageBatchResultType::Canceled => {
                        stored.batch.request_counts.canceled =
                            stored.batch.request_counts.canceled.saturating_add(1);
                    }
                }
                stored.batch.request_counts.processing =
                    stored.batch.request_counts.processing.saturating_sub(1);
            })
            .await;
    }

    let _ = batches
        .update(&batch_id, |stored| {
            stored.batch.processing_status = "ended";
            stored.batch.ended_at = Some(chrono::Utc::now().to_rfc3339());
            stored.cancel_requested = false;
        })
        .await;
}

async fn finalize_canceled_batch(
    batches: &crate::server::store::BatchStore,
    batch_id: &str,
    canceled_results: Vec<MessageBatchResult>,
) {
    let remaining = u32::try_from(canceled_results.len()).unwrap_or(u32::MAX);
    let _ = batches
        .update(batch_id, move |stored| {
            stored.results.extend(canceled_results);
            stored.batch.request_counts.canceled = stored
                .batch
                .request_counts
                .canceled
                .saturating_add(remaining);
            stored.batch.request_counts.processing = stored
                .batch
                .request_counts
                .processing
                .saturating_sub(remaining);
            stored.batch.processing_status = "ended";
            stored.batch.ended_at = Some(chrono::Utc::now().to_rfc3339());
            stored.cancel_requested = false;
        })
        .await;
}