uni_plugin_wasm_rt/pool.rs
1//! Per-plugin instance cache with a concurrency cap.
2//!
3//! One [`InstancePool`] per loaded plugin. **It does not reuse live
4//! instances.** Every [`InstancePool::acquire`] constructs a *fresh*
5//! instance via the loader-supplied factory; the factory is expected to
6//! be cheap because the heavy artifacts (a compiled wasmtime `Component`
7//! plus its `InstancePre`, or extism's prepared `Manifest`) are cached
8//! by the loader and the factory only spins up a fresh `Store`+instance.
9//!
10//! Freshness per acquire is a *security* property, not just hygiene:
11//!
12//! - A reused `Store<HostState>` would leak guest linear memory,
13//! globals, and WASI context across unrelated invocations — a `Pure`
14//! function could carry state between two unrelated queries (bug #2).
15//! - A trapped store recycled back into a warm pool would re-trap or
16//! read poisoned memory on its next use (bug #3).
17//!
18//! Re-instantiating per acquire closes both: fresh state every call, and
19//! a trapped instance is simply dropped (its `Drop` decrements the live
20//! counter) and never handed out again.
21//!
22//! What remains of the old pool is the **concurrency cap**:
23//! `PoolConfig::max_instances` bounds how many instances may be live at
24//! once (so a flood of concurrent UDF calls can't exhaust wasmtime
25//! memory), enforced via the same CAS-guarded `live` counter the old
26//! capacity check used. [`PoolMetrics`] keeps a sane meaning —
27//! `misses` counts fresh constructions (every acquire), `hits` is now
28//! always zero (no warm reuse), `exhausted` counts cap rejections,
29//! `live` is the current in-flight count.
30//!
31//! Generic over both:
32//!
33//! - **`T`** — the per-invoke instance type (`extism::Plugin`, a
34//! wasmtime component instance wrapper, or a dummy in tests).
35//! - **`E`** — the loader-specific error type. The factory returns
36//! `Result<T, E>`; `acquire` constructs `E` from a
37//! resource-exhaustion message via [`PoolResourceLimit`].
38
39use std::sync::Arc;
40use std::sync::atomic::{AtomicU64, Ordering};
41
42use parking_lot::Mutex;
43
44/// Per-pool configuration.
45#[derive(Clone, Debug)]
46pub struct PoolConfig {
47 /// Maximum concurrent live instances.
48 ///
49 /// Bounds the wasmtime memory footprint. Default `4` matches the
50 /// `Capability::ConcurrentInstances` default in the proposal. Acts
51 /// as a concurrency semaphore: at most this many instances may be
52 /// in flight at once.
53 pub max_instances: usize,
54 /// Retained for API compatibility; no longer pre-warms anything.
55 ///
56 /// Instances are now built fresh per [`InstancePool::acquire`] (so a
57 /// reused store can't leak guest state across calls), so there is no
58 /// warm pool to populate. The field stays so existing
59 /// `PoolConfig { max_instances, warm_count }` construction sites keep
60 /// compiling and downstream config surfaces keep their shape.
61 pub warm_count: usize,
62}
63
64impl Default for PoolConfig {
65 fn default() -> Self {
66 Self {
67 max_instances: 4,
68 warm_count: 1,
69 }
70 }
71}
72
73/// Pool metrics surface — read by `host.metric_counter` host imports.
74#[derive(Debug, Default)]
75pub struct PoolMetrics {
76 /// Warm-reuse hits. Always `0` since instances are never reused.
77 pub hits: AtomicU64,
78 /// Fresh constructions — one per successful acquire.
79 pub misses: AtomicU64,
80 /// Acquires that failed because `max_instances` was reached.
81 pub exhausted: AtomicU64,
82 /// Currently-live (in-flight) instances.
83 pub live: AtomicU64,
84}
85
86/// Loader-error trait used by [`InstancePool::acquire`] to construct
87/// the "pool at capacity" error.
88///
89/// Each loader implements this with one line:
90///
91/// ```ignore
92/// impl uni_plugin_wasm_rt::PoolResourceLimit for ExtismError {
93/// fn resource_limit(msg: String) -> Self { Self::ResourceLimit(msg) }
94/// }
95/// ```
96pub trait PoolResourceLimit {
97 /// Construct a "resource limit exceeded" instance from a diagnostic
98 /// message. Called when the pool's `max_instances` is reached.
99 #[must_use]
100 fn resource_limit(msg: String) -> Self;
101}
102
103/// A per-plugin instance cache with a concurrency cap.
104///
105/// Generic over the per-invoke instance type `T` and the loader's error
106/// type `E`. Production use: `InstancePool<extism::Plugin, ExtismError>`
107/// or `InstancePool<ScalarPluginInstance, WasmError>`.
108///
109/// **Does not reuse instances** — every [`Self::acquire`] builds a fresh
110/// one and every release drops it. See the module docs for why.
111pub struct InstancePool<T, E>
112where
113 T: Send + 'static,
114 E: PoolResourceLimit + Send + Sync + 'static,
115{
116 cfg: PoolConfig,
117 factory: Mutex<Box<dyn Fn() -> Result<T, E> + Send + Sync>>,
118 metrics: Arc<PoolMetrics>,
119}
120
121impl<T, E> std::fmt::Debug for InstancePool<T, E>
122where
123 T: Send + 'static,
124 E: PoolResourceLimit + Send + Sync + 'static,
125{
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 f.debug_struct("InstancePool")
128 .field("cfg", &self.cfg)
129 .field(
130 "metrics.misses",
131 &self.metrics.misses.load(Ordering::Relaxed),
132 )
133 .field("metrics.live", &self.metrics.live.load(Ordering::Relaxed))
134 .finish_non_exhaustive()
135 }
136}
137
138impl<T, E> InstancePool<T, E>
139where
140 T: Send + 'static,
141 E: PoolResourceLimit + Send + Sync + 'static,
142{
143 /// Construct a pool that builds fresh instances via `factory`.
144 ///
145 /// `cfg.warm_count` is accepted for API compatibility but ignored:
146 /// nothing is pre-warmed, because instances are never reused.
147 ///
148 /// # Errors
149 ///
150 /// This constructor is infallible in practice; the `E` in the return
151 /// type is retained so the signature is stable across the refactor.
152 pub fn new(
153 cfg: PoolConfig,
154 factory: impl Fn() -> Result<T, E> + Send + Sync + 'static,
155 ) -> Result<Self, E> {
156 let factory = Mutex::new(Box::new(factory) as Box<dyn Fn() -> Result<T, E> + Send + Sync>);
157 Ok(Self {
158 cfg,
159 factory,
160 metrics: Arc::new(PoolMetrics::default()),
161 })
162 }
163
164 /// Acquire a *fresh* instance, honoring the concurrency cap.
165 ///
166 /// Reserves a live slot (CAS against `max_instances`), then builds a
167 /// brand-new instance via the factory. No warm reuse — the returned
168 /// instance has clean state. Releasing it (via [`PooledInstance`]'s
169 /// drop) frees the slot.
170 ///
171 /// # Errors
172 ///
173 /// - `E::resource_limit(...)` when `max_instances` is reached.
174 /// - Whatever the factory returns on construction failure.
175 pub fn acquire(&self) -> Result<T, E> {
176 // Reserve a live slot atomically. CAS-loop guarantees the
177 // invariant `live <= max` even under concurrent acquirers.
178 let max = self.cfg.max_instances as u64;
179 loop {
180 let live = self.metrics.live.load(Ordering::SeqCst);
181 if live >= max {
182 self.metrics.exhausted.fetch_add(1, Ordering::SeqCst);
183 return Err(E::resource_limit(format!(
184 "instance pool at capacity ({} live)",
185 self.cfg.max_instances
186 )));
187 }
188 if self
189 .metrics
190 .live
191 .compare_exchange(live, live + 1, Ordering::SeqCst, Ordering::SeqCst)
192 .is_ok()
193 {
194 break;
195 }
196 }
197 // The slot is reserved; construct a fresh instance. If
198 // construction fails, give the slot back.
199 let inst = match (self.factory.lock())() {
200 Ok(v) => v,
201 Err(err) => {
202 self.metrics.live.fetch_sub(1, Ordering::SeqCst);
203 return Err(err);
204 }
205 };
206 self.metrics.misses.fetch_add(1, Ordering::SeqCst);
207 Ok(inst)
208 }
209
210 /// Release an instance, freeing its concurrency slot.
211 ///
212 /// The instance is dropped here (never recycled), so its `Drop` impl
213 /// runs any cleanup. A trapped instance is therefore discarded, not
214 /// handed back out.
215 pub fn release(&self, inst: T) {
216 drop(inst);
217 self.metrics.live.fetch_sub(1, Ordering::SeqCst);
218 }
219
220 /// Snapshot the current metrics.
221 #[must_use]
222 pub fn metrics(&self) -> Arc<PoolMetrics> {
223 Arc::clone(&self.metrics)
224 }
225
226 /// Pool configuration, for diagnostics.
227 #[must_use]
228 pub fn config(&self) -> &PoolConfig {
229 &self.cfg
230 }
231}
232
233/// RAII handle to an instance acquired from an [`InstancePool`].
234///
235/// Holds the fresh instance and frees its concurrency slot on drop
236/// (dropping the instance — never recycling it). Adapters use this to
237/// make "acquire-call-drop" exception-safe: if the plugin call panics or
238/// traps, the slot still frees and the (possibly poisoned) instance is
239/// discarded.
240pub struct PooledInstance<T, E>
241where
242 T: Send + 'static,
243 E: PoolResourceLimit + Send + Sync + 'static,
244{
245 pool: Arc<InstancePool<T, E>>,
246 inst: Option<T>,
247}
248
249impl<T, E> std::fmt::Debug for PooledInstance<T, E>
250where
251 T: Send + 'static,
252 E: PoolResourceLimit + Send + Sync + 'static,
253{
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 f.debug_struct("PooledInstance")
256 .field("has_inst", &self.inst.is_some())
257 .finish_non_exhaustive()
258 }
259}
260
261impl<T, E> PooledInstance<T, E>
262where
263 T: Send + 'static,
264 E: PoolResourceLimit + Send + Sync + 'static,
265{
266 /// Acquire a fresh `PooledInstance` from the pool.
267 ///
268 /// # Errors
269 ///
270 /// Propagates [`InstancePool::acquire`].
271 pub fn acquire(pool: Arc<InstancePool<T, E>>) -> Result<Self, E> {
272 let inst = pool.acquire()?;
273 Ok(Self {
274 pool,
275 inst: Some(inst),
276 })
277 }
278
279 /// Mutable access to the instance.
280 ///
281 /// # Panics
282 ///
283 /// If called after [`Self::take`].
284 pub fn get_mut(&mut self) -> &mut T {
285 self.inst
286 .as_mut()
287 .expect("PooledInstance accessed after take/drop")
288 }
289
290 /// Consume the wrapper, returning the inner instance without freeing
291 /// its concurrency slot via the pool.
292 ///
293 /// Retained for API compatibility. With per-invoke instances there is
294 /// no "corrupted vs clean" distinction at the pool level (a dropped
295 /// instance is always discarded), but `take` still moves the instance
296 /// out and decrements the live counter so callers that need ownership
297 /// keep working.
298 pub fn take(mut self) -> T {
299 let inst = self.inst.take().expect("PooledInstance already taken");
300 self.pool.metrics.live.fetch_sub(1, Ordering::SeqCst);
301 inst
302 }
303}
304
305impl<T, E> Drop for PooledInstance<T, E>
306where
307 T: Send + 'static,
308 E: PoolResourceLimit + Send + Sync + 'static,
309{
310 fn drop(&mut self) {
311 if let Some(inst) = self.inst.take() {
312 // Always discards the instance and frees the slot — never
313 // recycles, so a trapped store can't be handed out again.
314 self.pool.release(inst);
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322
323 #[derive(Debug, thiserror::Error)]
324 enum TestErr {
325 #[error("resource limit: {0}")]
326 ResourceLimit(String),
327 }
328
329 impl PoolResourceLimit for TestErr {
330 fn resource_limit(msg: String) -> Self {
331 Self::ResourceLimit(msg)
332 }
333 }
334
335 #[derive(Debug)]
336 #[allow(dead_code)]
337 struct Dummy(u32);
338
339 type TestPool = InstancePool<Dummy, TestErr>;
340
341 #[test]
342 fn acquire_constructs_fresh_each_time() {
343 let n = Arc::new(AtomicU64::new(0));
344 let nc = Arc::clone(&n);
345 let pool = TestPool::new(
346 PoolConfig {
347 max_instances: 4,
348 warm_count: 1,
349 },
350 move || Ok(Dummy(nc.fetch_add(1, Ordering::SeqCst) as u32)),
351 )
352 .unwrap();
353
354 // Nothing pre-warmed: live starts at zero.
355 assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 0);
356
357 let a = pool.acquire().unwrap();
358 let b = pool.acquire().unwrap();
359 // Distinct fresh instances, both counted as misses (no warm reuse).
360 assert_ne!(a.0, b.0);
361 assert_eq!(pool.metrics.misses.load(Ordering::SeqCst), 2);
362 assert_eq!(pool.metrics.hits.load(Ordering::SeqCst), 0);
363 assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 2);
364 }
365
366 #[test]
367 fn release_frees_the_slot() {
368 let pool = Arc::new(
369 TestPool::new(
370 PoolConfig {
371 max_instances: 1,
372 warm_count: 0,
373 },
374 || Ok(Dummy(0)),
375 )
376 .unwrap(),
377 );
378 {
379 let _h = PooledInstance::acquire(Arc::clone(&pool)).unwrap();
380 assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 1);
381 // At capacity while held.
382 assert!(PooledInstance::acquire(Arc::clone(&pool)).is_err());
383 }
384 // Slot freed on drop — acquirable again.
385 assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 0);
386 let _h = PooledInstance::acquire(Arc::clone(&pool)).unwrap();
387 }
388
389 #[test]
390 fn exhaustion_returns_resource_limit() {
391 let pool = TestPool::new(
392 PoolConfig {
393 max_instances: 1,
394 warm_count: 0,
395 },
396 || Ok(Dummy(0)),
397 )
398 .unwrap();
399 let _held = pool.acquire().unwrap();
400 let err = pool.acquire().unwrap_err();
401 assert!(matches!(err, TestErr::ResourceLimit(_)));
402 assert_eq!(pool.metrics.exhausted.load(Ordering::SeqCst), 1);
403 }
404
405 #[test]
406 fn pooled_instance_take_does_not_double_free() {
407 let pool = Arc::new(
408 TestPool::new(
409 PoolConfig {
410 max_instances: 2,
411 warm_count: 0,
412 },
413 || Ok(Dummy(7)),
414 )
415 .unwrap(),
416 );
417 let h = PooledInstance::acquire(Arc::clone(&pool)).unwrap();
418 assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 1);
419 let taken = h.take();
420 assert_eq!(taken.0, 7);
421 // `take` decremented live; drop of `taken` does nothing extra.
422 assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 0);
423 }
424
425 #[test]
426 fn config_default_matches_proposal() {
427 let c = PoolConfig::default();
428 assert_eq!(c.max_instances, 4);
429 assert_eq!(c.warm_count, 1);
430 }
431
432 /// The concurrency cap holds even under contention: at most
433 /// `max_instances` acquires succeed concurrently; the rest get
434 /// `resource_limit`. (The CAS-guarded `live` counter is the same one
435 /// the old capacity check used.)
436 #[test]
437 fn concurrent_acquire_never_exceeds_max() {
438 use std::sync::Barrier;
439 use std::thread;
440
441 const MAX: usize = 4;
442 const THREADS: usize = 32;
443
444 let pool = Arc::new(
445 TestPool::new(
446 PoolConfig {
447 max_instances: MAX,
448 warm_count: 0,
449 },
450 || Ok(Dummy(0)),
451 )
452 .unwrap(),
453 );
454
455 let barrier = Arc::new(Barrier::new(THREADS));
456 let mut handles = Vec::with_capacity(THREADS);
457 for _ in 0..THREADS {
458 let p = Arc::clone(&pool);
459 let b = Arc::clone(&barrier);
460 handles.push(thread::spawn(move || {
461 b.wait();
462 p.acquire().ok()
463 }));
464 }
465
466 let mut held = Vec::with_capacity(THREADS);
467 for h in handles {
468 if let Some(inst) = h.join().unwrap() {
469 held.push(inst);
470 }
471 }
472
473 assert_eq!(held.len(), MAX, "exactly max_instances must be live");
474 assert_eq!(pool.metrics.live.load(Ordering::SeqCst), MAX as u64);
475 assert_eq!(
476 pool.metrics.exhausted.load(Ordering::SeqCst),
477 (THREADS - MAX) as u64
478 );
479 }
480}