use anyllm_proxy::admin;
use anyllm_proxy::config::{self, Config, MultiConfig};
use anyllm_proxy::server::routes;
use dashmap::DashMap;
use reqwest::{multipart, Client};
use std::sync::{Arc, OnceLock};
static TEST_VK_MAP: OnceLock<Arc<DashMap<[u8; 32], admin::keys::VirtualKeyMeta>>> = OnceLock::new();
static TEST_HMAC_SECRET: OnceLock<Arc<Vec<u8>>> = OnceLock::new();
fn shared_vk_map() -> Arc<DashMap<[u8; 32], admin::keys::VirtualKeyMeta>> {
TEST_VK_MAP
.get_or_init(|| {
let map = Arc::new(DashMap::new());
anyllm_proxy::server::middleware::set_virtual_keys(map.clone());
map
})
.clone()
}
fn shared_hmac_secret() -> Arc<Vec<u8>> {
TEST_HMAC_SECRET
.get_or_init(|| {
let secret = Arc::new(b"batch-api-test-hmac-secret".to_vec());
anyllm_proxy::server::middleware::set_hmac_secret(secret.clone());
secret
})
.clone()
}
fn insert_test_virtual_key(raw_key: &str, key_id: i64) {
let hash = admin::keys::hmac_hash_key(raw_key, &shared_hmac_secret());
let hash_bytes = admin::keys::hash_from_hex(&hash).unwrap();
shared_vk_map().insert(
hash_bytes,
admin::keys::VirtualKeyMeta {
id: key_id,
description: Some("batch-api-tenant-scope-test".to_string()),
expires_at: None,
rpm_limit: None,
tpm_limit: None,
rate_state: Arc::new(admin::keys::RateLimitState::new()),
role: admin::keys::KeyRole::Developer,
max_budget_usd: None,
budget_duration: None,
period_start: None,
period_spend_usd: 0.0,
allowed_models: None,
allowed_routes: None,
},
);
}
fn test_config() -> Config {
Config {
backend: config::BackendKind::OpenAI,
openai_api_key: "test-key".to_string(),
openai_base_url: "https://api.openai.com".to_string(),
listen_port: 0,
model_mapping: config::ModelMapping {
big_model: "gpt-4o".into(),
small_model: "gpt-4o-mini".into(),
},
tls: config::TlsConfig::default(),
backend_auth: config::BackendAuth::BearerToken("test-key".into()),
log_bodies: false,
expose_degradation_warnings: false,
openai_api_format: config::OpenAIApiFormat::Chat,
provider_id: None,
}
}
async fn make_test_batch_engine() -> std::sync::Arc<
anyllm_batch_engine::BatchEngine<
anyllm_batch_engine::queue::sqlite::SqliteQueue,
anyllm_batch_engine::webhook::sqlite::SqliteWebhookQueue,
>,
> {
use anyllm_batch_engine::{
db::init_batch_engine_tables, file_store::FileStore, queue::sqlite::SqliteQueue,
webhook::sqlite::SqliteWebhookQueue, BatchEngine,
};
let conn = rusqlite::Connection::open_in_memory().unwrap();
init_batch_engine_tables(&conn).unwrap();
let db = std::sync::Arc::new(std::sync::Mutex::new(conn));
std::sync::Arc::new(BatchEngine {
queue: std::sync::Arc::new(SqliteQueue::new(db.clone())),
file_store: FileStore::new(db.clone()),
webhook_queue: std::sync::Arc::new(SqliteWebhookQueue::new(db)),
global_webhook_urls: vec![],
webhook_signing_secret: None,
})
}
async fn spawn_test_server_with_shared() -> String {
std::env::set_var("PROXY_OPEN_RELAY", "true");
let config = test_config();
let multi = MultiConfig::from_single_config(&config);
let mut shared = admin::state::SharedState::new_for_test();
shared.virtual_keys = shared_vk_map();
shared.hmac_secret = shared_hmac_secret();
let engine = make_test_batch_engine().await;
let app = routes::app_multi_with_shared(multi, Some(shared), None, None, Some(engine), None);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
format!("http://{addr}")
}
fn valid_jsonl() -> &'static str {
r#"{"custom_id": "req-1", "body": {"model": "gpt-4o", "messages": [{"role": "user", "content": "Hello"}]}}
{"custom_id": "req-2", "body": {"model": "gpt-4o", "messages": [{"role": "user", "content": "World"}]}}"#
}
fn valid_batch_form() -> multipart::Form {
multipart::Form::new().text("purpose", "batch").part(
"file",
multipart::Part::bytes(valid_jsonl().as_bytes().to_vec())
.file_name("test.jsonl")
.mime_str("application/jsonl")
.unwrap(),
)
}
async fn upload_batch_file(client: &Client, base: &str, api_key: &str) -> String {
let resp = client
.post(format!("{base}/v1/files"))
.header("x-api-key", api_key)
.multipart(valid_batch_form())
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let file_obj: serde_json::Value = resp.json().await.unwrap();
file_obj["id"].as_str().unwrap().to_string()
}
async fn create_batch(client: &Client, base: &str, api_key: &str, file_id: &str) -> String {
let resp = client
.post(format!("{base}/v1/batches"))
.header("x-api-key", api_key)
.json(&serde_json::json!({
"input_file_id": file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let batch_obj: serde_json::Value = resp.json().await.unwrap();
batch_obj["id"].as_str().unwrap().to_string()
}
#[tokio::test]
async fn upload_file_and_create_batch() {
let base = spawn_test_server_with_shared().await;
let client = Client::new();
let form = multipart::Form::new().text("purpose", "batch").part(
"file",
multipart::Part::bytes(valid_jsonl().as_bytes().to_vec())
.file_name("test.jsonl")
.mime_str("application/jsonl")
.unwrap(),
);
let resp = client
.post(format!("{base}/v1/files"))
.header("x-api-key", "test")
.multipart(form)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let file_obj: serde_json::Value = resp.json().await.unwrap();
assert_eq!(file_obj["object"], "file");
assert_eq!(file_obj["purpose"], "batch");
assert!(file_obj["id"].as_str().unwrap().starts_with("file-"));
let file_id = file_obj["id"].as_str().unwrap().to_string();
let resp = client
.post(format!("{base}/v1/batches"))
.header("x-api-key", "test")
.json(&serde_json::json!({
"input_file_id": file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let batch_obj: serde_json::Value = resp.json().await.unwrap();
assert_eq!(batch_obj["object"], "batch");
assert!(batch_obj["id"].as_str().unwrap().starts_with("batch"));
assert_eq!(batch_obj["status"], "validating");
assert_eq!(batch_obj["input_file_id"], file_id);
assert_eq!(batch_obj["request_counts"]["total"], 2);
let batch_id = batch_obj["id"].as_str().unwrap().to_string();
let resp = client
.get(format!("{base}/v1/batches/{batch_id}"))
.header("x-api-key", "test")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let fetched: serde_json::Value = resp.json().await.unwrap();
assert_eq!(fetched["id"], batch_id);
let resp = client
.get(format!("{base}/v1/batches"))
.header("x-api-key", "test")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let list: serde_json::Value = resp.json().await.unwrap();
assert_eq!(list["object"], "list");
assert!(!list["data"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn virtual_key_batch_routes_are_tenant_scoped() {
let key_a = "sk-vk-batch-tenant-a";
let key_b = "sk-vk-batch-tenant-b";
insert_test_virtual_key(key_a, 101);
insert_test_virtual_key(key_b, 202);
let base = spawn_test_server_with_shared().await;
let client = Client::new();
let file_a = upload_batch_file(&client, &base, key_a).await;
let file_b = upload_batch_file(&client, &base, key_b).await;
let batch_a = create_batch(&client, &base, key_a, &file_a).await;
let batch_b = create_batch(&client, &base, key_b, &file_b).await;
let list_a: serde_json::Value = client
.get(format!("{base}/v1/batches"))
.header("x-api-key", key_a)
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let ids_a: Vec<&str> = list_a["data"]
.as_array()
.unwrap()
.iter()
.filter_map(|job| job["id"].as_str())
.collect();
assert!(ids_a.contains(&batch_a.as_str()));
assert!(!ids_a.contains(&batch_b.as_str()));
let list_b: serde_json::Value = client
.get(format!("{base}/v1/batches"))
.header("x-api-key", key_b)
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let ids_b: Vec<&str> = list_b["data"]
.as_array()
.unwrap()
.iter()
.filter_map(|job| job["id"].as_str())
.collect();
assert!(ids_b.contains(&batch_b.as_str()));
assert!(!ids_b.contains(&batch_a.as_str()));
let cross_create = client
.post(format!("{base}/v1/batches"))
.header("x-api-key", key_a)
.json(&serde_json::json!({
"input_file_id": file_b,
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}))
.send()
.await
.unwrap();
assert_eq!(cross_create.status(), 400);
let cross_get = client
.get(format!("{base}/v1/batches/{batch_b}"))
.header("x-api-key", key_a)
.send()
.await
.unwrap();
assert_eq!(cross_get.status(), 404);
let cross_cancel = client
.post(format!("{base}/v1/batches/{batch_b}/cancel"))
.header("x-api-key", key_a)
.send()
.await
.unwrap();
assert_eq!(cross_cancel.status(), 404);
let owner_get = client
.get(format!("{base}/v1/batches/{batch_b}"))
.header("x-api-key", key_b)
.send()
.await
.unwrap();
assert_eq!(owner_get.status(), 200);
let owner_cancel = client
.post(format!("{base}/v1/batches/{batch_b}/cancel"))
.header("x-api-key", key_b)
.send()
.await
.unwrap();
assert_eq!(owner_cancel.status(), 200);
}
#[tokio::test]
async fn upload_invalid_jsonl_returns_400() {
let base = spawn_test_server_with_shared().await;
let client = Client::new();
let form = multipart::Form::new().text("purpose", "batch").part(
"file",
multipart::Part::bytes(b"not valid json".to_vec()).file_name("bad.jsonl"),
);
let resp = client
.post(format!("{base}/v1/files"))
.header("x-api-key", "test")
.multipart(form)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
}
#[tokio::test]
async fn create_batch_with_missing_file_returns_400() {
let base = spawn_test_server_with_shared().await;
let client = Client::new();
let resp = client
.post(format!("{base}/v1/batches"))
.header("x-api-key", "test")
.json(&serde_json::json!({
"input_file_id": "file-nonexistent",
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
}
#[tokio::test]
async fn get_nonexistent_batch_returns_404() {
let base = spawn_test_server_with_shared().await;
let client = Client::new();
let resp = client
.get(format!("{base}/v1/batches/batch-does-not-exist"))
.header("x-api-key", "test")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn unsupported_backend_returns_501() {
std::env::set_var("PROXY_OPEN_RELAY", "true");
let config = Config {
backend: config::BackendKind::Anthropic,
openai_api_key: "test-key".to_string(),
openai_base_url: "https://api.anthropic.com".to_string(),
listen_port: 0,
model_mapping: config::ModelMapping {
big_model: "claude-sonnet-4-6".into(),
small_model: "claude-haiku-4-5".into(),
},
tls: config::TlsConfig::default(),
backend_auth: config::BackendAuth::BearerToken("test-key".into()),
log_bodies: false,
expose_degradation_warnings: false,
openai_api_format: config::OpenAIApiFormat::Chat,
provider_id: None,
};
let multi = MultiConfig::from_single_config(&config);
let shared = admin::state::SharedState::new_for_test();
let engine = make_test_batch_engine().await;
let app = routes::app_multi_with_shared(multi, Some(shared), None, None, Some(engine), None);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });
let base = format!("http://{addr}");
let client = Client::new();
let resp = client
.post(format!("{base}/v1/batches"))
.header("x-api-key", "test")
.json(&serde_json::json!({
"input_file_id": "file-abc",
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 501);
}
#[tokio::test]
async fn cancel_queued_batch() {
let base = spawn_test_server_with_shared().await;
let client = Client::new();
let form = multipart::Form::new().text("purpose", "batch").part(
"file",
multipart::Part::bytes(valid_jsonl().as_bytes().to_vec())
.file_name("test.jsonl")
.mime_str("application/jsonl")
.unwrap(),
);
let resp = client
.post(format!("{base}/v1/files"))
.header("x-api-key", "test")
.multipart(form)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let file_obj: serde_json::Value = resp.json().await.unwrap();
let file_id = file_obj["id"].as_str().unwrap().to_string();
let resp = client
.post(format!("{base}/v1/batches"))
.header("x-api-key", "test")
.json(&serde_json::json!({
"input_file_id": file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
}))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let batch_obj: serde_json::Value = resp.json().await.unwrap();
let batch_id = batch_obj["id"].as_str().unwrap().to_string();
let resp = client
.post(format!("{base}/v1/batches/{batch_id}/cancel"))
.header("x-api-key", "test")
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let cancelled: serde_json::Value = resp.json().await.unwrap();
let status = cancelled["status"].as_str().unwrap();
assert!(
status == "cancelling" || status == "cancelled",
"expected cancelling or cancelled, got {status}"
);
}
#[tokio::test]
async fn anthropic_batch_rejects_empty_requests() {
let base = spawn_test_server_with_shared().await;
let client = Client::new();
let resp = client
.post(format!("{base}/v1/messages/batches"))
.header("x-api-key", "test")
.header("content-type", "application/json")
.body(serde_json::to_string(&serde_json::json!({"requests": []})).unwrap())
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
}