marque_engine/batch.rs
1//! Concurrent batch processing over many documents.
2//!
3//! `BatchEngine` wraps `Engine` behind an `Arc` and uses `ConcurrencyController`
4//! from `recoco-utils` to enforce row and byte limits on in-flight work.
5//!
6//! CPU-bound lint/fix work is dispatched to tokio's blocking thread pool via
7//! `spawn_blocking`, keeping the async executor free for I/O-bound coordination.
8//!
9//! Results stream out in **completion order** (fastest documents first), not
10//! submission order. Callers correlate results by the `id` field echoed back
11//! alongside each result.
12//!
13//! # Example
14//!
15//! ```rust,no_run
16//! use marque_engine::{Engine, batch::{BatchEngine, BatchOptions}};
17//! use futures::StreamExt;
18//!
19//! # async fn example(engine: Engine) {
20//! let batch = BatchEngine::new(engine, BatchOptions {
21//! max_concurrent_docs: Some(16),
22//! max_inflight_bytes: Some(256 * 1024 * 1024), // 256 MiB
23//! });
24//!
25//! let docs = vec![
26//! ("doc1".to_owned(), b"TOP SECRET//SI".to_vec()),
27//! ("doc2".to_owned(), b"SECRET//NOFORN".to_vec()),
28//! ];
29//!
30//! let mut results = batch.lint_many(docs);
31//! while let Some((id, result)) = results.next().await {
32//! match result {
33//! Ok(lint) => println!("{id}: {} diagnostics", lint.diagnostics.len()),
34//! Err(e) => eprintln!("{id}: failed: {e}"),
35//! }
36//! }
37//! # }
38//! ```
39
40use std::sync::Arc;
41
42use futures::{Stream, StreamExt, stream};
43use recoco_utils::concur_control::{ConcurrencyController, Options as ConcurOptions};
44
45use crate::{Engine, FixResult, LintResult};
46
47/// Error returned when a single document in a batch fails to process.
48///
49/// Batch APIs surface this per-document so a panic or cancellation in one
50/// document does not abort the entire batch run.
51#[derive(Debug)]
52pub enum BatchError {
53 /// The blocking lint/fix task panicked or was cancelled.
54 TaskFailed(tokio::task::JoinError),
55}
56
57impl BatchError {
58 /// Returns `true` if the error was caused by a panic in the worker task.
59 ///
60 /// CI pipelines and supervisors should treat this as an application bug
61 /// that warrants investigation (not a transient infrastructure issue).
62 pub fn is_panic(&self) -> bool {
63 match self {
64 Self::TaskFailed(e) => e.is_panic(),
65 }
66 }
67
68 /// Returns `true` if the error was caused by task cancellation (e.g.,
69 /// runtime shutdown, explicit abort).
70 ///
71 /// Cancellation is an expected operational event — callers that see
72 /// this during a graceful shutdown should typically log-and-continue,
73 /// not alert.
74 pub fn is_cancelled(&self) -> bool {
75 match self {
76 Self::TaskFailed(e) => e.is_cancelled(),
77 }
78 }
79}
80
81impl std::fmt::Display for BatchError {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 Self::TaskFailed(e) => {
85 let kind = if e.is_panic() {
86 "panicked"
87 } else if e.is_cancelled() {
88 "was cancelled"
89 } else {
90 "failed"
91 };
92 write!(f, "batch task {kind}: {e}")
93 }
94 }
95 }
96}
97
98impl std::error::Error for BatchError {
99 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
100 match self {
101 Self::TaskFailed(e) => Some(e),
102 }
103 }
104}
105
106impl From<tokio::task::JoinError> for BatchError {
107 fn from(e: tokio::task::JoinError) -> Self {
108 Self::TaskFailed(e)
109 }
110}
111
112/// Concurrency limits for batch processing.
113///
114/// Both limits are optional and independent. When both are set the more
115/// restrictive one governs at any given moment.
116pub struct BatchOptions {
117 /// Maximum documents in-flight simultaneously.
118 ///
119 /// This field drives **two** independent limits that both happen to
120 /// share this value:
121 ///
122 /// 1. `ConcurrencyController::max_inflight_rows` — the semaphore that
123 /// rate-limits how many documents can hold permits at the same time.
124 /// 2. `buffer_unordered` cap — how many per-document futures are
125 /// created and polled ahead of readiness.
126 ///
127 /// In practice they are always set together: the effective maximum is
128 /// the minimum of whichever blocks first for a given workload.
129 /// Defaults to 32.
130 pub max_concurrent_docs: Option<usize>,
131
132 /// Maximum total bytes of document content in-flight simultaneously.
133 ///
134 /// Useful for memory-bounded batch runs over large corpora. `None` means
135 /// unlimited (byte accounting is still tracked for observability).
136 pub max_inflight_bytes: Option<usize>,
137}
138
139impl Default for BatchOptions {
140 fn default() -> Self {
141 Self {
142 max_concurrent_docs: Some(32),
143 max_inflight_bytes: None,
144 }
145 }
146}
147
148/// Wraps `Engine` for concurrent multi-document processing with backpressure.
149///
150/// The underlying `Engine` is shared via `Arc`; cloning `BatchEngine` is cheap.
151pub struct BatchEngine {
152 engine: Arc<Engine>,
153 controller: Arc<ConcurrencyController>,
154 /// Buffer cap forwarded to `buffer_unordered`.
155 concurrent: usize,
156}
157
158impl BatchEngine {
159 pub fn new(engine: Engine, options: BatchOptions) -> Self {
160 let concurrent = options.max_concurrent_docs.unwrap_or(32);
161 let controller = ConcurrencyController::new(&ConcurOptions {
162 max_inflight_rows: options.max_concurrent_docs,
163 max_inflight_bytes: options.max_inflight_bytes,
164 });
165 Self {
166 engine: Arc::new(engine),
167 controller: Arc::new(controller),
168 concurrent,
169 }
170 }
171
172 /// Lint many documents concurrently. Yields `(id, Result)` in
173 /// completion order; an `Err` indicates the per-document task panicked
174 /// or was cancelled — it does not abort the batch.
175 pub fn lint_many(
176 &self,
177 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
178 ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
179 let engine = Arc::clone(&self.engine);
180 let controller = Arc::clone(&self.controller);
181 let concurrent = self.concurrent;
182
183 stream::iter(docs)
184 .map(move |(id, data)| {
185 let engine = Arc::clone(&engine);
186 let controller = Arc::clone(&controller);
187 async move {
188 let byte_len = data.len();
189 let _permit = controller
190 .acquire(Some(|| byte_len))
191 .await
192 .expect("ConcurrencyController semaphore unexpectedly closed");
193 let result = tokio::task::spawn_blocking(move || engine.lint(&data))
194 .await
195 .map_err(BatchError::from);
196 (id, result)
197 }
198 })
199 .buffer_unordered(concurrent)
200 }
201
202 /// Fix many documents concurrently. Yields `(id, Result)` in
203 /// completion order; an `Err` indicates the per-document task panicked
204 /// or was cancelled — it does not abort the batch.
205 pub fn fix_many(
206 &self,
207 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
208 ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
209 let engine = Arc::clone(&self.engine);
210 let controller = Arc::clone(&self.controller);
211 let concurrent = self.concurrent;
212
213 stream::iter(docs)
214 .map(move |(id, data)| {
215 let engine = Arc::clone(&engine);
216 let controller = Arc::clone(&controller);
217 async move {
218 let byte_len = data.len();
219 let _permit = controller
220 .acquire(Some(|| byte_len))
221 .await
222 .expect("ConcurrencyController semaphore unexpectedly closed");
223 let result = tokio::task::spawn_blocking(move || {
224 engine.fix(&data, crate::FixMode::Apply)
225 })
226 .await
227 .map_err(BatchError::from);
228 (id, result)
229 }
230 })
231 .buffer_unordered(concurrent)
232 }
233}