harmont_cli/plugin/
pool.rs1#![allow(
27 clippy::missing_errors_doc,
28 clippy::missing_panics_doc,
29 clippy::expect_used,
30 clippy::collapsible_if,
31 clippy::missing_const_for_fn,
32 clippy::needless_pass_by_value
33)]
34
35use std::path::PathBuf;
36use std::sync::Arc;
37
38use anyhow::{Context, Result};
39use extism::{Manifest as ExtismManifest, Plugin, Wasm};
40use tokio::sync::{Mutex, Semaphore};
41
42use super::host_fns;
43
44#[derive(Debug, Clone)]
45enum PluginBytes {
46 Embedded(&'static [u8]),
47 Disk(PathBuf),
48}
49
50#[derive(Debug)]
51pub struct PluginPool {
52 bytes: PluginBytes,
53 instances: Mutex<Vec<Plugin>>,
54 semaphore: Arc<Semaphore>,
55 max_instances: usize,
56 allowed_hosts: Vec<String>,
62}
63
64impl PluginPool {
65 pub fn from_bytes(bytes: &'static [u8], max_instances: usize) -> Result<Self> {
66 Self::from_bytes_with_hosts(bytes, max_instances, Vec::new())
67 }
68
69 pub fn from_bytes_with_hosts(
70 bytes: &'static [u8],
71 max_instances: usize,
72 allowed_hosts: Vec<String>,
73 ) -> Result<Self> {
74 let max_instances = max_instances.max(1);
75 let pool = Self {
76 bytes: PluginBytes::Embedded(bytes),
77 instances: Mutex::new(Vec::with_capacity(max_instances)),
78 semaphore: Arc::new(Semaphore::new(max_instances)),
79 max_instances,
80 allowed_hosts,
81 };
82 let plugin = pool
84 .spawn_instance()
85 .context("preallocate first plugin instance")?;
86 pool.instances
87 .try_lock()
88 .expect("just-created mutex is uncontended")
89 .push(plugin);
90 Ok(pool)
91 }
92
93 pub fn from_file(path: PathBuf, max_instances: usize) -> Result<Self> {
94 Self::from_file_with_hosts(path, max_instances, Vec::new())
95 }
96
97 pub fn from_file_with_hosts(
98 path: PathBuf,
99 max_instances: usize,
100 allowed_hosts: Vec<String>,
101 ) -> Result<Self> {
102 let max_instances = max_instances.max(1);
103 let pool = Self {
104 bytes: PluginBytes::Disk(path.clone()),
105 instances: Mutex::new(Vec::with_capacity(max_instances)),
106 semaphore: Arc::new(Semaphore::new(max_instances)),
107 max_instances,
108 allowed_hosts,
109 };
110 let plugin = pool.spawn_instance().with_context(|| {
111 format!("preallocate first plugin instance from {}", path.display())
112 })?;
113 pool.instances
114 .try_lock()
115 .expect("just-created mutex is uncontended")
116 .push(plugin);
117 Ok(pool)
118 }
119
120 fn spawn_instance(&self) -> Result<Plugin> {
121 let wasm = match &self.bytes {
122 PluginBytes::Embedded(b) => Wasm::data(*b),
123 PluginBytes::Disk(p) => Wasm::file(p),
124 };
125 let manifest =
126 ExtismManifest::new([wasm]).with_allowed_hosts(self.allowed_hosts.iter().cloned());
127 Plugin::new(&manifest, host_fns::all(), true).context("spawn extism plugin instance")
128 }
129
130 pub async fn acquire(&self) -> Result<PoolGuard<'_>> {
136 let permit = self
138 .semaphore
139 .clone()
140 .acquire_owned()
141 .await
142 .context("semaphore closed")?;
143 let plugin = {
145 let mut g = self.instances.lock().await;
146 g.pop()
147 };
148 let plugin = if let Some(p) = plugin {
149 p
150 } else {
151 self.spawn_instance()?
152 };
153 Ok(PoolGuard {
154 pool: self,
155 plugin: Some(plugin),
156 _permit: permit,
157 })
158 }
159
160 fn put_back(&self, plugin: Plugin) {
161 if let Ok(mut g) = self.instances.try_lock()
165 && g.len() < self.max_instances
166 {
167 g.push(plugin);
168 }
169 }
170
171 #[must_use]
172 pub fn max_instances(&self) -> usize {
173 self.max_instances
174 }
175}
176
177#[derive(Debug)]
178pub struct PoolGuard<'a> {
179 pool: &'a PluginPool,
180 plugin: Option<Plugin>,
181 _permit: tokio::sync::OwnedSemaphorePermit,
182}
183
184impl PoolGuard<'_> {
185 pub fn plugin(&mut self) -> &mut Plugin {
186 self.plugin.as_mut().expect("plugin present until drop")
187 }
188}
189
190impl Drop for PoolGuard<'_> {
191 fn drop(&mut self) {
192 if let Some(p) = self.plugin.take() {
193 self.pool.put_back(p);
194 }
195 }
196}