1use std::sync::Arc;
8
9use dashmap::DashMap;
10use sha2::{Digest, Sha256};
11
12use crate::cache::ResponseCache;
13use crate::engine::{CompiledModule, WasmEngine};
14use crate::error::WasmError;
15use crate::http_client::HttpClient;
16use crate::instance::PluginInstance;
17use crate::limits::PluginLimits;
18use crate::rate_limiter::RateLimiter;
19use crate::secrets::SecretsStore;
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct InstanceKey {
24 pub name: String,
26
27 pub config_hash: String,
29}
30
31impl InstanceKey {
32 pub fn new(name: &str, config: &serde_json::Value) -> Self {
34 let config_str = serde_json::to_string(config).unwrap_or_default();
35 let config_hash = compute_hash(&config_str);
36
37 Self {
38 name: name.to_string(),
39 config_hash,
40 }
41 }
42}
43
44fn compute_hash(s: &str) -> String {
46 let mut hasher = Sha256::new();
47 hasher.update(s.as_bytes());
48 let result = hasher.finalize();
49 hex::encode(&result[..8])
51}
52
53#[allow(dead_code)] pub struct ResolvedPlugin {
56 pub module: CompiledModule,
58
59 pub config: serde_json::Value,
61
62 pub config_json: Vec<u8>,
64}
65
66pub struct InstancePool {
70 engine: Arc<WasmEngine>,
72
73 limits: PluginLimits,
75
76 http_client: Option<Arc<HttpClient>>,
78
79 secrets: Option<SecretsStore>,
81
82 rate_limiter: Option<RateLimiter>,
84
85 response_cache: Option<ResponseCache>,
87
88 nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
90
91 kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
93
94 modules: DashMap<String, CompiledModule>,
96
97 instances: DashMap<InstanceKey, ()>,
101
102 configs: DashMap<InstanceKey, Vec<u8>>,
104}
105
106impl InstancePool {
107 pub fn new(engine: Arc<WasmEngine>, limits: PluginLimits) -> Self {
109 Self {
110 engine,
111 limits,
112 http_client: None,
113 secrets: None,
114 rate_limiter: None,
115 response_cache: None,
116 nats_publisher: None,
117 kafka_publisher: None,
118 modules: DashMap::new(),
119 instances: DashMap::new(),
120 configs: DashMap::new(),
121 }
122 }
123
124 pub fn with_http_client(
126 engine: Arc<WasmEngine>,
127 limits: PluginLimits,
128 http_client: Arc<HttpClient>,
129 ) -> Self {
130 Self {
131 engine,
132 limits,
133 http_client: Some(http_client),
134 secrets: None,
135 rate_limiter: None,
136 response_cache: None,
137 nats_publisher: None,
138 kafka_publisher: None,
139 modules: DashMap::new(),
140 instances: DashMap::new(),
141 configs: DashMap::new(),
142 }
143 }
144
145 pub fn with_http_client_and_secrets(
147 engine: Arc<WasmEngine>,
148 limits: PluginLimits,
149 http_client: Arc<HttpClient>,
150 secrets: SecretsStore,
151 ) -> Self {
152 Self {
153 engine,
154 limits,
155 http_client: Some(http_client),
156 secrets: Some(secrets),
157 rate_limiter: None,
158 response_cache: None,
159 nats_publisher: None,
160 kafka_publisher: None,
161 modules: DashMap::new(),
162 instances: DashMap::new(),
163 configs: DashMap::new(),
164 }
165 }
166
167 #[allow(clippy::too_many_arguments)]
169 pub fn with_all_options(
170 engine: Arc<WasmEngine>,
171 limits: PluginLimits,
172 http_client: Option<Arc<HttpClient>>,
173 secrets: Option<SecretsStore>,
174 rate_limiter: Option<RateLimiter>,
175 response_cache: Option<ResponseCache>,
176 nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
177 kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
178 ) -> Self {
179 Self {
180 engine,
181 limits,
182 http_client,
183 secrets,
184 rate_limiter,
185 response_cache,
186 nats_publisher,
187 kafka_publisher,
188 modules: DashMap::new(),
189 instances: DashMap::new(),
190 configs: DashMap::new(),
191 }
192 }
193
194 pub fn register_module(&self, module: CompiledModule) {
196 self.modules.insert(module.name.clone(), module);
197 }
198
199 pub fn register_config(&self, key: InstanceKey, config_json: Vec<u8>) {
201 self.configs.insert(key.clone(), config_json);
202 self.instances.insert(key, ());
203 }
204
205 pub fn get_instance(&self, key: &InstanceKey) -> Result<PluginInstance, WasmError> {
207 let module = self
209 .modules
210 .get(&key.name)
211 .ok_or_else(|| WasmError::InitFailed(format!("plugin not found: {}", key.name)))?;
212
213 let config_json = self
215 .configs
216 .get(key)
217 .ok_or_else(|| WasmError::InitFailed(format!("config not found for: {}", key.name)))?;
218
219 let mut instance = PluginInstance::new_with_all_options(
221 self.engine.engine(),
222 &module,
223 self.limits.clone(),
224 self.http_client.clone(),
225 self.secrets.clone(),
226 self.rate_limiter.clone(),
227 self.response_cache.clone(),
228 self.nats_publisher.clone(),
229 self.kafka_publisher.clone(),
230 )?;
231
232 let result = instance.init(&config_json)?;
234 if result != 0 {
235 return Err(WasmError::InitFailed(format!(
236 "plugin {} init returned {}",
237 key.name, result
238 )));
239 }
240
241 Ok(instance)
242 }
243
244 pub fn has_plugin(&self, name: &str) -> bool {
246 self.modules.contains_key(name)
247 }
248
249 pub fn body_access(&self, name: &str) -> bool {
251 self.modules
252 .get(name)
253 .map(|m| m.body_access)
254 .unwrap_or(false)
255 }
256
257 pub fn module_count(&self) -> usize {
259 self.modules.len()
260 }
261
262 pub fn instance_key_count(&self) -> usize {
264 self.instances.len()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use serde_json::json;
272
273 #[test]
274 fn instance_key_from_config() {
275 let key1 = InstanceKey::new("rate-limit", &json!({"quota": 100, "window": 60}));
276 let key2 = InstanceKey::new("rate-limit", &json!({"quota": 100, "window": 60}));
277 let key3 = InstanceKey::new("rate-limit", &json!({"quota": 200, "window": 60}));
278
279 assert_eq!(key1, key2);
281
282 assert_ne!(key1, key3);
284 }
285
286 #[test]
287 fn instance_key_different_plugins() {
288 let key1 = InstanceKey::new("plugin-a", &json!({}));
289 let key2 = InstanceKey::new("plugin-b", &json!({}));
290
291 assert_ne!(key1, key2);
292 }
293
294 #[test]
295 fn create_pool() {
296 let engine = Arc::new(WasmEngine::new().unwrap());
297 let pool = InstancePool::new(engine, PluginLimits::default());
298
299 assert_eq!(pool.module_count(), 0);
300 assert_eq!(pool.instance_key_count(), 0);
301 }
302}