use std::sync::Arc;
use dashmap::DashMap;
use sha2::{Digest, Sha256};
use crate::cache::ResponseCache;
use crate::engine::{CompiledModule, WasmEngine};
use crate::error::WasmError;
use crate::http_client::HttpClient;
use crate::instance::PluginInstance;
use crate::limits::PluginLimits;
use crate::rate_limiter::RateLimiter;
use crate::secrets::SecretsStore;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct InstanceKey {
pub name: String,
pub config_hash: String,
}
impl InstanceKey {
pub fn new(name: &str, config: &serde_json::Value) -> Self {
let config_str = serde_json::to_string(config).unwrap_or_default();
let config_hash = compute_hash(&config_str);
Self {
name: name.to_string(),
config_hash,
}
}
}
fn compute_hash(s: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(s.as_bytes());
let result = hasher.finalize();
hex::encode(&result[..8])
}
#[allow(dead_code)] pub struct ResolvedPlugin {
pub module: CompiledModule,
pub config: serde_json::Value,
pub config_json: Vec<u8>,
}
pub struct InstancePool {
engine: Arc<WasmEngine>,
limits: PluginLimits,
http_client: Option<Arc<HttpClient>>,
secrets: Option<SecretsStore>,
rate_limiter: Option<RateLimiter>,
response_cache: Option<ResponseCache>,
nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
modules: DashMap<String, CompiledModule>,
instances: DashMap<InstanceKey, ()>,
configs: DashMap<InstanceKey, Vec<u8>>,
}
impl InstancePool {
pub fn new(engine: Arc<WasmEngine>, limits: PluginLimits) -> Self {
Self {
engine,
limits,
http_client: None,
secrets: None,
rate_limiter: None,
response_cache: None,
nats_publisher: None,
kafka_publisher: None,
modules: DashMap::new(),
instances: DashMap::new(),
configs: DashMap::new(),
}
}
pub fn with_http_client(
engine: Arc<WasmEngine>,
limits: PluginLimits,
http_client: Arc<HttpClient>,
) -> Self {
Self {
engine,
limits,
http_client: Some(http_client),
secrets: None,
rate_limiter: None,
response_cache: None,
nats_publisher: None,
kafka_publisher: None,
modules: DashMap::new(),
instances: DashMap::new(),
configs: DashMap::new(),
}
}
pub fn with_http_client_and_secrets(
engine: Arc<WasmEngine>,
limits: PluginLimits,
http_client: Arc<HttpClient>,
secrets: SecretsStore,
) -> Self {
Self {
engine,
limits,
http_client: Some(http_client),
secrets: Some(secrets),
rate_limiter: None,
response_cache: None,
nats_publisher: None,
kafka_publisher: None,
modules: DashMap::new(),
instances: DashMap::new(),
configs: DashMap::new(),
}
}
#[allow(clippy::too_many_arguments)]
pub fn with_all_options(
engine: Arc<WasmEngine>,
limits: PluginLimits,
http_client: Option<Arc<HttpClient>>,
secrets: Option<SecretsStore>,
rate_limiter: Option<RateLimiter>,
response_cache: Option<ResponseCache>,
nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
) -> Self {
Self {
engine,
limits,
http_client,
secrets,
rate_limiter,
response_cache,
nats_publisher,
kafka_publisher,
modules: DashMap::new(),
instances: DashMap::new(),
configs: DashMap::new(),
}
}
pub fn register_module(&self, module: CompiledModule) {
self.modules.insert(module.name.clone(), module);
}
pub fn register_config(&self, key: InstanceKey, config_json: Vec<u8>) {
self.configs.insert(key.clone(), config_json);
self.instances.insert(key, ());
}
pub fn get_instance(&self, key: &InstanceKey) -> Result<PluginInstance, WasmError> {
let module = self
.modules
.get(&key.name)
.ok_or_else(|| WasmError::InitFailed(format!("plugin not found: {}", key.name)))?;
let config_json = self
.configs
.get(key)
.ok_or_else(|| WasmError::InitFailed(format!("config not found for: {}", key.name)))?;
let mut instance = PluginInstance::new_with_all_options(
self.engine.engine(),
&module,
self.limits.clone(),
self.http_client.clone(),
self.secrets.clone(),
self.rate_limiter.clone(),
self.response_cache.clone(),
self.nats_publisher.clone(),
self.kafka_publisher.clone(),
)?;
let result = instance.init(&config_json)?;
if result != 0 {
return Err(WasmError::InitFailed(format!(
"plugin {} init returned {}",
key.name, result
)));
}
Ok(instance)
}
pub fn has_plugin(&self, name: &str) -> bool {
self.modules.contains_key(name)
}
pub fn body_access(&self, name: &str) -> bool {
self.modules
.get(name)
.map(|m| m.body_access)
.unwrap_or(false)
}
pub fn module_count(&self) -> usize {
self.modules.len()
}
pub fn instance_key_count(&self) -> usize {
self.instances.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn instance_key_from_config() {
let key1 = InstanceKey::new("rate-limit", &json!({"quota": 100, "window": 60}));
let key2 = InstanceKey::new("rate-limit", &json!({"quota": 100, "window": 60}));
let key3 = InstanceKey::new("rate-limit", &json!({"quota": 200, "window": 60}));
assert_eq!(key1, key2);
assert_ne!(key1, key3);
}
#[test]
fn instance_key_different_plugins() {
let key1 = InstanceKey::new("plugin-a", &json!({}));
let key2 = InstanceKey::new("plugin-b", &json!({}));
assert_ne!(key1, key2);
}
#[test]
fn create_pool() {
let engine = Arc::new(WasmEngine::new().unwrap());
let pool = InstancePool::new(engine, PluginLimits::default());
assert_eq!(pool.module_count(), 0);
assert_eq!(pool.instance_key_count(), 0);
}
}