Expand description
Concurrent batch processing over many documents.
BatchEngine wraps Engine behind an Arc and uses ConcurrencyController
from recoco-utils to enforce row and byte limits on in-flight work.
CPU-bound lint/fix work is dispatched to tokio’s blocking thread pool via
spawn_blocking, keeping the async executor free for I/O-bound coordination.
Results stream out in completion order (fastest documents first), not
submission order. Callers correlate results by the id field echoed back
alongside each result.
§Example
use marque_engine::{Engine, batch::{BatchEngine, BatchOptions}};
use futures::StreamExt;
let batch = BatchEngine::new(engine, BatchOptions {
max_concurrent_docs: Some(16),
max_inflight_bytes: Some(256 * 1024 * 1024), // 256 MiB
});
let docs = vec![
("doc1".to_owned(), b"TOP SECRET//SI".to_vec()),
("doc2".to_owned(), b"SECRET//NOFORN".to_vec()),
];
let mut results = batch.lint_many(docs);
while let Some((id, result)) = results.next().await {
match result {
Ok(lint) => println!("{id}: {} diagnostics", lint.diagnostics.len()),
Err(e) => eprintln!("{id}: failed: {e}"),
}
}Structs§
- Batch
Engine - Wraps
Enginefor concurrent multi-document processing with backpressure. - Batch
Options - Concurrency limits for batch processing.
Enums§
- Batch
Error - Error returned when a single document in a batch fails to process.