Skip to main content

uni_plugin_wasm_rt/
pool.rs

1//! Per-plugin instance cache with a concurrency cap.
2//!
3//! One [`InstancePool`] per loaded plugin. **It does not reuse live
4//! instances.** Every [`InstancePool::acquire`] constructs a *fresh*
5//! instance via the loader-supplied factory; the factory is expected to
6//! be cheap because the heavy artifacts (a compiled wasmtime `Component`
7//! plus its `InstancePre`, or extism's prepared `Manifest`) are cached
8//! by the loader and the factory only spins up a fresh `Store`+instance.
9//!
10//! Freshness per acquire is a *security* property, not just hygiene:
11//!
12//! - A reused `Store<HostState>` would leak guest linear memory,
13//!   globals, and WASI context across unrelated invocations — a `Pure`
14//!   function could carry state between two unrelated queries (bug #2).
15//! - A trapped store recycled back into a warm pool would re-trap or
16//!   read poisoned memory on its next use (bug #3).
17//!
18//! Re-instantiating per acquire closes both: fresh state every call, and
19//! a trapped instance is simply dropped (its `Drop` decrements the live
20//! counter) and never handed out again.
21//!
22//! What remains of the old pool is the **concurrency cap**:
23//! `PoolConfig::max_instances` bounds how many instances may be live at
24//! once (so a flood of concurrent UDF calls can't exhaust wasmtime
25//! memory), enforced via the same CAS-guarded `live` counter the old
26//! capacity check used. [`PoolMetrics`] keeps a sane meaning —
27//! `misses` counts fresh constructions (every acquire), `hits` is now
28//! always zero (no warm reuse), `exhausted` counts cap rejections,
29//! `live` is the current in-flight count.
30//!
31//! Generic over both:
32//!
33//! - **`T`** — the per-invoke instance type (`extism::Plugin`, a
34//!   wasmtime component instance wrapper, or a dummy in tests).
35//! - **`E`** — the loader-specific error type. The factory returns
36//!   `Result<T, E>`; `acquire` constructs `E` from a
37//!   resource-exhaustion message via [`PoolResourceLimit`].
38
39use std::sync::Arc;
40use std::sync::atomic::{AtomicU64, Ordering};
41
42use parking_lot::Mutex;
43
44/// Per-pool configuration.
45#[derive(Clone, Debug)]
46pub struct PoolConfig {
47    /// Maximum concurrent live instances.
48    ///
49    /// Bounds the wasmtime memory footprint. Default `4` matches the
50    /// `Capability::ConcurrentInstances` default in the proposal. Acts
51    /// as a concurrency semaphore: at most this many instances may be
52    /// in flight at once.
53    pub max_instances: usize,
54    /// Retained for API compatibility; no longer pre-warms anything.
55    ///
56    /// Instances are now built fresh per [`InstancePool::acquire`] (so a
57    /// reused store can't leak guest state across calls), so there is no
58    /// warm pool to populate. The field stays so existing
59    /// `PoolConfig { max_instances, warm_count }` construction sites keep
60    /// compiling and downstream config surfaces keep their shape.
61    pub warm_count: usize,
62}
63
64impl Default for PoolConfig {
65    fn default() -> Self {
66        Self {
67            max_instances: 4,
68            warm_count: 1,
69        }
70    }
71}
72
73/// Pool metrics surface — read by `host.metric_counter` host imports.
74#[derive(Debug, Default)]
75pub struct PoolMetrics {
76    /// Warm-reuse hits. Always `0` since instances are never reused.
77    pub hits: AtomicU64,
78    /// Fresh constructions — one per successful acquire.
79    pub misses: AtomicU64,
80    /// Acquires that failed because `max_instances` was reached.
81    pub exhausted: AtomicU64,
82    /// Currently-live (in-flight) instances.
83    pub live: AtomicU64,
84}
85
86/// Loader-error trait used by [`InstancePool::acquire`] to construct
87/// the "pool at capacity" error.
88///
89/// Each loader implements this with one line:
90///
91/// ```ignore
92/// impl uni_plugin_wasm_rt::PoolResourceLimit for ExtismError {
93///     fn resource_limit(msg: String) -> Self { Self::ResourceLimit(msg) }
94/// }
95/// ```
96pub trait PoolResourceLimit {
97    /// Construct a "resource limit exceeded" instance from a diagnostic
98    /// message. Called when the pool's `max_instances` is reached.
99    #[must_use]
100    fn resource_limit(msg: String) -> Self;
101}
102
103/// A per-plugin instance cache with a concurrency cap.
104///
105/// Generic over the per-invoke instance type `T` and the loader's error
106/// type `E`. Production use: `InstancePool<extism::Plugin, ExtismError>`
107/// or `InstancePool<ScalarPluginInstance, WasmError>`.
108///
109/// **Does not reuse instances** — every [`Self::acquire`] builds a fresh
110/// one and every release drops it. See the module docs for why.
111pub struct InstancePool<T, E>
112where
113    T: Send + 'static,
114    E: PoolResourceLimit + Send + Sync + 'static,
115{
116    cfg: PoolConfig,
117    factory: Mutex<Box<dyn Fn() -> Result<T, E> + Send + Sync>>,
118    metrics: Arc<PoolMetrics>,
119}
120
121impl<T, E> std::fmt::Debug for InstancePool<T, E>
122where
123    T: Send + 'static,
124    E: PoolResourceLimit + Send + Sync + 'static,
125{
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        f.debug_struct("InstancePool")
128            .field("cfg", &self.cfg)
129            .field(
130                "metrics.misses",
131                &self.metrics.misses.load(Ordering::Relaxed),
132            )
133            .field("metrics.live", &self.metrics.live.load(Ordering::Relaxed))
134            .finish_non_exhaustive()
135    }
136}
137
138impl<T, E> InstancePool<T, E>
139where
140    T: Send + 'static,
141    E: PoolResourceLimit + Send + Sync + 'static,
142{
143    /// Construct a pool that builds fresh instances via `factory`.
144    ///
145    /// `cfg.warm_count` is accepted for API compatibility but ignored:
146    /// nothing is pre-warmed, because instances are never reused.
147    ///
148    /// # Errors
149    ///
150    /// This constructor is infallible in practice; the `E` in the return
151    /// type is retained so the signature is stable across the refactor.
152    pub fn new(
153        cfg: PoolConfig,
154        factory: impl Fn() -> Result<T, E> + Send + Sync + 'static,
155    ) -> Result<Self, E> {
156        let factory = Mutex::new(Box::new(factory) as Box<dyn Fn() -> Result<T, E> + Send + Sync>);
157        Ok(Self {
158            cfg,
159            factory,
160            metrics: Arc::new(PoolMetrics::default()),
161        })
162    }
163
164    /// Acquire a *fresh* instance, honoring the concurrency cap.
165    ///
166    /// Reserves a live slot (CAS against `max_instances`), then builds a
167    /// brand-new instance via the factory. No warm reuse — the returned
168    /// instance has clean state. Releasing it (via [`PooledInstance`]'s
169    /// drop) frees the slot.
170    ///
171    /// # Errors
172    ///
173    /// - `E::resource_limit(...)` when `max_instances` is reached.
174    /// - Whatever the factory returns on construction failure.
175    pub fn acquire(&self) -> Result<T, E> {
176        // Reserve a live slot atomically. CAS-loop guarantees the
177        // invariant `live <= max` even under concurrent acquirers.
178        let max = self.cfg.max_instances as u64;
179        loop {
180            let live = self.metrics.live.load(Ordering::SeqCst);
181            if live >= max {
182                self.metrics.exhausted.fetch_add(1, Ordering::SeqCst);
183                return Err(E::resource_limit(format!(
184                    "instance pool at capacity ({} live)",
185                    self.cfg.max_instances
186                )));
187            }
188            if self
189                .metrics
190                .live
191                .compare_exchange(live, live + 1, Ordering::SeqCst, Ordering::SeqCst)
192                .is_ok()
193            {
194                break;
195            }
196        }
197        // The slot is reserved; construct a fresh instance. If
198        // construction fails, give the slot back.
199        let inst = match (self.factory.lock())() {
200            Ok(v) => v,
201            Err(err) => {
202                self.metrics.live.fetch_sub(1, Ordering::SeqCst);
203                return Err(err);
204            }
205        };
206        self.metrics.misses.fetch_add(1, Ordering::SeqCst);
207        Ok(inst)
208    }
209
210    /// Release an instance, freeing its concurrency slot.
211    ///
212    /// The instance is dropped here (never recycled), so its `Drop` impl
213    /// runs any cleanup. A trapped instance is therefore discarded, not
214    /// handed back out.
215    pub fn release(&self, inst: T) {
216        drop(inst);
217        self.metrics.live.fetch_sub(1, Ordering::SeqCst);
218    }
219
220    /// Snapshot the current metrics.
221    #[must_use]
222    pub fn metrics(&self) -> Arc<PoolMetrics> {
223        Arc::clone(&self.metrics)
224    }
225
226    /// Pool configuration, for diagnostics.
227    #[must_use]
228    pub fn config(&self) -> &PoolConfig {
229        &self.cfg
230    }
231}
232
233/// RAII handle to an instance acquired from an [`InstancePool`].
234///
235/// Holds the fresh instance and frees its concurrency slot on drop
236/// (dropping the instance — never recycling it). Adapters use this to
237/// make "acquire-call-drop" exception-safe: if the plugin call panics or
238/// traps, the slot still frees and the (possibly poisoned) instance is
239/// discarded.
240pub struct PooledInstance<T, E>
241where
242    T: Send + 'static,
243    E: PoolResourceLimit + Send + Sync + 'static,
244{
245    pool: Arc<InstancePool<T, E>>,
246    inst: Option<T>,
247}
248
249impl<T, E> std::fmt::Debug for PooledInstance<T, E>
250where
251    T: Send + 'static,
252    E: PoolResourceLimit + Send + Sync + 'static,
253{
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        f.debug_struct("PooledInstance")
256            .field("has_inst", &self.inst.is_some())
257            .finish_non_exhaustive()
258    }
259}
260
261impl<T, E> PooledInstance<T, E>
262where
263    T: Send + 'static,
264    E: PoolResourceLimit + Send + Sync + 'static,
265{
266    /// Acquire a fresh `PooledInstance` from the pool.
267    ///
268    /// # Errors
269    ///
270    /// Propagates [`InstancePool::acquire`].
271    pub fn acquire(pool: Arc<InstancePool<T, E>>) -> Result<Self, E> {
272        let inst = pool.acquire()?;
273        Ok(Self {
274            pool,
275            inst: Some(inst),
276        })
277    }
278
279    /// Mutable access to the instance.
280    ///
281    /// # Panics
282    ///
283    /// If called after [`Self::take`].
284    pub fn get_mut(&mut self) -> &mut T {
285        self.inst
286            .as_mut()
287            .expect("PooledInstance accessed after take/drop")
288    }
289
290    /// Consume the wrapper, returning the inner instance without freeing
291    /// its concurrency slot via the pool.
292    ///
293    /// Retained for API compatibility. With per-invoke instances there is
294    /// no "corrupted vs clean" distinction at the pool level (a dropped
295    /// instance is always discarded), but `take` still moves the instance
296    /// out and decrements the live counter so callers that need ownership
297    /// keep working.
298    pub fn take(mut self) -> T {
299        let inst = self.inst.take().expect("PooledInstance already taken");
300        self.pool.metrics.live.fetch_sub(1, Ordering::SeqCst);
301        inst
302    }
303}
304
305impl<T, E> Drop for PooledInstance<T, E>
306where
307    T: Send + 'static,
308    E: PoolResourceLimit + Send + Sync + 'static,
309{
310    fn drop(&mut self) {
311        if let Some(inst) = self.inst.take() {
312            // Always discards the instance and frees the slot — never
313            // recycles, so a trapped store can't be handed out again.
314            self.pool.release(inst);
315        }
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    #[derive(Debug, thiserror::Error)]
324    enum TestErr {
325        #[error("resource limit: {0}")]
326        ResourceLimit(String),
327    }
328
329    impl PoolResourceLimit for TestErr {
330        fn resource_limit(msg: String) -> Self {
331            Self::ResourceLimit(msg)
332        }
333    }
334
335    #[derive(Debug)]
336    #[allow(dead_code)]
337    struct Dummy(u32);
338
339    type TestPool = InstancePool<Dummy, TestErr>;
340
341    #[test]
342    fn acquire_constructs_fresh_each_time() {
343        let n = Arc::new(AtomicU64::new(0));
344        let nc = Arc::clone(&n);
345        let pool = TestPool::new(
346            PoolConfig {
347                max_instances: 4,
348                warm_count: 1,
349            },
350            move || Ok(Dummy(nc.fetch_add(1, Ordering::SeqCst) as u32)),
351        )
352        .unwrap();
353
354        // Nothing pre-warmed: live starts at zero.
355        assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 0);
356
357        let a = pool.acquire().unwrap();
358        let b = pool.acquire().unwrap();
359        // Distinct fresh instances, both counted as misses (no warm reuse).
360        assert_ne!(a.0, b.0);
361        assert_eq!(pool.metrics.misses.load(Ordering::SeqCst), 2);
362        assert_eq!(pool.metrics.hits.load(Ordering::SeqCst), 0);
363        assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 2);
364    }
365
366    #[test]
367    fn release_frees_the_slot() {
368        let pool = Arc::new(
369            TestPool::new(
370                PoolConfig {
371                    max_instances: 1,
372                    warm_count: 0,
373                },
374                || Ok(Dummy(0)),
375            )
376            .unwrap(),
377        );
378        {
379            let _h = PooledInstance::acquire(Arc::clone(&pool)).unwrap();
380            assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 1);
381            // At capacity while held.
382            assert!(PooledInstance::acquire(Arc::clone(&pool)).is_err());
383        }
384        // Slot freed on drop — acquirable again.
385        assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 0);
386        let _h = PooledInstance::acquire(Arc::clone(&pool)).unwrap();
387    }
388
389    #[test]
390    fn exhaustion_returns_resource_limit() {
391        let pool = TestPool::new(
392            PoolConfig {
393                max_instances: 1,
394                warm_count: 0,
395            },
396            || Ok(Dummy(0)),
397        )
398        .unwrap();
399        let _held = pool.acquire().unwrap();
400        let err = pool.acquire().unwrap_err();
401        assert!(matches!(err, TestErr::ResourceLimit(_)));
402        assert_eq!(pool.metrics.exhausted.load(Ordering::SeqCst), 1);
403    }
404
405    #[test]
406    fn pooled_instance_take_does_not_double_free() {
407        let pool = Arc::new(
408            TestPool::new(
409                PoolConfig {
410                    max_instances: 2,
411                    warm_count: 0,
412                },
413                || Ok(Dummy(7)),
414            )
415            .unwrap(),
416        );
417        let h = PooledInstance::acquire(Arc::clone(&pool)).unwrap();
418        assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 1);
419        let taken = h.take();
420        assert_eq!(taken.0, 7);
421        // `take` decremented live; drop of `taken` does nothing extra.
422        assert_eq!(pool.metrics.live.load(Ordering::SeqCst), 0);
423    }
424
425    #[test]
426    fn config_default_matches_proposal() {
427        let c = PoolConfig::default();
428        assert_eq!(c.max_instances, 4);
429        assert_eq!(c.warm_count, 1);
430    }
431
432    /// The concurrency cap holds even under contention: at most
433    /// `max_instances` acquires succeed concurrently; the rest get
434    /// `resource_limit`. (The CAS-guarded `live` counter is the same one
435    /// the old capacity check used.)
436    #[test]
437    fn concurrent_acquire_never_exceeds_max() {
438        use std::sync::Barrier;
439        use std::thread;
440
441        const MAX: usize = 4;
442        const THREADS: usize = 32;
443
444        let pool = Arc::new(
445            TestPool::new(
446                PoolConfig {
447                    max_instances: MAX,
448                    warm_count: 0,
449                },
450                || Ok(Dummy(0)),
451            )
452            .unwrap(),
453        );
454
455        let barrier = Arc::new(Barrier::new(THREADS));
456        let mut handles = Vec::with_capacity(THREADS);
457        for _ in 0..THREADS {
458            let p = Arc::clone(&pool);
459            let b = Arc::clone(&barrier);
460            handles.push(thread::spawn(move || {
461                b.wait();
462                p.acquire().ok()
463            }));
464        }
465
466        let mut held = Vec::with_capacity(THREADS);
467        for h in handles {
468            if let Some(inst) = h.join().unwrap() {
469                held.push(inst);
470            }
471        }
472
473        assert_eq!(held.len(), MAX, "exactly max_instances must be live");
474        assert_eq!(pool.metrics.live.load(Ordering::SeqCst), MAX as u64);
475        assert_eq!(
476            pool.metrics.exhausted.load(Ordering::SeqCst),
477            (THREADS - MAX) as u64
478        );
479    }
480}