Skip to main content

marque_engine/
batch.rs

1//! Concurrent batch processing over many documents.
2//!
3//! `BatchEngine` wraps `Engine` behind an `Arc` and uses `ConcurrencyController`
4//! from `recoco-utils` to enforce row and byte limits on in-flight work.
5//!
6//! CPU-bound lint/fix work is dispatched to tokio's blocking thread pool via
7//! `spawn_blocking`, keeping the async executor free for I/O-bound coordination.
8//!
9//! Results stream out in **completion order** (fastest documents first), not
10//! submission order. Callers correlate results by the `id` field echoed back
11//! alongside each result.
12//!
13//! # Example
14//!
15//! ```rust,no_run
16//! use marque_engine::{Engine, batch::{BatchEngine, BatchOptions}};
17//! use futures::StreamExt;
18//!
19//! # async fn example(engine: Engine) {
20//! let batch = BatchEngine::new(engine, BatchOptions {
21//!     max_concurrent_docs: Some(16),
22//!     max_inflight_bytes: Some(256 * 1024 * 1024), // 256 MiB
23//! });
24//!
25//! let docs = vec![
26//!     ("doc1".to_owned(), b"TOP SECRET//SI".to_vec()),
27//!     ("doc2".to_owned(), b"SECRET//NOFORN".to_vec()),
28//! ];
29//!
30//! let mut results = batch.lint_many(docs);
31//! while let Some((id, result)) = results.next().await {
32//!     match result {
33//!         Ok(lint) => println!("{id}: {} diagnostics", lint.diagnostics.len()),
34//!         Err(e) => eprintln!("{id}: failed: {e}"),
35//!     }
36//! }
37//! # }
38//! ```
39
40use std::sync::Arc;
41
42use futures::{Stream, StreamExt, stream};
43use recoco_utils::concur_control::{ConcurrencyController, Options as ConcurOptions};
44
45use crate::{Engine, FixResult, LintResult};
46
47/// Error returned when a single document in a batch fails to process.
48///
49/// Batch APIs surface this per-document so a panic or cancellation in one
50/// document does not abort the entire batch run.
51#[derive(Debug)]
52pub enum BatchError {
53    /// The blocking lint/fix task panicked or was cancelled.
54    TaskFailed(tokio::task::JoinError),
55}
56
57impl BatchError {
58    /// Returns `true` if the error was caused by a panic in the worker task.
59    ///
60    /// CI pipelines and supervisors should treat this as an application bug
61    /// that warrants investigation (not a transient infrastructure issue).
62    pub fn is_panic(&self) -> bool {
63        match self {
64            Self::TaskFailed(e) => e.is_panic(),
65        }
66    }
67
68    /// Returns `true` if the error was caused by task cancellation (e.g.,
69    /// runtime shutdown, explicit abort).
70    ///
71    /// Cancellation is an expected operational event — callers that see
72    /// this during a graceful shutdown should typically log-and-continue,
73    /// not alert.
74    pub fn is_cancelled(&self) -> bool {
75        match self {
76            Self::TaskFailed(e) => e.is_cancelled(),
77        }
78    }
79}
80
81impl std::fmt::Display for BatchError {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match self {
84            Self::TaskFailed(e) => {
85                let kind = if e.is_panic() {
86                    "panicked"
87                } else if e.is_cancelled() {
88                    "was cancelled"
89                } else {
90                    "failed"
91                };
92                write!(f, "batch task {kind}: {e}")
93            }
94        }
95    }
96}
97
98impl std::error::Error for BatchError {
99    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
100        match self {
101            Self::TaskFailed(e) => Some(e),
102        }
103    }
104}
105
106impl From<tokio::task::JoinError> for BatchError {
107    fn from(e: tokio::task::JoinError) -> Self {
108        Self::TaskFailed(e)
109    }
110}
111
112/// Concurrency limits for batch processing.
113///
114/// Both limits are optional and independent. When both are set the more
115/// restrictive one governs at any given moment.
116pub struct BatchOptions {
117    /// Maximum documents in-flight simultaneously.
118    ///
119    /// This field drives **two** independent limits that both happen to
120    /// share this value:
121    ///
122    /// 1. `ConcurrencyController::max_inflight_rows` — the semaphore that
123    ///    rate-limits how many documents can hold permits at the same time.
124    /// 2. `buffer_unordered` cap — how many per-document futures are
125    ///    created and polled ahead of readiness.
126    ///
127    /// In practice they are always set together: the effective maximum is
128    /// the minimum of whichever blocks first for a given workload.
129    /// Defaults to 32.
130    pub max_concurrent_docs: Option<usize>,
131
132    /// Maximum total bytes of document content in-flight simultaneously.
133    ///
134    /// Useful for memory-bounded batch runs over large corpora. `None` means
135    /// unlimited (byte accounting is still tracked for observability).
136    pub max_inflight_bytes: Option<usize>,
137}
138
139impl Default for BatchOptions {
140    fn default() -> Self {
141        Self {
142            max_concurrent_docs: Some(32),
143            max_inflight_bytes: None,
144        }
145    }
146}
147
148/// Wraps `Engine` for concurrent multi-document processing with backpressure.
149///
150/// The underlying `Engine` is shared via `Arc`; cloning `BatchEngine` is cheap.
151pub struct BatchEngine {
152    engine: Arc<Engine>,
153    controller: Arc<ConcurrencyController>,
154    /// Buffer cap forwarded to `buffer_unordered`.
155    concurrent: usize,
156}
157
158impl BatchEngine {
159    pub fn new(engine: Engine, options: BatchOptions) -> Self {
160        let concurrent = options.max_concurrent_docs.unwrap_or(32);
161        let controller = ConcurrencyController::new(&ConcurOptions {
162            max_inflight_rows: options.max_concurrent_docs,
163            max_inflight_bytes: options.max_inflight_bytes,
164        });
165        Self {
166            engine: Arc::new(engine),
167            controller: Arc::new(controller),
168            concurrent,
169        }
170    }
171
172    /// Lint many documents concurrently. Yields `(id, Result)` in
173    /// completion order; an `Err` indicates the per-document task panicked
174    /// or was cancelled — it does not abort the batch.
175    pub fn lint_many(
176        &self,
177        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
178    ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
179        let engine = Arc::clone(&self.engine);
180        let controller = Arc::clone(&self.controller);
181        let concurrent = self.concurrent;
182
183        stream::iter(docs)
184            .map(move |(id, data)| {
185                let engine = Arc::clone(&engine);
186                let controller = Arc::clone(&controller);
187                async move {
188                    let byte_len = data.len();
189                    let _permit = controller
190                        .acquire(Some(|| byte_len))
191                        .await
192                        .expect("ConcurrencyController semaphore unexpectedly closed");
193                    let result = tokio::task::spawn_blocking(move || engine.lint(&data))
194                        .await
195                        .map_err(BatchError::from);
196                    (id, result)
197                }
198            })
199            .buffer_unordered(concurrent)
200    }
201
202    /// Fix many documents concurrently. Yields `(id, Result)` in
203    /// completion order; an `Err` indicates the per-document task panicked
204    /// or was cancelled — it does not abort the batch.
205    pub fn fix_many(
206        &self,
207        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
208    ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
209        let engine = Arc::clone(&self.engine);
210        let controller = Arc::clone(&self.controller);
211        let concurrent = self.concurrent;
212
213        stream::iter(docs)
214            .map(move |(id, data)| {
215                let engine = Arc::clone(&engine);
216                let controller = Arc::clone(&controller);
217                async move {
218                    let byte_len = data.len();
219                    let _permit = controller
220                        .acquire(Some(|| byte_len))
221                        .await
222                        .expect("ConcurrencyController semaphore unexpectedly closed");
223                    let result = tokio::task::spawn_blocking(move || {
224                        engine.fix(&data, crate::FixMode::Apply)
225                    })
226                    .await
227                    .map_err(BatchError::from);
228                    (id, result)
229                }
230            })
231            .buffer_unordered(concurrent)
232    }
233}