Skip to main content

Module batch

Module batch 

Source
Expand description

Concurrent batch processing over many documents.

BatchEngine wraps Engine behind an Arc and uses ConcurrencyController from recoco-utils to enforce row and byte limits on in-flight work.

CPU-bound lint/fix work is dispatched to tokio’s blocking thread pool via spawn_blocking, keeping the async executor free for I/O-bound coordination.

Results stream out in completion order (fastest documents first), not submission order. Callers correlate results by the id field echoed back alongside each result.

§Example

use marque_engine::{Engine, batch::{BatchEngine, BatchOptions}};
use futures::StreamExt;
use std::time::Duration;

// `BatchOptions` is `#[non_exhaustive]`, so construct via
// `Default::default()` + field assignment.
let mut options = BatchOptions::default();
options.max_concurrent_docs = Some(16);
options.max_inflight_bytes = Some(256 * 1024 * 1024); // 256 MiB
options.per_doc_deadline = Some(Duration::from_secs(5));
let batch = BatchEngine::new(engine, options);

let docs = vec![
    ("doc1".to_owned(), b"TOP SECRET//SI".to_vec()),
    ("doc2".to_owned(), b"SECRET//NOFORN".to_vec()),
];

let mut results = batch.lint_many(docs);
while let Some((id, result)) = results.next().await {
    match result {
        Ok(lint) => println!("{id}: {} diagnostics", lint.diagnostics.len()),
        Err(e) => eprintln!("{id}: failed: {e}"),
    }
}

Structs§

BatchEngine
Wraps Engine for concurrent multi-document processing with backpressure.
BatchOptions
Concurrency limits and per-document budgets for batch processing.

Enums§

BatchError
Error returned when a single document in a batch fails to process.