Skip to main content

marque_engine/
batch.rs

1// SPDX-FileCopyrightText: 2026 Knitli Inc.
2//
3// SPDX-License-Identifier: LicenseRef-MarqueLicense-1.0
4
5//! Concurrent batch processing over many documents.
6//!
7//! `BatchEngine` wraps `Engine` behind an `Arc` and uses `ConcurrencyController`
8//! from `recoco-utils` to enforce row and byte limits on in-flight work.
9//!
10//! CPU-bound lint/fix work is dispatched to tokio's blocking thread pool via
11//! `spawn_blocking`, keeping the async executor free for I/O-bound coordination.
12//!
13//! Results stream out in **completion order** (fastest documents first), not
14//! submission order. Callers correlate results by the `id` field echoed back
15//! alongside each result.
16//!
17//! # Example
18//!
19//! ```rust,no_run
20//! use marque_engine::{Engine, batch::{BatchEngine, BatchOptions}};
21//! use futures::StreamExt;
22//! use std::time::Duration;
23//!
24//! # async fn example(engine: Engine) {
25//! // `BatchOptions` is `#[non_exhaustive]`, so construct via
26//! // `Default::default()` + field assignment.
27//! let mut options = BatchOptions::default();
28//! options.max_concurrent_docs = Some(16);
29//! options.max_inflight_bytes = Some(256 * 1024 * 1024); // 256 MiB
30//! options.per_doc_deadline = Some(Duration::from_secs(5));
31//! let batch = BatchEngine::new(engine, options);
32//!
33//! let docs = vec![
34//!     ("doc1".to_owned(), b"TOP SECRET//SI".to_vec()),
35//!     ("doc2".to_owned(), b"SECRET//NOFORN".to_vec()),
36//! ];
37//!
38//! let mut results = batch.lint_many(docs);
39//! while let Some((id, result)) = results.next().await {
40//!     match result {
41//!         Ok(lint) => println!("{id}: {} diagnostics", lint.diagnostics.len()),
42//!         Err(e) => eprintln!("{id}: failed: {e}"),
43//!     }
44//! }
45//! # }
46//! ```
47
48use std::sync::Arc;
49// Batch processing uses `std::time::Instant`. `BatchEngine` depends
50// on tokio (gated behind the `batch` Cargo feature), and tokio
51// itself does not target `wasm32-unknown-unknown`, so this module
52// never reaches the WASM clock-polyfill question — std's `Instant`
53// is sufficient.
54use std::time::{Duration, Instant};
55
56use futures::{Stream, StreamExt, stream};
57use recoco_utils::concur_control::{ConcurrencyController, Options as ConcurOptions};
58
59use crate::{Engine, EngineError, FixOptions, FixResult, LintOptions, LintResult};
60
61/// Error returned when a single document in a batch fails to process.
62///
63/// Batch APIs surface this per-document so a panic, cancellation, or
64/// graceful shutdown of the underlying concurrency controller does not
65/// abort the entire batch run.
66///
67/// `#[non_exhaustive]` because future infrastructure-level errors
68/// (deadline expired, cache write-through failed, queue overflow,
69/// etc.) will land as new variants alongside the existing two. A
70/// downstream `match` should always carry a wildcard arm; without
71/// `non_exhaustive` every new variant would be a semver-breaking
72/// change for consumers, which would either pin them to a stale
73/// version or pressure us to never grow the surface.
74#[derive(Debug)]
75#[non_exhaustive]
76pub enum BatchError {
77    /// The blocking lint/fix task panicked or was cancelled.
78    TaskFailed(tokio::task::JoinError),
79    /// The `ConcurrencyController` semaphore was closed while this
80    /// document was waiting for a permit. Indicates the runtime is in
81    /// shutdown — the caller has no work to do beyond observing the
82    /// error and ending its loop.
83    ///
84    /// Whitepaper §9.4 / gap register #8 carved this out as a separate
85    /// variant so deployment supervisors can distinguish it from a
86    /// real worker-task panic. `is_panic()` returns `false` for this
87    /// variant; `is_shutdown()` returns `true`.
88    ShutdownInProgress,
89    /// `fix_many` aborted this document's fix pass because the
90    /// per-document deadline (set on `BatchOptions::per_doc_deadline`)
91    /// expired. Spec 005 §R4 / Constitution V Principle V — no partial
92    /// `FixResult` is ever produced; the caller receives the partial
93    /// `LintResult` so it can render whatever diagnostics the engine
94    /// surfaced before the abort.
95    ///
96    /// `is_deadline_exceeded()` returns `true` for this variant only.
97    /// `is_panic()` and `is_shutdown()` return `false` — a deadline
98    /// trip is a routine operational signal, not a worker bug or
99    /// runtime shutdown.
100    ///
101    /// Note: only the **fix** path produces this variant. `lint_many`
102    /// surfaces a deadline-truncated lint as `Ok(LintResult { truncated:
103    /// true, .. })` so the partial diagnostics flow through the same
104    /// success channel — there is no asymmetric response shape on the
105    /// lint side because no audit-stream invariant is at risk.
106    DocumentDeadlineExceeded {
107        /// The lint pass produced before the deadline tripped. May
108        /// itself be truncated (`partial_lint.truncated`) if the
109        /// deadline expired during the lint phase rather than the
110        /// fix-application phase.
111        partial_lint: LintResult,
112    },
113}
114
115impl BatchError {
116    /// Returns `true` if the error was caused by a panic in the worker task.
117    ///
118    /// CI pipelines and supervisors should treat this as an application bug
119    /// that warrants investigation (not a transient infrastructure issue).
120    pub fn is_panic(&self) -> bool {
121        match self {
122            Self::TaskFailed(e) => e.is_panic(),
123            Self::ShutdownInProgress => false,
124            Self::DocumentDeadlineExceeded { .. } => false,
125        }
126    }
127
128    /// Returns `true` if the error was caused by task cancellation (e.g.,
129    /// runtime shutdown, explicit abort).
130    ///
131    /// Cancellation is an expected operational event — callers that see
132    /// this during a graceful shutdown should typically log-and-continue,
133    /// not alert.
134    pub fn is_cancelled(&self) -> bool {
135        match self {
136            Self::TaskFailed(e) => e.is_cancelled(),
137            Self::ShutdownInProgress => false,
138            Self::DocumentDeadlineExceeded { .. } => false,
139        }
140    }
141
142    /// Returns `true` if the error was caused by the `ConcurrencyController`
143    /// semaphore being closed while this document was awaiting a permit.
144    ///
145    /// Distinct from `is_cancelled()` (which fires when a worker task is
146    /// aborted mid-execution) and from `is_panic()` (which fires on a real
147    /// bug). Shutdown is the routine end-of-life signal — supervisors
148    /// should drain any remaining items in the result stream and exit.
149    pub fn is_shutdown(&self) -> bool {
150        matches!(self, Self::ShutdownInProgress)
151    }
152
153    /// Returns `true` if this error was caused by the per-document
154    /// deadline expiring during a `fix_many` call.
155    ///
156    /// Routine operational signal — the document took longer to
157    /// process than its budget allowed. Callers should render the
158    /// embedded `partial_lint` diagnostics and either skip the
159    /// document or retry with a larger budget. Distinct from
160    /// `is_panic()` (worker bug) and `is_shutdown()` (runtime
161    /// end-of-life).
162    pub fn is_deadline_exceeded(&self) -> bool {
163        matches!(self, Self::DocumentDeadlineExceeded { .. })
164    }
165}
166
167impl std::fmt::Display for BatchError {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        match self {
170            Self::TaskFailed(e) => {
171                let kind = if e.is_panic() {
172                    "panicked"
173                } else if e.is_cancelled() {
174                    "was cancelled"
175                } else {
176                    "failed"
177                };
178                write!(f, "batch task {kind}: {e}")
179            }
180            Self::ShutdownInProgress => {
181                f.write_str("ConcurrencyController semaphore closed (shutdown in progress)")
182            }
183            Self::DocumentDeadlineExceeded { partial_lint } => write!(
184                f,
185                "document deadline exceeded after {}/{} candidates",
186                partial_lint.candidates_processed, partial_lint.candidates_total
187            ),
188        }
189    }
190}
191
192impl std::error::Error for BatchError {
193    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
194        match self {
195            Self::TaskFailed(e) => Some(e),
196            Self::ShutdownInProgress => None,
197            // Like `EngineError::DeadlineExceeded`, the deadline trip
198            // is not caused by an inner error — it reports a runtime
199            // condition (the deadline elapsed) with no underlying
200            // failure to chain.
201            Self::DocumentDeadlineExceeded { .. } => None,
202        }
203    }
204}
205
206impl From<tokio::task::JoinError> for BatchError {
207    fn from(e: tokio::task::JoinError) -> Self {
208        Self::TaskFailed(e)
209    }
210}
211
212impl From<tokio::sync::AcquireError> for BatchError {
213    fn from(_: tokio::sync::AcquireError) -> Self {
214        // `AcquireError` carries no information beyond "semaphore was
215        // closed" — and the semaphore here is owned by `BatchEngine`,
216        // so closure means the engine is shutting down. Surface that
217        // intent explicitly rather than re-exporting tokio's type.
218        Self::ShutdownInProgress
219    }
220}
221
222/// Concurrency limits and per-document budgets for batch processing.
223///
224/// All fields are optional and independent. When both concurrency limits
225/// are set the more restrictive one governs at any given moment;
226/// `per_doc_deadline` is orthogonal and applies separately to each
227/// document's permit-acquired execution slice.
228///
229/// # Breaking change in this release
230///
231/// This struct gained `#[non_exhaustive]` and a new `per_doc_deadline`
232/// field in spec 005 Phase 3d. **Downstream code that previously
233/// constructed `BatchOptions` with a struct literal**
234/// (`BatchOptions { max_concurrent_docs, max_inflight_bytes }`) **will
235/// no longer compile** — `#[non_exhaustive]` blocks cross-crate
236/// struct-literal construction unconditionally, even when every
237/// existing field is supplied. Switch to
238/// `Default::default()` + public field assignment, shown below. (The
239/// CHANGELOG / release notes for this version surface this explicitly.)
240///
241/// `#[non_exhaustive]` was added so future per-doc concerns (memory
242/// budgets, per-rule deadlines, cancellation tokens) can join without
243/// a further breaking-change cycle for downstream callers using the
244/// recommended construction pattern.
245///
246/// ```rust,no_run
247/// use marque_engine::BatchOptions;
248/// use std::time::Duration;
249///
250/// let mut opts = BatchOptions::default();
251/// opts.per_doc_deadline = Some(Duration::from_secs(5));
252/// ```
253#[non_exhaustive]
254pub struct BatchOptions {
255    /// Maximum documents in-flight simultaneously.
256    ///
257    /// This field drives **two** independent limits that both happen to
258    /// share this value:
259    ///
260    /// 1. `ConcurrencyController::max_inflight_rows` — the semaphore that
261    ///    rate-limits how many documents can hold permits at the same time.
262    /// 2. `buffer_unordered` cap — how many per-document futures are
263    ///    created and polled ahead of readiness.
264    ///
265    /// In practice they are always set together: the effective maximum is
266    /// the minimum of whichever blocks first for a given workload.
267    /// Defaults to 32.
268    pub max_concurrent_docs: Option<usize>,
269
270    /// Maximum total bytes of document content in-flight simultaneously.
271    ///
272    /// Useful for memory-bounded batch runs over large corpora. `None` means
273    /// unlimited (byte accounting is still tracked for observability).
274    pub max_inflight_bytes: Option<usize>,
275
276    /// Per-document wall-clock budget (spec 005 §R2). When `Some(d)`,
277    /// each document's lint/fix call gets its own deadline of
278    /// `Instant::now() + d` stamped **after** the document acquires
279    /// its concurrency permit — `ConcurrencyController` wait time
280    /// does not consume the budget. A slow document does not borrow
281    /// from a fast document's slice.
282    ///
283    /// On expiry: lint returns `Ok(LintResult { truncated: true, .. })`
284    /// (partial diagnostics matter to the caller). Fix returns
285    /// `Err(BatchError::DocumentDeadlineExceeded { partial_lint })`
286    /// per Constitution V Principle V — no partial `FixResult` is
287    /// ever produced.
288    ///
289    /// `None` (default) means no per-document deadline.
290    pub per_doc_deadline: Option<Duration>,
291}
292
293impl Default for BatchOptions {
294    fn default() -> Self {
295        Self {
296            max_concurrent_docs: Some(32),
297            max_inflight_bytes: None,
298            per_doc_deadline: None,
299        }
300    }
301}
302
303/// Wraps `Engine` for concurrent multi-document processing with backpressure.
304///
305/// The underlying `Engine` is shared via `Arc`; cloning `BatchEngine` is cheap.
306pub struct BatchEngine {
307    engine: Arc<Engine>,
308    controller: Arc<ConcurrencyController>,
309    /// Buffer cap forwarded to `buffer_unordered`.
310    concurrent: usize,
311    /// Default per-document deadline (spec 005 §R2). Stamped into an
312    /// `Instant` after each document acquires its concurrency permit
313    /// — so a slow earlier document does not consume budget allotted
314    /// to a later one, and `ConcurrencyController` wait time does
315    /// not count against the engine's slice. `None` means no
316    /// deadline; the construction-time default flows through
317    /// `lint_many` / `fix_many`. Per-call `_with_options` variants
318    /// can override.
319    per_doc_deadline: Option<Duration>,
320}
321
322impl BatchEngine {
323    pub fn new(engine: Engine, options: BatchOptions) -> Self {
324        let concurrent = options.max_concurrent_docs.unwrap_or(32);
325        let controller = ConcurrencyController::new(&ConcurOptions {
326            max_inflight_rows: options.max_concurrent_docs,
327            max_inflight_bytes: options.max_inflight_bytes,
328        });
329        Self {
330            engine: Arc::new(engine),
331            controller: Arc::new(controller),
332            concurrent,
333            per_doc_deadline: options.per_doc_deadline,
334        }
335    }
336
337    /// Lint many documents concurrently. Yields `(id, Result)` in
338    /// completion order; an `Err` indicates the per-document task
339    /// panicked, was cancelled, or could not start because shutdown
340    /// is in progress (the `ConcurrencyController` semaphore was
341    /// closed) — it does not abort the batch.
342    ///
343    /// Honors `BatchOptions::per_doc_deadline` from construction time
344    /// (spec 005 §R2). A deadline-truncated lint surfaces as
345    /// `Ok(LintResult { truncated: true, .. })` — the partial
346    /// diagnostics are useful, so they flow through the success
347    /// channel rather than `Err`.
348    pub fn lint_many(
349        &self,
350        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
351    ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
352        self.lint_many_inner(docs, self.per_doc_deadline)
353    }
354
355    /// Same as [`lint_many`] but reads `per_doc_deadline` from the
356    /// supplied [`BatchOptions`] instead of the construction-time
357    /// default. Other fields on `opts` are reserved for future
358    /// per-call overrides; in MVP only `per_doc_deadline` is honored.
359    ///
360    /// [`lint_many`]: BatchEngine::lint_many
361    pub fn lint_many_with_options(
362        &self,
363        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
364        opts: &BatchOptions,
365    ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
366        self.lint_many_inner(docs, opts.per_doc_deadline)
367    }
368
369    fn lint_many_inner(
370        &self,
371        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
372        per_doc_deadline: Option<Duration>,
373    ) -> impl Stream<Item = (String, Result<LintResult, BatchError>)> {
374        let engine = Arc::clone(&self.engine);
375        let controller = Arc::clone(&self.controller);
376        let concurrent = self.concurrent;
377
378        stream::iter(docs)
379            .map(move |(id, data)| {
380                let engine = Arc::clone(&engine);
381                let controller = Arc::clone(&controller);
382                async move {
383                    let byte_len = data.len();
384                    // Whitepaper §9.4 / gap register #8: surface a closed
385                    // controller as `BatchError::ShutdownInProgress` rather
386                    // than `.expect()`-panicking. The `From<AcquireError>`
387                    // impl above maps the only possible error.
388                    let _permit = match controller.acquire(Some(|| byte_len)).await {
389                        Ok(p) => p,
390                        Err(e) => return (id, Err(BatchError::from(e))),
391                    };
392                    // Spec 005 §R2: the deadline is stamped AFTER permit
393                    // acquisition so slow `ConcurrencyController` waits
394                    // (a backed-up batch) don't consume the document's
395                    // engine budget.
396                    let result = tokio::task::spawn_blocking(move || {
397                        // `checked_add` overflow must not silently drop
398                        // the deadline (which would let an unbounded
399                        // pass run after the operator explicitly
400                        // configured a budget). Treat overflow as
401                        // `deadline = now`, which the engine's pre-pass
402                        // check (`now >= deadline`) treats as expired
403                        // and aborts on entry.
404                        let deadline = per_doc_deadline.map(|d| {
405                            let now = Instant::now();
406                            now.checked_add(d).unwrap_or(now)
407                        });
408                        // In-crate construction may use struct-update
409                        // syntax across `#[non_exhaustive]` — only the
410                        // outside-the-defining-crate boundary is restricted.
411                        let opts = LintOptions {
412                            deadline,
413                            ..LintOptions::default()
414                        };
415                        engine.lint_with_options(&data, &opts)
416                    })
417                    .await
418                    .map_err(BatchError::from);
419                    (id, result)
420                }
421            })
422            .buffer_unordered(concurrent)
423    }
424
425    /// Fix many documents concurrently. Yields `(id, Result)` in
426    /// completion order; an `Err` indicates the per-document task
427    /// panicked, was cancelled, hit the per-document deadline, or the
428    /// runtime is shutting down — it does not abort the batch.
429    ///
430    /// Honors `BatchOptions::per_doc_deadline` from construction
431    /// time. A deadline trip on the fix path returns
432    /// `Err(BatchError::DocumentDeadlineExceeded { partial_lint })`
433    /// per Constitution V Principle V — no partial `FixResult` is
434    /// ever produced. Match on `is_deadline_exceeded()` to
435    /// distinguish from worker bugs (`is_panic()`) or shutdown
436    /// (`is_shutdown()`).
437    pub fn fix_many(
438        &self,
439        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
440    ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
441        self.fix_many_inner(docs, self.per_doc_deadline)
442    }
443
444    /// Same as [`fix_many`] but reads `per_doc_deadline` from the
445    /// supplied [`BatchOptions`] instead of the construction-time
446    /// default. Other fields on `opts` are reserved for future
447    /// per-call overrides; in MVP only `per_doc_deadline` is honored.
448    ///
449    /// [`fix_many`]: BatchEngine::fix_many
450    pub fn fix_many_with_options(
451        &self,
452        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
453        opts: &BatchOptions,
454    ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
455        self.fix_many_inner(docs, opts.per_doc_deadline)
456    }
457
458    fn fix_many_inner(
459        &self,
460        docs: impl IntoIterator<Item = (String, Vec<u8>)>,
461        per_doc_deadline: Option<Duration>,
462    ) -> impl Stream<Item = (String, Result<FixResult, BatchError>)> {
463        let engine = Arc::clone(&self.engine);
464        let controller = Arc::clone(&self.controller);
465        let concurrent = self.concurrent;
466
467        stream::iter(docs)
468            .map(move |(id, data)| {
469                let engine = Arc::clone(&engine);
470                let controller = Arc::clone(&controller);
471                async move {
472                    let byte_len = data.len();
473                    let _permit = match controller.acquire(Some(|| byte_len)).await {
474                        Ok(p) => p,
475                        Err(e) => return (id, Err(BatchError::from(e))),
476                    };
477                    // Spec 005 §R2 — same per-permit stamping as
478                    // `lint_many`. Spec 005 §R4: a deadline trip on the
479                    // fix path returns `Err(EngineError::DeadlineExceeded
480                    // { partial_lint })`, which we re-shape into
481                    // `BatchError::DocumentDeadlineExceeded` so callers
482                    // matching on `BatchError` see the deadline-trip
483                    // signal at the same level as panic / shutdown.
484                    let result = tokio::task::spawn_blocking(move || {
485                        // Same overflow semantics as `lint_many_inner` —
486                        // overflow folds to `deadline = now` (which the
487                        // engine treats as already expired) so the
488                        // operator-configured deadline is never silently
489                        // disabled.
490                        let deadline = per_doc_deadline.map(|d| {
491                            let now = Instant::now();
492                            now.checked_add(d).unwrap_or(now)
493                        });
494                        let opts = FixOptions {
495                            deadline,
496                            ..FixOptions::default()
497                        };
498                        engine.fix_with_options(&data, crate::FixMode::Apply, &opts)
499                    })
500                    .await;
501                    let mapped = match result {
502                        Ok(Ok(fix_result)) => Ok(fix_result),
503                        Ok(Err(EngineError::DeadlineExceeded { partial_lint })) => {
504                            Err(BatchError::DocumentDeadlineExceeded { partial_lint })
505                        }
506                        // `EngineError::InvalidThreshold` cannot fire here
507                        // because `FixOptions` carries no `threshold_override`
508                        // (default is `None`, falling back to the engine's
509                        // pre-validated config threshold). A future addition
510                        // of per-doc threshold overrides on `BatchOptions`
511                        // would need to thread `EngineError::InvalidThreshold`
512                        // into a new `BatchError` variant; until then the
513                        // arm is `unreachable!` so a silent breakage is
514                        // visible at the next test run.
515                        Ok(Err(EngineError::InvalidThreshold(_))) => unreachable!(
516                            "BatchEngine does not set FixOptions::threshold_override; \
517                             InvalidThreshold cannot fire"
518                        ),
519                        // `EngineError` is `#[non_exhaustive]` for crate
520                        // outsiders, but inside `marque-engine` we see all
521                        // variants — adding a future variant will produce
522                        // a non-exhaustive-match error here, forcing an
523                        // explicit `BatchError` mapping decision rather
524                        // than a silently-eaten wildcard.
525                        Err(join_error) => Err(BatchError::from(join_error)),
526                    };
527                    (id, mapped)
528                }
529            })
530            .buffer_unordered(concurrent)
531    }
532}
533
534#[cfg(test)]
535#[cfg_attr(coverage_nightly, coverage(off))]
536mod tests {
537    use super::*;
538
539    #[test]
540    fn shutdown_error_is_not_panic_or_cancellation() {
541        let e = BatchError::ShutdownInProgress;
542        assert!(!e.is_panic());
543        assert!(!e.is_cancelled());
544        assert!(e.is_shutdown());
545    }
546
547    #[test]
548    fn shutdown_error_display_names_the_state() {
549        let e = BatchError::ShutdownInProgress;
550        let s = e.to_string();
551        // The Display string must convey "shutdown" cleanly so a log
552        // grep on operator dashboards picks it up. We don't assert
553        // exact wording — only the discriminating substrings.
554        assert!(
555            s.contains("shutdown"),
556            "ShutdownInProgress Display should name the state explicitly: got {s:?}"
557        );
558        assert!(
559            s.contains("closed"),
560            "Display should name the underlying signal (semaphore closed): got {s:?}"
561        );
562    }
563
564    #[test]
565    fn shutdown_error_has_no_source() {
566        // Whitepaper §9.4: `ShutdownInProgress` is a terminal signal,
567        // not a wrapped error. Anything pretending to be a `source()`
568        // here would be misleading — the underlying `AcquireError`
569        // carries no information beyond "closed".
570        let e = BatchError::ShutdownInProgress;
571        assert!(
572            std::error::Error::source(&e).is_none(),
573            "ShutdownInProgress must not chain to a source"
574        );
575    }
576
577    #[test]
578    fn from_acquire_error_yields_shutdown_variant() {
579        // Drive the conversion path the runtime uses. Closing a
580        // semaphore and acquiring against it produces `AcquireError`,
581        // which `BatchError::from` must convert to `ShutdownInProgress`.
582        let sem = tokio::sync::Semaphore::new(1);
583        sem.close();
584        // `try_acquire` on a closed semaphore returns
585        // `TryAcquireError::Closed`, not `AcquireError`. The async
586        // `acquire().await` returns `AcquireError`. Run a tiny
587        // single-thread runtime to drive the right path.
588        let rt = tokio::runtime::Builder::new_current_thread()
589            .build()
590            .expect("current_thread runtime builds");
591        let acquire_err = rt.block_on(async { sem.acquire().await }).unwrap_err();
592        let batch_err: BatchError = acquire_err.into();
593        assert!(batch_err.is_shutdown());
594        assert!(!batch_err.is_panic());
595    }
596}