use super::capabilities::CapabilitySet;
use super::sandbox::PluginSandbox;
use super::{Plugin, PluginContext};
use crate::types::Layer4Result;
use anyhow::{anyhow, Context};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use wasmtime::*;
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder};
#[derive(Debug, Clone)]
pub struct WasmConfig {
pub max_memory_bytes: u64,
pub max_cpu_time_ms: u64,
pub max_table_elements: u32,
pub enable_wasi: bool,
pub enable_async: bool,
}
impl Default for WasmConfig {
fn default() -> Self {
Self {
max_memory_bytes: 16 * 1024 * 1024, max_cpu_time_ms: 5000, max_table_elements: 10000,
enable_wasi: true,
enable_async: true,
}
}
}
pub struct PluginState {
pub sandbox: PluginSandbox,
pub data_dir: PathBuf,
pub start_time: Option<Instant>,
pub cpu_limit_ms: u64,
pub memory_used: u64,
pub wasi_ctx: Option<WasiCtx>,
}
impl PluginState {
fn new(sandbox: PluginSandbox, data_dir: PathBuf, cpu_limit_ms: u64) -> Self {
Self {
sandbox,
data_dir,
start_time: None,
cpu_limit_ms,
memory_used: 0,
wasi_ctx: None,
}
}
fn with_wasi(mut self, wasi_ctx: WasiCtx) -> Self {
self.wasi_ctx = Some(wasi_ctx);
self
}
fn check_cpu_limit(&self) -> Result<()> {
if self.cpu_limit_ms == 0 {
return Ok(());
}
if let Some(start) = self.start_time {
let elapsed = start.elapsed().as_millis() as u64;
if elapsed > self.cpu_limit_ms {
return Err(anyhow!(
"CPU time limit exceeded: {}ms > {}ms",
elapsed,
self.cpu_limit_ms
));
}
}
Ok(())
}
}
pub struct WasmPlugin {
name: String,
version: String,
sandbox: PluginSandbox,
module: Arc<Module>,
engine: Engine,
config: WasmConfig,
}
impl WasmPlugin {
fn new(
name: String,
version: String,
sandbox: PluginSandbox,
module: Module,
engine: Engine,
config: WasmConfig,
) -> Self {
Self {
name,
version,
sandbox,
module: Arc::new(module),
engine,
config,
}
}
pub fn module(&self) -> &Module {
&self.module
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn create_store(&self, data_dir: PathBuf) -> Store<PluginState> {
let cpu_limit = self.config.max_cpu_time_ms;
let state = PluginState::new(self.sandbox.clone(), data_dir, cpu_limit);
Store::new(&self.engine, state)
}
pub fn instantiate(&self, store: &mut Store<PluginState>) -> Result<Instance> {
store.data_mut().start_time = Some(Instant::now());
Instance::new(store, &self.module, &[])
.with_context(|| format!("Failed to instantiate WASM plugin: {}", self.name))
}
pub fn execute_func(
&self,
store: &mut Store<PluginState>,
instance: &Instance,
func_name: &str,
input: &serde_json::Value,
) -> Result<serde_json::Value> {
store.data().check_cpu_limit()?;
let func = instance
.get_typed_func::<(i32, i32), (i32, i32)>(&mut *store, func_name)
.with_context(|| format!("Function '{}' not found in plugin", func_name))?;
let input_bytes = serde_json::to_vec(input)?;
let input_len = input_bytes.len() as i32;
let memory = instance
.get_memory(&mut *store, "memory")
.ok_or_else(|| anyhow!("No memory export in plugin"))?;
let input_ptr = self.allocate_in_memory(store, memory, input_len)?;
memory.data_mut(&mut *store)[input_ptr as usize..][..input_len as usize]
.copy_from_slice(&input_bytes);
let (output_ptr, output_len) = func.call(&mut *store, (input_ptr, input_len))?;
let data = memory.data(&store);
let output_slice = &data[output_ptr as usize..][..output_len as usize];
let output: serde_json::Value = serde_json::from_slice(output_slice)
.unwrap_or_else(|_| serde_json::json!({"error": "Invalid JSON output"}));
store.data().check_cpu_limit()?;
Ok(output)
}
fn allocate_in_memory(
&self,
store: &mut Store<PluginState>,
memory: Memory,
size: i32,
) -> Result<i32> {
let data = memory.data_mut(&mut *store);
let current_size = data.len() as i32;
let ptr = current_size;
let needed_size = ptr + size;
let current_pages = memory.size(&store);
let needed_pages = (needed_size / 65536) + 1;
if needed_pages > current_pages as i32 {
let grow_by = needed_pages - current_pages as i32;
memory
.grow(&mut *store, grow_by as u64)
.with_context(|| "Failed to grow WASM memory")?;
}
store.data_mut().memory_used += size as u64;
Ok(ptr)
}
}
#[async_trait::async_trait]
impl Plugin for WasmPlugin {
fn name(&self) -> &str {
&self.name
}
fn version(&self) -> &str {
&self.version
}
async fn initialize(&self, _context: &PluginContext) -> Layer4Result<()> {
Ok(())
}
async fn execute(&self, input: &serde_json::Value) -> Layer4Result<serde_json::Value> {
self.sandbox.check_cpu_limit()?;
let data_dir = std::env::temp_dir();
let mut store = self.create_store(data_dir);
let instance = self
.instantiate(&mut store)
.map_err(|e| anyhow!("WASM instantiation failed: {}", e))?;
for func_name in &["execute", "run", "_start", "main"] {
if let Ok(output) = self.execute_func(&mut store, &instance, func_name, input) {
return Ok(output);
}
}
Err(anyhow!(
"WASM plugin '{}' has no callable entry point. Expected one of: execute, run, _start, main",
self.name
))
}
async fn shutdown(&self) -> Layer4Result<()> {
Ok(())
}
}
pub struct WasmLoader {
engine: Engine,
modules: RwLock<HashMap<String, Module>>,
plugins: RwLock<HashMap<String, Arc<WasmPlugin>>>,
config: WasmConfig,
}
impl WasmLoader {
pub fn new() -> Layer4Result<Self> {
Self::with_config(WasmConfig::default())
}
pub fn with_config(config: WasmConfig) -> Layer4Result<Self> {
let mut engine_config = Config::new();
engine_config.wasm_backtrace_details(WasmBacktraceDetails::Enable);
engine_config.cranelift_opt_level(OptLevel::Speed);
if config.max_memory_bytes > 0 {
let max_pages = (config.max_memory_bytes / 65536) + 1;
engine_config.wasm_memory64(true);
engine_config.static_memory_maximum_size(max_pages * 65536);
}
let engine = Engine::new(&engine_config).context("Failed to create Wasmtime engine")?;
Ok(Self {
engine,
modules: RwLock::new(HashMap::new()),
plugins: RwLock::new(HashMap::new()),
config,
})
}
pub fn is_valid_wasm(path: &Path) -> bool {
if !path.exists() || !path.is_file() {
return false;
}
path.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "wasm")
.unwrap_or(false)
}
pub fn load(&self, path: &Path, capabilities: CapabilitySet) -> Layer4Result<String> {
let name = path
.file_stem()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let module = Module::from_file(&self.engine, path)
.with_context(|| format!("Failed to compile WASM: {:?}", path))?;
let sandbox = PluginSandbox::new(capabilities);
let plugin = WasmPlugin::new(
name.clone(),
self.extract_version(&module)
.unwrap_or_else(|| "0.1.0".to_string()),
sandbox,
module,
self.engine.clone(),
self.config.clone(),
);
let module = self.modules.read().get(&name).cloned();
if let Some(module) = module {
self.modules.write().insert(name.clone(), module);
}
self.plugins.write().insert(name.clone(), Arc::new(plugin));
tracing::info!("Loaded WASM plugin: {} from {:?}", name, path);
Ok(name)
}
fn extract_version(&self, _module: &Module) -> Option<String> {
None
}
pub fn get(&self, name: &str) -> Option<Arc<WasmPlugin>> {
self.plugins.read().get(name).cloned()
}
pub fn unload(&self, name: &str) -> Layer4Result<()> {
self.modules.write().remove(name);
self.plugins.write().remove(name);
tracing::info!("Unloaded WASM plugin: {}", name);
Ok(())
}
pub fn list(&self) -> Vec<String> {
self.plugins.read().keys().cloned().collect()
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn load_and_execute(
&self,
path: &Path,
input: &serde_json::Value,
capabilities: CapabilitySet,
) -> Layer4Result<serde_json::Value> {
let name = self.load(path, capabilities)?;
let plugin = self
.get(&name)
.ok_or_else(|| anyhow!("Plugin not found after loading: {}", name))?;
let rt = tokio::runtime::Runtime::new().context("Failed to create tokio runtime")?;
rt.block_on(async { plugin.execute(input).await })
}
}
impl Default for WasmLoader {
fn default() -> Self {
Self::new().expect("Failed to create WasmLoader")
}
}
pub struct WasiContextBuilder {
preopens: Vec<(String, PathBuf, DirPerms, FilePerms)>,
env: HashMap<String, String>,
args: Vec<String>,
inherit_stdio: bool,
inherit_env: bool,
}
impl WasiContextBuilder {
pub fn new() -> Self {
Self {
preopens: Vec::new(),
env: HashMap::new(),
args: Vec::new(),
inherit_stdio: true,
inherit_env: false,
}
}
pub fn preopen(&mut self, guest_path: &str, host_path: PathBuf) -> &mut Self {
self.preopens.push((
guest_path.to_string(),
host_path,
DirPerms::all(),
FilePerms::all(),
));
self
}
pub fn preopen_readonly(&mut self, guest_path: &str, host_path: PathBuf) -> &mut Self {
self.preopens.push((
guest_path.to_string(),
host_path,
DirPerms::READ,
FilePerms::READ,
));
self
}
pub fn preopen_with_perms(
&mut self,
guest_path: &str,
host_path: PathBuf,
dir_perms: DirPerms,
file_perms: FilePerms,
) -> &mut Self {
self.preopens
.push((guest_path.to_string(), host_path, dir_perms, file_perms));
self
}
pub fn env(&mut self, key: &str, value: &str) -> &mut Self {
self.env.insert(key.to_string(), value.to_string());
self
}
pub fn envs(&mut self, envs: &[(impl AsRef<str>, impl AsRef<str>)]) -> &mut Self {
for (k, v) in envs {
self.env
.insert(k.as_ref().to_string(), v.as_ref().to_string());
}
self
}
pub fn arg(&mut self, arg: &str) -> &mut Self {
self.args.push(arg.to_string());
self
}
pub fn args(&mut self, args: &[impl AsRef<str>]) -> &mut Self {
for arg in args {
self.args.push(arg.as_ref().to_string());
}
self
}
pub fn inherit_stdio(&mut self, inherit: bool) -> &mut Self {
self.inherit_stdio = inherit;
self
}
pub fn inherit_env(&mut self, inherit: bool) -> &mut Self {
self.inherit_env = inherit;
self
}
pub fn build(&self) -> WasiCtx {
let mut builder = WasiCtxBuilder::new();
if self.inherit_stdio {
builder.inherit_stdio();
}
if self.inherit_env {
builder.inherit_env();
}
for (key, value) in &self.env {
builder.env(key, value);
}
for arg in &self.args {
builder.arg(arg);
}
for (guest_path, host_path, dir_perms, file_perms) in &self.preopens {
builder
.preopened_dir(host_path, guest_path, *dir_perms, *file_perms)
.expect("Failed to preopen directory");
}
builder.build()
}
}
impl Default for WasiContextBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wasm_loader_creation() {
let loader = WasmLoader::new();
assert!(loader.is_ok());
let loader = loader.unwrap();
assert!(loader.list().is_empty());
}
#[test]
fn test_wasm_config_default() {
let config = WasmConfig::default();
assert_eq!(config.max_memory_bytes, 16 * 1024 * 1024);
assert_eq!(config.max_cpu_time_ms, 5000);
assert!(config.enable_wasi);
}
#[test]
fn test_is_valid_wasm() {
let tmp = tempfile::NamedTempFile::with_suffix(".wasm").unwrap();
assert!(WasmLoader::is_valid_wasm(tmp.path()));
let tmp_txt = tempfile::NamedTempFile::with_suffix(".txt").unwrap();
assert!(!WasmLoader::is_valid_wasm(tmp_txt.path()));
}
#[test]
fn test_wasm_plugin_creation() {
let loader = WasmLoader::new().unwrap();
let sandbox = PluginSandbox::sandboxed();
let engine = loader.engine().clone();
let config = WasmConfig::default();
let module = Module::new(&engine, "(module)").unwrap();
let plugin = WasmPlugin::new(
"test".to_string(),
"0.1.0".to_string(),
sandbox,
module,
engine,
config,
);
assert_eq!(plugin.name(), "test");
assert_eq!(plugin.version(), "0.1.0");
}
#[test]
fn test_wasi_context_builder() {
let mut builder = WasiContextBuilder::new();
builder.env("TEST", "value");
builder.arg("--help");
let _ctx = builder.build();
}
#[test]
fn test_wasi_context_builder_with_preopen() {
let mut builder = WasiContextBuilder::new();
builder.env("HOME", "/home/user");
builder.arg("--test");
builder.preopen("/tmp", std::env::temp_dir());
let _ctx = builder.build();
}
#[test]
fn test_wasi_context_builder_readonly() {
let mut builder = WasiContextBuilder::new();
builder.preopen_readonly("/data", std::env::temp_dir());
builder.inherit_stdio(true);
builder.inherit_env(false);
let _ctx = builder.build();
}
#[tokio::test]
async fn test_plugin_execute_without_entry_point_returns_error() {
let loader = WasmLoader::new().unwrap();
let sandbox = PluginSandbox::sandboxed();
let engine = loader.engine().clone();
let config = WasmConfig::default();
let module = Module::new(&engine, "(module)").unwrap();
let plugin = WasmPlugin::new(
"test".to_string(),
"0.1.0".to_string(),
sandbox,
module,
engine,
config,
);
let input = serde_json::json!({"test": "input"});
let result = plugin.execute(&input).await;
assert!(result.is_err());
let error = result.unwrap_err().to_string();
assert!(error.contains("no callable entry point"));
assert!(error.contains("execute, run, _start, main"));
}
}