1use crate::error::{KernelError, KernelResult};
57use crate::plugin::{Extension, ExtensionCapability, ExtensionInfo, ObservabilityExtension};
58use parking_lot::RwLock;
59use std::any::Any;
60use std::collections::HashMap;
61use std::path::Path;
62use std::sync::Arc;
63use std::sync::atomic::{AtomicU64, Ordering};
64use std::time::Instant;
65
66#[derive(Debug, Clone)]
75pub struct WasmPluginCapabilities {
76 pub can_read_table: Vec<String>,
78 pub can_write_table: Vec<String>,
80 pub can_index_search: bool,
82 pub can_vector_search: bool,
84 pub can_call_plugin: Vec<String>,
86 pub memory_limit_bytes: u64,
88 pub fuel_limit: u64,
90 pub timeout_ms: u64,
92}
93
94impl Default for WasmPluginCapabilities {
95 fn default() -> Self {
96 Self {
97 can_read_table: vec![],
98 can_write_table: vec![],
99 can_index_search: false,
100 can_vector_search: false,
101 can_call_plugin: vec![],
102 memory_limit_bytes: 16 * 1024 * 1024, fuel_limit: 1_000_000, timeout_ms: 100, }
106 }
107}
108
109impl WasmPluginCapabilities {
110 pub fn observability_only() -> Self {
112 Self {
113 can_read_table: vec![],
114 can_write_table: vec![],
115 can_index_search: false,
116 can_vector_search: false,
117 can_call_plugin: vec![],
118 memory_limit_bytes: 4 * 1024 * 1024, fuel_limit: 100_000, timeout_ms: 10, }
122 }
123
124 pub fn read_only(tables: Vec<String>) -> Self {
126 Self {
127 can_read_table: tables,
128 can_write_table: vec![],
129 can_index_search: true,
130 can_vector_search: true,
131 can_call_plugin: vec![],
132 memory_limit_bytes: 64 * 1024 * 1024, fuel_limit: 10_000_000, timeout_ms: 1000, }
136 }
137
138 pub fn can_read(&self, table_name: &str) -> bool {
140 self.can_read_table.iter().any(|pattern| {
141 if pattern == "*" {
142 true
143 } else if pattern.ends_with('*') {
144 table_name.starts_with(&pattern[..pattern.len() - 1])
145 } else {
146 pattern == table_name
147 }
148 })
149 }
150
151 pub fn can_write(&self, table_name: &str) -> bool {
153 self.can_write_table.iter().any(|pattern| {
154 if pattern == "*" {
155 true
156 } else if pattern.ends_with('*') {
157 table_name.starts_with(&pattern[..pattern.len() - 1])
158 } else {
159 pattern == table_name
160 }
161 })
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum WasmPluginState {
172 Loading,
174 Ready,
176 Executing,
178 Trapped,
180 Unloading,
182 Unloaded,
184}
185
186#[derive(Debug, Clone, Default)]
188pub struct WasmPluginStats {
189 pub total_calls: u64,
191 pub total_fuel_consumed: u64,
193 pub total_execution_us: u64,
195 pub trap_count: u64,
197 pub peak_memory_bytes: u64,
199}
200
201#[derive(Debug, Clone)]
203pub struct WasmInstanceConfig {
204 pub capabilities: WasmPluginCapabilities,
206 pub debug_mode: bool,
208 pub enable_fuel: bool,
210 pub enable_epochs: bool,
212 pub epoch_interval_ms: u64,
214}
215
216impl Default for WasmInstanceConfig {
217 fn default() -> Self {
218 Self {
219 capabilities: WasmPluginCapabilities::default(),
220 debug_mode: false,
221 enable_fuel: true,
222 enable_epochs: true,
223 epoch_interval_ms: 10, }
225 }
226}
227
228pub struct WasmPluginInstance {
237 name: String,
239 state: RwLock<WasmPluginState>,
241 config: WasmInstanceConfig,
243 stats: RwLock<WasmPluginStats>,
245 fuel_remaining: AtomicU64,
247 info: ExtensionInfo,
249 module_hash: [u8; 32],
251}
252
253impl WasmPluginInstance {
254 pub fn new(name: &str, _wasm_bytes: &[u8], config: WasmInstanceConfig) -> KernelResult<Self> {
256 let module_hash = Self::compute_hash(_wasm_bytes);
261
262 Ok(Self {
263 name: name.to_string(),
264 state: RwLock::new(WasmPluginState::Loading),
265 config: config.clone(),
266 stats: RwLock::new(WasmPluginStats::default()),
267 fuel_remaining: AtomicU64::new(config.capabilities.fuel_limit),
268 info: ExtensionInfo {
269 name: name.to_string(),
270 version: "1.0.0".to_string(),
271 description: format!("WASM plugin: {}", name),
272 author: "SochDB".to_string(),
273 capabilities: vec![ExtensionCapability::Custom("wasm".to_string())],
274 },
275 module_hash,
276 })
277 }
278
279 pub fn init(&self) -> KernelResult<()> {
281 *self.state.write() = WasmPluginState::Ready;
282 Ok(())
283 }
284
285 pub fn call(&self, func_name: &str, args: &[WasmValue]) -> KernelResult<Vec<WasmValue>> {
287 {
289 let state = self.state.read();
290 if *state != WasmPluginState::Ready {
291 return Err(KernelError::Plugin {
292 message: format!("plugin {} not ready, state: {:?}", self.name, *state),
293 });
294 }
295 }
296
297 *self.state.write() = WasmPluginState::Executing;
299 let start = Instant::now();
300
301 if self.config.enable_fuel {
303 let remaining = self.fuel_remaining.load(Ordering::Acquire);
304 if remaining == 0 {
305 *self.state.write() = WasmPluginState::Trapped;
306 return Err(KernelError::Plugin {
307 message: format!("plugin {} exhausted fuel limit", self.name),
308 });
309 }
310 }
311
312 let result = self.simulate_call(func_name, args);
315
316 {
318 let mut stats = self.stats.write();
319 stats.total_calls += 1;
320 stats.total_execution_us += start.elapsed().as_micros() as u64;
321
322 let fuel_used = 100 + args.len() as u64 * 10;
324 stats.total_fuel_consumed += fuel_used;
325 self.fuel_remaining.fetch_sub(
326 fuel_used.min(self.fuel_remaining.load(Ordering::Acquire)),
327 Ordering::AcqRel,
328 );
329 }
330
331 *self.state.write() = WasmPluginState::Ready;
333
334 result
335 }
336
337 pub fn refuel(&self) {
339 self.fuel_remaining
340 .store(self.config.capabilities.fuel_limit, Ordering::Release);
341 }
342
343 pub fn stats(&self) -> WasmPluginStats {
345 self.stats.read().clone()
346 }
347
348 pub fn state(&self) -> WasmPluginState {
350 *self.state.read()
351 }
352
353 pub fn name(&self) -> &str {
355 &self.name
356 }
357
358 pub fn capabilities(&self) -> &WasmPluginCapabilities {
360 &self.config.capabilities
361 }
362
363 pub fn module_hash(&self) -> &[u8; 32] {
365 &self.module_hash
366 }
367
368 fn simulate_call(&self, func_name: &str, args: &[WasmValue]) -> KernelResult<Vec<WasmValue>> {
370 match func_name {
372 "on_insert" | "on_update" | "on_delete" => {
373 Ok(vec![WasmValue::I32(0)])
375 }
376 "get_metrics" => {
377 Ok(vec![WasmValue::F64(42.0)])
379 }
380 "transform" => {
381 if args.is_empty() {
383 Ok(vec![WasmValue::I32(0)])
384 } else {
385 Ok(vec![args[0].clone()])
386 }
387 }
388 _ => Err(KernelError::Plugin {
389 message: format!("unknown function: {}", func_name),
390 }),
391 }
392 }
393
394 fn compute_hash(bytes: &[u8]) -> [u8; 32] {
396 let mut hash = [0u8; 32];
399 let crc = crc32fast::hash(bytes);
400 for i in 0..8 {
401 hash[i * 4..(i + 1) * 4].copy_from_slice(&crc.to_le_bytes());
402 }
403 hash
404 }
405}
406
407#[derive(Debug, Clone, PartialEq)]
413pub enum WasmValue {
414 I32(i32),
416 I64(i64),
418 F32(f32),
420 F64(f64),
422 ExternRef(u64),
424}
425
426impl WasmValue {
427 pub fn as_i32(&self) -> Option<i32> {
429 match self {
430 WasmValue::I32(v) => Some(*v),
431 _ => None,
432 }
433 }
434
435 pub fn as_i64(&self) -> Option<i64> {
437 match self {
438 WasmValue::I64(v) => Some(*v),
439 _ => None,
440 }
441 }
442
443 pub fn as_f32(&self) -> Option<f32> {
445 match self {
446 WasmValue::F32(v) => Some(*v),
447 _ => None,
448 }
449 }
450
451 pub fn as_f64(&self) -> Option<f64> {
453 match self {
454 WasmValue::F64(v) => Some(*v),
455 _ => None,
456 }
457 }
458}
459
460pub struct WasmPluginRegistry {
469 plugins: RwLock<HashMap<String, Arc<WasmPluginInstance>>>,
471 load_order: RwLock<Vec<String>>,
473 total_calls: AtomicU64,
475 total_traps: AtomicU64,
476}
477
478impl Default for WasmPluginRegistry {
479 fn default() -> Self {
480 Self::new()
481 }
482}
483
484impl WasmPluginRegistry {
485 pub fn new() -> Self {
487 Self {
488 plugins: RwLock::new(HashMap::new()),
489 load_order: RwLock::new(Vec::new()),
490 total_calls: AtomicU64::new(0),
491 total_traps: AtomicU64::new(0),
492 }
493 }
494
495 pub fn load(
497 &self,
498 name: &str,
499 wasm_bytes: &[u8],
500 config: WasmInstanceConfig,
501 ) -> KernelResult<()> {
502 if self.plugins.read().contains_key(name) {
504 return Err(KernelError::Plugin {
505 message: format!("plugin '{}' already registered", name),
506 });
507 }
508
509 let instance = WasmPluginInstance::new(name, wasm_bytes, config)?;
511 instance.init()?;
512
513 let arc = Arc::new(instance);
515 self.plugins.write().insert(name.to_string(), arc);
516 self.load_order.write().push(name.to_string());
517
518 Ok(())
519 }
520
521 pub fn load_from_file(
523 &self,
524 name: &str,
525 path: &Path,
526 config: WasmInstanceConfig,
527 ) -> KernelResult<()> {
528 let wasm_bytes = std::fs::read(path).map_err(|e| KernelError::Plugin {
529 message: format!("failed to read WASM file: {}", e),
530 })?;
531 self.load(name, &wasm_bytes, config)
532 }
533
534 pub fn unload(&self, name: &str) -> KernelResult<()> {
536 let mut plugins = self.plugins.write();
537
538 if !plugins.contains_key(name) {
539 return Err(KernelError::Plugin {
540 message: format!("plugin '{}' not found", name),
541 });
542 }
543
544 if let Some(plugin) = plugins.get(name) {
546 *plugin.state.write() = WasmPluginState::Unloading;
547 }
548
549 plugins.remove(name);
551 self.load_order.write().retain(|n| n != name);
552
553 Ok(())
554 }
555
556 pub fn get(&self, name: &str) -> Option<Arc<WasmPluginInstance>> {
558 self.plugins.read().get(name).cloned()
559 }
560
561 pub fn call(
563 &self,
564 plugin_name: &str,
565 func_name: &str,
566 args: &[WasmValue],
567 ) -> KernelResult<Vec<WasmValue>> {
568 let plugin = self.get(plugin_name).ok_or_else(|| KernelError::Plugin {
569 message: format!("plugin '{}' not found", plugin_name),
570 })?;
571
572 self.total_calls.fetch_add(1, Ordering::Relaxed);
573
574 match plugin.call(func_name, args) {
575 Ok(result) => Ok(result),
576 Err(e) => {
577 self.total_traps.fetch_add(1, Ordering::Relaxed);
578 Err(e)
579 }
580 }
581 }
582
583 pub fn list(&self) -> Vec<String> {
585 self.load_order.read().clone()
586 }
587
588 pub fn count(&self) -> usize {
590 self.plugins.read().len()
591 }
592
593 pub fn global_stats(&self) -> (u64, u64) {
595 (
596 self.total_calls.load(Ordering::Relaxed),
597 self.total_traps.load(Ordering::Relaxed),
598 )
599 }
600
601 pub fn shutdown_all(&self) -> KernelResult<()> {
603 let order = self.load_order.read().clone();
604 for name in order.iter().rev() {
605 if let Err(e) = self.unload(name) {
606 eprintln!("warning: failed to unload plugin {}: {}", name, e);
608 }
609 }
610 Ok(())
611 }
612}
613
614pub struct WasmObservabilityPlugin {
620 instance: Arc<WasmPluginInstance>,
622}
623
624impl WasmObservabilityPlugin {
625 pub fn new(instance: Arc<WasmPluginInstance>) -> Self {
627 Self { instance }
628 }
629}
630
631impl Extension for WasmObservabilityPlugin {
632 fn info(&self) -> ExtensionInfo {
633 self.instance.info.clone()
634 }
635
636 fn as_any(&self) -> &dyn Any {
637 self
638 }
639
640 fn as_any_mut(&mut self) -> &mut dyn Any {
641 self
642 }
643}
644
645impl ObservabilityExtension for WasmObservabilityPlugin {
646 fn counter_inc(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
647 let _ = self.instance.call(
649 "counter_inc",
650 &[
651 WasmValue::I64(name.as_ptr() as i64),
652 WasmValue::I32(name.len() as i32),
653 WasmValue::I64(value as i64),
654 WasmValue::I32(labels.len() as i32),
655 ],
656 );
657 }
658
659 fn gauge_set(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
660 let _ = self.instance.call(
661 "gauge_set",
662 &[
663 WasmValue::I64(name.as_ptr() as i64),
664 WasmValue::I32(name.len() as i32),
665 WasmValue::F64(value),
666 WasmValue::I32(labels.len() as i32),
667 ],
668 );
669 }
670
671 fn histogram_observe(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
672 let _ = self.instance.call(
673 "histogram_observe",
674 &[
675 WasmValue::I64(name.as_ptr() as i64),
676 WasmValue::I32(name.len() as i32),
677 WasmValue::F64(value),
678 WasmValue::I32(labels.len() as i32),
679 ],
680 );
681 }
682
683 fn span_start(&self, name: &str, parent: Option<u64>) -> u64 {
684 match self.instance.call(
685 "span_start",
686 &[
687 WasmValue::I64(name.as_ptr() as i64),
688 WasmValue::I32(name.len() as i32),
689 WasmValue::I64(parent.unwrap_or(0) as i64),
690 ],
691 ) {
692 Ok(results) => results.first().and_then(|v| v.as_i64()).unwrap_or(0) as u64,
693 Err(_) => 0,
694 }
695 }
696
697 fn span_end(&self, span_id: u64) {
698 let _ = self
699 .instance
700 .call("span_end", &[WasmValue::I64(span_id as i64)]);
701 }
702
703 fn span_event(&self, span_id: u64, name: &str, _attributes: &[(&str, &str)]) {
704 let _ = self.instance.call(
705 "span_event",
706 &[
707 WasmValue::I64(span_id as i64),
708 WasmValue::I64(name.as_ptr() as i64),
709 WasmValue::I32(name.len() as i32),
710 ],
711 );
712 }
713}
714
715#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_capabilities_default() {
725 let caps = WasmPluginCapabilities::default();
726 assert_eq!(caps.memory_limit_bytes, 16 * 1024 * 1024);
727 assert_eq!(caps.fuel_limit, 1_000_000);
728 assert!(!caps.can_read("any_table"));
729 assert!(!caps.can_write("any_table"));
730 }
731
732 #[test]
733 fn test_capabilities_read_patterns() {
734 let caps = WasmPluginCapabilities {
735 can_read_table: vec!["users".to_string(), "logs_*".to_string()],
736 ..Default::default()
737 };
738
739 assert!(caps.can_read("users"));
740 assert!(caps.can_read("logs_2024"));
741 assert!(caps.can_read("logs_"));
742 assert!(!caps.can_read("orders"));
743 }
744
745 #[test]
746 fn test_capabilities_wildcard() {
747 let caps = WasmPluginCapabilities {
748 can_read_table: vec!["*".to_string()],
749 ..Default::default()
750 };
751
752 assert!(caps.can_read("any_table"));
753 assert!(caps.can_read("another_table"));
754 }
755
756 #[test]
757 fn test_wasm_instance_creation() {
758 let config = WasmInstanceConfig::default();
759 let instance = WasmPluginInstance::new("test_plugin", b"fake wasm bytes", config).unwrap();
760
761 assert_eq!(instance.name(), "test_plugin");
762 assert_eq!(instance.state(), WasmPluginState::Loading);
763
764 instance.init().unwrap();
765 assert_eq!(instance.state(), WasmPluginState::Ready);
766 }
767
768 #[test]
769 fn test_wasm_instance_call() {
770 let config = WasmInstanceConfig::default();
771 let instance = WasmPluginInstance::new("test_plugin", b"fake wasm bytes", config).unwrap();
772 instance.init().unwrap();
773
774 let result = instance.call("on_insert", &[WasmValue::I32(42)]).unwrap();
775 assert_eq!(result.len(), 1);
776 assert_eq!(result[0], WasmValue::I32(0));
777
778 let stats = instance.stats();
779 assert_eq!(stats.total_calls, 1);
780 assert!(stats.total_fuel_consumed > 0);
781 }
782
783 #[test]
784 fn test_wasm_registry() {
785 let registry = WasmPluginRegistry::new();
786
787 registry
789 .load("plugin1", b"fake wasm", WasmInstanceConfig::default())
790 .unwrap();
791 assert_eq!(registry.count(), 1);
792
793 let result = registry.call("plugin1", "on_insert", &[]).unwrap();
795 assert_eq!(result[0], WasmValue::I32(0));
796
797 let (calls, traps) = registry.global_stats();
799 assert_eq!(calls, 1);
800 assert_eq!(traps, 0);
801
802 registry.unload("plugin1").unwrap();
804 assert_eq!(registry.count(), 0);
805 }
806
807 #[test]
808 fn test_wasm_registry_duplicate() {
809 let registry = WasmPluginRegistry::new();
810
811 registry
812 .load("plugin1", b"fake wasm", WasmInstanceConfig::default())
813 .unwrap();
814
815 let result = registry.load("plugin1", b"fake wasm", WasmInstanceConfig::default());
816 assert!(result.is_err());
817 }
818
819 #[test]
820 fn test_wasm_value_conversions() {
821 let v = WasmValue::I32(42);
822 assert_eq!(v.as_i32(), Some(42));
823 assert_eq!(v.as_i64(), None);
824
825 let v = WasmValue::F64(2.5);
826 assert_eq!(v.as_f64(), Some(2.5));
827 assert_eq!(v.as_f32(), None);
828 }
829
830 #[test]
831 fn test_fuel_exhaustion() {
832 let config = WasmInstanceConfig {
833 capabilities: WasmPluginCapabilities {
834 fuel_limit: 100, ..Default::default()
836 },
837 enable_fuel: true,
838 ..Default::default()
839 };
840
841 let instance = WasmPluginInstance::new("test", b"fake wasm", config).unwrap();
842 instance.init().unwrap();
843
844 let _ = instance.call("on_insert", &[]);
846
847 let result = instance.call("on_insert", &[]);
850 assert!(result.is_err());
851 }
852
853 #[test]
854 fn test_refuel() {
855 let config = WasmInstanceConfig {
856 capabilities: WasmPluginCapabilities {
857 fuel_limit: 150,
858 ..Default::default()
859 },
860 enable_fuel: true,
861 ..Default::default()
862 };
863
864 let instance = WasmPluginInstance::new("test", b"fake wasm", config).unwrap();
865 instance.init().unwrap();
866
867 let _ = instance.call("on_insert", &[]);
869
870 instance.refuel();
872
873 let result = instance.call("on_insert", &[]);
875 assert!(result.is_ok());
876 }
877
878 #[test]
879 fn test_observability_wrapper() {
880 let config = WasmInstanceConfig::default();
881 let instance =
882 Arc::new(WasmPluginInstance::new("obs_plugin", b"fake wasm", config).unwrap());
883 instance.init().unwrap();
884
885 let wrapper = WasmObservabilityPlugin::new(instance.clone());
886
887 wrapper.counter_inc("test_counter", 1, &[]);
889 wrapper.gauge_set("test_gauge", 42.0, &[]);
890 wrapper.histogram_observe("test_histogram", 0.5, &[]);
891
892 let span = wrapper.span_start("test_span", None);
893 wrapper.span_event(span, "event1", &[]);
894 wrapper.span_end(span);
895 }
896}