use std::collections::{HashMap, HashSet};
use anyhow::Result;
use bytes::Bytes;
use cel_cxx::Env;
use http_body::Body;
use http_body_util::{Full, combinators::UnsyncBoxBody};
use hyper::{Request, Response, body::Incoming};
use tracing::{debug, info, warn};
use wasmtime::Store;
use wasmtime_wasi_http::p3::{
Request as WasiRequest, WasiHttpView, bindings::http::types::ErrorCode,
};
use crate::{
db::{Db, Insert},
events::{Event, connect::Connect, content::InboundContent, response::ContextualResponse},
plugins::WitmPlugin,
wasm::{
CapabilityProvider, Host, Runtime,
bindgen::{
Plugin, UserInput,
witmproxy::plugin::capabilities::{Event as WasmEvent, EventKind},
},
},
};
pub struct PluginRegistry {
plugins: HashMap<String, WitmPlugin>,
pub db: Db,
pub runtime: Runtime,
env: &'static Env<'static>,
}
pub enum HostHandleRequestResult<T = Incoming>
where
T: Body<Data = Bytes> + Send + Sync + 'static,
{
None,
Noop(Request<T>),
Request(Request<UnsyncBoxBody<Bytes, ErrorCode>>),
Response(Response<UnsyncBoxBody<Bytes, ErrorCode>>),
}
pub enum HostHandleResponseResult<T = Full<Bytes>>
where
T: Body<Data = Bytes> + Send + Sync + 'static,
{
None,
Noop(Response<T>),
Response(Response<UnsyncBoxBody<Bytes, ErrorCode>>),
}
impl WasmEvent {
pub fn register<'a>(env: cel_cxx::EnvBuilder<'a>) -> Result<cel_cxx::EnvBuilder<'a>> {
let env = WasiRequest::register_cel_env(env)?;
let env = ContextualResponse::register_cel_env(env)?;
let env = InboundContent::register_cel_env(env)?;
let env = Connect::register_cel_env(env)?;
let env = crate::events::timer::TimerEvent::register_cel_env(env)?;
let env = crate::plugins::cel::CelTime::register_cel_env(env)?;
Ok(env)
}
}
impl PluginRegistry {
pub fn new(db: Db, runtime: Runtime) -> Result<Self> {
let env = WasmEvent::register(Env::builder().with_standard(true))?.build()?;
let env: &'static Env<'static> = Box::leak(Box::new(env));
Ok(Self {
plugins: HashMap::new(),
db,
runtime,
env,
})
}
pub fn plugins(&self) -> &HashMap<String, WitmPlugin> {
&self.plugins
}
pub async fn load_plugins(&mut self) -> Result<()> {
let plugins = WitmPlugin::all(&mut self.db, &self.runtime.engine, self.env).await?;
for plugin in plugins.into_iter() {
self.plugins.insert(plugin.id(), plugin);
}
Ok(())
}
pub async fn plugin_from_component(&self, component_bytes: Vec<u8>) -> Result<WitmPlugin> {
let component =
wasmtime::component::Component::from_binary(&self.runtime.engine, &component_bytes)?;
let mut store = wasmtime::Store::new(&self.runtime.engine, Host::default());
let instance = self
.runtime
.linker
.instantiate_async(&mut store, &component)
.await?;
let plugin_instance = Plugin::new(&mut store, &instance)?;
let guest_result = store
.run_concurrent(async move |store| {
let (manifest, task) = match plugin_instance
.witmproxy_plugin_witm_plugin()
.call_manifest(store)
.await
{
Ok(ok) => ok,
Err(e) => {
warn!("Error calling manifest: {}", e);
return Err(e);
}
};
task.block(store).await;
Ok(manifest)
})
.await??;
let public_key_bytes = &guest_result.publickey;
if !public_key_bytes.is_empty() {
let public_key = wasmsign2::PublicKey::from_bytes(public_key_bytes)
.map_err(|e| anyhow::anyhow!("Failed to parse public key: {}", e))?;
let mut reader = std::io::Cursor::new(&component_bytes);
match public_key.verify(&mut reader, None) {
Ok(()) => {
info!(
"WASM component signature verified successfully for plugin: {}",
guest_result.name
);
}
Err(e) => {
anyhow::bail!(
"WASM component signature verification failed for plugin {}: {}",
guest_result.name,
e
);
}
}
} else {
anyhow::bail!(
"Plugin {} does not have a public key for signature verification",
guest_result.name
);
}
let plugin = WitmPlugin::from(guest_result)
.with_component(component, component_bytes)
.compile_capability_scope_expressions(self.env)?;
Ok(plugin)
}
pub async fn register_plugin(&mut self, plugin: WitmPlugin) -> Result<()> {
plugin.insert(&mut self.db).await?;
self.plugins.insert(plugin.id(), plugin);
Ok(())
}
pub async fn remove_plugin(
&mut self,
name: &str,
namespace: Option<&str>,
) -> Result<Vec<String>> {
let deleted_plugins: Vec<(String, String)> = if let Some(namespace) = namespace {
sqlx::query_as(
"DELETE FROM plugins WHERE namespace = ? AND name = ? RETURNING namespace, name",
)
.bind(namespace)
.bind(name)
.fetch_all(&self.db.pool)
.await?
} else {
sqlx::query_as("DELETE FROM plugins WHERE name = ? RETURNING namespace, name")
.bind(name)
.fetch_all(&self.db.pool)
.await?
};
let mut removed_plugin_ids = Vec::new();
for (ns, n) in deleted_plugins {
let plugin_id = WitmPlugin::make_id(&ns, &n);
if self.plugins.remove(&plugin_id).is_some() {
removed_plugin_ids.push(plugin_id);
}
}
Ok(removed_plugin_ids)
}
fn new_store(&self) -> Store<Host> {
self.runtime.new_store()
}
pub fn find_first_unexecuted_plugin(
&self,
event: &dyn Event,
executed_plugins: &HashSet<String>,
) -> Option<&WitmPlugin> {
self.plugins
.values()
.find(|p| !executed_plugins.contains(&p.id()) && p.can_handle(event))
}
fn find_first_unexecuted_in_set<'a>(
&'a self,
event: &dyn Event,
executed_plugins: &HashSet<String>,
effective_set: &HashSet<String>,
) -> Option<&'a WitmPlugin> {
self.plugins.values().find(|p| {
effective_set.contains(&p.id())
&& !executed_plugins.contains(&p.id())
&& p.can_handle(event)
})
}
pub fn can_handle(&self, event: &dyn Event) -> bool {
self.plugins.values().any(|p| p.can_handle(event))
}
pub fn effective_plugins_for_tenant(
&self,
overrides: &[crate::db::tenants::TenantPluginOverride],
) -> HashSet<String> {
let mut effective = HashSet::new();
for (id, plugin) in &self.plugins {
let mut enabled = plugin.enabled;
for ov in overrides {
if ov.plugin_namespace == plugin.namespace && ov.plugin_name == plugin.name
&& let Some(ov_enabled) = ov.enabled {
enabled = ov_enabled;
}
}
if enabled {
effective.insert(id.clone());
}
}
effective
}
pub fn resolve_config(
&self,
plugin: &WitmPlugin,
tenant_config: &[crate::db::tenants::TenantPluginConfig],
) -> Vec<UserInput> {
use crate::wasm::bindgen::exports::witmproxy::plugin::witm_plugin::ActualInput;
let mut config = plugin.configuration.clone();
for tc in tenant_config {
if tc.plugin_namespace == plugin.namespace && tc.plugin_name == plugin.name {
let value = match serde_json::from_str::<ActualInput>(&tc.input_value) {
Ok(v) => v,
Err(e) => {
warn!(
"Failed to deserialize tenant config value for {}/{} input '{}': {}. Falling back to string.",
tc.plugin_namespace, tc.plugin_name, tc.input_name, e
);
ActualInput::Str(tc.input_value.clone())
}
};
if let Some(existing) = config.iter_mut().find(|c| c.name == tc.input_name) {
existing.value = value;
} else {
config.push(UserInput {
name: tc.input_name.clone(),
value,
});
}
}
}
config
}
pub async fn handle_event(&self, event: Box<dyn Event>) -> Result<(WasmEvent, Store<Host>)> {
let any_plugins = self.plugins.values().any(|p| p.can_handle(&*event));
if !any_plugins {
debug!(
"No plugins with matching capability and scope; skipping plugin processing for event of kind: {:?}",
event.kind()
);
let mut store = self.new_store();
let event_data = event.into_event_data(&mut store)?;
return Ok((event_data, store));
}
debug!(
"Found plugins with matching capability and scope; processing event of kind: {:?} through plugin chain",
event.kind()
);
let mut current_event = event;
let mut store = self.new_store();
let mut executed_plugins = HashSet::new();
while let Some(plugin) =
{ self.find_first_unexecuted_plugin(&*current_event, &executed_plugins) }
{
debug!("Executing handle_event for plugin: {}", plugin.id(),);
executed_plugins.insert(plugin.id());
let kind = current_event.kind();
let component = if let Some(c) = &plugin.component {
c
} else {
warn!(
target: "plugins",
plugin_id = %plugin.id(),
event_kind = kind.to_string(),
"Plugin component missing; skipping"
);
continue;
};
let (plugin_instance, component_store) =
match self.runtime.instantiate_plugin_component(component).await {
Ok(pi) => pi,
Err(e) => {
warn!(
target: "plugins",
plugin_id = %plugin.id(),
event_kind = kind.to_string(),
error = %e,
"Failed to instantiate plugin component; skipping"
);
continue;
}
};
store = component_store;
let event_data = current_event.into_event_data(&mut store)?;
let provider = CapabilityProvider::from(&plugin.capabilities);
let cap_resource = store.data_mut().table.push(provider)?;
let config = plugin.configuration.clone();
let guest_result = store
.run_concurrent(async move |store| {
let (create_result, task) = match plugin_instance
.witmproxy_plugin_witm_plugin()
.plugin()
.call_create(store, config)
.await
{
Ok(ok) => ok,
Err(e) => {
warn!(
target: "plugins",
event_kind = kind.to_string(),
error = %e,
"Error calling plugin create"
);
return Err(e);
}
};
task.block(store).await;
let plugin_resource = match create_result {
Ok(resource) => resource,
Err(e) => {
warn!(
target: "plugins",
event_kind = kind.to_string(),
error = ?e,
"Plugin create returned configure error"
);
return Ok(None);
}
};
let (result, task) = match plugin_instance
.witmproxy_plugin_witm_plugin()
.plugin()
.call_handle(store, plugin_resource, event_data, cap_resource)
.await
{
Ok(ok) => ok,
Err(e) => {
warn!(
target: "plugins",
event_kind = kind.to_string(),
error = %e,
"Error calling handle"
);
return Err(e);
}
};
task.block(store).await;
Ok(result)
})
.await??;
match guest_result {
Some(new_event_data) => {
current_event = match new_event_data {
WasmEvent::Request(r) => {
let req = store.data_mut().http().table.delete(r)?;
Box::new(req)
}
WasmEvent::Response(r) => {
let response = store.data_mut().http().table.delete(r.response)?;
let request_ctx = r.request;
Box::new(ContextualResponse {
request: request_ctx,
response,
})
}
WasmEvent::InboundContent(c) => {
let content = store.data_mut().table.delete(c)?;
Box::new(content)
}
WasmEvent::Timer(ctx) => Box::new(crate::events::timer::TimerEvent {
timestamp: ctx.timestamp,
}),
};
}
None => {
if kind == EventKind::Timer {
debug!("Timer plugin returned None; stopping timer chain");
let timer_event = crate::events::timer::TimerEvent::now();
let event_data = Box::new(timer_event).into_event_data(&mut store)?;
return Ok((event_data, store));
}
anyhow::bail!("Plugin returned no event data; cannot continue processing");
}
}
}
let kind = current_event.kind();
let event_data = current_event.into_event_data(&mut store)?;
kind.validate_output(&event_data)?;
Ok((event_data, store))
}
pub async fn handle_event_for_tenant(
&self,
event: Box<dyn Event>,
effective_set: &HashSet<String>,
tenant_config: &[crate::db::tenants::TenantPluginConfig],
) -> Result<(WasmEvent, Store<Host>)> {
let any_plugins = self
.plugins
.values()
.any(|p| effective_set.contains(&p.id()) && p.can_handle(&*event));
if !any_plugins {
debug!(
"No effective tenant plugins for event of kind: {:?}",
event.kind()
);
let mut store = self.new_store();
let event_data = event.into_event_data(&mut store)?;
return Ok((event_data, store));
}
let mut current_event = event;
let mut store = self.new_store();
let mut executed_plugins = HashSet::new();
while let Some(plugin) =
{ self.find_first_unexecuted_in_set(&*current_event, &executed_plugins, effective_set) }
{
debug!(
"Executing handle_event for plugin: {} (tenant-scoped)",
plugin.id()
);
executed_plugins.insert(plugin.id());
let kind = current_event.kind();
let component = if let Some(c) = &plugin.component {
c
} else {
warn!(
target: "plugins",
plugin_id = %plugin.id(),
event_kind = kind.to_string(),
"Plugin component missing; skipping"
);
continue;
};
let (plugin_instance, component_store) =
match self.runtime.instantiate_plugin_component(component).await {
Ok(pi) => pi,
Err(e) => {
warn!(
target: "plugins",
plugin_id = %plugin.id(),
event_kind = kind.to_string(),
error = %e,
"Failed to instantiate plugin component; skipping"
);
continue;
}
};
store = component_store;
let event_data = current_event.into_event_data(&mut store)?;
let provider = CapabilityProvider::from(&plugin.capabilities);
let cap_resource = store.data_mut().table.push(provider)?;
let config = self.resolve_config(plugin, tenant_config);
let guest_result = store
.run_concurrent(async move |store| {
let (create_result, task) = match plugin_instance
.witmproxy_plugin_witm_plugin()
.plugin()
.call_create(store, config)
.await
{
Ok(ok) => ok,
Err(e) => {
warn!(
target: "plugins",
event_kind = kind.to_string(),
error = %e,
"Error calling plugin create"
);
return Err(e);
}
};
task.block(store).await;
let plugin_resource = match create_result {
Ok(resource) => resource,
Err(e) => {
warn!(
target: "plugins",
event_kind = kind.to_string(),
error = ?e,
"Plugin create returned configure error"
);
return Ok(None);
}
};
let (result, task) = match plugin_instance
.witmproxy_plugin_witm_plugin()
.plugin()
.call_handle(store, plugin_resource, event_data, cap_resource)
.await
{
Ok(ok) => ok,
Err(e) => {
warn!(
target: "plugins",
event_kind = kind.to_string(),
error = %e,
"Error calling handle"
);
return Err(e);
}
};
task.block(store).await;
Ok(result)
})
.await??;
match guest_result {
Some(new_event_data) => {
current_event = match new_event_data {
WasmEvent::Request(r) => {
let req = store.data_mut().http().table.delete(r)?;
Box::new(req)
}
WasmEvent::Response(r) => {
let response = store.data_mut().http().table.delete(r.response)?;
let request_ctx = r.request;
Box::new(ContextualResponse {
request: request_ctx,
response,
})
}
WasmEvent::InboundContent(c) => {
let content = store.data_mut().table.delete(c)?;
Box::new(content)
}
WasmEvent::Timer(ctx) => Box::new(crate::events::timer::TimerEvent {
timestamp: ctx.timestamp,
}),
};
}
None => {
if kind == EventKind::Timer {
debug!("Timer plugin returned None; stopping timer chain");
let timer_event = crate::events::timer::TimerEvent::now();
let event_data = Box::new(timer_event).into_event_data(&mut store)?;
return Ok((event_data, store));
}
anyhow::bail!("Plugin returned no event data; cannot continue processing");
}
}
}
let kind = current_event.kind();
let event_data = current_event.into_event_data(&mut store)?;
kind.validate_output(&event_data)?;
Ok((event_data, store))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{create_plugin_registry, test_component_path};
use crate::wasm::bindgen::witmproxy::plugin::capabilities::{
CapabilityKind, CapabilityScope, EventKind,
};
use crate::{
plugins::{WitmPlugin, capabilities::Capability},
wasm::bindgen::witmproxy::plugin::capabilities::Capability as WitCapability,
};
use bytes::Bytes;
use http_body_util::Full;
use hyper::{Method, Request};
async fn register_test_plugin_with_cel_filter(
registry: &mut PluginRegistry,
cel_expression: &str,
) -> Result<(), anyhow::Error> {
let wasm_path = test_component_path()?;
let component_bytes = std::fs::read(&wasm_path)?;
let component = Some(wasmtime::component::Component::from_binary(
®istry.runtime.engine,
&component_bytes,
)?);
let capabilities = vec![
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Connect),
scope: CapabilityScope {
expression: cel_expression.into(),
},
},
cel: None,
},
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Request),
scope: CapabilityScope {
expression: cel_expression.into(),
},
},
cel: None,
},
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Response),
scope: CapabilityScope {
expression: cel_expression.into(),
},
},
cel: None,
},
];
let plugin = WitmPlugin {
name: "test_plugin_with_filter".into(),
component_bytes,
namespace: "test".into(),
version: "0.0.0".into(),
author: "author".into(),
description: "description".into(),
license: "mit".into(),
enabled: true,
url: "https://example.com".into(),
publickey: vec![],
capabilities,
configuration: vec![],
metadata: std::collections::HashMap::new(),
component,
}
.compile_capability_scope_expressions(registry.env)?;
registry.register_plugin(plugin).await
}
#[tokio::test]
async fn test_find_first_unexecuted_plugin_with_cel_filter() -> Result<(), anyhow::Error> {
let (mut registry, _temp_dir) = create_plugin_registry().await?;
let cel_expression = "request.host() != 'donotprocess.com' && !('skipthis' in request.headers() && 'true' in request.headers()['skipthis'])";
register_test_plugin_with_cel_filter(&mut registry, cel_expression).await?;
let executed_plugins = HashSet::new();
let req = Request::builder()
.method(Method::GET)
.uri("https://example.com/test")
.header("host", "example.com")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_some(),
"Request to example.com should return one plugin"
);
let req = Request::builder()
.method(Method::GET)
.uri("https://example.com/test")
.header("host", "example.com")
.header("skipthis", "false")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_some(),
"Request to example.com with skipthis=false should return one plugin"
);
let req = Request::builder()
.method(Method::GET)
.uri("https://example.com/test")
.header("host", "example.com")
.header("skipthis", "true")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_none(),
"Request to example.com with skipthis=true should not match"
);
let req = Request::builder()
.method(Method::GET)
.uri("https://donotprocess.com/test")
.header("host", "donotprocess.com")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_none(),
"Request to donotprocess.com should not match"
);
let req = Request::builder()
.method(Method::GET)
.uri("https://donotprocess.com/test")
.header("host", "donotprocess.com")
.header("skipthis", "false")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_none(),
"Request to donotprocess.com with skipthis=false should not match"
);
let req = Request::builder()
.method(Method::GET)
.uri("https://donotprocess.com/test")
.header("host", "donotprocess.com")
.header("skipthis", "true")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_none(),
"Request to donotprocess.com with skipthis=true should not match"
);
Ok(())
}
#[tokio::test]
async fn test_find_first_unexecuted_plugin_no_plugins() -> Result<(), anyhow::Error> {
let (registry, _temp_dir) = create_plugin_registry().await?;
let req = Request::builder()
.method(Method::GET)
.uri("https://example.com/test")
.header("host", "example.com")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let executed_plugins = HashSet::new();
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_none(),
"Should return no plugins when none are registered"
);
Ok(())
}
#[tokio::test]
async fn test_find_first_unexecuted_plugin_no_request_capability() -> Result<(), anyhow::Error>
{
let (mut registry, _temp_dir) = create_plugin_registry().await?;
let wasm_path = test_component_path()?;
let component_bytes = std::fs::read(&wasm_path)?;
let component = Some(
wasmtime::component::Component::from_binary(®istry.runtime.engine, &component_bytes)
.unwrap(),
);
let capabilities = vec![
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Connect),
scope: CapabilityScope {
expression: "true".into(),
},
},
cel: None,
},
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Response),
scope: CapabilityScope {
expression: "true".to_string(),
},
},
cel: None,
},
];
let plugin = WitmPlugin {
name: "response_only_plugin".into(),
component_bytes,
namespace: "test".into(),
version: "0.0.0".into(),
author: "author".into(),
description: "description".into(),
license: "mit".into(),
enabled: true,
url: "https://example.com".into(),
publickey: vec![],
capabilities,
configuration: vec![],
metadata: std::collections::HashMap::new(),
component,
};
registry.register_plugin(plugin).await?;
let req = Request::builder()
.method(Method::GET)
.uri("https://example.com/test")
.header("host", "example.com")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let executed_plugins = HashSet::new();
let event: Box<dyn Event> = Box::new(wasi_req);
let matching_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
matching_plugin.is_none(),
"Should return no plugins when plugin doesn't have Request capability"
);
Ok(())
}
#[tokio::test]
async fn test_find_first_unexecuted_plugin_excludes_executed_plugins()
-> Result<(), anyhow::Error> {
let (mut registry, _temp_dir) = create_plugin_registry().await?;
let cel_expression1 = "true";
register_test_plugin_with_cel_filter(&mut registry, cel_expression1).await?;
let wasm_path = test_component_path()?;
let component_bytes = std::fs::read(&wasm_path)?;
let component = Some(
wasmtime::component::Component::from_binary(®istry.runtime.engine, &component_bytes)
.unwrap(),
);
let capabilities = vec![
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Connect),
scope: CapabilityScope {
expression: "true".into(),
},
},
cel: None,
},
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Request),
scope: CapabilityScope {
expression: "true".to_string(),
},
},
cel: None,
},
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Response),
scope: CapabilityScope {
expression: "true".to_string(),
},
},
cel: None,
},
];
let plugin2 = WitmPlugin {
name: "second_test_plugin".into(),
component_bytes,
namespace: "test".into(),
version: "0.0.0".into(),
author: "author".into(),
description: "description".into(),
license: "mit".into(),
enabled: true,
url: "https://example.com".into(),
publickey: vec![],
capabilities,
configuration: vec![],
metadata: std::collections::HashMap::new(),
component,
}
.compile_capability_scope_expressions(registry.env)?;
registry.register_plugin(plugin2).await?;
let req = Request::builder()
.method(Method::GET)
.uri("https://example.com/test")
.header("host", "example.com")
.body(Full::new(Bytes::from("test body")))
.unwrap();
let (wasi_req, _io) = WasiRequest::from_http(req);
let mut executed_plugins = HashSet::new();
let event: Box<dyn Event> = Box::new(wasi_req);
let first_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
first_plugin.is_some(),
"Should find a plugin when none are executed"
);
executed_plugins.insert(first_plugin.unwrap().id());
let second_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
if let Some(second_plugin) = second_plugin {
assert_ne!(
first_plugin.unwrap().id(),
second_plugin.id(),
"Should return a different plugin"
);
executed_plugins.insert(second_plugin.id());
let third_plugin = registry.find_first_unexecuted_plugin(&*event, &executed_plugins);
assert!(
third_plugin.is_none(),
"Should return None when all plugins have been executed"
);
}
Ok(())
}
#[tokio::test]
async fn test_remove_plugin_with_namespace() -> Result<(), anyhow::Error> {
let (mut registry, _temp_dir) = create_plugin_registry().await?;
let cel_expression = "true";
register_test_plugin_with_cel_filter(&mut registry, cel_expression).await?;
assert_eq!(registry.plugins().len(), 1);
assert!(
registry
.plugins()
.contains_key("test/test_plugin_with_filter")
);
let removed = registry
.remove_plugin("test_plugin_with_filter", Some("test"))
.await?;
assert_eq!(removed.len(), 1);
assert_eq!(removed[0], "test/test_plugin_with_filter");
assert_eq!(registry.plugins().len(), 0);
Ok(())
}
#[tokio::test]
async fn test_remove_plugin_without_namespace() -> Result<(), anyhow::Error> {
let (mut registry, _temp_dir) = create_plugin_registry().await?;
let wasm_path = test_component_path()?;
let component_bytes = std::fs::read(&wasm_path)?;
for (namespace, name) in [("ns1", "common_plugin"), ("ns2", "common_plugin")] {
let component = Some(
wasmtime::component::Component::from_binary(
®istry.runtime.engine,
&component_bytes,
)
.unwrap(),
);
let capabilities = vec![
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Connect),
scope: CapabilityScope {
expression: "true".into(),
},
},
cel: None,
},
Capability {
granted: true,
inner: WitCapability {
kind: CapabilityKind::HandleEvent(EventKind::Request),
scope: CapabilityScope {
expression: "true".into(),
},
},
cel: None,
},
];
let plugin = WitmPlugin {
name: name.to_string(),
component_bytes: component_bytes.clone(),
namespace: namespace.to_string(),
version: "0.0.0".into(),
author: "author".into(),
description: "description".into(),
license: "mit".into(),
enabled: true,
url: "https://example.com".into(),
publickey: vec![],
capabilities,
configuration: vec![],
metadata: std::collections::HashMap::new(),
component,
};
registry.register_plugin(plugin).await?;
}
assert_eq!(registry.plugins().len(), 2);
assert!(registry.plugins().contains_key("ns1/common_plugin"));
assert!(registry.plugins().contains_key("ns2/common_plugin"));
let removed = registry.remove_plugin("common_plugin", None).await?;
assert_eq!(removed.len(), 2);
assert!(removed.contains(&"ns1/common_plugin".to_string()));
assert!(removed.contains(&"ns2/common_plugin".to_string()));
assert_eq!(registry.plugins().len(), 0);
Ok(())
}
#[tokio::test]
async fn test_remove_nonexistent_plugin() -> Result<(), anyhow::Error> {
let (mut registry, _temp_dir) = create_plugin_registry().await?;
let removed = registry
.remove_plugin("nonexistent_plugin", Some("test"))
.await?;
assert_eq!(removed.len(), 0);
assert_eq!(registry.plugins().len(), 0);
Ok(())
}
}