Skip to main content

harmont_cli/plugin/
pool.rs

1//! Instance pool for a loaded plugin.
2//!
3//! Each `LoadedPlugin` owns a `PluginPool`. Concurrent calls into the
4//! plugin acquire an instance from the pool (creating one on demand
5//! up to a pre-set max); when the call finishes, the instance returns
6//! to the pool for reuse. Bounded by a `tokio::sync::Semaphore` so the
7//! orchestrator's parallelism doesn't exceed `max_instances`.
8
9// Pedantic-bucket nags accepted at module scope:
10// - `missing_errors_doc`: every fallible fn returns `anyhow::Result`
11//   with rich `context` messages.
12// - `missing_panics_doc` on `PluginPool::from_*`: the only panic path
13//   is the `try_lock().expect()` on a Mutex we just constructed; it
14//   cannot be contended. Documenting it would be noise.
15// - `expect_used`: same — these are on freshly-created Mutexes and
16//   are infallible by construction.
17// - `collapsible_if`: the nested `if g.len() < self.max_instances`
18//   reads more clearly one rule per line.
19// - `needless_pass_by_value` on `from_file(path: PathBuf, ...)`: we
20//   clone the path into `bytes` AND store the original in the pool
21//   field; passing by value avoids forcing every caller to clone.
22//   Suppressed at the call site below.
23// - `missing_const_for_fn`/`missing_panics_doc` on `PoolGuard::plugin`:
24//   the `expect` lives on an `Option` we control; the guard contract
25//   guarantees the plugin is present until drop.
26#![allow(
27    clippy::missing_errors_doc,
28    clippy::missing_panics_doc,
29    clippy::expect_used,
30    clippy::collapsible_if,
31    clippy::missing_const_for_fn,
32    clippy::needless_pass_by_value
33)]
34
35use std::path::PathBuf;
36use std::sync::Arc;
37
38use anyhow::{Context, Result};
39use extism::{Manifest as ExtismManifest, Plugin, Wasm};
40use tokio::sync::{Mutex, Semaphore};
41
42use super::host_fns;
43
44#[derive(Debug, Clone)]
45enum PluginBytes {
46    Embedded(&'static [u8]),
47    Disk(PathBuf),
48}
49
50#[derive(Debug)]
51pub struct PluginPool {
52    bytes: PluginBytes,
53    instances: Mutex<Vec<Plugin>>,
54    semaphore: Arc<Semaphore>,
55    max_instances: usize,
56    /// HTTPS hosts the plugin is permitted to contact via extism's
57    /// HTTP host fn. Threaded into the per-instance
58    /// [`ExtismManifest`] at spawn time. Empty means "no outbound
59    /// HTTP", which is the default until the plugin's own manifest
60    /// declares otherwise.
61    allowed_hosts: Vec<String>,
62}
63
64impl PluginPool {
65    pub fn from_bytes(bytes: &'static [u8], max_instances: usize) -> Result<Self> {
66        Self::from_bytes_with_hosts(bytes, max_instances, Vec::new())
67    }
68
69    pub fn from_bytes_with_hosts(
70        bytes: &'static [u8],
71        max_instances: usize,
72        allowed_hosts: Vec<String>,
73    ) -> Result<Self> {
74        let max_instances = max_instances.max(1);
75        let pool = Self {
76            bytes: PluginBytes::Embedded(bytes),
77            instances: Mutex::new(Vec::with_capacity(max_instances)),
78            semaphore: Arc::new(Semaphore::new(max_instances)),
79            max_instances,
80            allowed_hosts,
81        };
82        // Pre-instantiate one — the first acquire is the most latency-sensitive.
83        let plugin = pool
84            .spawn_instance()
85            .context("preallocate first plugin instance")?;
86        pool.instances
87            .try_lock()
88            .expect("just-created mutex is uncontended")
89            .push(plugin);
90        Ok(pool)
91    }
92
93    pub fn from_file(path: PathBuf, max_instances: usize) -> Result<Self> {
94        Self::from_file_with_hosts(path, max_instances, Vec::new())
95    }
96
97    pub fn from_file_with_hosts(
98        path: PathBuf,
99        max_instances: usize,
100        allowed_hosts: Vec<String>,
101    ) -> Result<Self> {
102        let max_instances = max_instances.max(1);
103        let pool = Self {
104            bytes: PluginBytes::Disk(path.clone()),
105            instances: Mutex::new(Vec::with_capacity(max_instances)),
106            semaphore: Arc::new(Semaphore::new(max_instances)),
107            max_instances,
108            allowed_hosts,
109        };
110        let plugin = pool.spawn_instance().with_context(|| {
111            format!("preallocate first plugin instance from {}", path.display())
112        })?;
113        pool.instances
114            .try_lock()
115            .expect("just-created mutex is uncontended")
116            .push(plugin);
117        Ok(pool)
118    }
119
120    fn spawn_instance(&self) -> Result<Plugin> {
121        let wasm = match &self.bytes {
122            PluginBytes::Embedded(b) => Wasm::data(*b),
123            PluginBytes::Disk(p) => Wasm::file(p),
124        };
125        let manifest =
126            ExtismManifest::new([wasm]).with_allowed_hosts(self.allowed_hosts.iter().cloned());
127        Plugin::new(&manifest, host_fns::all(), true).context("spawn extism plugin instance")
128    }
129
130    /// Acquire an instance. Returns a guard that holds the instance
131    /// until dropped; on drop, the instance returns to the pool.
132    ///
133    /// If the pool is at capacity, blocks on the semaphore until a
134    /// slot is freed.
135    pub async fn acquire(&self) -> Result<PoolGuard<'_>> {
136        // Reserve a slot.
137        let permit = self
138            .semaphore
139            .clone()
140            .acquire_owned()
141            .await
142            .context("semaphore closed")?;
143        // Take an instance from the pool, or spawn a fresh one.
144        let plugin = {
145            let mut g = self.instances.lock().await;
146            g.pop()
147        };
148        let plugin = if let Some(p) = plugin {
149            p
150        } else {
151            self.spawn_instance()?
152        };
153        Ok(PoolGuard {
154            pool: self,
155            plugin: Some(plugin),
156            _permit: permit,
157        })
158    }
159
160    fn put_back(&self, plugin: Plugin) {
161        // Best-effort: if the pool is full (more than max), drop on floor.
162        // The semaphore guarantees we never have more than `max_instances`
163        // outstanding, so the pool can hold up to `max_instances` safely.
164        if let Ok(mut g) = self.instances.try_lock()
165            && g.len() < self.max_instances
166        {
167            g.push(plugin);
168        }
169    }
170
171    #[must_use]
172    pub fn max_instances(&self) -> usize {
173        self.max_instances
174    }
175}
176
177#[derive(Debug)]
178pub struct PoolGuard<'a> {
179    pool: &'a PluginPool,
180    plugin: Option<Plugin>,
181    _permit: tokio::sync::OwnedSemaphorePermit,
182}
183
184impl PoolGuard<'_> {
185    pub fn plugin(&mut self) -> &mut Plugin {
186        self.plugin.as_mut().expect("plugin present until drop")
187    }
188}
189
190impl Drop for PoolGuard<'_> {
191    fn drop(&mut self) {
192        if let Some(p) = self.plugin.take() {
193            self.pool.put_back(p);
194        }
195    }
196}