Skip to main content

mofa_plugins/wasm_runtime/
runtime.rs

1//! WASM Runtime Core
2//!
3//! Core runtime management for WASM plugin execution
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::sync::RwLock;
11use tracing::{debug, info};
12use wasmtime::*;
13
14use super::plugin::{WasmPlugin, WasmPluginConfig};
15use super::types::{ExecutionConfig, MemoryConfig, ResourceLimits, WasmError, WasmResult};
16
17/// WASM runtime configuration
18#[derive(Debug, Clone)]
19pub struct RuntimeConfig {
20    /// Resource limits
21    pub resource_limits: ResourceLimits,
22    /// Execution configuration
23    pub execution_config: ExecutionConfig,
24    /// Memory configuration
25    pub memory_config: MemoryConfig,
26    /// Enable module caching
27    pub enable_cache: bool,
28    /// Cache directory
29    pub cache_dir: Option<String>,
30    /// Maximum cached modules
31    pub max_cached_modules: usize,
32    /// Enable parallel compilation
33    pub parallel_compilation: bool,
34    /// Cranelift optimization level
35    pub optimization_level: OptimizationLevel,
36}
37
38/// Optimization level for compilation
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum OptimizationLevel {
41    None,
42    Speed,
43    SpeedAndSize,
44}
45
46impl Default for RuntimeConfig {
47    fn default() -> Self {
48        Self {
49            resource_limits: ResourceLimits::default(),
50            execution_config: ExecutionConfig::default(),
51            memory_config: MemoryConfig::default(),
52            enable_cache: true,
53            cache_dir: None,
54            max_cached_modules: 100,
55            parallel_compilation: true,
56            optimization_level: OptimizationLevel::Speed,
57        }
58    }
59}
60
61impl RuntimeConfig {
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
67        self.resource_limits = limits;
68        self
69    }
70
71    pub fn with_cache_dir(mut self, dir: &str) -> Self {
72        self.cache_dir = Some(dir.to_string());
73        self
74    }
75
76    pub fn with_optimization(mut self, level: OptimizationLevel) -> Self {
77        self.optimization_level = level;
78        self
79    }
80
81    /// Convert to wasmtime Config
82    fn to_wasmtime_config(&self) -> Config {
83        let mut config = Config::new();
84
85        // Async support
86        config.async_support(self.execution_config.async_support);
87
88        // Fuel metering
89        config.consume_fuel(self.execution_config.fuel_metering);
90
91        // Epoch interruption
92        config.epoch_interruption(self.execution_config.epoch_interruption);
93
94        // Debug info
95        config.debug_info(self.execution_config.debug_info);
96
97        // WASM features
98        config.wasm_reference_types(self.execution_config.reference_types);
99        config.wasm_simd(self.execution_config.simd);
100        config.wasm_bulk_memory(self.execution_config.bulk_memory);
101        config.wasm_multi_value(self.execution_config.multi_value);
102        config.wasm_threads(self.execution_config.threads);
103
104        // Optimization level
105        match self.optimization_level {
106            OptimizationLevel::None => {
107                config.cranelift_opt_level(wasmtime::OptLevel::None);
108            }
109            OptimizationLevel::Speed => {
110                config.cranelift_opt_level(wasmtime::OptLevel::Speed);
111            }
112            OptimizationLevel::SpeedAndSize => {
113                config.cranelift_opt_level(wasmtime::OptLevel::SpeedAndSize);
114            }
115        }
116
117        // Parallel compilation
118        config.parallel_compilation(self.parallel_compilation);
119
120        config
121    }
122}
123
124/// Compiled module with metadata
125pub struct CompiledModule {
126    /// Module name/ID
127    pub name: String,
128    /// Compiled wasmtime module
129    pub module: Module,
130    /// Compilation time
131    pub compile_time_ms: u64,
132    /// Module size in bytes
133    pub size_bytes: usize,
134    /// Hash of source bytes
135    pub source_hash: String,
136    /// Compilation timestamp
137    pub compiled_at: u64,
138}
139
140impl CompiledModule {
141    pub fn new(name: &str, module: Module, source_bytes: &[u8], compile_time_ms: u64) -> Self {
142        let source_hash = format!("{:x}", md5_hash(source_bytes));
143
144        Self {
145            name: name.to_string(),
146            module,
147            compile_time_ms,
148            size_bytes: source_bytes.len(),
149            source_hash,
150            compiled_at: std::time::SystemTime::now()
151                .duration_since(std::time::UNIX_EPOCH)
152                .unwrap_or_default()
153                .as_secs(),
154        }
155    }
156}
157
158/// Module cache for compiled WASM modules
159pub struct ModuleCache {
160    /// Cached modules by name
161    modules: RwLock<HashMap<String, Arc<CompiledModule>>>,
162    /// Cache by source hash
163    by_hash: RwLock<HashMap<String, String>>,
164    /// Maximum entries
165    max_entries: usize,
166    /// Cache hits
167    hits: RwLock<u64>,
168    /// Cache misses
169    misses: RwLock<u64>,
170}
171
172impl ModuleCache {
173    pub fn new(max_entries: usize) -> Self {
174        Self {
175            modules: RwLock::new(HashMap::new()),
176            by_hash: RwLock::new(HashMap::new()),
177            max_entries,
178            hits: RwLock::new(0),
179            misses: RwLock::new(0),
180        }
181    }
182
183    /// Get module by name
184    pub async fn get(&self, name: &str) -> Option<Arc<CompiledModule>> {
185        let modules = self.modules.read().await;
186        if let Some(module) = modules.get(name).cloned() {
187            *self.hits.write().await += 1;
188            Some(module)
189        } else {
190            *self.misses.write().await += 1;
191            None
192        }
193    }
194
195    /// Get module by source hash
196    pub async fn get_by_hash(&self, hash: &str) -> Option<Arc<CompiledModule>> {
197        let by_hash = self.by_hash.read().await;
198        if let Some(name) = by_hash.get(hash) {
199            self.get(name).await
200        } else {
201            *self.misses.write().await += 1;
202            None
203        }
204    }
205
206    /// Insert a compiled module
207    pub async fn insert(&self, module: CompiledModule) {
208        let name = module.name.clone();
209        let hash = module.source_hash.clone();
210        let arc = Arc::new(module);
211
212        let mut modules = self.modules.write().await;
213
214        // Evict if at capacity (simple LRU-ish: remove oldest)
215        if modules.len() >= self.max_entries
216            && let Some(oldest) = modules.keys().next().cloned()
217        {
218            modules.remove(&oldest);
219        }
220
221        modules.insert(name.clone(), arc);
222        drop(modules);
223
224        self.by_hash.write().await.insert(hash, name);
225    }
226
227    /// Remove a module from cache
228    pub async fn remove(&self, name: &str) {
229        let mut modules = self.modules.write().await;
230        if let Some(module) = modules.remove(name) {
231            self.by_hash.write().await.remove(&module.source_hash);
232        }
233    }
234
235    /// Clear the cache
236    pub async fn clear(&self) {
237        self.modules.write().await.clear();
238        self.by_hash.write().await.clear();
239    }
240
241    /// Get cache statistics
242    pub async fn stats(&self) -> CacheStats {
243        let modules = self.modules.read().await;
244        CacheStats {
245            entries: modules.len(),
246            total_size_bytes: modules.values().map(|m| m.size_bytes).sum(),
247            hits: *self.hits.read().await,
248            misses: *self.misses.read().await,
249        }
250    }
251}
252
253impl Default for ModuleCache {
254    fn default() -> Self {
255        Self::new(100)
256    }
257}
258
259/// Cache statistics
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct CacheStats {
262    pub entries: usize,
263    pub total_size_bytes: usize,
264    pub hits: u64,
265    pub misses: u64,
266}
267
268impl CacheStats {
269    pub fn hit_rate(&self) -> f64 {
270        let total = self.hits + self.misses;
271        if total == 0 {
272            0.0
273        } else {
274            self.hits as f64 / total as f64
275        }
276    }
277}
278
279/// Runtime statistics
280#[derive(Debug, Clone, Default, Serialize, Deserialize)]
281pub struct RuntimeStats {
282    /// Total modules compiled
283    pub modules_compiled: u64,
284    /// Total compilation time in milliseconds
285    pub total_compile_time_ms: u64,
286    /// Total plugins created
287    pub plugins_created: u64,
288    /// Currently active plugins
289    pub active_plugins: u64,
290    /// Total executions
291    pub total_executions: u64,
292    /// Failed executions
293    pub failed_executions: u64,
294    /// Cache statistics
295    pub cache_stats: Option<CacheStats>,
296}
297
298/// WASM Runtime
299pub struct WasmRuntime {
300    /// Configuration
301    config: RuntimeConfig,
302    /// Wasmtime engine
303    engine: Engine,
304    /// Module cache
305    cache: ModuleCache,
306    /// Runtime statistics
307    stats: RwLock<RuntimeStats>,
308    /// Created time
309    created_at: Instant,
310}
311
312impl WasmRuntime {
313    /// Create a new WASM runtime
314    pub fn new(config: RuntimeConfig) -> WasmResult<Self> {
315        let wasmtime_config = config.to_wasmtime_config();
316        let engine = Engine::new(&wasmtime_config)
317            .map_err(|e| WasmError::Internal(format!("Failed to create engine: {}", e)))?;
318
319        let cache = ModuleCache::new(config.max_cached_modules);
320
321        info!(
322            "WASM runtime created with config: {:?}",
323            config.optimization_level
324        );
325
326        Ok(Self {
327            config,
328            engine,
329            cache,
330            stats: RwLock::new(RuntimeStats::default()),
331            created_at: Instant::now(),
332        })
333    }
334
335    /// Create with default configuration
336    pub fn default_runtime() -> WasmResult<Self> {
337        Self::new(RuntimeConfig::default())
338    }
339
340    /// Get the wasmtime engine
341    pub fn engine(&self) -> &Engine {
342        &self.engine
343    }
344
345    /// Get runtime configuration
346    pub fn config(&self) -> &RuntimeConfig {
347        &self.config
348    }
349
350    /// Compile a WASM module from bytes
351    pub async fn compile(&self, name: &str, bytes: &[u8]) -> WasmResult<Arc<CompiledModule>> {
352        // Check cache first
353        let hash = format!("{:x}", md5_hash(bytes));
354        if let Some(cached) = self.cache.get_by_hash(&hash).await {
355            debug!("Using cached module for {}", name);
356            return Ok(cached);
357        }
358
359        // Compile
360        let start = Instant::now();
361        let module = Module::new(&self.engine, bytes)
362            .map_err(|e| WasmError::CompilationError(e.to_string()))?;
363        let compile_time = start.elapsed().as_millis() as u64;
364
365        let compiled = CompiledModule::new(name, module, bytes, compile_time);
366
367        // Update stats
368        {
369            let mut stats = self.stats.write().await;
370            stats.modules_compiled += 1;
371            stats.total_compile_time_ms += compile_time;
372        }
373
374        // Cache it
375        self.cache.insert(compiled).await;
376
377        info!(
378            "Compiled module {} in {}ms ({} bytes)",
379            name,
380            compile_time,
381            bytes.len()
382        );
383
384        self.cache
385            .get(name)
386            .await
387            .ok_or_else(|| WasmError::Internal("Failed to retrieve compiled module".to_string()))
388    }
389
390    /// Compile from WAT format
391    pub async fn compile_wat(&self, name: &str, wat: &str) -> WasmResult<Arc<CompiledModule>> {
392        let bytes = wat.as_bytes().to_vec();
393        self.compile(name, &bytes).await
394    }
395
396    /// Compile from file
397    pub async fn compile_file(&self, name: &str, path: &Path) -> WasmResult<Arc<CompiledModule>> {
398        let bytes = tokio::fs::read(path).await?;
399        self.compile(name, &bytes).await
400    }
401
402    /// Create a plugin from compiled module
403    pub async fn create_plugin(
404        &self,
405        compiled: &CompiledModule,
406        config: WasmPluginConfig,
407    ) -> WasmResult<WasmPlugin> {
408        // Create module clone since WasmPlugin needs ownership
409        let module_bytes = compiled
410            .module
411            .serialize()
412            .map_err(|e| WasmError::Internal(e.to_string()))?;
413
414        let module = unsafe {
415            Module::deserialize(&self.engine, &module_bytes)
416                .map_err(|e| WasmError::LoadError(e.to_string()))?
417        };
418
419        // Update stats
420        {
421            let mut stats = self.stats.write().await;
422            stats.plugins_created += 1;
423            stats.active_plugins += 1;
424        }
425
426        // Create plugin directly using internal constructor with async support flag
427        let plugin = create_plugin_from_module(
428            &self.engine,
429            module,
430            config,
431            self.config.execution_config.async_support,
432        )?;
433
434        info!(
435            "Created plugin {} from module {}",
436            plugin.id(),
437            compiled.name
438        );
439        Ok(plugin)
440    }
441
442    /// Create a plugin directly from bytes
443    pub async fn create_plugin_from_bytes(
444        &self,
445        bytes: &[u8],
446        config: WasmPluginConfig,
447    ) -> WasmResult<WasmPlugin> {
448        let compiled = self.compile(&config.id, bytes).await?;
449        self.create_plugin(&compiled, config).await
450    }
451
452    /// Create a plugin directly from WAT
453    pub async fn create_plugin_from_wat(
454        &self,
455        wat: &str,
456        config: WasmPluginConfig,
457    ) -> WasmResult<WasmPlugin> {
458        let bytes = wat.as_bytes().to_vec();
459        self.create_plugin_from_bytes(&bytes, config).await
460    }
461
462    /// Get runtime statistics
463    pub async fn stats(&self) -> RuntimeStats {
464        let mut stats = self.stats.read().await.clone();
465        stats.cache_stats = Some(self.cache.stats().await);
466        stats
467    }
468
469    /// Get cache statistics
470    pub async fn cache_stats(&self) -> CacheStats {
471        self.cache.stats().await
472    }
473
474    /// Clear the module cache
475    pub async fn clear_cache(&self) {
476        self.cache.clear().await;
477        info!("Module cache cleared");
478    }
479
480    /// Get uptime in seconds
481    pub fn uptime_secs(&self) -> u64 {
482        self.created_at.elapsed().as_secs()
483    }
484
485    /// Increment epoch for epoch-based interruption
486    pub fn increment_epoch(&self) {
487        self.engine.increment_epoch();
488    }
489
490    /// Start epoch ticker for timeout support
491    pub fn start_epoch_ticker(&self) -> tokio::task::JoinHandle<()> {
492        let engine = self.engine.clone();
493        let tick_ms = self.config.execution_config.epoch_tick_ms;
494
495        tokio::spawn(async move {
496            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(tick_ms));
497            loop {
498                interval.tick().await;
499                engine.increment_epoch();
500            }
501        })
502    }
503}
504
505/// Helper function to create plugin from module
506fn create_plugin_from_module(
507    engine: &Engine,
508    module: Module,
509    config: WasmPluginConfig,
510    async_support: bool,
511) -> WasmResult<WasmPlugin> {
512    use super::host::HostContext;
513    use super::plugin::*;
514
515    let manifest = {
516        let mut manifest = super::types::PluginManifest::new(&config.id, "1.0.0");
517        for export in module.exports() {
518            if let ExternType::Func(_) = export.ty() {
519                manifest.exports.push(super::types::PluginExport::function(
520                    export.name(),
521                    vec![],
522                    vec![],
523                ));
524            }
525        }
526        manifest
527    };
528
529    let host_context = Arc::new(HostContext::new(
530        &config.id,
531        config.allowed_capabilities.clone(),
532    ));
533
534    Ok(WasmPlugin::from_parts_with_async(
535        config.id.clone(),
536        config,
537        manifest,
538        module,
539        engine.clone(),
540        host_context,
541        async_support,
542    ))
543}
544
545/// Simple MD5 hash for cache keys (using sha2 since md5 not available)
546fn md5_hash(data: &[u8]) -> u64 {
547    use std::collections::hash_map::DefaultHasher;
548    use std::hash::{Hash, Hasher};
549
550    let mut hasher = DefaultHasher::new();
551    data.hash(&mut hasher);
552    hasher.finish()
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn test_runtime_config() {
561        let config = RuntimeConfig::new().with_optimization(OptimizationLevel::Speed);
562
563        assert_eq!(config.optimization_level, OptimizationLevel::Speed);
564        assert!(config.enable_cache);
565    }
566
567    #[tokio::test]
568    async fn test_module_cache() {
569        let cache = ModuleCache::new(10);
570
571        let stats = cache.stats().await;
572        assert_eq!(stats.entries, 0);
573        assert_eq!(stats.hits, 0);
574        assert_eq!(stats.misses, 0);
575
576        // Miss
577        let _ = cache.get("nonexistent").await;
578        assert_eq!(*cache.misses.read().await, 1);
579    }
580
581    #[tokio::test]
582    async fn test_wasm_runtime_creation() {
583        let config = RuntimeConfig::default();
584        let runtime = WasmRuntime::new(config).unwrap();
585
586        let stats = runtime.stats().await;
587        assert_eq!(stats.modules_compiled, 0);
588        assert_eq!(stats.plugins_created, 0);
589    }
590
591    #[tokio::test]
592    async fn test_compile_wat() {
593        let runtime = WasmRuntime::default_runtime().unwrap();
594
595        let wat = r#"
596            (module
597                (func (export "answer") (result i32)
598                    i32.const 42
599                )
600            )
601        "#;
602
603        let compiled = runtime.compile_wat("test", wat).await.unwrap();
604        assert_eq!(compiled.name, "test");
605        assert!(compiled.compile_time_ms >= 0);
606
607        let stats = runtime.stats().await;
608        assert_eq!(stats.modules_compiled, 1);
609    }
610
611    #[test]
612    fn test_cache_stats_hit_rate() {
613        let stats = CacheStats {
614            entries: 10,
615            total_size_bytes: 1000,
616            hits: 80,
617            misses: 20,
618        };
619
620        assert_eq!(stats.hit_rate(), 0.8);
621    }
622}