Skip to main content

barbacane_wasm/
pool.rs

1//! Instance pooling for WASM plugins.
2//!
3//! Per SPEC-003 section 6.1, each (plugin name, config) pair produces a
4//! separate WASM instance. Under load, instances are cloned from the
5//! AOT-compiled module.
6
7use std::sync::Arc;
8
9use dashmap::DashMap;
10use sha2::{Digest, Sha256};
11
12use crate::cache::ResponseCache;
13use crate::engine::{CompiledModule, WasmEngine};
14use crate::error::WasmError;
15use crate::http_client::HttpClient;
16use crate::instance::PluginInstance;
17use crate::limits::PluginLimits;
18use crate::rate_limiter::RateLimiter;
19use crate::secrets::SecretsStore;
20
21/// Key for identifying a plugin instance.
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct InstanceKey {
24    /// Plugin name.
25    pub name: String,
26
27    /// Hash of the serialized config for deduplication.
28    pub config_hash: String,
29}
30
31impl InstanceKey {
32    /// Create an instance key from a plugin name and config.
33    pub fn new(name: &str, config: &serde_json::Value) -> Self {
34        let config_str = serde_json::to_string(config).unwrap_or_default();
35        let config_hash = compute_hash(&config_str);
36
37        Self {
38            name: name.to_string(),
39            config_hash,
40        }
41    }
42}
43
44/// Compute a short hash of the given string.
45fn compute_hash(s: &str) -> String {
46    let mut hasher = Sha256::new();
47    hasher.update(s.as_bytes());
48    let result = hasher.finalize();
49    // Use first 16 chars of hex for reasonable uniqueness
50    hex::encode(&result[..8])
51}
52
53/// A resolved plugin ready for instantiation.
54#[allow(dead_code)] // Public API for plugin resolution pipeline
55pub struct ResolvedPlugin {
56    /// The compiled WASM module.
57    pub module: CompiledModule,
58
59    /// The plugin config (JSON).
60    pub config: serde_json::Value,
61
62    /// Pre-serialized config for passing to init.
63    pub config_json: Vec<u8>,
64}
65
66/// Pool of WASM plugin instances.
67///
68/// Maintains a cache of compiled modules and manages instance creation.
69pub struct InstancePool {
70    /// The WASM engine.
71    engine: Arc<WasmEngine>,
72
73    /// Resource limits for instances.
74    limits: PluginLimits,
75
76    /// HTTP client for plugins that need outbound HTTP calls.
77    http_client: Option<Arc<HttpClient>>,
78
79    /// Resolved secrets store (shared across all instances).
80    secrets: Option<SecretsStore>,
81
82    /// Rate limiter (shared across all instances).
83    rate_limiter: Option<RateLimiter>,
84
85    /// Response cache (shared across all instances).
86    response_cache: Option<ResponseCache>,
87
88    /// NATS publisher (shared across all instances).
89    nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
90
91    /// Kafka publisher (shared across all instances).
92    kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
93
94    /// Cache of compiled modules by plugin name.
95    modules: DashMap<String, CompiledModule>,
96
97    /// Cache of initialized instances by key.
98    /// In a production implementation, this would be a proper pool with
99    /// checkout/return semantics. For now, we create new instances.
100    instances: DashMap<InstanceKey, ()>,
101
102    /// Plugin configs by key.
103    configs: DashMap<InstanceKey, Vec<u8>>,
104}
105
106impl InstancePool {
107    /// Create a new instance pool.
108    pub fn new(engine: Arc<WasmEngine>, limits: PluginLimits) -> Self {
109        Self {
110            engine,
111            limits,
112            http_client: None,
113            secrets: None,
114            rate_limiter: None,
115            response_cache: None,
116            nats_publisher: None,
117            kafka_publisher: None,
118            modules: DashMap::new(),
119            instances: DashMap::new(),
120            configs: DashMap::new(),
121        }
122    }
123
124    /// Create a new instance pool with an HTTP client for outbound calls.
125    pub fn with_http_client(
126        engine: Arc<WasmEngine>,
127        limits: PluginLimits,
128        http_client: Arc<HttpClient>,
129    ) -> Self {
130        Self {
131            engine,
132            limits,
133            http_client: Some(http_client),
134            secrets: None,
135            rate_limiter: None,
136            response_cache: None,
137            nats_publisher: None,
138            kafka_publisher: None,
139            modules: DashMap::new(),
140            instances: DashMap::new(),
141            configs: DashMap::new(),
142        }
143    }
144
145    /// Create a new instance pool with HTTP client and secrets store.
146    pub fn with_http_client_and_secrets(
147        engine: Arc<WasmEngine>,
148        limits: PluginLimits,
149        http_client: Arc<HttpClient>,
150        secrets: SecretsStore,
151    ) -> Self {
152        Self {
153            engine,
154            limits,
155            http_client: Some(http_client),
156            secrets: Some(secrets),
157            rate_limiter: None,
158            response_cache: None,
159            nats_publisher: None,
160            kafka_publisher: None,
161            modules: DashMap::new(),
162            instances: DashMap::new(),
163            configs: DashMap::new(),
164        }
165    }
166
167    /// Create a new instance pool with all options.
168    #[allow(clippy::too_many_arguments)]
169    pub fn with_all_options(
170        engine: Arc<WasmEngine>,
171        limits: PluginLimits,
172        http_client: Option<Arc<HttpClient>>,
173        secrets: Option<SecretsStore>,
174        rate_limiter: Option<RateLimiter>,
175        response_cache: Option<ResponseCache>,
176        nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
177        kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
178    ) -> Self {
179        Self {
180            engine,
181            limits,
182            http_client,
183            secrets,
184            rate_limiter,
185            response_cache,
186            nats_publisher,
187            kafka_publisher,
188            modules: DashMap::new(),
189            instances: DashMap::new(),
190            configs: DashMap::new(),
191        }
192    }
193
194    /// Register a compiled module in the pool.
195    pub fn register_module(&self, module: CompiledModule) {
196        self.modules.insert(module.name.clone(), module);
197    }
198
199    /// Register a plugin config.
200    pub fn register_config(&self, key: InstanceKey, config_json: Vec<u8>) {
201        self.configs.insert(key.clone(), config_json);
202        self.instances.insert(key, ());
203    }
204
205    /// Get or create an instance for the given key.
206    pub fn get_instance(&self, key: &InstanceKey) -> Result<PluginInstance, WasmError> {
207        // Get the compiled module
208        let module = self
209            .modules
210            .get(&key.name)
211            .ok_or_else(|| WasmError::InitFailed(format!("plugin not found: {}", key.name)))?;
212
213        // Get the config
214        let config_json = self
215            .configs
216            .get(key)
217            .ok_or_else(|| WasmError::InitFailed(format!("config not found for: {}", key.name)))?;
218
219        // Create a new instance with all options
220        let mut instance = PluginInstance::new_with_all_options(
221            self.engine.engine(),
222            &module,
223            self.limits.clone(),
224            self.http_client.clone(),
225            self.secrets.clone(),
226            self.rate_limiter.clone(),
227            self.response_cache.clone(),
228            self.nats_publisher.clone(),
229            self.kafka_publisher.clone(),
230        )?;
231
232        // Initialize with config
233        let result = instance.init(&config_json)?;
234        if result != 0 {
235            return Err(WasmError::InitFailed(format!(
236                "plugin {} init returned {}",
237                key.name, result
238            )));
239        }
240
241        Ok(instance)
242    }
243
244    /// Check if a plugin is registered.
245    pub fn has_plugin(&self, name: &str) -> bool {
246        self.modules.contains_key(name)
247    }
248
249    /// Check if a plugin has body_access capability.
250    pub fn body_access(&self, name: &str) -> bool {
251        self.modules
252            .get(name)
253            .map(|m| m.body_access)
254            .unwrap_or(false)
255    }
256
257    /// Get the number of registered modules.
258    pub fn module_count(&self) -> usize {
259        self.modules.len()
260    }
261
262    /// Get the number of registered instance keys.
263    pub fn instance_key_count(&self) -> usize {
264        self.instances.len()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use serde_json::json;
272
273    #[test]
274    fn instance_key_from_config() {
275        let key1 = InstanceKey::new("rate-limit", &json!({"quota": 100, "window": 60}));
276        let key2 = InstanceKey::new("rate-limit", &json!({"quota": 100, "window": 60}));
277        let key3 = InstanceKey::new("rate-limit", &json!({"quota": 200, "window": 60}));
278
279        // Same config should produce same key
280        assert_eq!(key1, key2);
281
282        // Different config should produce different key
283        assert_ne!(key1, key3);
284    }
285
286    #[test]
287    fn instance_key_different_plugins() {
288        let key1 = InstanceKey::new("plugin-a", &json!({}));
289        let key2 = InstanceKey::new("plugin-b", &json!({}));
290
291        assert_ne!(key1, key2);
292    }
293
294    #[test]
295    fn create_pool() {
296        let engine = Arc::new(WasmEngine::new().unwrap());
297        let pool = InstancePool::new(engine, PluginLimits::default());
298
299        assert_eq!(pool.module_count(), 0);
300        assert_eq!(pool.instance_key_count(), 0);
301    }
302}