sqry_nl/classifier/pool.rs
1//! NL07 — bounded classifier pool.
2//!
3//! [`ClassifierPool`] holds `N` independent loaded
4//! [`crate::classifier::IntentClassifier`] sessions, each wrapped in a
5//! [`crate::classifier::SharedClassifier`]. Concurrent translate calls
6//! acquire a slot, classify, and release the slot back to the pool —
7//! capping resident memory at `N × per-classifier RSS` regardless of
8//! request fan-in.
9//!
10//! # Why a `crossbeam_channel::bounded` channel and not a queue + condvar?
11//!
12//! A naive `ArrayQueue + Mutex<bool> + Condvar` triple has a lost-wakeup
13//! window: a producer that pushes between the consumer's queue-empty
14//! check and `condvar.wait` will never wake the consumer. Crossbeam's
15//! bounded channel wraps that wait/notify atomically, so a `recv()`
16//! observed-empty + `send()` race cannot drop the wakeup. The acquire
17//! path is `recv()`; the release path is `send()` of the same
18//! [`SharedClassifier`].
19//!
20//! # Pool invariant — N distinct loaded sessions
21//!
22//! The constructor [`ClassifierPool::new`] calls the user-supplied
23//! `loader` exactly `capacity` times. Each call MUST yield a freshly
24//! loaded [`crate::classifier::IntentClassifier`] (separate
25//! `IntentClassifier::load` invocation, separate `ort::Session`
26//! allocation, separate model-weight buffer). The
27//! [`crate::classifier::SharedClassifier`]s wrapping those classifiers
28//! never alias one another — distinct slots = distinct sessions = the
29//! pool fans out across N parallel inference workers.
30//!
31//! # Panic safety — slot return via `scopeguard::guard`
32//!
33//! [`PoolGuard`] wraps the held [`SharedClassifier`] in a
34//! [`scopeguard::guard`] whose deferred closure performs the channel
35//! send back into the pool. The scopeguard's drop hook runs on every
36//! exit path — normal scope exit AND panicking-unwind — so a panic
37//! inside [`crate::classifier::IntentClassifier::classify`] cannot
38//! leak a slot.
39//!
40//! Why scopeguard rather than just a hand-rolled `Drop` impl on
41//! `PoolGuard` (which Rust would also run on unwind)? The DAG NL07
42//! `critical_decisions` list mandates scopeguard as the panic-safety
43//! primitive: it makes the contract structurally explicit at the call
44//! site (the closure that "must run" is named at the point the
45//! invariant is established), and it survives any future refactor
46//! that adds an intermediate fallible step between
47//! [`ClassifierPool::acquire`] and the final slot release. The
48//! crate-level `scopeguard` dependency is declared in
49//! `sqry-nl/Cargo.toml` for exactly this purpose.
50//!
51//! # No tokio dependency
52//!
53//! The pool is sync. `recv()` blocks the current thread. Async callers
54//! (sqry-daemon, sqry-lsp) MUST wrap [`ClassifierPool::acquire`] in
55//! [`tokio::task::spawn_blocking`] at their boundary — sqry-nl itself
56//! never imports tokio.
57
58use crate::classifier::{IntentClassifier, SharedClassifier};
59use crate::error::NlError;
60use crossbeam_channel::{Receiver, Sender, bounded};
61
62/// Lower bound for the pool capacity (NFR-2: one classifier minimum).
63pub const POOL_MIN: usize = 1;
64
65/// Upper bound for the pool capacity (NFR-2: cap RSS at 8 sessions).
66pub const POOL_MAX: usize = 8;
67
68/// Default pool size when neither config nor env-var supply one.
69///
70/// FR-15 requires at least 2 concurrent inference workers so the
71/// daemon's MCP host and the LSP server can serve overlapping
72/// `sqry_ask` calls without serialising on a single classifier
73/// session.
74pub const POOL_DEFAULT: usize = 2;
75
76/// Bounded pool of N independently-loaded classifiers.
77///
78/// See module docs for the invariants this type enforces.
79pub struct ClassifierPool {
80 sender: Sender<SharedClassifier>,
81 receiver: Receiver<SharedClassifier>,
82 capacity: usize,
83}
84
85impl ClassifierPool {
86 /// Build a pool of `capacity` independently-loaded classifiers.
87 ///
88 /// `capacity` is clamped into `[POOL_MIN, POOL_MAX]` (NFR-2).
89 /// Calls `loader` exactly `capacity` times — see the pool
90 /// invariant in the module-level docs.
91 ///
92 /// # Errors
93 ///
94 /// Propagates the first [`NlError`] returned by `loader`. On
95 /// failure the partially-built pool is dropped, releasing any
96 /// already-loaded classifiers.
97 pub fn new<L>(capacity: usize, mut loader: L) -> Result<Self, NlError>
98 where
99 L: FnMut() -> Result<IntentClassifier, NlError>,
100 {
101 let capacity = capacity.clamp(POOL_MIN, POOL_MAX);
102 let (sender, receiver) = bounded::<SharedClassifier>(capacity);
103 for _ in 0..capacity {
104 let classifier = loader()?;
105 let shared = SharedClassifier::new(classifier);
106 // The channel was just created with capacity == count, so
107 // every send fits without blocking. A failure here is a
108 // programmer error (the channel can't be disconnected
109 // before we've returned a Receiver to anyone), so panic.
110 sender
111 .send(shared)
112 .expect("crossbeam_channel just created with capacity == iteration count");
113 }
114 Ok(Self {
115 sender,
116 receiver,
117 capacity,
118 })
119 }
120
121 /// Pool capacity (post-clamp).
122 #[must_use]
123 pub fn capacity(&self) -> usize {
124 self.capacity
125 }
126
127 /// Acquire a slot, blocking the current thread until one is
128 /// available. The returned [`PoolGuard`] returns the slot on
129 /// drop (panic-safe via [`scopeguard::guard`]).
130 ///
131 /// # Panics
132 ///
133 /// Panics only if the channel has been disconnected — which is
134 /// impossible during normal use because the [`ClassifierPool`]
135 /// itself owns the receiver, so the only way to disconnect it is
136 /// to drop the pool. Acquiring a guard on a dropped pool would be
137 /// a use-after-free style bug.
138 pub fn acquire(&self) -> PoolGuard<'_> {
139 let shared = self
140 .receiver
141 .recv()
142 .expect("ClassifierPool channel disconnected — pool dropped while in use");
143 // `scopeguard::guard` wraps `shared` so its on-drop closure —
144 // the channel `send` that returns the slot to the pool — runs
145 // on every exit path, including panicking unwind. Cloning the
146 // `Sender` is cheap (it's an Arc internally) and lets the
147 // closure outlive the borrow on `&self`. Capturing the sender
148 // by move into the closure makes the panic-safety contract
149 // structurally explicit at the acquire site, per NL07
150 // critical_decisions.
151 let sender = self.sender.clone();
152 let on_release: SlotReturn = Box::new(move |shared| {
153 // Best-effort: a disconnected channel here means the pool
154 // was dropped while a guard was live, which is a tear-down
155 // sequencing bug. We swallow the error because Drop must
156 // not panic during unwind (would abort).
157 let _ = sender.send(shared);
158 });
159 let scoped = scopeguard::guard(shared, on_release);
160 PoolGuard {
161 scoped: Some(scoped),
162 _pool: std::marker::PhantomData,
163 }
164 }
165}
166
167impl std::fmt::Debug for ClassifierPool {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("ClassifierPool")
170 .field("capacity", &self.capacity)
171 .field("available", &self.receiver.len())
172 .finish()
173 }
174}
175
176/// RAII guard returned by [`ClassifierPool::acquire`].
177///
178/// Holds a single [`SharedClassifier`] for the duration of one
179/// translate call. Returns the slot to the pool on drop, including
180/// the panicking-unwind path. Slot return is implemented via
181/// [`scopeguard::guard`] so the panic-safety contract is structurally
182/// explicit (see module-level docs).
183///
184/// The `'a` lifetime ties this guard to the parent [`ClassifierPool`]
185/// borrow so callers cannot stash a guard past the pool's lifetime.
186/// The actual slot-return mechanism is a cloned [`Sender`] inside the
187/// scopeguard closure, not a reference to the pool — but the lifetime
188/// keeps API ergonomics consistent with the previous Drop-based
189/// implementation.
190pub struct PoolGuard<'a> {
191 /// Scopeguard wrapping the held [`SharedClassifier`]. The on-drop
192 /// closure does the channel send. `Option` so [`Drop`] can move
193 /// it out (although in practice the scopeguard's own drop hook
194 /// fires when `PoolGuard` is dropped — `Option::take` here is a
195 /// belt-and-suspenders against any future refactor that wants to
196 /// disarm or re-arm the guard.)
197 scoped: Option<scopeguard::ScopeGuard<SharedClassifier, SlotReturn>>,
198 _pool: std::marker::PhantomData<&'a ClassifierPool>,
199}
200
201/// Type alias for the boxed return-to-pool closure. A boxed closure
202/// keeps `PoolGuard`'s type signature concrete so its size is stable
203/// across the public API surface, regardless of the captured-sender
204/// representation.
205type SlotReturn = Box<dyn FnOnce(SharedClassifier) + Send + 'static>;
206
207// `scopeguard::guard` is generic over the closure type, so we can't
208// directly name `ScopeGuard<SharedClassifier, SlotReturn>` from the
209// closure expression in `acquire` (closure types are anonymous). We
210// erase the closure to a `Box<dyn FnOnce>` at the call site by
211// constructing the boxed closure first, then passing it into
212// `scopeguard::guard`.
213impl<'a> PoolGuard<'a> {
214 /// Borrow the held [`SharedClassifier`].
215 ///
216 /// # Panics
217 ///
218 /// Panics if called after the guard's `Drop` impl ran (impossible
219 /// through the normal API — the borrow is bound by `'a`).
220 #[must_use]
221 pub fn classifier(&self) -> &SharedClassifier {
222 // `scopeguard::ScopeGuard` derefs to `&T` (in our case
223 // `&SharedClassifier`). Explicit `Deref::deref` keeps clippy
224 // happy — autoderef would otherwise complain at the bare
225 // `&**scoped` form.
226 use std::ops::Deref;
227 let scoped = self
228 .scoped
229 .as_ref()
230 .expect("PoolGuard accessed after drop — invariant violated");
231 scoped.deref()
232 }
233}
234
235// PoolGuard's own `Drop` is intentionally a no-op: the scopeguard
236// inside `scoped` runs ITS on-drop closure when the `Option<ScopeGuard>`
237// is dropped (whether by going out of scope or by panic-unwind), which
238// performs the channel send. This is the structural panic-safety
239// contract scopeguard provides — see module-level docs.
240
241/// Resolve the effective pool size from a configured value, the
242/// `SQRY_NL_POOL_SIZE` environment variable, and the default.
243///
244/// Resolution order (highest priority first):
245/// 1. `configured` (e.g. `TranslatorConfig::classifier_pool_size`).
246/// 2. `SQRY_NL_POOL_SIZE` env var (parsed as `usize`).
247/// 3. [`POOL_DEFAULT`].
248///
249/// The result is clamped into `[POOL_MIN, POOL_MAX]` per NFR-2.
250#[must_use]
251pub fn resolve_pool_size(configured: Option<usize>) -> usize {
252 let raw = configured
253 .or_else(|| {
254 std::env::var("SQRY_NL_POOL_SIZE")
255 .ok()
256 .and_then(|s| s.trim().parse::<usize>().ok())
257 })
258 .unwrap_or(POOL_DEFAULT);
259 raw.clamp(POOL_MIN, POOL_MAX)
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate::types::{ClassificationResult, Intent};
266 use std::sync::Arc;
267 use std::sync::atomic::{AtomicUsize, Ordering};
268
269 /// Tiny stand-in for [`IntentClassifier`] so unit tests don't need
270 /// the ONNX dylib + model fixtures. We can't construct a real
271 /// `IntentClassifier` in unit-test scope (it owns an
272 /// `ort::Session`), so the `loader` argument to
273 /// [`ClassifierPool::new`] would normally need real artifacts.
274 ///
275 /// Instead, the unit tests below exercise [`resolve_pool_size`]
276 /// and the channel mechanics by building a pool over a mocked
277 /// `IntentClassifier` only when the test needs the full pool.
278 /// See `sqry-nl/tests/pool_concurrent_load.rs` for the integration
279 /// test that uses the real `IntentClassifier::load`.
280 fn _silence_unused_warning() {
281 let _ = ClassificationResult {
282 intent: Intent::Ambiguous,
283 confidence: 0.0,
284 all_probabilities: vec![],
285 model_version: "test".into(),
286 };
287 }
288
289 /// Process-global env-var lock serialises the three
290 /// `resolve_pool_size_*` tests that touch `SQRY_NL_POOL_SIZE` so
291 /// `cargo test`'s parallel runner cannot observe one test's stale
292 /// `set_var` from another. parking_lot Mutex avoids poisoning.
293 static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
294
295 /// Pool size resolution honours config > env > default.
296 #[test]
297 fn resolve_pool_size_prefers_configured() {
298 let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
299 // SAFETY: env vars are process-global; the ENV_LOCK serialises
300 // every test in this module that mutates SQRY_NL_POOL_SIZE.
301 unsafe { std::env::set_var("SQRY_NL_POOL_SIZE", "6") };
302 assert_eq!(resolve_pool_size(Some(3)), 3);
303 unsafe { std::env::remove_var("SQRY_NL_POOL_SIZE") };
304 }
305
306 #[test]
307 fn resolve_pool_size_falls_back_to_env() {
308 let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
309 // SAFETY: see resolve_pool_size_prefers_configured.
310 unsafe { std::env::set_var("SQRY_NL_POOL_SIZE", "5") };
311 assert_eq!(resolve_pool_size(None), 5);
312 unsafe { std::env::remove_var("SQRY_NL_POOL_SIZE") };
313 }
314
315 #[test]
316 fn resolve_pool_size_default_when_unset() {
317 let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
318 // Make sure no stale value is leaking from a sibling test.
319 // SAFETY: see resolve_pool_size_prefers_configured.
320 unsafe { std::env::remove_var("SQRY_NL_POOL_SIZE") };
321 assert_eq!(resolve_pool_size(None), POOL_DEFAULT);
322 }
323
324 #[test]
325 fn resolve_pool_size_clamped_to_max() {
326 assert_eq!(resolve_pool_size(Some(999)), POOL_MAX);
327 }
328
329 #[test]
330 fn resolve_pool_size_clamped_to_min() {
331 assert_eq!(resolve_pool_size(Some(0)), POOL_MIN);
332 }
333
334 /// The pool's `capacity()` reflects the post-clamp size.
335 ///
336 /// We can't construct a real IntentClassifier in unit-test scope,
337 /// but we can stand up a synthetic mini-pool over a hand-built
338 /// channel to assert the channel mechanics independently.
339 /// The end-to-end "N distinct sessions" assertion lives in
340 /// `tests/pool_concurrent_load.rs`.
341 #[test]
342 fn capacity_clamps_above_max() {
343 // Build the pool using a loader that fails immediately so we
344 // exercise the clamp without needing a real IntentClassifier.
345 // The clamp is observable through the error path's iteration
346 // count: a request for capacity 999 must produce at most
347 // POOL_MAX loader invocations before bailing.
348 let count = Arc::new(AtomicUsize::new(0));
349 let count_inner = Arc::clone(&count);
350 let res = ClassifierPool::new(999, move || -> Result<IntentClassifier, NlError> {
351 count_inner.fetch_add(1, Ordering::SeqCst);
352 Err(NlError::Config("synthetic loader failure".into()))
353 });
354 assert!(res.is_err());
355 // Only one loader call before the failure short-circuits;
356 // critically, no run-away iteration to 999.
357 assert_eq!(count.load(Ordering::SeqCst), 1);
358 }
359
360 /// PoolGuard returns the slot on drop. We exercise this with a
361 /// hand-built channel of [`SharedClassifier`]s that do NOT wrap
362 /// a real IntentClassifier — the pool mechanics are independent
363 /// of what's inside the `SharedClassifier`. (A `SharedClassifier`
364 /// is `Arc<Mutex<IntentClassifier>>` and we never lock it here,
365 /// so we can't actually allocate one in unit-test scope.) For the
366 /// guard-drop assertion we use the channel directly.
367 #[test]
368 fn channel_recv_send_round_trips() {
369 let (tx, rx) = bounded::<u64>(2);
370 tx.send(1).unwrap();
371 tx.send(2).unwrap();
372 // Acquire both, "use" them, send them back.
373 let a = rx.recv().unwrap();
374 let b = rx.recv().unwrap();
375 assert_eq!(rx.len(), 0);
376 tx.send(a).unwrap();
377 tx.send(b).unwrap();
378 assert_eq!(rx.len(), 2);
379 }
380}