1use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Instant;
9use tokio::sync::RwLock;
10use tracing::{debug, error, info};
11use wasmtime::*;
12
13use super::host::{DefaultHostFunctions, HostContext};
14use super::types::{
15 ExecutionConfig, PluginCapability, PluginManifest, ResourceLimits, WasmError, WasmResult,
16 WasmValue,
17};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
21pub enum WasmPluginState {
22 #[default]
24 Created,
25 Initializing,
27 Ready,
29 Running,
31 Paused,
33 Error,
35 Stopped,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct WasmPluginConfig {
42 pub id: String,
44 pub resource_limits: ResourceLimits,
46 pub execution_config: ExecutionConfig,
48 pub allowed_capabilities: Vec<PluginCapability>,
50 pub initial_config: HashMap<String, WasmValue>,
52 pub enable_caching: bool,
54}
55
56impl Default for WasmPluginConfig {
57 fn default() -> Self {
58 Self {
59 id: uuid::Uuid::now_v7().to_string(),
60 resource_limits: ResourceLimits::default(),
61 execution_config: ExecutionConfig::default(),
62 allowed_capabilities: vec![PluginCapability::ReadConfig, PluginCapability::SendMessage],
63 initial_config: HashMap::new(),
64 enable_caching: true,
65 }
66 }
67}
68
69impl WasmPluginConfig {
70 pub fn new(id: &str) -> Self {
71 Self {
72 id: id.to_string(),
73 ..Default::default()
74 }
75 }
76
77 pub fn with_capability(mut self, cap: PluginCapability) -> Self {
78 if !self.allowed_capabilities.contains(&cap) {
79 self.allowed_capabilities.push(cap);
80 }
81 self
82 }
83
84 pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
85 self.resource_limits = limits;
86 self
87 }
88
89 pub fn with_config(mut self, key: &str, value: WasmValue) -> Self {
90 self.initial_config.insert(key.to_string(), value);
91 self
92 }
93}
94
95#[derive(Debug, Clone, Default, Serialize, Deserialize)]
97pub struct PluginMetrics {
98 pub call_count: u64,
100 pub success_count: u64,
102 pub error_count: u64,
104 pub total_execution_time_ns: u64,
106 pub avg_execution_time_ns: u64,
108 pub peak_memory_bytes: u64,
110 pub current_memory_bytes: u64,
112 pub fuel_consumed: u64,
114 pub last_execution: u64,
116}
117
118impl PluginMetrics {
119 pub fn record_execution(&mut self, duration_ns: u64, success: bool) {
120 self.call_count += 1;
121 if success {
122 self.success_count += 1;
123 } else {
124 self.error_count += 1;
125 }
126 self.total_execution_time_ns += duration_ns;
127 self.avg_execution_time_ns = self.total_execution_time_ns / self.call_count;
128 self.last_execution = std::time::SystemTime::now()
129 .duration_since(std::time::UNIX_EPOCH)
130 .unwrap_or_default()
131 .as_secs();
132 }
133}
134
135pub struct PluginInstance {
137 store: Store<PluginState>,
139 instance: Instance,
141 manifest: PluginManifest,
143}
144
145pub struct PluginState {
147 pub host_context: Arc<HostContext>,
149 pub host_functions: Arc<DefaultHostFunctions>,
151 pub limits: StoreLimits,
153 pub execution_start: Option<Instant>,
155 pub fuel_limit: Option<u64>,
157}
158
159pub struct StoreLimits {
161 pub max_memory_bytes: u64,
162 pub max_table_elements: u32,
163 pub max_instances: u32,
164}
165
166impl Default for StoreLimits {
167 fn default() -> Self {
168 Self {
169 max_memory_bytes: 16 * 1024 * 1024, max_table_elements: 10000,
171 max_instances: 10,
172 }
173 }
174}
175
176impl ResourceLimiter for StoreLimits {
177 fn memory_growing(
178 &mut self,
179 _current: usize,
180 desired: usize,
181 maximum: Option<usize>,
182 ) -> Result<bool> {
183 let max = maximum.unwrap_or(self.max_memory_bytes as usize);
184 Ok(desired <= max && desired <= self.max_memory_bytes as usize)
185 }
186
187 fn table_growing(
188 &mut self,
189 _current: usize,
190 desired: usize,
191 maximum: Option<usize>,
192 ) -> Result<bool> {
193 let max = maximum.unwrap_or(self.max_table_elements as usize);
194 Ok(desired <= max && desired <= self.max_table_elements as usize)
195 }
196}
197
198pub struct WasmPlugin {
200 id: String,
202 config: WasmPluginConfig,
204 manifest: PluginManifest,
206 state: RwLock<WasmPluginState>,
208 module: Module,
210 engine: Engine,
212 host_context: Arc<HostContext>,
214 metrics: RwLock<PluginMetrics>,
216 instance: RwLock<Option<PluginInstance>>,
218 async_support: bool,
220}
221
222impl WasmPlugin {
223 pub fn from_bytes(engine: &Engine, bytes: &[u8], config: WasmPluginConfig) -> WasmResult<Self> {
225 Self::from_bytes_with_async(engine, bytes, config, true)
226 }
227
228 pub fn from_bytes_with_async(
230 engine: &Engine,
231 bytes: &[u8],
232 config: WasmPluginConfig,
233 async_support: bool,
234 ) -> WasmResult<Self> {
235 let module =
236 Module::new(engine, bytes).map_err(|e| WasmError::CompilationError(e.to_string()))?;
237
238 let manifest = Self::extract_manifest(&module, &config);
240
241 let host_context = Arc::new(HostContext::new(
242 &config.id,
243 config.allowed_capabilities.clone(),
244 ));
245
246 for (key, value) in &config.initial_config {
248 let ctx = host_context.clone();
250 let k = key.clone();
251 let v = value.clone();
252 tokio::spawn(async move {
253 ctx.set_config(&k, v).await;
254 });
255 }
256
257 Ok(Self {
258 id: config.id.clone(),
259 config,
260 manifest,
261 state: RwLock::new(WasmPluginState::Created),
262 module,
263 engine: engine.clone(),
264 host_context,
265 metrics: RwLock::new(PluginMetrics::default()),
266 instance: RwLock::new(None),
267 async_support,
268 })
269 }
270
271 pub fn from_wat(engine: &Engine, wat: &str, config: WasmPluginConfig) -> WasmResult<Self> {
273 Self::from_wat_with_async(engine, wat, config, true)
274 }
275
276 pub fn from_wat_with_async(
278 engine: &Engine,
279 wat: &str,
280 config: WasmPluginConfig,
281 async_support: bool,
282 ) -> WasmResult<Self> {
283 let bytes = wat.to_string().into_bytes();
284 Self::from_bytes_with_async(engine, &bytes, config, async_support)
285 }
286
287 pub fn from_file(
289 engine: &Engine,
290 path: &std::path::Path,
291 config: WasmPluginConfig,
292 ) -> WasmResult<Self> {
293 let bytes = std::fs::read(path)?;
294 Self::from_bytes(engine, &bytes, config)
295 }
296
297 fn extract_manifest(module: &Module, config: &WasmPluginConfig) -> PluginManifest {
298 let mut manifest = PluginManifest::new(&config.id, "1.0.0");
300
301 for export in module.exports() {
303 match export.ty() {
304 ExternType::Func(_) => {
305 manifest.exports.push(super::types::PluginExport::function(
306 export.name(),
307 vec![],
308 vec![],
309 ));
310 }
311 ExternType::Memory(_) => {
312 manifest
313 .exports
314 .push(super::types::PluginExport::memory(export.name()));
315 }
316 _ => {}
317 }
318 }
319
320 manifest
321 }
322
323 pub fn id(&self) -> &str {
325 &self.id
326 }
327
328 pub fn manifest(&self) -> &PluginManifest {
330 &self.manifest
331 }
332
333 pub async fn state(&self) -> WasmPluginState {
335 *self.state.read().await
336 }
337
338 pub async fn metrics(&self) -> PluginMetrics {
340 self.metrics.read().await.clone()
341 }
342
343 pub async fn initialize(&self) -> WasmResult<()> {
345 let mut state = self.state.write().await;
346 if *state != WasmPluginState::Created && *state != WasmPluginState::Stopped {
347 return Err(WasmError::ExecutionError(format!(
348 "Cannot initialize plugin in state {:?}",
349 *state
350 )));
351 }
352
353 *state = WasmPluginState::Initializing;
354 drop(state);
355
356 self.create_instance().await?;
358
359 if self.has_export("_initialize").await {
361 self.call_void("_initialize", &[]).await?;
362 }
363
364 *self.state.write().await = WasmPluginState::Ready;
365 info!("Plugin {} initialized", self.id);
366 Ok(())
367 }
368
369 pub async fn has_export(&self, name: &str) -> bool {
371 for export in self.module.exports() {
373 if export.name() == name {
374 return true;
375 }
376 }
377 false
378 }
379
380 async fn create_instance(&self) -> WasmResult<()> {
382 let host_functions = Arc::new(DefaultHostFunctions::new(self.host_context.clone()));
383
384 let limits = StoreLimits {
385 max_memory_bytes: self.config.resource_limits.max_memory_pages as u64 * 65536,
386 max_table_elements: self.config.resource_limits.max_table_elements,
387 max_instances: self.config.resource_limits.max_instances,
388 };
389
390 let plugin_state = PluginState {
391 host_context: self.host_context.clone(),
392 host_functions: host_functions.clone(),
393 limits,
394 execution_start: None,
395 fuel_limit: self.config.resource_limits.max_fuel,
396 };
397
398 let mut store = Store::new(&self.engine, plugin_state);
399
400 if let Some(fuel) = self.config.resource_limits.max_fuel {
402 store
403 .set_fuel(fuel)
404 .map_err(|e| WasmError::Internal(e.to_string()))?;
405 }
406
407 let mut linker = Linker::new(&self.engine);
409 Self::add_host_functions(&mut linker, host_functions)?;
410
411 let instance = if self.async_support {
413 linker
414 .instantiate_async(&mut store, &self.module)
415 .await
416 .map_err(|e| WasmError::InstantiationError(e.to_string()))?
417 } else {
418 linker
419 .instantiate(&mut store, &self.module)
420 .map_err(|e| WasmError::InstantiationError(e.to_string()))?
421 };
422
423 let plugin_instance = PluginInstance {
424 store,
425 instance,
426 manifest: self.manifest.clone(),
427 };
428
429 *self.instance.write().await = Some(plugin_instance);
430 Ok(())
431 }
432
433 fn add_host_functions(
434 linker: &mut Linker<PluginState>,
435 _host_functions: Arc<DefaultHostFunctions>,
436 ) -> WasmResult<()> {
437 linker
439 .func_wrap(
440 "env",
441 "host_log",
442 |_caller: Caller<'_, PluginState>, level: i32, ptr: i32, len: i32| {
443 debug!("host_log called: level={}, ptr={}, len={}", level, ptr, len);
445 0i32 },
447 )
448 .map_err(|e| WasmError::Internal(e.to_string()))?;
449
450 linker
452 .func_wrap(
453 "env",
454 "host_now_ms",
455 |_caller: Caller<'_, PluginState>| -> i64 {
456 std::time::SystemTime::now()
457 .duration_since(std::time::UNIX_EPOCH)
458 .unwrap_or_default()
459 .as_millis() as i64
460 },
461 )
462 .map_err(|e| WasmError::Internal(e.to_string()))?;
463
464 linker
466 .func_wrap(
467 "env",
468 "host_alloc",
469 |_caller: Caller<'_, PluginState>, size: i32| -> i32 {
470 debug!("host_alloc called: size={}", size);
472 0 },
474 )
475 .map_err(|e| WasmError::Internal(e.to_string()))?;
476
477 linker
479 .func_wrap(
480 "env",
481 "host_free",
482 |_caller: Caller<'_, PluginState>, ptr: i32| {
483 debug!("host_free called: ptr={}", ptr);
484 },
485 )
486 .map_err(|e| WasmError::Internal(e.to_string()))?;
487
488 linker
490 .func_wrap(
491 "env",
492 "abort",
493 |_caller: Caller<'_, PluginState>, msg: i32, file: i32, line: i32, col: i32| {
494 error!(
495 "WASM abort: msg={}, file={}, line={}, col={}",
496 msg, file, line, col
497 );
498 },
499 )
500 .map_err(|e| WasmError::Internal(e.to_string()))?;
501
502 Ok(())
503 }
504
505 pub async fn call_i32(&self, name: &str, args: &[Val]) -> WasmResult<i32> {
507 let start = Instant::now();
508 let result = self.call_internal(name, args).await;
509 let duration = start.elapsed();
510
511 let success = result.is_ok();
512 self.metrics
513 .write()
514 .await
515 .record_execution(duration.as_nanos() as u64, success);
516
517 match result {
518 Ok(vals) => {
519 if let Some(Val::I32(v)) = vals.first() {
520 Ok(*v)
521 } else {
522 Err(WasmError::TypeMismatch {
523 expected: "i32".to_string(),
524 actual: format!("{:?}", vals),
525 })
526 }
527 }
528 Err(e) => Err(e),
529 }
530 }
531
532 pub async fn call_i64(&self, name: &str, args: &[Val]) -> WasmResult<i64> {
534 let start = Instant::now();
535 let result = self.call_internal(name, args).await;
536 let duration = start.elapsed();
537
538 let success = result.is_ok();
539 self.metrics
540 .write()
541 .await
542 .record_execution(duration.as_nanos() as u64, success);
543
544 match result {
545 Ok(vals) => {
546 if let Some(Val::I64(v)) = vals.first() {
547 Ok(*v)
548 } else {
549 Err(WasmError::TypeMismatch {
550 expected: "i64".to_string(),
551 actual: format!("{:?}", vals),
552 })
553 }
554 }
555 Err(e) => Err(e),
556 }
557 }
558
559 pub async fn call_void(&self, name: &str, args: &[Val]) -> WasmResult<()> {
561 let start = Instant::now();
562 let result = self.call_internal(name, args).await;
563 let duration = start.elapsed();
564
565 let success = result.is_ok();
566 self.metrics
567 .write()
568 .await
569 .record_execution(duration.as_nanos() as u64, success);
570
571 result.map(|_| ())
572 }
573
574 async fn call_internal(&self, name: &str, args: &[Val]) -> WasmResult<Vec<Val>> {
575 let state = self.state().await;
576 if state != WasmPluginState::Ready && state != WasmPluginState::Running {
577 return Err(WasmError::ExecutionError(format!(
578 "Plugin not ready, current state: {:?}",
579 state
580 )));
581 }
582
583 let mut instance_guard = self.instance.write().await;
584 let instance = instance_guard
585 .as_mut()
586 .ok_or_else(|| WasmError::ExecutionError("Instance not created".to_string()))?;
587
588 let func = instance
589 .instance
590 .get_func(&mut instance.store, name)
591 .ok_or_else(|| WasmError::ExportNotFound(name.to_string()))?;
592
593 let ty = func.ty(&instance.store);
594 let mut results = vec![Val::I32(0); ty.results().len()];
595
596 if self.async_support {
598 func.call_async(&mut instance.store, args, &mut results)
599 .await
600 .map_err(|e| WasmError::ExecutionError(e.to_string()))?;
601 } else {
602 func.call(&mut instance.store, args, &mut results)
603 .map_err(|e| WasmError::ExecutionError(e.to_string()))?;
604 }
605
606 Ok(results)
607 }
608
609 pub async fn stop(&self) -> WasmResult<()> {
611 let mut state = self.state.write().await;
612
613 if *state == WasmPluginState::Ready || *state == WasmPluginState::Running {
615 drop(state);
616 if self.has_export("_cleanup").await {
617 let _ = self.call_void("_cleanup", &[]).await;
618 }
619 state = self.state.write().await;
620 }
621
622 *state = WasmPluginState::Stopped;
623 *self.instance.write().await = None;
624
625 info!("Plugin {} stopped", self.id);
626 Ok(())
627 }
628
629 pub fn from_parts(
631 id: String,
632 config: WasmPluginConfig,
633 manifest: PluginManifest,
634 module: Module,
635 engine: Engine,
636 host_context: Arc<HostContext>,
637 ) -> Self {
638 Self::from_parts_with_async(id, config, manifest, module, engine, host_context, true)
639 }
640
641 pub fn from_parts_with_async(
643 id: String,
644 config: WasmPluginConfig,
645 manifest: PluginManifest,
646 module: Module,
647 engine: Engine,
648 host_context: Arc<HostContext>,
649 async_support: bool,
650 ) -> Self {
651 Self {
652 id,
653 config,
654 manifest,
655 state: RwLock::new(WasmPluginState::Created),
656 module,
657 engine,
658 host_context,
659 metrics: RwLock::new(PluginMetrics::default()),
660 instance: RwLock::new(None),
661 async_support,
662 }
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669
670 fn create_test_engine() -> Engine {
671 let mut config = Config::new();
672 config.async_support(false);
673 Engine::new(&config).unwrap()
674 }
675
676 #[test]
677 fn test_plugin_config() {
678 let config = WasmPluginConfig::new("test-plugin")
679 .with_capability(PluginCapability::Storage)
680 .with_config("key", WasmValue::String("value".into()));
681
682 assert_eq!(config.id, "test-plugin");
683 assert!(
684 config
685 .allowed_capabilities
686 .contains(&PluginCapability::Storage)
687 );
688 assert!(config.initial_config.contains_key("key"));
689 }
690
691 #[test]
692 fn test_plugin_metrics() {
693 let mut metrics = PluginMetrics::default();
694
695 metrics.record_execution(1000, true);
696 assert_eq!(metrics.call_count, 1);
697 assert_eq!(metrics.success_count, 1);
698
699 metrics.record_execution(2000, false);
700 assert_eq!(metrics.call_count, 2);
701 assert_eq!(metrics.error_count, 1);
702 assert_eq!(metrics.avg_execution_time_ns, 1500);
703 }
704
705 #[test]
706 fn test_plugin_state_default() {
707 let state = WasmPluginState::default();
708 assert_eq!(state, WasmPluginState::Created);
709 }
710
711 #[tokio::test]
712 async fn test_wasm_plugin_from_wat() {
713 let engine = create_test_engine();
714
715 let wat = r#"
716 (module
717 (func (export "add") (param i32 i32) (result i32)
718 local.get 0
719 local.get 1
720 i32.add
721 )
722 (func (export "double") (param i32) (result i32)
723 local.get 0
724 i32.const 2
725 i32.mul
726 )
727 )
728 "#;
729
730 let mut config = WasmPluginConfig::new("test-math");
732 config.resource_limits.max_fuel = None;
733
734 let plugin = WasmPlugin::from_wat_with_async(&engine, wat, config, false).unwrap();
736
737 assert_eq!(plugin.id(), "test-math");
738 assert_eq!(plugin.state().await, WasmPluginState::Created);
739
740 plugin.initialize().await.unwrap();
742 assert_eq!(plugin.state().await, WasmPluginState::Ready);
743
744 let result = plugin
746 .call_i32("add", &[Val::I32(3), Val::I32(4)])
747 .await
748 .unwrap();
749 assert_eq!(result, 7);
750
751 let result = plugin.call_i32("double", &[Val::I32(21)]).await.unwrap();
752 assert_eq!(result, 42);
753
754 let metrics = plugin.metrics().await;
756 assert_eq!(metrics.call_count, 2);
757 assert_eq!(metrics.success_count, 2);
758
759 plugin.stop().await.unwrap();
761 assert_eq!(plugin.state().await, WasmPluginState::Stopped);
762 }
763}