use crate::anthropic::{
MessageBatchRequest, MessageBatchResult, MessageBatchResultType, error_body,
from_openai_response_value,
};
use crate::codex::convert::responses_to_codex_request;
use crate::server::handlers::responses::{
anthropic_responses_request, collect_response_input_items,
};
use axum::http::{HeaderMap, header::HOST};
pub(in crate::server::handlers) fn build_batch_id() -> String {
format!(
"msgbatch_{}_{:08x}",
crate::config::now_unix(),
rand::random::<u32>()
)
}
pub(in crate::server::handlers) fn batch_results_url(
headers: &HeaderMap,
batch_id: &str,
) -> String {
let host = headers
.get(HOST)
.and_then(|value| value.to_str().ok())
.unwrap_or("127.0.0.1:14550");
format!("http://{host}/v1/messages/batches/{batch_id}/results")
}
pub(in crate::server::handlers) async fn run_message_batch_worker(
batches: crate::server::store::BatchStore,
token_manager: crate::token::TokenManager,
codex: crate::codex::client::CodexClient,
batch_id: String,
requests: Vec<MessageBatchRequest>,
) {
let mut requests = std::collections::VecDeque::from(requests);
while let Some(item) = requests.pop_front() {
if batches.cancel_requested(&batch_id).await.unwrap_or(false) {
let mut canceled = vec![MessageBatchResult {
custom_id: item.custom_id,
result: MessageBatchResultType::Canceled,
}];
canceled.extend(requests.into_iter().map(|pending| MessageBatchResult {
custom_id: pending.custom_id,
result: MessageBatchResultType::Canceled,
}));
finalize_canceled_batch(&batches, &batch_id, canceled).await;
return;
}
let result = match anthropic_responses_request(&item.params).and_then(|response_request| {
collect_response_input_items(&response_request, None)
.map(|input_items| (response_request, input_items))
}) {
Ok((response_request, input_items)) => match token_manager.credentials().await {
Ok(credentials) => match codex
.complete_response(
responses_to_codex_request(&response_request, &input_items),
&credentials,
)
.await
{
Ok(response) => MessageBatchResult {
custom_id: item.custom_id,
result: MessageBatchResultType::Succeeded {
message: from_openai_response_value(&response, &response_request.model),
},
},
Err(error) => MessageBatchResult {
custom_id: item.custom_id,
result: MessageBatchResultType::Errored {
error: error_body(&error),
},
},
},
Err(error) => MessageBatchResult {
custom_id: item.custom_id,
result: MessageBatchResultType::Errored {
error: error_body(&error),
},
},
},
Err(error) => MessageBatchResult {
custom_id: item.custom_id,
result: MessageBatchResultType::Errored {
error: error_body(&error),
},
},
};
batches
.update(&batch_id, move |stored| {
stored.results.push(result.clone());
match &result.result {
MessageBatchResultType::Succeeded { .. } => {
stored.batch.request_counts.succeeded =
stored.batch.request_counts.succeeded.saturating_add(1);
}
MessageBatchResultType::Errored { .. } => {
stored.batch.request_counts.errored =
stored.batch.request_counts.errored.saturating_add(1);
}
MessageBatchResultType::Canceled => {
stored.batch.request_counts.canceled =
stored.batch.request_counts.canceled.saturating_add(1);
}
}
stored.batch.request_counts.processing =
stored.batch.request_counts.processing.saturating_sub(1);
})
.await;
}
let _ = batches
.update(&batch_id, |stored| {
stored.batch.processing_status = "ended";
stored.batch.ended_at = Some(chrono::Utc::now().to_rfc3339());
stored.cancel_requested = false;
})
.await;
}
async fn finalize_canceled_batch(
batches: &crate::server::store::BatchStore,
batch_id: &str,
canceled_results: Vec<MessageBatchResult>,
) {
let remaining = u32::try_from(canceled_results.len()).unwrap_or(u32::MAX);
let _ = batches
.update(batch_id, move |stored| {
stored.results.extend(canceled_results);
stored.batch.request_counts.canceled = stored
.batch
.request_counts
.canceled
.saturating_add(remaining);
stored.batch.request_counts.processing = stored
.batch
.request_counts
.processing
.saturating_sub(remaining);
stored.batch.processing_status = "ended";
stored.batch.ended_at = Some(chrono::Utc::now().to_rfc3339());
stored.cancel_requested = false;
})
.await;
}