Skip to main content

sqry_nl/classifier/
pool.rs

1//! NL07 — bounded classifier pool.
2//!
3//! [`ClassifierPool`] holds `N` independent loaded
4//! [`crate::classifier::IntentClassifier`] sessions, each wrapped in a
5//! [`crate::classifier::SharedClassifier`]. Concurrent translate calls
6//! acquire a slot, classify, and release the slot back to the pool —
7//! capping resident memory at `N × per-classifier RSS` regardless of
8//! request fan-in.
9//!
10//! # Why a `crossbeam_channel::bounded` channel and not a queue + condvar?
11//!
12//! A naive `ArrayQueue + Mutex<bool> + Condvar` triple has a lost-wakeup
13//! window: a producer that pushes between the consumer's queue-empty
14//! check and `condvar.wait` will never wake the consumer. Crossbeam's
15//! bounded channel wraps that wait/notify atomically, so a `recv()`
16//! observed-empty + `send()` race cannot drop the wakeup. The acquire
17//! path is `recv()`; the release path is `send()` of the same
18//! [`SharedClassifier`].
19//!
20//! # Pool invariant — N distinct loaded sessions
21//!
22//! The constructor [`ClassifierPool::new`] calls the user-supplied
23//! `loader` exactly `capacity` times. Each call MUST yield a freshly
24//! loaded [`crate::classifier::IntentClassifier`] (separate
25//! `IntentClassifier::load` invocation, separate `ort::Session`
26//! allocation, separate model-weight buffer). The
27//! [`crate::classifier::SharedClassifier`]s wrapping those classifiers
28//! never alias one another — distinct slots = distinct sessions = the
29//! pool fans out across N parallel inference workers.
30//!
31//! # Panic safety — slot return via `scopeguard::guard`
32//!
33//! [`PoolGuard`] wraps the held [`SharedClassifier`] in a
34//! [`scopeguard::guard`] whose deferred closure performs the channel
35//! send back into the pool. The scopeguard's drop hook runs on every
36//! exit path — normal scope exit AND panicking-unwind — so a panic
37//! inside [`crate::classifier::IntentClassifier::classify`] cannot
38//! leak a slot.
39//!
40//! Why scopeguard rather than just a hand-rolled `Drop` impl on
41//! `PoolGuard` (which Rust would also run on unwind)? The DAG NL07
42//! `critical_decisions` list mandates scopeguard as the panic-safety
43//! primitive: it makes the contract structurally explicit at the call
44//! site (the closure that "must run" is named at the point the
45//! invariant is established), and it survives any future refactor
46//! that adds an intermediate fallible step between
47//! [`ClassifierPool::acquire`] and the final slot release. The
48//! crate-level `scopeguard` dependency is declared in
49//! `sqry-nl/Cargo.toml` for exactly this purpose.
50//!
51//! # No tokio dependency
52//!
53//! The pool is sync. `recv()` blocks the current thread. Async callers
54//! (sqry-daemon, sqry-lsp) MUST wrap [`ClassifierPool::acquire`] in
55//! [`tokio::task::spawn_blocking`] at their boundary — sqry-nl itself
56//! never imports tokio.
57
58use crate::classifier::{IntentClassifier, SharedClassifier};
59use crate::error::NlError;
60use crossbeam_channel::{Receiver, Sender, bounded};
61
62/// Lower bound for the pool capacity (NFR-2: one classifier minimum).
63pub const POOL_MIN: usize = 1;
64
65/// Upper bound for the pool capacity (NFR-2: cap RSS at 8 sessions).
66pub const POOL_MAX: usize = 8;
67
68/// Default pool size when neither config nor env-var supply one.
69///
70/// FR-15 requires at least 2 concurrent inference workers so the
71/// daemon's MCP host and the LSP server can serve overlapping
72/// `sqry_ask` calls without serialising on a single classifier
73/// session.
74pub const POOL_DEFAULT: usize = 2;
75
76/// Bounded pool of N independently-loaded classifiers.
77///
78/// See module docs for the invariants this type enforces.
79pub struct ClassifierPool {
80    sender: Sender<SharedClassifier>,
81    receiver: Receiver<SharedClassifier>,
82    capacity: usize,
83}
84
85impl ClassifierPool {
86    /// Build a pool of `capacity` independently-loaded classifiers.
87    ///
88    /// `capacity` is clamped into `[POOL_MIN, POOL_MAX]` (NFR-2).
89    /// Calls `loader` exactly `capacity` times — see the pool
90    /// invariant in the module-level docs.
91    ///
92    /// # Errors
93    ///
94    /// Propagates the first [`NlError`] returned by `loader`. On
95    /// failure the partially-built pool is dropped, releasing any
96    /// already-loaded classifiers.
97    pub fn new<L>(capacity: usize, mut loader: L) -> Result<Self, NlError>
98    where
99        L: FnMut() -> Result<IntentClassifier, NlError>,
100    {
101        let capacity = capacity.clamp(POOL_MIN, POOL_MAX);
102        let (sender, receiver) = bounded::<SharedClassifier>(capacity);
103        for _ in 0..capacity {
104            let classifier = loader()?;
105            let shared = SharedClassifier::new(classifier);
106            // The channel was just created with capacity == count, so
107            // every send fits without blocking. A failure here is a
108            // programmer error (the channel can't be disconnected
109            // before we've returned a Receiver to anyone), so panic.
110            sender
111                .send(shared)
112                .expect("crossbeam_channel just created with capacity == iteration count");
113        }
114        Ok(Self {
115            sender,
116            receiver,
117            capacity,
118        })
119    }
120
121    /// Pool capacity (post-clamp).
122    #[must_use]
123    pub fn capacity(&self) -> usize {
124        self.capacity
125    }
126
127    /// Acquire a slot, blocking the current thread until one is
128    /// available. The returned [`PoolGuard`] returns the slot on
129    /// drop (panic-safe via [`scopeguard::guard`]).
130    ///
131    /// # Panics
132    ///
133    /// Panics only if the channel has been disconnected — which is
134    /// impossible during normal use because the [`ClassifierPool`]
135    /// itself owns the receiver, so the only way to disconnect it is
136    /// to drop the pool. Acquiring a guard on a dropped pool would be
137    /// a use-after-free style bug.
138    pub fn acquire(&self) -> PoolGuard<'_> {
139        let shared = self
140            .receiver
141            .recv()
142            .expect("ClassifierPool channel disconnected — pool dropped while in use");
143        // `scopeguard::guard` wraps `shared` so its on-drop closure —
144        // the channel `send` that returns the slot to the pool — runs
145        // on every exit path, including panicking unwind. Cloning the
146        // `Sender` is cheap (it's an Arc internally) and lets the
147        // closure outlive the borrow on `&self`. Capturing the sender
148        // by move into the closure makes the panic-safety contract
149        // structurally explicit at the acquire site, per NL07
150        // critical_decisions.
151        let sender = self.sender.clone();
152        let on_release: SlotReturn = Box::new(move |shared| {
153            // Best-effort: a disconnected channel here means the pool
154            // was dropped while a guard was live, which is a tear-down
155            // sequencing bug. We swallow the error because Drop must
156            // not panic during unwind (would abort).
157            let _ = sender.send(shared);
158        });
159        let scoped = scopeguard::guard(shared, on_release);
160        PoolGuard {
161            scoped: Some(scoped),
162            _pool: std::marker::PhantomData,
163        }
164    }
165}
166
167impl std::fmt::Debug for ClassifierPool {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct("ClassifierPool")
170            .field("capacity", &self.capacity)
171            .field("available", &self.receiver.len())
172            .finish()
173    }
174}
175
176/// RAII guard returned by [`ClassifierPool::acquire`].
177///
178/// Holds a single [`SharedClassifier`] for the duration of one
179/// translate call. Returns the slot to the pool on drop, including
180/// the panicking-unwind path. Slot return is implemented via
181/// [`scopeguard::guard`] so the panic-safety contract is structurally
182/// explicit (see module-level docs).
183///
184/// The `'a` lifetime ties this guard to the parent [`ClassifierPool`]
185/// borrow so callers cannot stash a guard past the pool's lifetime.
186/// The actual slot-return mechanism is a cloned [`Sender`] inside the
187/// scopeguard closure, not a reference to the pool — but the lifetime
188/// keeps API ergonomics consistent with the previous Drop-based
189/// implementation.
190pub struct PoolGuard<'a> {
191    /// Scopeguard wrapping the held [`SharedClassifier`]. The on-drop
192    /// closure does the channel send. `Option` so [`Drop`] can move
193    /// it out (although in practice the scopeguard's own drop hook
194    /// fires when `PoolGuard` is dropped — `Option::take` here is a
195    /// belt-and-suspenders against any future refactor that wants to
196    /// disarm or re-arm the guard.)
197    scoped: Option<scopeguard::ScopeGuard<SharedClassifier, SlotReturn>>,
198    _pool: std::marker::PhantomData<&'a ClassifierPool>,
199}
200
201/// Type alias for the boxed return-to-pool closure. A boxed closure
202/// keeps `PoolGuard`'s type signature concrete so its size is stable
203/// across the public API surface, regardless of the captured-sender
204/// representation.
205type SlotReturn = Box<dyn FnOnce(SharedClassifier) + Send + 'static>;
206
207// `scopeguard::guard` is generic over the closure type, so we can't
208// directly name `ScopeGuard<SharedClassifier, SlotReturn>` from the
209// closure expression in `acquire` (closure types are anonymous). We
210// erase the closure to a `Box<dyn FnOnce>` at the call site by
211// constructing the boxed closure first, then passing it into
212// `scopeguard::guard`.
213impl<'a> PoolGuard<'a> {
214    /// Borrow the held [`SharedClassifier`].
215    ///
216    /// # Panics
217    ///
218    /// Panics if called after the guard's `Drop` impl ran (impossible
219    /// through the normal API — the borrow is bound by `'a`).
220    #[must_use]
221    pub fn classifier(&self) -> &SharedClassifier {
222        // `scopeguard::ScopeGuard` derefs to `&T` (in our case
223        // `&SharedClassifier`). Explicit `Deref::deref` keeps clippy
224        // happy — autoderef would otherwise complain at the bare
225        // `&**scoped` form.
226        use std::ops::Deref;
227        let scoped = self
228            .scoped
229            .as_ref()
230            .expect("PoolGuard accessed after drop — invariant violated");
231        scoped.deref()
232    }
233}
234
235// PoolGuard's own `Drop` is intentionally a no-op: the scopeguard
236// inside `scoped` runs ITS on-drop closure when the `Option<ScopeGuard>`
237// is dropped (whether by going out of scope or by panic-unwind), which
238// performs the channel send. This is the structural panic-safety
239// contract scopeguard provides — see module-level docs.
240
241/// Resolve the effective pool size from a configured value, the
242/// `SQRY_NL_POOL_SIZE` environment variable, and the default.
243///
244/// Resolution order (highest priority first):
245/// 1. `configured` (e.g. `TranslatorConfig::classifier_pool_size`).
246/// 2. `SQRY_NL_POOL_SIZE` env var (parsed as `usize`).
247/// 3. [`POOL_DEFAULT`].
248///
249/// The result is clamped into `[POOL_MIN, POOL_MAX]` per NFR-2.
250#[must_use]
251pub fn resolve_pool_size(configured: Option<usize>) -> usize {
252    let raw = configured
253        .or_else(|| {
254            std::env::var("SQRY_NL_POOL_SIZE")
255                .ok()
256                .and_then(|s| s.trim().parse::<usize>().ok())
257        })
258        .unwrap_or(POOL_DEFAULT);
259    raw.clamp(POOL_MIN, POOL_MAX)
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate::types::{ClassificationResult, Intent};
266    use std::sync::Arc;
267    use std::sync::atomic::{AtomicUsize, Ordering};
268
269    /// Tiny stand-in for [`IntentClassifier`] so unit tests don't need
270    /// the ONNX dylib + model fixtures. We can't construct a real
271    /// `IntentClassifier` in unit-test scope (it owns an
272    /// `ort::Session`), so the `loader` argument to
273    /// [`ClassifierPool::new`] would normally need real artifacts.
274    ///
275    /// Instead, the unit tests below exercise [`resolve_pool_size`]
276    /// and the channel mechanics by building a pool over a mocked
277    /// `IntentClassifier` only when the test needs the full pool.
278    /// See `sqry-nl/tests/pool_concurrent_load.rs` for the integration
279    /// test that uses the real `IntentClassifier::load`.
280    fn _silence_unused_warning() {
281        let _ = ClassificationResult {
282            intent: Intent::Ambiguous,
283            confidence: 0.0,
284            all_probabilities: vec![],
285            model_version: "test".into(),
286        };
287    }
288
289    /// Process-global env-var lock serialises the three
290    /// `resolve_pool_size_*` tests that touch `SQRY_NL_POOL_SIZE` so
291    /// `cargo test`'s parallel runner cannot observe one test's stale
292    /// `set_var` from another. parking_lot Mutex avoids poisoning.
293    static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
294
295    /// Pool size resolution honours config > env > default.
296    #[test]
297    fn resolve_pool_size_prefers_configured() {
298        let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
299        // SAFETY: env vars are process-global; the ENV_LOCK serialises
300        // every test in this module that mutates SQRY_NL_POOL_SIZE.
301        unsafe { std::env::set_var("SQRY_NL_POOL_SIZE", "6") };
302        assert_eq!(resolve_pool_size(Some(3)), 3);
303        unsafe { std::env::remove_var("SQRY_NL_POOL_SIZE") };
304    }
305
306    #[test]
307    fn resolve_pool_size_falls_back_to_env() {
308        let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
309        // SAFETY: see resolve_pool_size_prefers_configured.
310        unsafe { std::env::set_var("SQRY_NL_POOL_SIZE", "5") };
311        assert_eq!(resolve_pool_size(None), 5);
312        unsafe { std::env::remove_var("SQRY_NL_POOL_SIZE") };
313    }
314
315    #[test]
316    fn resolve_pool_size_default_when_unset() {
317        let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
318        // Make sure no stale value is leaking from a sibling test.
319        // SAFETY: see resolve_pool_size_prefers_configured.
320        unsafe { std::env::remove_var("SQRY_NL_POOL_SIZE") };
321        assert_eq!(resolve_pool_size(None), POOL_DEFAULT);
322    }
323
324    #[test]
325    fn resolve_pool_size_clamped_to_max() {
326        assert_eq!(resolve_pool_size(Some(999)), POOL_MAX);
327    }
328
329    #[test]
330    fn resolve_pool_size_clamped_to_min() {
331        assert_eq!(resolve_pool_size(Some(0)), POOL_MIN);
332    }
333
334    /// The pool's `capacity()` reflects the post-clamp size.
335    ///
336    /// We can't construct a real IntentClassifier in unit-test scope,
337    /// but we can stand up a synthetic mini-pool over a hand-built
338    /// channel to assert the channel mechanics independently.
339    /// The end-to-end "N distinct sessions" assertion lives in
340    /// `tests/pool_concurrent_load.rs`.
341    #[test]
342    fn capacity_clamps_above_max() {
343        // Build the pool using a loader that fails immediately so we
344        // exercise the clamp without needing a real IntentClassifier.
345        // The clamp is observable through the error path's iteration
346        // count: a request for capacity 999 must produce at most
347        // POOL_MAX loader invocations before bailing.
348        let count = Arc::new(AtomicUsize::new(0));
349        let count_inner = Arc::clone(&count);
350        let res = ClassifierPool::new(999, move || -> Result<IntentClassifier, NlError> {
351            count_inner.fetch_add(1, Ordering::SeqCst);
352            Err(NlError::Config("synthetic loader failure".into()))
353        });
354        assert!(res.is_err());
355        // Only one loader call before the failure short-circuits;
356        // critically, no run-away iteration to 999.
357        assert_eq!(count.load(Ordering::SeqCst), 1);
358    }
359
360    /// PoolGuard returns the slot on drop. We exercise this with a
361    /// hand-built channel of [`SharedClassifier`]s that do NOT wrap
362    /// a real IntentClassifier — the pool mechanics are independent
363    /// of what's inside the `SharedClassifier`. (A `SharedClassifier`
364    /// is `Arc<Mutex<IntentClassifier>>` and we never lock it here,
365    /// so we can't actually allocate one in unit-test scope.) For the
366    /// guard-drop assertion we use the channel directly.
367    #[test]
368    fn channel_recv_send_round_trips() {
369        let (tx, rx) = bounded::<u64>(2);
370        tx.send(1).unwrap();
371        tx.send(2).unwrap();
372        // Acquire both, "use" them, send them back.
373        let a = rx.recv().unwrap();
374        let b = rx.recv().unwrap();
375        assert_eq!(rx.len(), 0);
376        tx.send(a).unwrap();
377        tx.send(b).unwrap();
378        assert_eq!(rx.len(), 2);
379    }
380}