Skip to main content

mofa_plugins/wasm_runtime/
plugin.rs

1//! WASM Plugin Wrapper
2//!
3//! Wraps WASM modules as plugins with lifecycle management
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Instant;
9use tokio::sync::RwLock;
10use tracing::{debug, error, info};
11use wasmtime::*;
12
13use super::host::{DefaultHostFunctions, HostContext};
14use super::types::{
15    ExecutionConfig, PluginCapability, PluginManifest, ResourceLimits, WasmError, WasmResult,
16    WasmValue,
17};
18
19/// WASM plugin state
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
21pub enum WasmPluginState {
22    /// Plugin is created but not initialized
23    #[default]
24    Created,
25    /// Plugin is initializing
26    Initializing,
27    /// Plugin is ready to execute
28    Ready,
29    /// Plugin is currently executing
30    Running,
31    /// Plugin is paused
32    Paused,
33    /// Plugin encountered an error
34    Error,
35    /// Plugin is stopped
36    Stopped,
37}
38
39/// WASM plugin configuration
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct WasmPluginConfig {
42    /// Plugin ID
43    pub id: String,
44    /// Resource limits
45    pub resource_limits: ResourceLimits,
46    /// Execution configuration
47    pub execution_config: ExecutionConfig,
48    /// Allowed capabilities
49    pub allowed_capabilities: Vec<PluginCapability>,
50    /// Initial configuration values
51    pub initial_config: HashMap<String, WasmValue>,
52    /// Enable caching
53    pub enable_caching: bool,
54}
55
56impl Default for WasmPluginConfig {
57    fn default() -> Self {
58        Self {
59            id: uuid::Uuid::now_v7().to_string(),
60            resource_limits: ResourceLimits::default(),
61            execution_config: ExecutionConfig::default(),
62            allowed_capabilities: vec![PluginCapability::ReadConfig, PluginCapability::SendMessage],
63            initial_config: HashMap::new(),
64            enable_caching: true,
65        }
66    }
67}
68
69impl WasmPluginConfig {
70    pub fn new(id: &str) -> Self {
71        Self {
72            id: id.to_string(),
73            ..Default::default()
74        }
75    }
76
77    pub fn with_capability(mut self, cap: PluginCapability) -> Self {
78        if !self.allowed_capabilities.contains(&cap) {
79            self.allowed_capabilities.push(cap);
80        }
81        self
82    }
83
84    pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
85        self.resource_limits = limits;
86        self
87    }
88
89    pub fn with_config(mut self, key: &str, value: WasmValue) -> Self {
90        self.initial_config.insert(key.to_string(), value);
91        self
92    }
93}
94
95/// Plugin execution metrics
96#[derive(Debug, Clone, Default, Serialize, Deserialize)]
97pub struct PluginMetrics {
98    /// Total number of calls
99    pub call_count: u64,
100    /// Successful calls
101    pub success_count: u64,
102    /// Failed calls
103    pub error_count: u64,
104    /// Total execution time in nanoseconds
105    pub total_execution_time_ns: u64,
106    /// Average execution time in nanoseconds
107    pub avg_execution_time_ns: u64,
108    /// Peak memory usage in bytes
109    pub peak_memory_bytes: u64,
110    /// Current memory usage in bytes
111    pub current_memory_bytes: u64,
112    /// Fuel consumed (if metering enabled)
113    pub fuel_consumed: u64,
114    /// Last execution timestamp
115    pub last_execution: u64,
116}
117
118impl PluginMetrics {
119    pub fn record_execution(&mut self, duration_ns: u64, success: bool) {
120        self.call_count += 1;
121        if success {
122            self.success_count += 1;
123        } else {
124            self.error_count += 1;
125        }
126        self.total_execution_time_ns += duration_ns;
127        self.avg_execution_time_ns = self.total_execution_time_ns / self.call_count;
128        self.last_execution = std::time::SystemTime::now()
129            .duration_since(std::time::UNIX_EPOCH)
130            .unwrap_or_default()
131            .as_secs();
132    }
133}
134
135/// Plugin instance wrapping wasmtime Instance
136pub struct PluginInstance {
137    /// Wasmtime store
138    store: Store<PluginState>,
139    /// Wasmtime instance
140    instance: Instance,
141    /// Plugin manifest
142    manifest: PluginManifest,
143}
144
145/// Plugin state stored in wasmtime Store
146pub struct PluginState {
147    /// Host context
148    pub host_context: Arc<HostContext>,
149    /// Host functions implementation
150    pub host_functions: Arc<DefaultHostFunctions>,
151    /// Limits configuration
152    pub limits: StoreLimits,
153    /// Execution start time
154    pub execution_start: Option<Instant>,
155    /// Fuel limit
156    pub fuel_limit: Option<u64>,
157}
158
159/// Store limits for resource control
160pub struct StoreLimits {
161    pub max_memory_bytes: u64,
162    pub max_table_elements: u32,
163    pub max_instances: u32,
164}
165
166impl Default for StoreLimits {
167    fn default() -> Self {
168        Self {
169            max_memory_bytes: 16 * 1024 * 1024, // 16MB
170            max_table_elements: 10000,
171            max_instances: 10,
172        }
173    }
174}
175
176impl ResourceLimiter for StoreLimits {
177    fn memory_growing(
178        &mut self,
179        _current: usize,
180        desired: usize,
181        maximum: Option<usize>,
182    ) -> Result<bool> {
183        let max = maximum.unwrap_or(self.max_memory_bytes as usize);
184        Ok(desired <= max && desired <= self.max_memory_bytes as usize)
185    }
186
187    fn table_growing(
188        &mut self,
189        _current: usize,
190        desired: usize,
191        maximum: Option<usize>,
192    ) -> Result<bool> {
193        let max = maximum.unwrap_or(self.max_table_elements as usize);
194        Ok(desired <= max && desired <= self.max_table_elements as usize)
195    }
196}
197
198/// WASM Plugin
199pub struct WasmPlugin {
200    /// Plugin ID
201    id: String,
202    /// Plugin configuration
203    config: WasmPluginConfig,
204    /// Plugin manifest
205    manifest: PluginManifest,
206    /// Current state
207    state: RwLock<WasmPluginState>,
208    /// Compiled module
209    module: Module,
210    /// Wasmtime engine
211    engine: Engine,
212    /// Host context
213    host_context: Arc<HostContext>,
214    /// Execution metrics
215    metrics: RwLock<PluginMetrics>,
216    /// Instance (created on demand)
217    instance: RwLock<Option<PluginInstance>>,
218    /// Whether async support is enabled in the engine
219    async_support: bool,
220}
221
222impl WasmPlugin {
223    /// Create a new WASM plugin from module bytes
224    pub fn from_bytes(engine: &Engine, bytes: &[u8], config: WasmPluginConfig) -> WasmResult<Self> {
225        Self::from_bytes_with_async(engine, bytes, config, true)
226    }
227
228    /// Create a new WASM plugin from module bytes with async support flag
229    pub fn from_bytes_with_async(
230        engine: &Engine,
231        bytes: &[u8],
232        config: WasmPluginConfig,
233        async_support: bool,
234    ) -> WasmResult<Self> {
235        let module =
236            Module::new(engine, bytes).map_err(|e| WasmError::CompilationError(e.to_string()))?;
237
238        // Extract manifest from module (could be custom section or export)
239        let manifest = Self::extract_manifest(&module, &config);
240
241        let host_context = Arc::new(HostContext::new(
242            &config.id,
243            config.allowed_capabilities.clone(),
244        ));
245
246        // Set initial config values
247        for (key, value) in &config.initial_config {
248            // Spawn a task to set config since we can't use async here
249            let ctx = host_context.clone();
250            let k = key.clone();
251            let v = value.clone();
252            tokio::spawn(async move {
253                ctx.set_config(&k, v).await;
254            });
255        }
256
257        Ok(Self {
258            id: config.id.clone(),
259            config,
260            manifest,
261            state: RwLock::new(WasmPluginState::Created),
262            module,
263            engine: engine.clone(),
264            host_context,
265            metrics: RwLock::new(PluginMetrics::default()),
266            instance: RwLock::new(None),
267            async_support,
268        })
269    }
270
271    /// Create from WAT (WebAssembly Text format)
272    pub fn from_wat(engine: &Engine, wat: &str, config: WasmPluginConfig) -> WasmResult<Self> {
273        Self::from_wat_with_async(engine, wat, config, true)
274    }
275
276    /// Create from WAT with async support flag
277    pub fn from_wat_with_async(
278        engine: &Engine,
279        wat: &str,
280        config: WasmPluginConfig,
281        async_support: bool,
282    ) -> WasmResult<Self> {
283        let bytes = wat.to_string().into_bytes();
284        Self::from_bytes_with_async(engine, &bytes, config, async_support)
285    }
286
287    /// Create from file path
288    pub fn from_file(
289        engine: &Engine,
290        path: &std::path::Path,
291        config: WasmPluginConfig,
292    ) -> WasmResult<Self> {
293        let bytes = std::fs::read(path)?;
294        Self::from_bytes(engine, &bytes, config)
295    }
296
297    fn extract_manifest(module: &Module, config: &WasmPluginConfig) -> PluginManifest {
298        // Try to extract from custom section or use defaults
299        let mut manifest = PluginManifest::new(&config.id, "1.0.0");
300
301        // List exports from module
302        for export in module.exports() {
303            match export.ty() {
304                ExternType::Func(_) => {
305                    manifest.exports.push(super::types::PluginExport::function(
306                        export.name(),
307                        vec![],
308                        vec![],
309                    ));
310                }
311                ExternType::Memory(_) => {
312                    manifest
313                        .exports
314                        .push(super::types::PluginExport::memory(export.name()));
315                }
316                _ => {}
317            }
318        }
319
320        manifest
321    }
322
323    /// Get plugin ID
324    pub fn id(&self) -> &str {
325        &self.id
326    }
327
328    /// Get plugin manifest
329    pub fn manifest(&self) -> &PluginManifest {
330        &self.manifest
331    }
332
333    /// Get current state
334    pub async fn state(&self) -> WasmPluginState {
335        *self.state.read().await
336    }
337
338    /// Get metrics
339    pub async fn metrics(&self) -> PluginMetrics {
340        self.metrics.read().await.clone()
341    }
342
343    /// Initialize the plugin
344    pub async fn initialize(&self) -> WasmResult<()> {
345        let mut state = self.state.write().await;
346        if *state != WasmPluginState::Created && *state != WasmPluginState::Stopped {
347            return Err(WasmError::ExecutionError(format!(
348                "Cannot initialize plugin in state {:?}",
349                *state
350            )));
351        }
352
353        *state = WasmPluginState::Initializing;
354        drop(state);
355
356        // Create instance
357        self.create_instance().await?;
358
359        // Call _initialize if exported
360        if self.has_export("_initialize").await {
361            self.call_void("_initialize", &[]).await?;
362        }
363
364        *self.state.write().await = WasmPluginState::Ready;
365        info!("Plugin {} initialized", self.id);
366        Ok(())
367    }
368
369    /// Check if export exists
370    pub async fn has_export(&self, name: &str) -> bool {
371        // Check if the module has this export
372        for export in self.module.exports() {
373            if export.name() == name {
374                return true;
375            }
376        }
377        false
378    }
379
380    /// Create a new instance
381    async fn create_instance(&self) -> WasmResult<()> {
382        let host_functions = Arc::new(DefaultHostFunctions::new(self.host_context.clone()));
383
384        let limits = StoreLimits {
385            max_memory_bytes: self.config.resource_limits.max_memory_pages as u64 * 65536,
386            max_table_elements: self.config.resource_limits.max_table_elements,
387            max_instances: self.config.resource_limits.max_instances,
388        };
389
390        let plugin_state = PluginState {
391            host_context: self.host_context.clone(),
392            host_functions: host_functions.clone(),
393            limits,
394            execution_start: None,
395            fuel_limit: self.config.resource_limits.max_fuel,
396        };
397
398        let mut store = Store::new(&self.engine, plugin_state);
399
400        // Set fuel if metering enabled
401        if let Some(fuel) = self.config.resource_limits.max_fuel {
402            store
403                .set_fuel(fuel)
404                .map_err(|e| WasmError::Internal(e.to_string()))?;
405        }
406
407        // Create linker with host functions
408        let mut linker = Linker::new(&self.engine);
409        Self::add_host_functions(&mut linker, host_functions)?;
410
411        // Use async or sync instantiation based on engine configuration
412        let instance = if self.async_support {
413            linker
414                .instantiate_async(&mut store, &self.module)
415                .await
416                .map_err(|e| WasmError::InstantiationError(e.to_string()))?
417        } else {
418            linker
419                .instantiate(&mut store, &self.module)
420                .map_err(|e| WasmError::InstantiationError(e.to_string()))?
421        };
422
423        let plugin_instance = PluginInstance {
424            store,
425            instance,
426            manifest: self.manifest.clone(),
427        };
428
429        *self.instance.write().await = Some(plugin_instance);
430        Ok(())
431    }
432
433    fn add_host_functions(
434        linker: &mut Linker<PluginState>,
435        _host_functions: Arc<DefaultHostFunctions>,
436    ) -> WasmResult<()> {
437        // Add host_log function
438        linker
439            .func_wrap(
440                "env",
441                "host_log",
442                |_caller: Caller<'_, PluginState>, level: i32, ptr: i32, len: i32| {
443                    // In real implementation, read string from memory and call host_functions.log()
444                    debug!("host_log called: level={}, ptr={}, len={}", level, ptr, len);
445                    0i32 // Success
446                },
447            )
448            .map_err(|e| WasmError::Internal(e.to_string()))?;
449
450        // Add host_now_ms function
451        linker
452            .func_wrap(
453                "env",
454                "host_now_ms",
455                |_caller: Caller<'_, PluginState>| -> i64 {
456                    std::time::SystemTime::now()
457                        .duration_since(std::time::UNIX_EPOCH)
458                        .unwrap_or_default()
459                        .as_millis() as i64
460                },
461            )
462            .map_err(|e| WasmError::Internal(e.to_string()))?;
463
464        // Add host_alloc function
465        linker
466            .func_wrap(
467                "env",
468                "host_alloc",
469                |_caller: Caller<'_, PluginState>, size: i32| -> i32 {
470                    // Simple bump allocator simulation
471                    debug!("host_alloc called: size={}", size);
472                    0 // Return null for now
473                },
474            )
475            .map_err(|e| WasmError::Internal(e.to_string()))?;
476
477        // Add host_free function
478        linker
479            .func_wrap(
480                "env",
481                "host_free",
482                |_caller: Caller<'_, PluginState>, ptr: i32| {
483                    debug!("host_free called: ptr={}", ptr);
484                },
485            )
486            .map_err(|e| WasmError::Internal(e.to_string()))?;
487
488        // Add abort function (used by AssemblyScript and others)
489        linker
490            .func_wrap(
491                "env",
492                "abort",
493                |_caller: Caller<'_, PluginState>, msg: i32, file: i32, line: i32, col: i32| {
494                    error!(
495                        "WASM abort: msg={}, file={}, line={}, col={}",
496                        msg, file, line, col
497                    );
498                },
499            )
500            .map_err(|e| WasmError::Internal(e.to_string()))?;
501
502        Ok(())
503    }
504
505    /// Call a function with i32 return value
506    pub async fn call_i32(&self, name: &str, args: &[Val]) -> WasmResult<i32> {
507        let start = Instant::now();
508        let result = self.call_internal(name, args).await;
509        let duration = start.elapsed();
510
511        let success = result.is_ok();
512        self.metrics
513            .write()
514            .await
515            .record_execution(duration.as_nanos() as u64, success);
516
517        match result {
518            Ok(vals) => {
519                if let Some(Val::I32(v)) = vals.first() {
520                    Ok(*v)
521                } else {
522                    Err(WasmError::TypeMismatch {
523                        expected: "i32".to_string(),
524                        actual: format!("{:?}", vals),
525                    })
526                }
527            }
528            Err(e) => Err(e),
529        }
530    }
531
532    /// Call a function with i64 return value
533    pub async fn call_i64(&self, name: &str, args: &[Val]) -> WasmResult<i64> {
534        let start = Instant::now();
535        let result = self.call_internal(name, args).await;
536        let duration = start.elapsed();
537
538        let success = result.is_ok();
539        self.metrics
540            .write()
541            .await
542            .record_execution(duration.as_nanos() as u64, success);
543
544        match result {
545            Ok(vals) => {
546                if let Some(Val::I64(v)) = vals.first() {
547                    Ok(*v)
548                } else {
549                    Err(WasmError::TypeMismatch {
550                        expected: "i64".to_string(),
551                        actual: format!("{:?}", vals),
552                    })
553                }
554            }
555            Err(e) => Err(e),
556        }
557    }
558
559    /// Call a void function
560    pub async fn call_void(&self, name: &str, args: &[Val]) -> WasmResult<()> {
561        let start = Instant::now();
562        let result = self.call_internal(name, args).await;
563        let duration = start.elapsed();
564
565        let success = result.is_ok();
566        self.metrics
567            .write()
568            .await
569            .record_execution(duration.as_nanos() as u64, success);
570
571        result.map(|_| ())
572    }
573
574    async fn call_internal(&self, name: &str, args: &[Val]) -> WasmResult<Vec<Val>> {
575        let state = self.state().await;
576        if state != WasmPluginState::Ready && state != WasmPluginState::Running {
577            return Err(WasmError::ExecutionError(format!(
578                "Plugin not ready, current state: {:?}",
579                state
580            )));
581        }
582
583        let mut instance_guard = self.instance.write().await;
584        let instance = instance_guard
585            .as_mut()
586            .ok_or_else(|| WasmError::ExecutionError("Instance not created".to_string()))?;
587
588        let func = instance
589            .instance
590            .get_func(&mut instance.store, name)
591            .ok_or_else(|| WasmError::ExportNotFound(name.to_string()))?;
592
593        let ty = func.ty(&instance.store);
594        let mut results = vec![Val::I32(0); ty.results().len()];
595
596        // Use async or sync call based on engine configuration
597        if self.async_support {
598            func.call_async(&mut instance.store, args, &mut results)
599                .await
600                .map_err(|e| WasmError::ExecutionError(e.to_string()))?;
601        } else {
602            func.call(&mut instance.store, args, &mut results)
603                .map_err(|e| WasmError::ExecutionError(e.to_string()))?;
604        }
605
606        Ok(results)
607    }
608
609    /// Stop the plugin
610    pub async fn stop(&self) -> WasmResult<()> {
611        let mut state = self.state.write().await;
612
613        // Call _cleanup if exported
614        if *state == WasmPluginState::Ready || *state == WasmPluginState::Running {
615            drop(state);
616            if self.has_export("_cleanup").await {
617                let _ = self.call_void("_cleanup", &[]).await;
618            }
619            state = self.state.write().await;
620        }
621
622        *state = WasmPluginState::Stopped;
623        *self.instance.write().await = None;
624
625        info!("Plugin {} stopped", self.id);
626        Ok(())
627    }
628
629    /// Create plugin from pre-existing parts (used by runtime)
630    pub fn from_parts(
631        id: String,
632        config: WasmPluginConfig,
633        manifest: PluginManifest,
634        module: Module,
635        engine: Engine,
636        host_context: Arc<HostContext>,
637    ) -> Self {
638        Self::from_parts_with_async(id, config, manifest, module, engine, host_context, true)
639    }
640
641    /// Create plugin from pre-existing parts with async support flag
642    pub fn from_parts_with_async(
643        id: String,
644        config: WasmPluginConfig,
645        manifest: PluginManifest,
646        module: Module,
647        engine: Engine,
648        host_context: Arc<HostContext>,
649        async_support: bool,
650    ) -> Self {
651        Self {
652            id,
653            config,
654            manifest,
655            state: RwLock::new(WasmPluginState::Created),
656            module,
657            engine,
658            host_context,
659            metrics: RwLock::new(PluginMetrics::default()),
660            instance: RwLock::new(None),
661            async_support,
662        }
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    fn create_test_engine() -> Engine {
671        let mut config = Config::new();
672        config.async_support(false);
673        Engine::new(&config).unwrap()
674    }
675
676    #[test]
677    fn test_plugin_config() {
678        let config = WasmPluginConfig::new("test-plugin")
679            .with_capability(PluginCapability::Storage)
680            .with_config("key", WasmValue::String("value".into()));
681
682        assert_eq!(config.id, "test-plugin");
683        assert!(
684            config
685                .allowed_capabilities
686                .contains(&PluginCapability::Storage)
687        );
688        assert!(config.initial_config.contains_key("key"));
689    }
690
691    #[test]
692    fn test_plugin_metrics() {
693        let mut metrics = PluginMetrics::default();
694
695        metrics.record_execution(1000, true);
696        assert_eq!(metrics.call_count, 1);
697        assert_eq!(metrics.success_count, 1);
698
699        metrics.record_execution(2000, false);
700        assert_eq!(metrics.call_count, 2);
701        assert_eq!(metrics.error_count, 1);
702        assert_eq!(metrics.avg_execution_time_ns, 1500);
703    }
704
705    #[test]
706    fn test_plugin_state_default() {
707        let state = WasmPluginState::default();
708        assert_eq!(state, WasmPluginState::Created);
709    }
710
711    #[tokio::test]
712    async fn test_wasm_plugin_from_wat() {
713        let engine = create_test_engine();
714
715        let wat = r#"
716            (module
717                (func (export "add") (param i32 i32) (result i32)
718                    local.get 0
719                    local.get 1
720                    i32.add
721                )
722                (func (export "double") (param i32) (result i32)
723                    local.get 0
724                    i32.const 2
725                    i32.mul
726                )
727            )
728        "#;
729
730        // Use config without fuel metering since test engine doesn't support it
731        let mut config = WasmPluginConfig::new("test-math");
732        config.resource_limits.max_fuel = None;
733
734        // Use non-async mode since test engine has async_support disabled
735        let plugin = WasmPlugin::from_wat_with_async(&engine, wat, config, false).unwrap();
736
737        assert_eq!(plugin.id(), "test-math");
738        assert_eq!(plugin.state().await, WasmPluginState::Created);
739
740        // Initialize
741        plugin.initialize().await.unwrap();
742        assert_eq!(plugin.state().await, WasmPluginState::Ready);
743
744        // Call functions
745        let result = plugin
746            .call_i32("add", &[Val::I32(3), Val::I32(4)])
747            .await
748            .unwrap();
749        assert_eq!(result, 7);
750
751        let result = plugin.call_i32("double", &[Val::I32(21)]).await.unwrap();
752        assert_eq!(result, 42);
753
754        // Check metrics
755        let metrics = plugin.metrics().await;
756        assert_eq!(metrics.call_count, 2);
757        assert_eq!(metrics.success_count, 2);
758
759        // Stop
760        plugin.stop().await.unwrap();
761        assert_eq!(plugin.state().await, WasmPluginState::Stopped);
762    }
763}