Skip to main content

mofa_plugins/wasm_runtime/
manager.rs

1//! WASM Plugin Manager
2//!
3//! Manages multiple WASM plugins with lifecycle management
4
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::sync::{RwLock, broadcast};
11use tracing::{error, info};
12
13use super::plugin::{PluginMetrics, WasmPlugin, WasmPluginConfig, WasmPluginState};
14use super::runtime::{RuntimeConfig, WasmRuntime};
15use super::types::{PluginCapability, PluginManifest, WasmError, WasmResult};
16
17/// Plugin handle for external reference
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct PluginHandle(pub String);
20
21impl PluginHandle {
22    pub fn new(id: &str) -> Self {
23        Self(id.to_string())
24    }
25
26    pub fn id(&self) -> &str {
27        &self.0
28    }
29}
30
31impl From<&str> for PluginHandle {
32    fn from(s: &str) -> Self {
33        Self::new(s)
34    }
35}
36
37impl From<String> for PluginHandle {
38    fn from(s: String) -> Self {
39        Self(s)
40    }
41}
42
43/// Loaded plugin information
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct LoadedPlugin {
46    /// Plugin ID
47    pub id: String,
48    /// Plugin manifest
49    pub manifest: PluginManifest,
50    /// Current state
51    pub state: WasmPluginState,
52    /// Load timestamp
53    pub loaded_at: u64,
54    /// Last activity timestamp
55    pub last_activity: u64,
56    /// Execution metrics
57    pub metrics: PluginMetrics,
58}
59
60/// Plugin event
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PluginEvent {
63    /// Plugin loaded
64    Loaded {
65        plugin_id: String,
66        manifest: PluginManifest,
67    },
68    /// Plugin initialized
69    Initialized { plugin_id: String },
70    /// Plugin state changed
71    StateChanged {
72        plugin_id: String,
73        old_state: WasmPluginState,
74        new_state: WasmPluginState,
75    },
76    /// Plugin executed function
77    Executed {
78        plugin_id: String,
79        function: String,
80        duration_ms: u64,
81        success: bool,
82    },
83    /// Plugin unloaded
84    Unloaded { plugin_id: String },
85    /// Plugin error
86    Error { plugin_id: String, error: String },
87}
88
89/// Plugin registry for tracking loaded plugins
90pub struct PluginRegistry {
91    /// Registered plugins by ID
92    plugins: RwLock<HashMap<String, PluginInfo>>,
93    /// Plugins by capability
94    by_capability: RwLock<HashMap<PluginCapability, Vec<String>>>,
95}
96
97/// Plugin info stored in registry
98struct PluginInfo {
99    manifest: PluginManifest,
100    source_hash: String,
101    registered_at: u64,
102}
103
104impl PluginRegistry {
105    pub fn new() -> Self {
106        Self {
107            plugins: RwLock::new(HashMap::new()),
108            by_capability: RwLock::new(HashMap::new()),
109        }
110    }
111
112    /// Register a plugin
113    pub async fn register(&self, plugin_id: &str, manifest: PluginManifest, source_hash: &str) {
114        let info = PluginInfo {
115            manifest: manifest.clone(),
116            source_hash: source_hash.to_string(),
117            registered_at: now_secs(),
118        };
119
120        // Register in main map
121        self.plugins
122            .write()
123            .await
124            .insert(plugin_id.to_string(), info);
125
126        // Index by capability
127        let mut by_cap = self.by_capability.write().await;
128        for cap in &manifest.capabilities {
129            by_cap
130                .entry(cap.clone())
131                .or_insert_with(Vec::new)
132                .push(plugin_id.to_string());
133        }
134    }
135
136    /// Unregister a plugin
137    pub async fn unregister(&self, plugin_id: &str) {
138        if let Some(info) = self.plugins.write().await.remove(plugin_id) {
139            // Remove from capability index
140            let mut by_cap = self.by_capability.write().await;
141            for cap in &info.manifest.capabilities {
142                if let Some(ids) = by_cap.get_mut(cap) {
143                    ids.retain(|id| id != plugin_id);
144                }
145            }
146        }
147    }
148
149    /// Get plugins with a specific capability
150    pub async fn with_capability(&self, cap: &PluginCapability) -> Vec<String> {
151        self.by_capability
152            .read()
153            .await
154            .get(cap)
155            .cloned()
156            .unwrap_or_default()
157    }
158
159    /// Check if plugin is registered
160    pub async fn is_registered(&self, plugin_id: &str) -> bool {
161        self.plugins.read().await.contains_key(plugin_id)
162    }
163
164    /// Get plugin manifest
165    pub async fn get_manifest(&self, plugin_id: &str) -> Option<PluginManifest> {
166        self.plugins
167            .read()
168            .await
169            .get(plugin_id)
170            .map(|info| info.manifest.clone())
171    }
172
173    /// List all registered plugins
174    pub async fn list(&self) -> Vec<String> {
175        self.plugins.read().await.keys().cloned().collect()
176    }
177}
178
179impl Default for PluginRegistry {
180    fn default() -> Self {
181        Self::new()
182    }
183}
184
185/// WASM Plugin Manager
186pub struct WasmPluginManager {
187    /// WASM runtime
188    runtime: Arc<WasmRuntime>,
189    /// Loaded plugins
190    plugins: RwLock<HashMap<String, Arc<WasmPlugin>>>,
191    /// Plugin registry
192    registry: PluginRegistry,
193    /// Event broadcaster
194    event_tx: broadcast::Sender<PluginEvent>,
195    /// Default plugin config
196    default_config: WasmPluginConfig,
197    /// Manager statistics
198    stats: RwLock<ManagerStats>,
199}
200
201/// Manager statistics
202#[derive(Debug, Clone, Default, Serialize, Deserialize)]
203pub struct ManagerStats {
204    /// Total plugins loaded
205    pub total_loaded: u64,
206    /// Total plugins unloaded
207    pub total_unloaded: u64,
208    /// Currently active plugins
209    pub active_plugins: usize,
210    /// Total function calls
211    pub total_calls: u64,
212    /// Failed function calls
213    pub failed_calls: u64,
214    /// Total execution time in milliseconds
215    pub total_execution_time_ms: u64,
216}
217
218impl WasmPluginManager {
219    /// Create a new plugin manager
220    pub fn new(runtime: Arc<WasmRuntime>) -> Self {
221        let (event_tx, _) = broadcast::channel(1024);
222
223        Self {
224            runtime,
225            plugins: RwLock::new(HashMap::new()),
226            registry: PluginRegistry::new(),
227            event_tx,
228            default_config: WasmPluginConfig::default(),
229            stats: RwLock::new(ManagerStats::default()),
230        }
231    }
232
233    /// Create with custom runtime config
234    pub fn with_runtime_config(config: RuntimeConfig) -> WasmResult<Self> {
235        let runtime = Arc::new(WasmRuntime::new(config)?);
236        Ok(Self::new(runtime))
237    }
238
239    /// Get the runtime
240    pub fn runtime(&self) -> &Arc<WasmRuntime> {
241        &self.runtime
242    }
243
244    /// Subscribe to plugin events
245    pub fn subscribe(&self) -> broadcast::Receiver<PluginEvent> {
246        self.event_tx.subscribe()
247    }
248
249    /// Set default plugin configuration
250    pub fn set_default_config(&mut self, config: WasmPluginConfig) {
251        self.default_config = config;
252    }
253
254    /// Load a plugin from bytes
255    pub async fn load_bytes(
256        &self,
257        bytes: &[u8],
258        config: Option<WasmPluginConfig>,
259    ) -> WasmResult<PluginHandle> {
260        let config = config.unwrap_or_else(|| self.default_config.clone());
261        let plugin_id = config.id.clone();
262
263        // Check if already loaded
264        if self.plugins.read().await.contains_key(&plugin_id) {
265            return Err(WasmError::PluginAlreadyLoaded(plugin_id));
266        }
267
268        // Create plugin via runtime
269        let plugin = self.runtime.create_plugin_from_bytes(bytes, config).await?;
270        let manifest = plugin.manifest().clone();
271
272        // Store plugin
273        let plugin = Arc::new(plugin);
274        self.plugins
275            .write()
276            .await
277            .insert(plugin_id.clone(), plugin.clone());
278
279        // Register in registry
280        self.registry
281            .register(&plugin_id, manifest.clone(), "")
282            .await;
283
284        // Update stats
285        {
286            let mut stats = self.stats.write().await;
287            stats.total_loaded += 1;
288            stats.active_plugins = self.plugins.read().await.len();
289        }
290
291        // Emit event
292        let _ = self.event_tx.send(PluginEvent::Loaded {
293            plugin_id: plugin_id.clone(),
294            manifest,
295        });
296
297        info!("Loaded plugin: {}", plugin_id);
298        Ok(PluginHandle::new(&plugin_id))
299    }
300
301    /// Load a plugin from WAT
302    pub async fn load_wat(
303        &self,
304        wat: &str,
305        config: Option<WasmPluginConfig>,
306    ) -> WasmResult<PluginHandle> {
307        let bytes = wat.to_string().into_bytes();
308        self.load_bytes(&bytes, config).await
309    }
310
311    /// Load a plugin from file
312    pub async fn load_file(
313        &self,
314        path: &Path,
315        config: Option<WasmPluginConfig>,
316    ) -> WasmResult<PluginHandle> {
317        let bytes = tokio::fs::read(path).await?;
318        self.load_bytes(&bytes, config).await
319    }
320
321    /// Initialize a plugin
322    pub async fn initialize(&self, handle: &PluginHandle) -> WasmResult<()> {
323        let plugin = self.get_plugin(handle).await?;
324
325        let old_state = plugin.state().await;
326        plugin.initialize().await?;
327        let new_state = plugin.state().await;
328
329        let _ = self.event_tx.send(PluginEvent::StateChanged {
330            plugin_id: handle.id().to_string(),
331            old_state,
332            new_state,
333        });
334
335        let _ = self.event_tx.send(PluginEvent::Initialized {
336            plugin_id: handle.id().to_string(),
337        });
338
339        Ok(())
340    }
341
342    /// Unload a plugin
343    pub async fn unload(&self, handle: &PluginHandle) -> WasmResult<()> {
344        let plugin_id = handle.id();
345
346        // Get and stop plugin
347        if let Some(plugin) = self.plugins.write().await.remove(plugin_id) {
348            plugin.stop().await?;
349        }
350
351        // Unregister
352        self.registry.unregister(plugin_id).await;
353
354        // Update stats
355        {
356            let mut stats = self.stats.write().await;
357            stats.total_unloaded += 1;
358            stats.active_plugins = self.plugins.read().await.len();
359        }
360
361        let _ = self.event_tx.send(PluginEvent::Unloaded {
362            plugin_id: plugin_id.to_string(),
363        });
364
365        info!("Unloaded plugin: {}", plugin_id);
366        Ok(())
367    }
368
369    /// Get a plugin by handle
370    pub async fn get_plugin(&self, handle: &PluginHandle) -> WasmResult<Arc<WasmPlugin>> {
371        self.plugins
372            .read()
373            .await
374            .get(handle.id())
375            .cloned()
376            .ok_or_else(|| WasmError::PluginNotFound(handle.id().to_string()))
377    }
378
379    /// Call a function on a plugin
380    pub async fn call_i32(
381        &self,
382        handle: &PluginHandle,
383        function: &str,
384        args: &[wasmtime::Val],
385    ) -> WasmResult<i32> {
386        let plugin = self.get_plugin(handle).await?;
387        let start = Instant::now();
388
389        let result = plugin.call_i32(function, args).await;
390        let duration = start.elapsed();
391
392        // Update stats
393        {
394            let mut stats = self.stats.write().await;
395            stats.total_calls += 1;
396            stats.total_execution_time_ms += duration.as_millis() as u64;
397            if result.is_err() {
398                stats.failed_calls += 1;
399            }
400        }
401
402        // Emit event
403        let _ = self.event_tx.send(PluginEvent::Executed {
404            plugin_id: handle.id().to_string(),
405            function: function.to_string(),
406            duration_ms: duration.as_millis() as u64,
407            success: result.is_ok(),
408        });
409
410        result
411    }
412
413    /// Call a void function on a plugin
414    pub async fn call_void(
415        &self,
416        handle: &PluginHandle,
417        function: &str,
418        args: &[wasmtime::Val],
419    ) -> WasmResult<()> {
420        let plugin = self.get_plugin(handle).await?;
421        let start = Instant::now();
422
423        let result = plugin.call_void(function, args).await;
424        let duration = start.elapsed();
425
426        // Update stats
427        {
428            let mut stats = self.stats.write().await;
429            stats.total_calls += 1;
430            stats.total_execution_time_ms += duration.as_millis() as u64;
431            if result.is_err() {
432                stats.failed_calls += 1;
433            }
434        }
435
436        // Emit event
437        let _ = self.event_tx.send(PluginEvent::Executed {
438            plugin_id: handle.id().to_string(),
439            function: function.to_string(),
440            duration_ms: duration.as_millis() as u64,
441            success: result.is_ok(),
442        });
443
444        result
445    }
446
447    /// Get plugin state
448    pub async fn get_state(&self, handle: &PluginHandle) -> WasmResult<WasmPluginState> {
449        let plugin = self.get_plugin(handle).await?;
450        Ok(plugin.state().await)
451    }
452
453    /// Get plugin metrics
454    pub async fn get_metrics(&self, handle: &PluginHandle) -> WasmResult<PluginMetrics> {
455        let plugin = self.get_plugin(handle).await?;
456        Ok(plugin.metrics().await)
457    }
458
459    /// Get plugin info
460    pub async fn get_info(&self, handle: &PluginHandle) -> WasmResult<LoadedPlugin> {
461        let plugin = self.get_plugin(handle).await?;
462
463        Ok(LoadedPlugin {
464            id: plugin.id().to_string(),
465            manifest: plugin.manifest().clone(),
466            state: plugin.state().await,
467            loaded_at: now_secs(),
468            last_activity: now_secs(),
469            metrics: plugin.metrics().await,
470        })
471    }
472
473    /// List all loaded plugins
474    pub async fn list_plugins(&self) -> Vec<PluginHandle> {
475        self.plugins
476            .read()
477            .await
478            .keys()
479            .map(|id| PluginHandle::new(id))
480            .collect()
481    }
482
483    /// Get plugins with specific capability
484    pub async fn plugins_with_capability(&self, cap: &PluginCapability) -> Vec<PluginHandle> {
485        self.registry
486            .with_capability(cap)
487            .await
488            .into_iter()
489            .map(PluginHandle)
490            .collect()
491    }
492
493    /// Get manager statistics
494    pub async fn stats(&self) -> ManagerStats {
495        let mut stats = self.stats.read().await.clone();
496        stats.active_plugins = self.plugins.read().await.len();
497        stats
498    }
499
500    /// Unload all plugins
501    pub async fn unload_all(&self) -> WasmResult<()> {
502        let handles: Vec<_> = self.list_plugins().await;
503        for handle in handles {
504            if let Err(e) = self.unload(&handle).await {
505                error!("Failed to unload plugin {}: {}", handle.id(), e);
506            }
507        }
508        Ok(())
509    }
510}
511
512fn now_secs() -> u64 {
513    std::time::SystemTime::now()
514        .duration_since(std::time::UNIX_EPOCH)
515        .unwrap_or_default()
516        .as_secs()
517}
518
519#[cfg(test)]
520mod tests {
521    use super::super::types::ExecutionConfig;
522    use super::*;
523
524    /// Create a test runtime without async support for synchronous tests
525    fn create_test_runtime() -> WasmResult<WasmRuntime> {
526        let mut config = RuntimeConfig::default();
527        config.execution_config = ExecutionConfig {
528            async_support: false,
529            fuel_metering: false,
530            epoch_interruption: false,
531            ..ExecutionConfig::default()
532        };
533        WasmRuntime::new(config)
534    }
535
536    #[test]
537    fn test_plugin_handle() {
538        let handle = PluginHandle::new("test-plugin");
539        assert_eq!(handle.id(), "test-plugin");
540
541        let handle2: PluginHandle = "another".into();
542        assert_eq!(handle2.id(), "another");
543    }
544
545    #[tokio::test]
546    async fn test_plugin_registry() {
547        let registry = PluginRegistry::new();
548
549        let manifest = PluginManifest::new("test", "1.0.0")
550            .with_capability(PluginCapability::ReadConfig)
551            .with_capability(PluginCapability::SendMessage);
552
553        registry.register("test", manifest, "hash123").await;
554
555        assert!(registry.is_registered("test").await);
556        assert!(!registry.is_registered("other").await);
557
558        let with_read = registry
559            .with_capability(&PluginCapability::ReadConfig)
560            .await;
561        assert!(with_read.contains(&"test".to_string()));
562
563        registry.unregister("test").await;
564        assert!(!registry.is_registered("test").await);
565    }
566
567    #[tokio::test]
568    async fn test_plugin_manager_creation() {
569        let runtime = Arc::new(create_test_runtime().unwrap());
570        let manager = WasmPluginManager::new(runtime);
571
572        let stats = manager.stats().await;
573        assert_eq!(stats.active_plugins, 0);
574        assert_eq!(stats.total_loaded, 0);
575    }
576
577    #[tokio::test]
578    async fn test_plugin_manager_load_wat() {
579        let runtime = Arc::new(create_test_runtime().unwrap());
580        let manager = WasmPluginManager::new(runtime);
581
582        let wat = r#"
583            (module
584                (func (export "greet") (result i32)
585                    i32.const 42
586                )
587            )
588        "#;
589
590        let mut config = WasmPluginConfig::new("greet-plugin");
591        config.resource_limits.max_fuel = None;
592        let handle = manager.load_wat(wat, Some(config)).await.unwrap();
593
594        assert_eq!(handle.id(), "greet-plugin");
595
596        let plugins = manager.list_plugins().await;
597        assert_eq!(plugins.len(), 1);
598
599        // Initialize
600        manager.initialize(&handle).await.unwrap();
601
602        // Call function
603        let result = manager.call_i32(&handle, "greet", &[]).await.unwrap();
604        assert_eq!(result, 42);
605
606        // Check stats
607        let stats = manager.stats().await;
608        assert_eq!(stats.total_calls, 1);
609        assert_eq!(stats.active_plugins, 1);
610
611        // Unload
612        manager.unload(&handle).await.unwrap();
613        assert_eq!(manager.list_plugins().await.len(), 0);
614    }
615
616    #[tokio::test]
617    async fn test_plugin_manager_events() {
618        let runtime = Arc::new(create_test_runtime().unwrap());
619        let manager = WasmPluginManager::new(runtime);
620
621        let mut rx = manager.subscribe();
622
623        let wat = r#"(module (func (export "test")))"#;
624        let mut config = WasmPluginConfig::new("event-test");
625        config.resource_limits.max_fuel = None;
626
627        let handle = manager.load_wat(wat, Some(config)).await.unwrap();
628
629        // Should receive Loaded event
630        if let Ok(event) = rx.try_recv() {
631            match event {
632                PluginEvent::Loaded { plugin_id, .. } => {
633                    assert_eq!(plugin_id, "event-test");
634                }
635                _ => panic!("Expected Loaded event"),
636            }
637        }
638
639        manager.unload(&handle).await.unwrap();
640    }
641}