batchy 0.1.0

Transparently batch concurrent requests into efficient bulk operations
Documentation

Batchy

Transparently batch concurrent requests into efficient bulk operations.

Batchy merges multiple concurrent requests into larger batches, forwarding them to a single processing call. It's perfect for ML inference, database queries, API calls, or any scenario where batching improves throughput.

Features

  • Transparent batching: Callers submit single requests and receive single responses - batching is invisible at the call site
  • Configurable strategy: Control max batch size, queue size, and wait time
  • Backpressure: Built-in queue limits prevent memory exhaustion under high load
  • Error handling: Batch-level errors are delivered to all affected callers
  • Async-native: Built on tokio for high-performance async workloads

Quick Start

[dependencies]
batchy = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

Usage

use batchy::{Batcher, BatcherConfig};
use tokio;

#[tokio::main]
async fn main() {
    // Create a batcher that processes text prompts
    let config = BatcherConfig {
        max_batch: 16,           // Max 16 requests per batch
        queue_size: 64,          // Queue up to 64 pending requests
        max_wait_ms: 50,         // Wait max 50ms for batch to fill
    };

    let batcher = Batcher::new(config, |prompts: Vec<String>| async move {
        // Your batch processing logic here
        // For ML inference, this would be your model inference call
        let results: Vec<String> = prompts
            .iter()
            .map(|p| format!("Processed: {}", p))
            .collect();
        Ok(results)
    });

    // Submit requests concurrently - they'll be automatically batched
    let handles: Vec<_> = (0..10)
        .map(|i| batcher.run(format!("request-{}", i)))
        .collect();

    // Await all results
    let results = futures::future::join_all(handles).await;
    
    for result in results {
        println!("{:?}", result);
    }
}

Configuration

Field Default Description
max_batch 32 Maximum requests merged into one processing call
queue_size 128 Size of the internal request queue (backpressure when full)
max_wait_ms 50 Maximum wait time for batch to fill under low load
use batchy::BatcherConfig;

// Using builder pattern
let config = BatcherConfigBuilder::default()
    .max_batch(64)
    .max_wait_ms(100)
    .build()
    .unwrap();

Batching Strategy

  1. First request arrives, worker starts timer
  2. Worker accumulates requests until:
    • max_batch is reached (immediate processing), OR
    • max_wait_ms expires (process what we have)
  3. Processing call executes with accumulated batch
  4. Results are fanned back to respective callers

Under high load: batches fill to max_batch for maximum throughput.
Under low load: each request waits at most max_wait_ms for bounded latency.

Error Handling

The processor returns Result<Vec<Res>, E>:

  • Err(e): The entire batch failed - every caller in that batch receives a clone of e
  • Ok(results): One result per input, in the same order
let batcher = Batcher::new(config, |items: Vec<i32>| async move {
    if items.is_empty() {
        return Err("Empty batch".to_string());
    }
    
    // Simulate processing error
    if items.iter().any(|&x| x < 0) {
        return Err("Negative values not allowed".to_string());
    }
    
    Ok(items.into_iter().map(|x| x * 2).collect())
});

CPU-Heavy Work

For CPU-intensive operations like ML inference, wrap your work in spawn_blocking:

let batcher = Batcher::new(config, move |items: Vec<YourInput>| async move {
    tokio::task::spawn_blocking(move || {
        // CPU-heavy work here (e.g., model inference)
        your_cpu_heavy_function(items)
    })
    .await
    .map_err(|e| format!("Task panicked: {}", e))?
});

Use Cases

  • ML Inference: Batch multiple inputs for GPU efficiency
  • Database Queries: Combine individual queries into bulk operations
  • API Calls: Aggregate requests to respect rate limits
  • File I/O: Batch disk writes for better throughput

License

MIT