use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use wasmtime::{Config, Engine, OptLevel};
use crate::channels::wasm::error::WasmChannelError;
use crate::tools::wasm::{FuelConfig, ResourceLimits};
#[derive(Debug, Clone)]
pub struct WasmChannelRuntimeConfig {
pub default_limits: ResourceLimits,
pub fuel_config: FuelConfig,
pub cache_compiled: bool,
pub cache_dir: Option<PathBuf>,
pub optimization_level: OptLevel,
pub callback_timeout: Duration,
}
impl Default for WasmChannelRuntimeConfig {
fn default() -> Self {
Self {
default_limits: ResourceLimits {
memory_bytes: 50 * 1024 * 1024, fuel: 10_000_000,
timeout: Duration::from_secs(60),
},
fuel_config: FuelConfig::default(),
cache_compiled: true,
cache_dir: None,
optimization_level: OptLevel::Speed,
callback_timeout: Duration::from_secs(30),
}
}
}
impl WasmChannelRuntimeConfig {
pub fn for_testing() -> Self {
Self {
default_limits: ResourceLimits {
memory_bytes: 5 * 1024 * 1024, fuel: 1_000_000,
timeout: Duration::from_secs(5),
},
fuel_config: FuelConfig::with_limit(1_000_000),
cache_compiled: false,
cache_dir: None,
optimization_level: OptLevel::None, callback_timeout: Duration::from_secs(5),
}
}
}
#[derive(Debug)]
pub struct PreparedChannelModule {
pub name: String,
pub description: String,
pub(crate) component_bytes: Vec<u8>,
pub limits: ResourceLimits,
}
impl PreparedChannelModule {
pub fn component_bytes(&self) -> &[u8] {
&self.component_bytes
}
pub fn for_testing(name: impl Into<String>, description: impl Into<String>) -> Self {
Self {
name: name.into(),
description: description.into(),
component_bytes: Vec::new(),
limits: ResourceLimits::default(),
}
}
}
pub struct WasmChannelRuntime {
engine: Engine,
config: WasmChannelRuntimeConfig,
modules: RwLock<HashMap<String, Arc<PreparedChannelModule>>>,
}
impl WasmChannelRuntime {
pub fn new(config: WasmChannelRuntimeConfig) -> Result<Self, WasmChannelError> {
let mut wasmtime_config = Config::new();
if config.fuel_config.enabled {
wasmtime_config.consume_fuel(true);
}
wasmtime_config.epoch_interruption(true);
wasmtime_config.wasm_component_model(true);
wasmtime_config.wasm_threads(false);
wasmtime_config.cranelift_opt_level(config.optimization_level);
wasmtime_config.debug_info(false);
let engine = Engine::new(&wasmtime_config).map_err(|e| {
WasmChannelError::Config(format!("Failed to create Wasmtime engine: {}", e))
})?;
Ok(Self {
engine,
config,
modules: RwLock::new(HashMap::new()),
})
}
pub fn engine(&self) -> &Engine {
&self.engine
}
pub fn config(&self) -> &WasmChannelRuntimeConfig {
&self.config
}
pub async fn prepare(
&self,
name: &str,
wasm_bytes: &[u8],
limits: Option<ResourceLimits>,
description: Option<String>,
) -> Result<Arc<PreparedChannelModule>, WasmChannelError> {
if let Some(module) = self.modules.read().await.get(name) {
return Ok(Arc::clone(module));
}
let name = name.to_string();
let wasm_bytes = wasm_bytes.to_vec();
let engine = self.engine.clone();
let default_limits = self.config.default_limits.clone();
let desc = description.unwrap_or_else(|| format!("WASM channel: {}", name));
let prepared = tokio::task::spawn_blocking(move || {
let _component = wasmtime::component::Component::new(&engine, &wasm_bytes)
.map_err(|e| WasmChannelError::Compilation(e.to_string()))?;
Ok::<_, WasmChannelError>(PreparedChannelModule {
name: name.clone(),
description: desc,
component_bytes: wasm_bytes,
limits: limits.unwrap_or(default_limits),
})
})
.await
.map_err(|e| {
WasmChannelError::Compilation(format!("Preparation task panicked: {}", e))
})??;
let prepared = Arc::new(prepared);
if self.config.cache_compiled {
self.modules
.write()
.await
.insert(prepared.name.clone(), Arc::clone(&prepared));
}
tracing::info!(
name = %prepared.name,
"Prepared WASM channel for execution"
);
Ok(prepared)
}
pub async fn get(&self, name: &str) -> Option<Arc<PreparedChannelModule>> {
self.modules.read().await.get(name).cloned()
}
pub async fn remove(&self, name: &str) -> Option<Arc<PreparedChannelModule>> {
self.modules.write().await.remove(name)
}
pub async fn list(&self) -> Vec<String> {
self.modules.read().await.keys().cloned().collect()
}
pub async fn clear(&self) {
self.modules.write().await.clear();
}
}
impl std::fmt::Debug for WasmChannelRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WasmChannelRuntime")
.field("config", &self.config)
.field("modules", &"<RwLock<HashMap>>")
.finish()
}
}
#[cfg(test)]
mod tests {
use crate::channels::wasm::runtime::{WasmChannelRuntime, WasmChannelRuntimeConfig};
#[test]
fn test_runtime_config_default() {
let config = WasmChannelRuntimeConfig::default();
assert!(config.cache_compiled);
assert!(config.fuel_config.enabled);
assert_eq!(config.default_limits.memory_bytes, 50 * 1024 * 1024);
}
#[test]
fn test_runtime_config_for_testing() {
let config = WasmChannelRuntimeConfig::for_testing();
assert!(!config.cache_compiled);
assert_eq!(config.default_limits.memory_bytes, 5 * 1024 * 1024);
}
#[test]
fn test_runtime_creation() {
let config = WasmChannelRuntimeConfig::for_testing();
let runtime = WasmChannelRuntime::new(config).unwrap();
assert!(runtime.config().fuel_config.enabled);
}
#[tokio::test]
async fn test_module_cache_operations() {
let config = WasmChannelRuntimeConfig::for_testing();
let runtime = WasmChannelRuntime::new(config).unwrap();
assert!(runtime.list().await.is_empty());
assert!(runtime.get("test").await.is_none());
}
}