Expand description
Concurrent batch processing over many documents.
BatchEngine wraps Engine behind an Arc and uses ConcurrencyController
from recoco-utils to enforce row and byte limits on in-flight work.
CPU-bound lint/fix work is dispatched to tokio’s blocking thread pool via
spawn_blocking, keeping the async executor free for I/O-bound coordination.
Results stream out in completion order (fastest documents first), not
submission order. Callers correlate results by the id field echoed back
alongside each result.
§Example
use marque_engine::{Engine, batch::{BatchEngine, BatchOptions}};
use futures::StreamExt;
use std::time::Duration;
// `BatchOptions` is `#[non_exhaustive]`, so construct via
// `Default::default()` + field assignment.
let mut options = BatchOptions::default();
options.max_concurrent_docs = Some(16);
options.max_inflight_bytes = Some(256 * 1024 * 1024); // 256 MiB
options.per_doc_deadline = Some(Duration::from_secs(5));
let batch = BatchEngine::new(engine, options);
let docs = vec![
("doc1".to_owned(), b"TOP SECRET//SI".to_vec()),
("doc2".to_owned(), b"SECRET//NOFORN".to_vec()),
];
let mut results = batch.lint_many(docs);
while let Some((id, result)) = results.next().await {
match result {
Ok(lint) => println!("{id}: {} diagnostics", lint.diagnostics.len()),
Err(e) => eprintln!("{id}: failed: {e}"),
}
}Structs§
- Batch
Engine - Wraps
Enginefor concurrent multi-document processing with backpressure. - Batch
Options - Concurrency limits and per-document budgets for batch processing.
Enums§
- Batch
Error - Error returned when a single document in a batch fails to process.