1use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::path::Path;
8use std::sync::Arc;
9use std::time::Instant;
10use tokio::sync::{RwLock, broadcast};
11use tracing::{error, info};
12
13use super::plugin::{PluginMetrics, WasmPlugin, WasmPluginConfig, WasmPluginState};
14use super::runtime::{RuntimeConfig, WasmRuntime};
15use super::types::{PluginCapability, PluginManifest, WasmError, WasmResult};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct PluginHandle(pub String);
20
21impl PluginHandle {
22 pub fn new(id: &str) -> Self {
23 Self(id.to_string())
24 }
25
26 pub fn id(&self) -> &str {
27 &self.0
28 }
29}
30
31impl From<&str> for PluginHandle {
32 fn from(s: &str) -> Self {
33 Self::new(s)
34 }
35}
36
37impl From<String> for PluginHandle {
38 fn from(s: String) -> Self {
39 Self(s)
40 }
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct LoadedPlugin {
46 pub id: String,
48 pub manifest: PluginManifest,
50 pub state: WasmPluginState,
52 pub loaded_at: u64,
54 pub last_activity: u64,
56 pub metrics: PluginMetrics,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PluginEvent {
63 Loaded {
65 plugin_id: String,
66 manifest: PluginManifest,
67 },
68 Initialized { plugin_id: String },
70 StateChanged {
72 plugin_id: String,
73 old_state: WasmPluginState,
74 new_state: WasmPluginState,
75 },
76 Executed {
78 plugin_id: String,
79 function: String,
80 duration_ms: u64,
81 success: bool,
82 },
83 Unloaded { plugin_id: String },
85 Error { plugin_id: String, error: String },
87}
88
89pub struct PluginRegistry {
91 plugins: RwLock<HashMap<String, PluginInfo>>,
93 by_capability: RwLock<HashMap<PluginCapability, Vec<String>>>,
95}
96
97struct PluginInfo {
99 manifest: PluginManifest,
100 source_hash: String,
101 registered_at: u64,
102}
103
104impl PluginRegistry {
105 pub fn new() -> Self {
106 Self {
107 plugins: RwLock::new(HashMap::new()),
108 by_capability: RwLock::new(HashMap::new()),
109 }
110 }
111
112 pub async fn register(&self, plugin_id: &str, manifest: PluginManifest, source_hash: &str) {
114 let info = PluginInfo {
115 manifest: manifest.clone(),
116 source_hash: source_hash.to_string(),
117 registered_at: now_secs(),
118 };
119
120 self.plugins
122 .write()
123 .await
124 .insert(plugin_id.to_string(), info);
125
126 let mut by_cap = self.by_capability.write().await;
128 for cap in &manifest.capabilities {
129 by_cap
130 .entry(cap.clone())
131 .or_insert_with(Vec::new)
132 .push(plugin_id.to_string());
133 }
134 }
135
136 pub async fn unregister(&self, plugin_id: &str) {
138 if let Some(info) = self.plugins.write().await.remove(plugin_id) {
139 let mut by_cap = self.by_capability.write().await;
141 for cap in &info.manifest.capabilities {
142 if let Some(ids) = by_cap.get_mut(cap) {
143 ids.retain(|id| id != plugin_id);
144 }
145 }
146 }
147 }
148
149 pub async fn with_capability(&self, cap: &PluginCapability) -> Vec<String> {
151 self.by_capability
152 .read()
153 .await
154 .get(cap)
155 .cloned()
156 .unwrap_or_default()
157 }
158
159 pub async fn is_registered(&self, plugin_id: &str) -> bool {
161 self.plugins.read().await.contains_key(plugin_id)
162 }
163
164 pub async fn get_manifest(&self, plugin_id: &str) -> Option<PluginManifest> {
166 self.plugins
167 .read()
168 .await
169 .get(plugin_id)
170 .map(|info| info.manifest.clone())
171 }
172
173 pub async fn list(&self) -> Vec<String> {
175 self.plugins.read().await.keys().cloned().collect()
176 }
177}
178
179impl Default for PluginRegistry {
180 fn default() -> Self {
181 Self::new()
182 }
183}
184
185pub struct WasmPluginManager {
187 runtime: Arc<WasmRuntime>,
189 plugins: RwLock<HashMap<String, Arc<WasmPlugin>>>,
191 registry: PluginRegistry,
193 event_tx: broadcast::Sender<PluginEvent>,
195 default_config: WasmPluginConfig,
197 stats: RwLock<ManagerStats>,
199}
200
201#[derive(Debug, Clone, Default, Serialize, Deserialize)]
203pub struct ManagerStats {
204 pub total_loaded: u64,
206 pub total_unloaded: u64,
208 pub active_plugins: usize,
210 pub total_calls: u64,
212 pub failed_calls: u64,
214 pub total_execution_time_ms: u64,
216}
217
218impl WasmPluginManager {
219 pub fn new(runtime: Arc<WasmRuntime>) -> Self {
221 let (event_tx, _) = broadcast::channel(1024);
222
223 Self {
224 runtime,
225 plugins: RwLock::new(HashMap::new()),
226 registry: PluginRegistry::new(),
227 event_tx,
228 default_config: WasmPluginConfig::default(),
229 stats: RwLock::new(ManagerStats::default()),
230 }
231 }
232
233 pub fn with_runtime_config(config: RuntimeConfig) -> WasmResult<Self> {
235 let runtime = Arc::new(WasmRuntime::new(config)?);
236 Ok(Self::new(runtime))
237 }
238
239 pub fn runtime(&self) -> &Arc<WasmRuntime> {
241 &self.runtime
242 }
243
244 pub fn subscribe(&self) -> broadcast::Receiver<PluginEvent> {
246 self.event_tx.subscribe()
247 }
248
249 pub fn set_default_config(&mut self, config: WasmPluginConfig) {
251 self.default_config = config;
252 }
253
254 pub async fn load_bytes(
256 &self,
257 bytes: &[u8],
258 config: Option<WasmPluginConfig>,
259 ) -> WasmResult<PluginHandle> {
260 let config = config.unwrap_or_else(|| self.default_config.clone());
261 let plugin_id = config.id.clone();
262
263 if self.plugins.read().await.contains_key(&plugin_id) {
265 return Err(WasmError::PluginAlreadyLoaded(plugin_id));
266 }
267
268 let plugin = self.runtime.create_plugin_from_bytes(bytes, config).await?;
270 let manifest = plugin.manifest().clone();
271
272 let plugin = Arc::new(plugin);
274 self.plugins
275 .write()
276 .await
277 .insert(plugin_id.clone(), plugin.clone());
278
279 self.registry
281 .register(&plugin_id, manifest.clone(), "")
282 .await;
283
284 {
286 let mut stats = self.stats.write().await;
287 stats.total_loaded += 1;
288 stats.active_plugins = self.plugins.read().await.len();
289 }
290
291 let _ = self.event_tx.send(PluginEvent::Loaded {
293 plugin_id: plugin_id.clone(),
294 manifest,
295 });
296
297 info!("Loaded plugin: {}", plugin_id);
298 Ok(PluginHandle::new(&plugin_id))
299 }
300
301 pub async fn load_wat(
303 &self,
304 wat: &str,
305 config: Option<WasmPluginConfig>,
306 ) -> WasmResult<PluginHandle> {
307 let bytes = wat.to_string().into_bytes();
308 self.load_bytes(&bytes, config).await
309 }
310
311 pub async fn load_file(
313 &self,
314 path: &Path,
315 config: Option<WasmPluginConfig>,
316 ) -> WasmResult<PluginHandle> {
317 let bytes = tokio::fs::read(path).await?;
318 self.load_bytes(&bytes, config).await
319 }
320
321 pub async fn initialize(&self, handle: &PluginHandle) -> WasmResult<()> {
323 let plugin = self.get_plugin(handle).await?;
324
325 let old_state = plugin.state().await;
326 plugin.initialize().await?;
327 let new_state = plugin.state().await;
328
329 let _ = self.event_tx.send(PluginEvent::StateChanged {
330 plugin_id: handle.id().to_string(),
331 old_state,
332 new_state,
333 });
334
335 let _ = self.event_tx.send(PluginEvent::Initialized {
336 plugin_id: handle.id().to_string(),
337 });
338
339 Ok(())
340 }
341
342 pub async fn unload(&self, handle: &PluginHandle) -> WasmResult<()> {
344 let plugin_id = handle.id();
345
346 if let Some(plugin) = self.plugins.write().await.remove(plugin_id) {
348 plugin.stop().await?;
349 }
350
351 self.registry.unregister(plugin_id).await;
353
354 {
356 let mut stats = self.stats.write().await;
357 stats.total_unloaded += 1;
358 stats.active_plugins = self.plugins.read().await.len();
359 }
360
361 let _ = self.event_tx.send(PluginEvent::Unloaded {
362 plugin_id: plugin_id.to_string(),
363 });
364
365 info!("Unloaded plugin: {}", plugin_id);
366 Ok(())
367 }
368
369 pub async fn get_plugin(&self, handle: &PluginHandle) -> WasmResult<Arc<WasmPlugin>> {
371 self.plugins
372 .read()
373 .await
374 .get(handle.id())
375 .cloned()
376 .ok_or_else(|| WasmError::PluginNotFound(handle.id().to_string()))
377 }
378
379 pub async fn call_i32(
381 &self,
382 handle: &PluginHandle,
383 function: &str,
384 args: &[wasmtime::Val],
385 ) -> WasmResult<i32> {
386 let plugin = self.get_plugin(handle).await?;
387 let start = Instant::now();
388
389 let result = plugin.call_i32(function, args).await;
390 let duration = start.elapsed();
391
392 {
394 let mut stats = self.stats.write().await;
395 stats.total_calls += 1;
396 stats.total_execution_time_ms += duration.as_millis() as u64;
397 if result.is_err() {
398 stats.failed_calls += 1;
399 }
400 }
401
402 let _ = self.event_tx.send(PluginEvent::Executed {
404 plugin_id: handle.id().to_string(),
405 function: function.to_string(),
406 duration_ms: duration.as_millis() as u64,
407 success: result.is_ok(),
408 });
409
410 result
411 }
412
413 pub async fn call_void(
415 &self,
416 handle: &PluginHandle,
417 function: &str,
418 args: &[wasmtime::Val],
419 ) -> WasmResult<()> {
420 let plugin = self.get_plugin(handle).await?;
421 let start = Instant::now();
422
423 let result = plugin.call_void(function, args).await;
424 let duration = start.elapsed();
425
426 {
428 let mut stats = self.stats.write().await;
429 stats.total_calls += 1;
430 stats.total_execution_time_ms += duration.as_millis() as u64;
431 if result.is_err() {
432 stats.failed_calls += 1;
433 }
434 }
435
436 let _ = self.event_tx.send(PluginEvent::Executed {
438 plugin_id: handle.id().to_string(),
439 function: function.to_string(),
440 duration_ms: duration.as_millis() as u64,
441 success: result.is_ok(),
442 });
443
444 result
445 }
446
447 pub async fn get_state(&self, handle: &PluginHandle) -> WasmResult<WasmPluginState> {
449 let plugin = self.get_plugin(handle).await?;
450 Ok(plugin.state().await)
451 }
452
453 pub async fn get_metrics(&self, handle: &PluginHandle) -> WasmResult<PluginMetrics> {
455 let plugin = self.get_plugin(handle).await?;
456 Ok(plugin.metrics().await)
457 }
458
459 pub async fn get_info(&self, handle: &PluginHandle) -> WasmResult<LoadedPlugin> {
461 let plugin = self.get_plugin(handle).await?;
462
463 Ok(LoadedPlugin {
464 id: plugin.id().to_string(),
465 manifest: plugin.manifest().clone(),
466 state: plugin.state().await,
467 loaded_at: now_secs(),
468 last_activity: now_secs(),
469 metrics: plugin.metrics().await,
470 })
471 }
472
473 pub async fn list_plugins(&self) -> Vec<PluginHandle> {
475 self.plugins
476 .read()
477 .await
478 .keys()
479 .map(|id| PluginHandle::new(id))
480 .collect()
481 }
482
483 pub async fn plugins_with_capability(&self, cap: &PluginCapability) -> Vec<PluginHandle> {
485 self.registry
486 .with_capability(cap)
487 .await
488 .into_iter()
489 .map(PluginHandle)
490 .collect()
491 }
492
493 pub async fn stats(&self) -> ManagerStats {
495 let mut stats = self.stats.read().await.clone();
496 stats.active_plugins = self.plugins.read().await.len();
497 stats
498 }
499
500 pub async fn unload_all(&self) -> WasmResult<()> {
502 let handles: Vec<_> = self.list_plugins().await;
503 for handle in handles {
504 if let Err(e) = self.unload(&handle).await {
505 error!("Failed to unload plugin {}: {}", handle.id(), e);
506 }
507 }
508 Ok(())
509 }
510}
511
512fn now_secs() -> u64 {
513 std::time::SystemTime::now()
514 .duration_since(std::time::UNIX_EPOCH)
515 .unwrap_or_default()
516 .as_secs()
517}
518
519#[cfg(test)]
520mod tests {
521 use super::super::types::ExecutionConfig;
522 use super::*;
523
524 fn create_test_runtime() -> WasmResult<WasmRuntime> {
526 let mut config = RuntimeConfig::default();
527 config.execution_config = ExecutionConfig {
528 async_support: false,
529 fuel_metering: false,
530 epoch_interruption: false,
531 ..ExecutionConfig::default()
532 };
533 WasmRuntime::new(config)
534 }
535
536 #[test]
537 fn test_plugin_handle() {
538 let handle = PluginHandle::new("test-plugin");
539 assert_eq!(handle.id(), "test-plugin");
540
541 let handle2: PluginHandle = "another".into();
542 assert_eq!(handle2.id(), "another");
543 }
544
545 #[tokio::test]
546 async fn test_plugin_registry() {
547 let registry = PluginRegistry::new();
548
549 let manifest = PluginManifest::new("test", "1.0.0")
550 .with_capability(PluginCapability::ReadConfig)
551 .with_capability(PluginCapability::SendMessage);
552
553 registry.register("test", manifest, "hash123").await;
554
555 assert!(registry.is_registered("test").await);
556 assert!(!registry.is_registered("other").await);
557
558 let with_read = registry
559 .with_capability(&PluginCapability::ReadConfig)
560 .await;
561 assert!(with_read.contains(&"test".to_string()));
562
563 registry.unregister("test").await;
564 assert!(!registry.is_registered("test").await);
565 }
566
567 #[tokio::test]
568 async fn test_plugin_manager_creation() {
569 let runtime = Arc::new(create_test_runtime().unwrap());
570 let manager = WasmPluginManager::new(runtime);
571
572 let stats = manager.stats().await;
573 assert_eq!(stats.active_plugins, 0);
574 assert_eq!(stats.total_loaded, 0);
575 }
576
577 #[tokio::test]
578 async fn test_plugin_manager_load_wat() {
579 let runtime = Arc::new(create_test_runtime().unwrap());
580 let manager = WasmPluginManager::new(runtime);
581
582 let wat = r#"
583 (module
584 (func (export "greet") (result i32)
585 i32.const 42
586 )
587 )
588 "#;
589
590 let mut config = WasmPluginConfig::new("greet-plugin");
591 config.resource_limits.max_fuel = None;
592 let handle = manager.load_wat(wat, Some(config)).await.unwrap();
593
594 assert_eq!(handle.id(), "greet-plugin");
595
596 let plugins = manager.list_plugins().await;
597 assert_eq!(plugins.len(), 1);
598
599 manager.initialize(&handle).await.unwrap();
601
602 let result = manager.call_i32(&handle, "greet", &[]).await.unwrap();
604 assert_eq!(result, 42);
605
606 let stats = manager.stats().await;
608 assert_eq!(stats.total_calls, 1);
609 assert_eq!(stats.active_plugins, 1);
610
611 manager.unload(&handle).await.unwrap();
613 assert_eq!(manager.list_plugins().await.len(), 0);
614 }
615
616 #[tokio::test]
617 async fn test_plugin_manager_events() {
618 let runtime = Arc::new(create_test_runtime().unwrap());
619 let manager = WasmPluginManager::new(runtime);
620
621 let mut rx = manager.subscribe();
622
623 let wat = r#"(module (func (export "test")))"#;
624 let mut config = WasmPluginConfig::new("event-test");
625 config.resource_limits.max_fuel = None;
626
627 let handle = manager.load_wat(wat, Some(config)).await.unwrap();
628
629 if let Ok(event) = rx.try_recv() {
631 match event {
632 PluginEvent::Loaded { plugin_id, .. } => {
633 assert_eq!(plugin_id, "event-test");
634 }
635 _ => panic!("Expected Loaded event"),
636 }
637 }
638
639 manager.unload(&handle).await.unwrap();
640 }
641}