anyllm_batch_engine 0.9.4

Batch orchestration engine with job queue, workers, and event-driven notifications
Documentation

anyllm_batch_engine

Batch orchestration engine: job queue, file storage, worker pool, and webhook delivery. HTTP-agnostic. Part of the anyllm-proxy workspace.

What this crate is

The backend of an OpenAI-style Batch API, packaged as a library. Callers (typically anyllm_proxy) wire its public types into their own HTTP layer; this crate owns the persistence, the worker loop, and the webhook signing.

It owns:

  • A SQLite-backed job queue (rusqlite, bundled, no external server).
  • A file store for input and output JSONL.
  • An async worker pool that pulls queued jobs and forwards them to a translator-compatible backend.
  • HMAC-signed webhook delivery for completion notifications.
  • JSONL request validation (validate_jsonl).

It deliberately does not own:

  • HTTP handlers or routes. The proxy crate exposes the OpenAI-compatible REST surface.
  • Authentication or rate limiting. Those belong to the surrounding server.

Where it fits

Five-crate workspace:

  • anyllm_translate - pure format mapping, no I/O.
  • anyllm_providers - provider and model catalog.
  • anyllm_client - async HTTP client.
  • anyllm_batch_engine (this crate) - batch job queue and worker pool.
  • anyllm_proxy - axum HTTP server that wires this crate into the /v1/batches and /v1/files endpoints.

Depend on this crate directly if you are building your own batch API surface. Depend on anyllm_proxy if you want the HTTP endpoints and admin UI for free.

Add it

[dependencies]
anyllm_batch_engine = "0.9"

Library examples

1. Validate a JSONL submission before accepting it

validate_jsonl is pure and synchronous. It enforces the OpenAI batch contract: every line is JSON, every custom_id is unique and ≤64 chars, every body is an object with a model field, ≤50,000 lines, ≤100 MB. Use it in any HTTP handler or CLI before persisting the file.

use anyllm_batch_engine::validate_jsonl;
use std::io::Cursor;

let input = br#"{"custom_id":"a","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4o-mini","messages":[{"role":"user","content":"hi"}]}}
{"custom_id":"b","method":"POST","url":"/v1/chat/completions","body":{"model":"gpt-4o-mini","messages":[{"role":"user","content":"there"}]}}
"#;

let result = validate_jsonl(Cursor::new(&input[..]))
    .map_err(|e| format!("rejected: {e}"))?;
println!("accepted {} items", result.line_count);

2. Wire up an in-memory engine for tests

The crate ships SQLite-backed JobQueue and WebhookQueue implementations. Both can share an in-memory connection, which is what the crate's own tests use; this is the smallest end-to-end example.

use anyllm_batch_engine::{
    BatchEngine, BatchSubmission, ExecutionMode, SourceFormat, SubmissionItem,
};
use anyllm_batch_engine::db::init_batch_engine_tables;
use anyllm_batch_engine::file_store::FileStore;
use anyllm_batch_engine::queue::sqlite::SqliteQueue;
use anyllm_batch_engine::webhook::sqlite::SqliteWebhookQueue;
use rusqlite::Connection;
use std::sync::{Arc, Mutex};

let conn = Connection::open_in_memory()?;
init_batch_engine_tables(&conn)?;
let db = Arc::new(Mutex::new(conn));

let engine = BatchEngine {
    queue: Arc::new(SqliteQueue::new(db.clone())),
    file_store: FileStore::new(db.clone()),
    webhook_queue: Arc::new(SqliteWebhookQueue::new(db)),
    global_webhook_urls: vec![],
    webhook_signing_secret: None,
};

// Upload a (here, trivial) JSONL file the submission will reference.
engine.file_store
    .insert("file-demo", None, None, b"{\"custom_id\":\"a\",\"body\":{\"model\":\"gpt-4o-mini\"}}", 1)
    .await?;

let job = engine.submit(BatchSubmission {
    items: vec![SubmissionItem {
        custom_id: "a".into(),
        model: "gpt-4o-mini".into(),
        body: serde_json::json!({"messages":[{"role":"user","content":"hi"}]}),
        source_format: SourceFormat::OpenAI,
    }],
    execution_mode: ExecutionMode::ProxyNative,
    input_file_id: "file-demo".into(),
    key_id: None,
    webhook_url: None,
    metadata: None,
    priority: 0,
}).await?;

println!("queued {} with {} items", job.id, job.request_counts.total);

3. Persistent engine + global webhooks

For a long-running process, point the connection at a file and pre-populate global_webhook_urls so every job lifecycle event (batch.queued, batch.cancelled, ...) gets delivered with HMAC-SHA256 signing.

use anyllm_batch_engine::BatchEngine;
use anyllm_batch_engine::db::init_batch_engine_tables;
use anyllm_batch_engine::file_store::FileStore;
use anyllm_batch_engine::queue::sqlite::SqliteQueue;
use anyllm_batch_engine::webhook::sqlite::SqliteWebhookQueue;
use rusqlite::Connection;
use std::sync::{Arc, Mutex};

let conn = Connection::open("/var/lib/myapp/batches.sqlite")?;
init_batch_engine_tables(&conn)?;
let db = Arc::new(Mutex::new(conn));

let engine = BatchEngine {
    queue: Arc::new(SqliteQueue::new(db.clone())),
    file_store: FileStore::new(db.clone()),
    webhook_queue: Arc::new(SqliteWebhookQueue::new(db)),
    global_webhook_urls: vec!["https://hooks.example.com/batch".into()],
    webhook_signing_secret: Some(std::env::var("WEBHOOK_SIGNING_SECRET")?),
};

Webhook receivers verify deliveries with the X-Signature-256 header.

4. Polling and result retrieval

Once jobs are queued, get, list, and get_items are how you build a status page or pull results.

use anyllm_batch_engine::{BatchId, BatchStatus};

let job = engine.get(&BatchId("batch_...".into())).await?;
if let Some(job) = job {
    if job.status == BatchStatus::Completed {
        for item in engine.get_items(&job.id).await? {
            if let Some(result) = item.result {
                println!("{} -> {} {}", item.custom_id, result.status_code, result.body);
            }
        }
    }
}

// Paginated listing for an admin UI:
let recent = engine.list(None, None, 50).await?;

5. Custom queue or webhook backends

BatchEngine<Q, W> is generic over the JobQueue and WebhookQueue traits. Implement either trait against Postgres, Redis, or your own store and plug it in without touching engine code. The bundled SqliteQueue / SqliteWebhookQueue are the reference implementations.

Public surface

Top-level re-exports (see src/lib.rs):

  • BatchEngine - the orchestrator (generic over queue and webhook implementations).
  • EngineError, QueueError - error types.
  • Types from job - BatchJob, BatchStatus, BatchItem, BatchSubmission, SubmissionItem, ExecutionMode, SourceFormat, BatchId, ItemId, RequestCounts.
  • validate_jsonl, ValidatedJsonl - input validation for OpenAI-style batch requests.

Modules:

Module Purpose
engine BatchEngine lifecycle: spawn workers, submit jobs, query status.
queue SQLite-backed FIFO with claim/ack semantics.
db Schema and rusqlite helpers.
file_store Input/output JSONL on disk, content-addressed.
job Job, status, and input/output types.
validation JSONL request validation.
webhook HMAC-signed completion callbacks.
error EngineError and QueueError.

Tests

cargo test -p anyllm_batch_engine