use std::collections::HashMap;
use anyhow::Result;
use cel_cxx::Activation;
use serde::{Deserialize, Serialize};
use sqlx::{QueryBuilder, Row, Sqlite, Transaction, query, sqlite::SqliteRow};
use tracing::error;
use wasmtime::Engine;
use wasmtime::component::Component;
pub use wasmtime_wasi_http::body::{HostIncomingBody, HyperIncomingBody};
use crate::events::Event;
use crate::{
Runtime,
db::{Db, Insert},
plugins::capabilities::Capability,
wasm::{
Host,
bindgen::{
Plugin, PluginManifest, exports::witmproxy::plugin::witm_plugin::Tag,
witmproxy::plugin::capabilities::Capability as WitCapability,
},
},
};
pub mod capabilities;
pub mod cel;
pub mod registry;
#[derive(Serialize, Deserialize)]
pub struct WitmPlugin {
pub namespace: String,
pub name: String,
pub version: String,
pub author: String,
pub description: String,
pub license: String,
pub url: String,
pub publickey: Vec<u8>,
pub enabled: bool,
pub capabilities: Vec<Capability>,
pub metadata: HashMap<String, String>,
#[serde(skip)]
pub component: Option<Component>,
pub component_bytes: Vec<u8>,
}
impl WitmPlugin {
fn make_id(namespace: &str, name: &str) -> String {
format!("{}/{}", namespace, name)
}
pub fn id(&self) -> String {
Self::make_id(&self.namespace, &self.name)
}
pub fn with_component(mut self, component: Component, component_bytes: Vec<u8>) -> Self {
self.component = Some(component);
self.component_bytes = component_bytes;
self
}
pub fn compile_capability_scope_expressions(
mut self,
env: &'static cel_cxx::Env,
) -> Result<Self> {
self.capabilities
.iter_mut()
.try_for_each(|c| c.compile_scope_expression(env))?;
Ok(self)
}
pub async fn from_db_row(
plugin_row: SqliteRow,
db: &mut Db,
engine: &Engine,
env: &'static cel_cxx::Env<'static>,
) -> Result<Self> {
let component_bytes: Vec<u8> = plugin_row.try_get("component")?;
let component = Component::from_binary(engine, &component_bytes)?;
let runtime = Runtime::try_default()?;
let mut store = wasmtime::Store::new(engine, Host::default());
let instance = runtime
.linker
.instantiate_async(&mut store, &component)
.await?;
let plugin_instance = Plugin::new(&mut store, &instance)?;
let guest_result = store
.run_concurrent(async move |store| {
let (manifest, task) = match plugin_instance
.witmproxy_plugin_witm_plugin()
.call_manifest(store)
.await
{
Ok(ok) => ok,
Err(e) => {
error!("Error calling manifest: {}", e);
return Err(e);
}
};
task.block(store).await;
Ok(manifest)
})
.await??;
let mut plugin = WitmPlugin::from(guest_result).with_component(component, component_bytes);
let capabilities = query(
"
SELECT capability, config, granted
FROM plugin_capabilities
WHERE namespace = ? AND name = ?
",
)
.bind(&plugin.namespace)
.bind(&plugin.name)
.fetch_all(&db.pool)
.await?;
for row in capabilities {
let config_str: String = row.try_get("config")?;
let granted_flag: bool = row.try_get("granted")?;
let config: WitCapability = serde_json::from_str(&config_str)?;
let capability = Capability {
inner: config,
granted: granted_flag,
cel: None,
};
plugin.capabilities.push(capability);
}
plugin = plugin.compile_capability_scope_expressions(env)?;
Ok(plugin)
}
pub async fn all(
db: &mut Db,
engine: &wasmtime::Engine,
env: &'static cel_cxx::Env<'static>,
) -> Result<Vec<Self>> {
let rows = query(
"
SELECT component
FROM plugins
",
)
.fetch_all(&db.pool)
.await?;
let mut plugins = Vec::new();
for row in rows {
match WitmPlugin::from_db_row(row, db, engine, env).await {
Ok(plugin) => plugins.push(plugin),
Err(e) => {
error!(
"Failed to load plugin from database row, dropping and continuing: {}",
e
);
}
}
}
Ok(plugins)
}
pub async fn delete(&self, db: &mut Db) -> Result<()> {
let mut tx = db.pool.begin().await?;
sqlx::query("DELETE FROM plugins WHERE namespace = ? AND name = ?")
.bind(&self.namespace)
.bind(&self.name)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
pub fn can_handle(&self, event: &Box<dyn Event>) -> bool {
self.capabilities
.iter()
.filter(|cap| cap.inner.kind == event.capability())
.filter(|cap| cap.granted)
.filter_map(|cap| {
let program: &cel_cxx::Program<'_> = cap.cel.as_ref()?;
Some(program)
})
.any(|program| {
let activation = match event.bind_cel_activation(Activation::new()) {
Some(a) => a,
None => return false,
};
match program.evaluate(activation) {
Ok(cel_cxx::Value::Bool(true)) => true,
Ok(_) => false,
Err(e) => {
error!("Error evaluating CEL filter: {}", e);
false
}
}
})
}
}
impl From<PluginManifest> for WitmPlugin {
fn from(manifest: PluginManifest) -> Self {
let metadata = manifest
.metadata
.iter()
.cloned()
.map(|Tag { key, value }| (key, value))
.collect::<HashMap<String, String>>();
let capabilities = manifest
.capabilities
.iter()
.cloned()
.map(|c| Capability {
inner: c,
granted: true,
cel: None,
})
.collect::<Vec<Capability>>();
WitmPlugin {
namespace: manifest.namespace,
name: manifest.name,
version: manifest.version,
author: manifest.author,
description: manifest.description,
license: manifest.license,
url: manifest.url,
publickey: manifest.publickey,
enabled: true,
component: None,
component_bytes: vec![],
metadata,
capabilities,
}
}
}
impl Insert for WitmPlugin {
async fn insert_tx(&self, db: &mut Db) -> Result<Transaction<'_, Sqlite>> {
let mut tx: Transaction<'_, Sqlite> = db.pool.begin().await?;
query(
"
INSERT OR REPLACE INTO plugins (namespace, name, version, author, description, license, url, publickey, enabled, component)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
",
)
.bind(self.namespace.clone())
.bind(self.name.clone())
.bind(self.version.clone())
.bind(self.author.clone())
.bind(self.description.clone())
.bind(self.license.clone())
.bind(self.url.clone())
.bind(self.publickey.clone())
.bind(self.enabled)
.bind(self.component_bytes.clone())
.execute(&mut *tx)
.await?;
let mut plugin_capabilities: Vec<(String, String, String, String, bool)> = vec![];
self.capabilities.iter().for_each(|cap| {
let cap_str = cap.inner.kind.to_string();
let config_str = match serde_json::to_string(&cap.inner) {
Ok(s) => s,
Err(e) => {
error!(
"Failed to serialize capability config for plugin {}/{} capability {}: {}",
self.namespace, self.name, cap_str, e
);
"".to_string()
}
};
plugin_capabilities.push((
self.namespace.clone(),
self.name.clone(),
cap_str,
config_str,
cap.granted,
));
});
let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
"INSERT INTO plugin_capabilities (namespace, name, capability, config, granted) ",
);
query_builder.push_values(
&plugin_capabilities,
|mut b, (namespace, name, capability, config, granted)| {
b.push_bind(namespace)
.push_bind(name)
.push_bind(capability)
.push_bind(config)
.push_bind(granted);
},
);
query_builder.build().execute(&mut *tx).await?;
if !self.metadata.is_empty() {
let mut query_builder: QueryBuilder<Sqlite> =
QueryBuilder::new("INSERT INTO plugin_metadata (namespace, name, key, value) ");
query_builder.push_values(&self.metadata, |mut b, (key, value)| {
b.push_bind(&self.namespace)
.push_bind(&self.name)
.push_bind(key)
.push_bind(value);
});
query_builder.build().execute(&mut *tx).await?;
}
Ok(tx)
}
}