marque_engine/batch.rs
1// SPDX-FileCopyrightText: 2026 Knitli Inc.
2//
3// SPDX-License-Identifier: LicenseRef-MarqueLicense-1.0
4
5//! Concurrent batch processing over many documents.
6//!
7//! `BatchEngine` wraps `Engine` behind an `Arc` and uses `ConcurrencyController`
8//! from `recoco-utils` to enforce row and byte limits on in-flight work.
9//!
10//! CPU-bound lint/fix work is dispatched to tokio's blocking thread pool via
11//! `spawn_blocking`, keeping the async executor free for I/O-bound coordination.
12//!
13//! Results stream out in **completion order** (fastest documents first), not
14//! submission order. Callers correlate results by the `id` field echoed back
15//! alongside each result.
16//!
17//! # Example
18//!
19//! ```rust,no_run
20//! use marque_engine::{Engine, batch::{BatchEngine, BatchOptions}};
21//! use futures::StreamExt;
22//! use std::time::Duration;
23//!
24//! # async fn example(engine: Engine) {
25//! // `BatchOptions` is `#[non_exhaustive]`, so construct via
26//! // `Default::default()` + field assignment.
27//! let mut options = BatchOptions::default();
28//! options.max_concurrent_docs = Some(16);
29//! options.max_inflight_bytes = Some(256 * 1024 * 1024); // 256 MiB
30//! options.per_doc_deadline = Some(Duration::from_secs(5));
31//! let batch = BatchEngine::new(engine, options);
32//!
33//! let docs = vec![
34//! ("doc1".to_owned(), b"TOP SECRET//SI".to_vec()),
35//! ("doc2".to_owned(), b"SECRET//NOFORN".to_vec()),
36//! ];
37//!
38//! let mut results = batch.lint_many(docs);
39//! while let Some((id, result)) = results.next().await {
40//! match result {
41//! Ok(lint) => println!("{id}: {} diagnostics", lint.diagnostics.len()),
42//! Err(e) => eprintln!("{id}: failed: {e}"),
43//! }
44//! }
45//! # }
46//! ```
47
48use std::sync::Arc;
49// Batch processing uses `std::time::Instant`. `BatchEngine` depends
50// on tokio (gated behind the `batch` Cargo feature), and tokio
51// itself does not target `wasm32-unknown-unknown`, so this module
52// never reaches the WASM clock-polyfill question — std's `Instant`
53// is sufficient.
54use std::time::{Duration, Instant};
55
56use futures::{Stream, StreamExt, stream};
57use recoco_utils::concur_control::{ConcurrencyController, Options as ConcurOptions};
58
59use crate::{Engine, EngineError, FixOptions, FixResult, LintOptions, LintResult};
60
61/// Error returned when a single document in a batch fails to process.
62///
63/// Batch APIs surface this per-document so a panic, cancellation, or
64/// graceful shutdown of the underlying concurrency controller does not
65/// abort the entire batch run.
66///
67/// `#[non_exhaustive]` because future infrastructure-level errors
68/// (deadline expired, cache write-through failed, queue overflow,
69/// etc.) will land as new variants alongside the existing two. A
70/// downstream `match` should always carry a wildcard arm; without
71/// `non_exhaustive` every new variant would be a semver-breaking
72/// change for consumers, which would either pin them to a stale
73/// version or pressure us to never grow the surface.
74#[derive(Debug)]
75#[non_exhaustive]
76pub enum BatchError {
77 /// The blocking lint/fix task panicked or was cancelled.
78 TaskFailed(tokio::task::JoinError),
79 /// The `ConcurrencyController` semaphore was closed while this
80 /// document was waiting for a permit. Indicates the runtime is in
81 /// shutdown — the caller has no work to do beyond observing the
82 /// error and ending its loop.
83 ///
84 /// Whitepaper §9.4 / gap register #8 carved this out as a separate
85 /// variant so deployment supervisors can distinguish it from a
86 /// real worker-task panic. `is_panic()` returns `false` for this
87 /// variant; `is_shutdown()` returns `true`.
88 ShutdownInProgress,
89 /// `fix_many` aborted this document's fix pass because the
90 /// per-document deadline (set on `BatchOptions::per_doc_deadline`)
91 /// expired. Spec 005 §R4 / Constitution V Principle V — no partial
92 /// `FixResult` is ever produced; the caller receives the partial
93 /// `LintResult` so it can render whatever diagnostics the engine
94 /// surfaced before the abort.
95 ///
96 /// `is_deadline_exceeded()` returns `true` for this variant only.
97 /// `is_panic()` and `is_shutdown()` return `false` — a deadline
98 /// trip is a routine operational signal, not a worker bug or
99 /// runtime shutdown.
100 ///
101 /// Note: only the **fix** path produces this variant. `lint_many`
102 /// surfaces a deadline-truncated lint as `Ok(LintResult { truncated:
103 /// true, .. })` so the partial diagnostics flow through the same
104 /// success channel — there is no asymmetric response shape on the
105 /// lint side because no audit-stream invariant is at risk.
106 DocumentDeadlineExceeded {
107 /// The lint pass produced before the deadline tripped. May
108 /// itself be truncated (`partial_lint.truncated`) if the
109 /// deadline expired during the lint phase rather than the
110 /// fix-application phase.
111 partial_lint: LintResult,
112 },
113}
114
115impl BatchError {
116 /// Returns `true` if the error was caused by a panic in the worker task.
117 ///
118 /// CI pipelines and supervisors should treat this as an application bug
119 /// that warrants investigation (not a transient infrastructure issue).
120 pub fn is_panic(&self) -> bool {
121 match self {
122 Self::TaskFailed(e) => e.is_panic(),
123 Self::ShutdownInProgress => false,
124 Self::DocumentDeadlineExceeded { .. } => false,
125 }
126 }
127
128 /// Returns `true` if the error was caused by task cancellation (e.g.,
129 /// runtime shutdown, explicit abort).
130 ///
131 /// Cancellation is an expected operational event — callers that see
132 /// this during a graceful shutdown should typically log-and-continue,
133 /// not alert.
134 pub fn is_cancelled(&self) -> bool {
135 match self {
136 Self::TaskFailed(e) => e.is_cancelled(),
137 Self::ShutdownInProgress => false,
138 Self::DocumentDeadlineExceeded { .. } => false,
139 }
140 }
141
142 /// Returns `true` if the error was caused by the `ConcurrencyController`
143 /// semaphore being closed while this document was awaiting a permit.
144 ///
145 /// Distinct from `is_cancelled()` (which fires when a worker task is
146 /// aborted mid-execution) and from `is_panic()` (which fires on a real
147 /// bug). Shutdown is the routine end-of-life signal — supervisors
148 /// should drain any remaining items in the result stream and exit.
149 pub fn is_shutdown(&self) -> bool {
150 matches!(self, Self::ShutdownInProgress)
151 }
152
153 /// Returns `true` if this error was caused by the per-document
154 /// deadline expiring during a `fix_many` call.
155 ///
156 /// Routine operational signal — the document took longer to
157 /// process than its budget allowed. Callers should render the
158 /// embedded `partial_lint` diagnostics and either skip the
159 /// document or retry with a larger budget. Distinct from
160 /// `is_panic()` (worker bug) and `is_shutdown()` (runtime
161 /// end-of-life).
162 pub fn is_deadline_exceeded(&self) -> bool {
163 matches!(self, Self::DocumentDeadlineExceeded { .. })
164 }
165}
166
167impl std::fmt::Display for BatchError {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::TaskFailed(e) => {
171 let kind = if e.is_panic() {
172 "panicked"
173 } else if e.is_cancelled() {
174 "was cancelled"
175 } else {
176 "failed"
177 };
178 write!(f, "batch task {kind}: {e}")
179 }
180 Self::ShutdownInProgress => {
181 f.write_str("ConcurrencyController semaphore closed (shutdown in progress)")
182 }
183 Self::DocumentDeadlineExceeded { partial_lint } => write!(
184 f,
185 "document deadline exceeded after {}/{} candidates",
186 partial_lint.candidates_processed, partial_lint.candidates_total
187 ),
188 }
189 }
190}
191
192impl std::error::Error for BatchError {
193 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
194 match self {
195 Self::TaskFailed(e) => Some(e),
196 Self::ShutdownInProgress => None,
197 // Like `EngineError::DeadlineExceeded`, the deadline trip
198 // is not caused by an inner error — it reports a runtime
199 // condition (the deadline elapsed) with no underlying
200 // failure to chain.
201 Self::DocumentDeadlineExceeded { .. } => None,
202 }
203 }
204}
205
206impl From<tokio::task::JoinError> for BatchError {
207 fn from(e: tokio::task::JoinError) -> Self {
208 Self::TaskFailed(e)
209 }
210}
211
212impl From<tokio::sync::AcquireError> for BatchError {
213 fn from(_: tokio::sync::AcquireError) -> Self {
214 // `AcquireError` carries no information beyond "semaphore was
215 // closed" — and the semaphore here is owned by `BatchEngine`,
216 // so closure means the engine is shutting down. Surface that
217 // intent explicitly rather than re-exporting tokio's type.
218 Self::ShutdownInProgress
219 }
220}
221
222/// Concurrency limits and per-document budgets for batch processing.
223///
224/// All fields are optional and independent. When both concurrency limits
225/// are set the more restrictive one governs at any given moment;
226/// `per_doc_deadline` is orthogonal and applies separately to each
227/// document's permit-acquired execution slice.
228///
229/// # Breaking change in this release
230///
231/// This struct gained `#[non_exhaustive]` and a new `per_doc_deadline`
232/// field in spec 005 Phase 3d. **Downstream code that previously
233/// constructed `BatchOptions` with a struct literal**
234/// (`BatchOptions { max_concurrent_docs, max_inflight_bytes }`) **will
235/// no longer compile** — `#[non_exhaustive]` blocks cross-crate
236/// struct-literal construction unconditionally, even when every
237/// existing field is supplied. Switch to
238/// `Default::default()` + public field assignment, shown below. (The
239/// CHANGELOG / release notes for this version surface this explicitly.)
240///
241/// `#[non_exhaustive]` was added so future per-doc concerns (memory
242/// budgets, per-rule deadlines, cancellation tokens) can join without
243/// a further breaking-change cycle for downstream callers using the
244/// recommended construction pattern.
245///
246/// ```rust,no_run
247/// use marque_engine::BatchOptions;
248/// use std::time::Duration;
249///
250/// let mut opts = BatchOptions::default();
251/// opts.per_doc_deadline = Some(Duration::from_secs(5));
252/// ```
253#[non_exhaustive]
254pub struct BatchOptions {
255 /// Maximum documents in-flight simultaneously.
256 ///
257 /// This field drives **two** independent limits that both happen to
258 /// share this value:
259 ///
260 /// 1. `ConcurrencyController::max_inflight_rows` — the semaphore that
261 /// rate-limits how many documents can hold permits at the same time.
262 /// 2. `buffer_unordered` cap — how many per-document futures are
263 /// created and polled ahead of readiness.
264 ///
265 /// In practice they are always set together: the effective maximum is
266 /// the minimum of whichever blocks first for a given workload.
267 /// Defaults to 32.
268 pub max_concurrent_docs: Option<usize>,
269
270 /// Maximum total bytes of document content in-flight simultaneously.
271 ///
272 /// Useful for memory-bounded batch runs over large corpora. `None` means
273 /// unlimited (byte accounting is still tracked for observability).
274 pub max_inflight_bytes: Option<usize>,
275
276 /// Per-document wall-clock budget (spec 005 §R2). When `Some(d)`,
277 /// each document's lint/fix call gets its own deadline of
278 /// `Instant::now() + d` stamped **after** the document acquires
279 /// its concurrency permit — `ConcurrencyController` wait time
280 /// does not consume the budget. A slow document does not borrow
281 /// from a fast document's slice.
282 ///
283 /// On expiry: lint returns `Ok(LintResult { truncated: true, .. })`
284 /// (partial diagnostics matter to the caller). Fix returns
285 /// `Err(BatchError::DocumentDeadlineExceeded { partial_lint })`
286 /// per Constitution V Principle V — no partial `FixResult` is
287 /// ever produced.
288 ///
289 /// `None` (default) means no per-document deadline.
290 pub per_doc_deadline: Option<Duration>,
291}
292
293impl Default for BatchOptions {
294 fn default() -> Self {
295 Self {
296 max_concurrent_docs: Some(32),
297 max_inflight_bytes: None,
298 per_doc_deadline: None,
299 }
300 }
301}
302
303/// Wraps `Engine` for concurrent multi-document processing with backpressure.
304///
305/// The underlying `Engine` is shared via `Arc`; cloning `BatchEngine` is cheap.
306pub struct BatchEngine {
307 engine: Arc<Engine>,
308 controller: Arc<ConcurrencyController>,
309 /// Buffer cap forwarded to `buffer_unordered`.
310 concurrent: usize,
311 /// Default per-document deadline (spec 005 §R2). Stamped into an
312 /// `Instant` after each document acquires its concurrency permit
313 /// — so a slow earlier document does not consume budget allotted
314 /// to a later one, and `ConcurrencyController` wait time does
315 /// not count against the engine's slice. `None` means no
316 /// deadline; the construction-time default flows through
317 /// `lint_many` / `fix_many`. Per-call `_with_options` variants
318 /// can override.
319 per_doc_deadline: Option<Duration>,
320}
321
322impl BatchEngine {
323 pub fn new(engine: Engine, options: BatchOptions) -> Self {
324 let concurrent = options.max_concurrent_docs.unwrap_or(32);
325 let controller = ConcurrencyController::new(&ConcurOptions {
326 max_inflight_rows: options.max_concurrent_docs,
327 max_inflight_bytes: options.max_inflight_bytes,
328 });
329 Self {
330 engine: Arc::new(engine),
331 controller: Arc::new(controller),
332 concurrent,
333 per_doc_deadline: options.per_doc_deadline,
334 }
335 }
336
337 /// Lint many documents concurrently. Yields `(id, Result)` in
338 /// completion order; an `Err` indicates the per-document task
339 /// panicked, was cancelled, or could not start because shutdown
340 /// is in progress (the `ConcurrencyController` semaphore was
341 /// closed) — it does not abort the batch.
342 ///
343 /// Honors `BatchOptions::per_doc_deadline` from construction time
344 /// (spec 005 §R2). A deadline-truncated lint surfaces as
345 /// `Ok(LintResult { truncated: true, .. })` — the partial
346 /// diagnostics are useful, so they flow through the success
347 /// channel rather than `Err`.
348 pub fn lint_many(
349 &self,
350 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
351 ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
352 self.lint_many_inner(docs, self.per_doc_deadline)
353 }
354
355 /// Same as [`lint_many`] but reads `per_doc_deadline` from the
356 /// supplied [`BatchOptions`] instead of the construction-time
357 /// default. Other fields on `opts` are reserved for future
358 /// per-call overrides; in MVP only `per_doc_deadline` is honored.
359 ///
360 /// [`lint_many`]: BatchEngine::lint_many
361 pub fn lint_many_with_options(
362 &self,
363 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
364 opts: &BatchOptions,
365 ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
366 self.lint_many_inner(docs, opts.per_doc_deadline)
367 }
368
369 fn lint_many_inner(
370 &self,
371 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
372 per_doc_deadline: Option<Duration>,
373 ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
374 let engine = Arc::clone(&self.engine);
375 let controller = Arc::clone(&self.controller);
376 let concurrent = self.concurrent;
377
378 stream::iter(docs)
379 .map(move |(id, data)| {
380 let engine = Arc::clone(&engine);
381 let controller = Arc::clone(&controller);
382 async move {
383 let byte_len = data.len();
384 // Whitepaper §9.4 / gap register #8: surface a closed
385 // controller as `BatchError::ShutdownInProgress` rather
386 // than `.expect()`-panicking. The `From<AcquireError>`
387 // impl above maps the only possible error.
388 let _permit = match controller.acquire(Some(|| byte_len)).await {
389 Ok(p) => p,
390 Err(e) => return (id, Err(BatchError::from(e))),
391 };
392 // Spec 005 §R2: the deadline is stamped AFTER permit
393 // acquisition so slow `ConcurrencyController` waits
394 // (a backed-up batch) don't consume the document's
395 // engine budget.
396 let result = tokio::task::spawn_blocking(move || {
397 // `checked_add` overflow must not silently drop
398 // the deadline (which would let an unbounded
399 // pass run after the operator explicitly
400 // configured a budget). Treat overflow as
401 // `deadline = now`, which the engine's pre-pass
402 // check (`now >= deadline`) treats as expired
403 // and aborts on entry.
404 let deadline = per_doc_deadline.map(|d| {
405 let now = Instant::now();
406 now.checked_add(d).unwrap_or(now)
407 });
408 // In-crate construction may use struct-update
409 // syntax across `#[non_exhaustive]` — only the
410 // outside-the-defining-crate boundary is restricted.
411 let opts = LintOptions {
412 deadline,
413 ..LintOptions::default()
414 };
415 engine.lint_with_options(&data, &opts)
416 })
417 .await
418 .map_err(BatchError::from);
419 (id, result)
420 }
421 })
422 .buffer_unordered(concurrent)
423 }
424
425 /// Fix many documents concurrently. Yields `(id, Result)` in
426 /// completion order; an `Err` indicates the per-document task
427 /// panicked, was cancelled, hit the per-document deadline, or the
428 /// runtime is shutting down — it does not abort the batch.
429 ///
430 /// Honors `BatchOptions::per_doc_deadline` from construction
431 /// time. A deadline trip on the fix path returns
432 /// `Err(BatchError::DocumentDeadlineExceeded { partial_lint })`
433 /// per Constitution V Principle V — no partial `FixResult` is
434 /// ever produced. Match on `is_deadline_exceeded()` to
435 /// distinguish from worker bugs (`is_panic()`) or shutdown
436 /// (`is_shutdown()`).
437 pub fn fix_many(
438 &self,
439 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
440 ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
441 self.fix_many_inner(docs, self.per_doc_deadline)
442 }
443
444 /// Same as [`fix_many`] but reads `per_doc_deadline` from the
445 /// supplied [`BatchOptions`] instead of the construction-time
446 /// default. Other fields on `opts` are reserved for future
447 /// per-call overrides; in MVP only `per_doc_deadline` is honored.
448 ///
449 /// [`fix_many`]: BatchEngine::fix_many
450 pub fn fix_many_with_options(
451 &self,
452 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
453 opts: &BatchOptions,
454 ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
455 self.fix_many_inner(docs, opts.per_doc_deadline)
456 }
457
458 fn fix_many_inner(
459 &self,
460 docs: impl IntoIterator<Item = (String, Vec<u8>)>,
461 per_doc_deadline: Option<Duration>,
462 ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
463 let engine = Arc::clone(&self.engine);
464 let controller = Arc::clone(&self.controller);
465 let concurrent = self.concurrent;
466
467 stream::iter(docs)
468 .map(move |(id, data)| {
469 let engine = Arc::clone(&engine);
470 let controller = Arc::clone(&controller);
471 async move {
472 let byte_len = data.len();
473 let _permit = match controller.acquire(Some(|| byte_len)).await {
474 Ok(p) => p,
475 Err(e) => return (id, Err(BatchError::from(e))),
476 };
477 // Spec 005 §R2 — same per-permit stamping as
478 // `lint_many`. Spec 005 §R4: a deadline trip on the
479 // fix path returns `Err(EngineError::DeadlineExceeded
480 // { partial_lint })`, which we re-shape into
481 // `BatchError::DocumentDeadlineExceeded` so callers
482 // matching on `BatchError` see the deadline-trip
483 // signal at the same level as panic / shutdown.
484 let result = tokio::task::spawn_blocking(move || {
485 // Same overflow semantics as `lint_many_inner` —
486 // overflow folds to `deadline = now` (which the
487 // engine treats as already expired) so the
488 // operator-configured deadline is never silently
489 // disabled.
490 let deadline = per_doc_deadline.map(|d| {
491 let now = Instant::now();
492 now.checked_add(d).unwrap_or(now)
493 });
494 let opts = FixOptions {
495 deadline,
496 ..FixOptions::default()
497 };
498 engine.fix_with_options(&data, crate::FixMode::Apply, &opts)
499 })
500 .await;
501 let mapped = match result {
502 Ok(Ok(fix_result)) => Ok(fix_result),
503 Ok(Err(EngineError::DeadlineExceeded { partial_lint })) => {
504 Err(BatchError::DocumentDeadlineExceeded { partial_lint })
505 }
506 // `EngineError::InvalidThreshold` cannot fire here
507 // because `FixOptions` carries no `threshold_override`
508 // (default is `None`, falling back to the engine's
509 // pre-validated config threshold). A future addition
510 // of per-doc threshold overrides on `BatchOptions`
511 // would need to thread `EngineError::InvalidThreshold`
512 // into a new `BatchError` variant; until then the
513 // arm is `unreachable!` so a silent breakage is
514 // visible at the next test run.
515 Ok(Err(EngineError::InvalidThreshold(_))) => unreachable!(
516 "BatchEngine does not set FixOptions::threshold_override; \
517 InvalidThreshold cannot fire"
518 ),
519 // `EngineError` is `#[non_exhaustive]` for crate
520 // outsiders, but inside `marque-engine` we see all
521 // variants — adding a future variant will produce
522 // a non-exhaustive-match error here, forcing an
523 // explicit `BatchError` mapping decision rather
524 // than a silently-eaten wildcard.
525 Err(join_error) => Err(BatchError::from(join_error)),
526 };
527 (id, mapped)
528 }
529 })
530 .buffer_unordered(concurrent)
531 }
532}
533
534#[cfg(test)]
535#[cfg_attr(coverage_nightly, coverage(off))]
536mod tests {
537 use super::*;
538
539 #[test]
540 fn shutdown_error_is_not_panic_or_cancellation() {
541 let e = BatchError::ShutdownInProgress;
542 assert!(!e.is_panic());
543 assert!(!e.is_cancelled());
544 assert!(e.is_shutdown());
545 }
546
547 #[test]
548 fn shutdown_error_display_names_the_state() {
549 let e = BatchError::ShutdownInProgress;
550 let s = e.to_string();
551 // The Display string must convey "shutdown" cleanly so a log
552 // grep on operator dashboards picks it up. We don't assert
553 // exact wording — only the discriminating substrings.
554 assert!(
555 s.contains("shutdown"),
556 "ShutdownInProgress Display should name the state explicitly: got {s:?}"
557 );
558 assert!(
559 s.contains("closed"),
560 "Display should name the underlying signal (semaphore closed): got {s:?}"
561 );
562 }
563
564 #[test]
565 fn shutdown_error_has_no_source() {
566 // Whitepaper §9.4: `ShutdownInProgress` is a terminal signal,
567 // not a wrapped error. Anything pretending to be a `source()`
568 // here would be misleading — the underlying `AcquireError`
569 // carries no information beyond "closed".
570 let e = BatchError::ShutdownInProgress;
571 assert!(
572 std::error::Error::source(&e).is_none(),
573 "ShutdownInProgress must not chain to a source"
574 );
575 }
576
577 #[test]
578 fn from_acquire_error_yields_shutdown_variant() {
579 // Drive the conversion path the runtime uses. Closing a
580 // semaphore and acquiring against it produces `AcquireError`,
581 // which `BatchError::from` must convert to `ShutdownInProgress`.
582 let sem = tokio::sync::Semaphore::new(1);
583 sem.close();
584 // `try_acquire` on a closed semaphore returns
585 // `TryAcquireError::Closed`, not `AcquireError`. The async
586 // `acquire().await` returns `AcquireError`. Run a tiny
587 // single-thread runtime to drive the right path.
588 let rt = tokio::runtime::Builder::new_current_thread()
589 .build()
590 .expect("current_thread runtime builds");
591 let acquire_err = rt.block_on(async { sem.acquire().await }).unwrap_err();
592 let batch_err: BatchError = acquire_err.into();
593 assert!(batch_err.is_shutdown());
594 assert!(!batch_err.is_panic());
595 }
596}