1use std::collections::HashMap;
29use std::path::PathBuf;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, OnceLock};
32use std::time::{Duration, Instant};
33
34use parking_lot::RwLock;
35use wasmtime::{Engine, Instance, InstancePre, Linker, Module, Store, TypedFunc, Memory};
36
37use super::config::PluginRuntimeConfig;
38use super::host_functions::HostFunctionRegistry;
39use super::host_imports::{register_crypto_imports, register_kv_imports, KvBackend, StoreCtx};
40use super::sandbox::{PluginSandbox, SecurityPolicy, ResourceLimits};
41use super::{
42 AuthRequest, AuthResult, HookType, PluginMetadata, PreQueryResult,
43 QueryContext, RouteResult,
44};
45
46#[derive(Debug, Clone)]
48pub enum PluginError {
49 LoadError(String),
51
52 InstantiationError(String),
54
55 ExecutionError(String),
57
58 Timeout(String),
60
61 MemoryExceeded(String),
63
64 SecurityViolation(String),
66
67 InvalidManifest(String),
69
70 HookNotFound(String),
72
73 RuntimeError(String),
75}
76
77impl std::fmt::Display for PluginError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 PluginError::LoadError(msg) => write!(f, "Load error: {}", msg),
81 PluginError::InstantiationError(msg) => write!(f, "Instantiation error: {}", msg),
82 PluginError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
83 PluginError::Timeout(msg) => write!(f, "Timeout: {}", msg),
84 PluginError::MemoryExceeded(msg) => write!(f, "Memory exceeded: {}", msg),
85 PluginError::SecurityViolation(msg) => write!(f, "Security violation: {}", msg),
86 PluginError::InvalidManifest(msg) => write!(f, "Invalid manifest: {}", msg),
87 PluginError::HookNotFound(msg) => write!(f, "Hook not found: {}", msg),
88 PluginError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
89 }
90 }
91}
92
93impl std::error::Error for PluginError {}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum PluginState {
98 Loading,
100
101 Running,
103
104 Paused,
106
107 Error(String),
109
110 Unloading,
112}
113
114pub struct LoadedPlugin {
116 pub metadata: PluginMetadata,
118
119 pub state: PluginState,
121
122 pub path: PathBuf,
124
125 module: Module,
128
129 instance_pre: OnceLock<InstancePre<StoreCtx>>,
135
136 sandbox: PluginSandbox,
138
139 instance_data: RwLock<PluginInstanceData>,
141
142 loaded_at: Instant,
144
145 last_invoked: RwLock<Option<Instant>>,
147
148 invocation_count: std::sync::atomic::AtomicU64,
150}
151
152struct PluginInstanceData {
154 memory_used: usize,
156
157 fuel_consumed: u64,
159
160 state: HashMap<String, Vec<u8>>,
162}
163
164impl LoadedPlugin {
165 pub fn new(
167 metadata: PluginMetadata,
168 path: PathBuf,
169 module: Module,
170 sandbox: PluginSandbox,
171 ) -> Self {
172 Self {
173 metadata,
174 state: PluginState::Running,
175 path,
176 module,
177 instance_pre: OnceLock::new(),
178 sandbox,
179 instance_data: RwLock::new(PluginInstanceData {
180 memory_used: 0,
181 fuel_consumed: 0,
182 state: HashMap::new(),
183 }),
184 loaded_at: Instant::now(),
185 last_invoked: RwLock::new(None),
186 invocation_count: std::sync::atomic::AtomicU64::new(0),
187 }
188 }
189
190 pub(crate) fn module(&self) -> &Module {
194 &self.module
195 }
196
197 pub fn memory_used(&self) -> usize {
199 self.instance_data.read().memory_used
200 }
201
202 pub fn invocation_count(&self) -> u64 {
204 self.invocation_count.load(std::sync::atomic::Ordering::Relaxed)
205 }
206
207 pub fn uptime(&self) -> Duration {
209 self.loaded_at.elapsed()
210 }
211
212 pub fn last_invoked(&self) -> Option<Instant> {
214 *self.last_invoked.read()
215 }
216
217 pub fn record_invocation(&self) {
219 self.invocation_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
220 *self.last_invoked.write() = Some(Instant::now());
221 }
222}
223
224pub struct WasmPluginRuntime {
226 config: PluginRuntimeConfig,
228
229 engine: Engine,
232
233 linker: Linker<StoreCtx>,
238
239 epoch_stop: Arc<AtomicBool>,
241
242 host_functions: Arc<HostFunctionRegistry>,
244
245 kv: KvBackend,
248
249 module_cache: RwLock<HashMap<PathBuf, Module>>,
252
253 default_policy: SecurityPolicy,
255
256 created_at: Instant,
258}
259
260impl WasmPluginRuntime {
261 pub fn new(config: &PluginRuntimeConfig) -> Result<Self, PluginError> {
263 let host_functions = Arc::new(HostFunctionRegistry::new());
264
265 let mut engine_config = wasmtime::Config::new();
266 if config.fuel_metering {
267 engine_config.consume_fuel(true);
268 }
269 engine_config.epoch_interruption(true);
272
273 let engine = Engine::new(&engine_config).map_err(|e| {
274 PluginError::RuntimeError(format!("wasmtime engine init: {}", e))
275 })?;
276
277 let mut linker: Linker<StoreCtx> = Linker::new(&engine);
281 register_kv_imports(&mut linker)?;
282 register_crypto_imports(&mut linker)?;
283
284 let epoch_stop = Arc::new(AtomicBool::new(false));
291 {
292 let engine = engine.clone();
293 let stop = epoch_stop.clone();
294 std::thread::Builder::new()
295 .name("wasm-epoch-ticker".into())
296 .spawn(move || {
297 while !stop.load(std::sync::atomic::Ordering::Relaxed) {
298 std::thread::sleep(Duration::from_millis(1));
299 engine.increment_epoch();
300 }
301 })
302 .ok();
303 }
304
305 let default_policy = SecurityPolicy {
306 allowed_hosts: vec!["localhost".to_string()],
307 allowed_paths: vec![config.plugin_dir.clone()],
308 max_memory: config.memory_limit,
309 max_execution_time: config.timeout,
310 allow_network: false,
311 allow_filesystem: false,
312 };
313
314 Ok(Self {
315 config: config.clone(),
316 engine,
317 linker,
318 epoch_stop,
319 host_functions,
320 kv: KvBackend::new(),
321 module_cache: RwLock::new(HashMap::new()),
322 default_policy,
323 created_at: Instant::now(),
324 })
325 }
326
327 pub fn kv(&self) -> &KvBackend {
330 &self.kv
331 }
332
333 #[allow(dead_code)]
336 pub(crate) fn linker(&self) -> &Linker<StoreCtx> {
337 &self.linker
338 }
339
340 pub(crate) fn engine(&self) -> &Engine {
343 &self.engine
344 }
345
346 pub fn config(&self) -> &PluginRuntimeConfig {
350 &self.config
351 }
352
353 pub fn instantiate(
355 &self,
356 manifest: &super::loader::PluginManifest,
357 wasm_bytes: &[u8],
358 ) -> Result<LoadedPlugin, PluginError> {
359 if wasm_bytes.len() < 8 {
361 return Err(PluginError::LoadError("WASM module too small".to_string()));
362 }
363
364 if &wasm_bytes[0..4] != b"\x00asm" {
366 return Err(PluginError::LoadError("Invalid WASM magic number".to_string()));
367 }
368
369 let metadata = PluginMetadata {
371 name: manifest.name.clone(),
372 version: manifest.version.clone(),
373 description: manifest.description.clone(),
374 author: manifest.author.clone(),
375 hooks: manifest.hooks.clone(),
376 permissions: manifest.permissions.clone(),
377 min_memory: manifest.min_memory,
378 max_memory: manifest.max_memory.min(self.config.memory_limit),
379 };
380
381 let resource_limits = ResourceLimits {
383 max_memory: metadata.max_memory,
384 max_execution_time: self.config.timeout,
385 max_fuel: if self.config.fuel_metering {
386 Some(self.config.fuel_limit)
387 } else {
388 None
389 },
390 max_table_elements: 10000,
391 max_instances: 1,
392 };
393
394 let sandbox = PluginSandbox::new(
395 self.default_policy.clone(),
396 resource_limits,
397 manifest.permissions.clone(),
398 );
399
400 let module = Module::from_binary(&self.engine, wasm_bytes).map_err(|e| {
403 PluginError::InstantiationError(format!("wasmtime compile: {}", e))
404 })?;
405
406 {
408 let mut cache = self.module_cache.write();
409 cache.insert(manifest.path.clone(), module.clone());
410 }
411
412 Ok(LoadedPlugin::new(
413 metadata,
414 manifest.path.clone(),
415 module,
416 sandbox,
417 ))
418 }
419
420 pub fn call_hook(
436 &self,
437 plugin: &LoadedPlugin,
438 hook: HookType,
439 args: &[u8],
440 ) -> Result<Vec<u8>, PluginError> {
441 if !plugin.metadata.hooks.contains(&hook) {
443 return Err(PluginError::HookNotFound(format!(
444 "Plugin {} does not support hook {:?}",
445 plugin.metadata.name, hook
446 )));
447 }
448
449 if plugin.state != PluginState::Running {
451 return Err(PluginError::ExecutionError(format!(
452 "Plugin {} is not running (state: {:?})",
453 plugin.metadata.name, plugin.state
454 )));
455 }
456
457 plugin.record_invocation();
459
460 let store_ctx = StoreCtx {
464 plugin_name: plugin.metadata.name.clone(),
465 kv: self.kv.clone(),
466 };
467 let mut store: Store<StoreCtx> = Store::new(&self.engine, store_ctx);
468 if self.config.fuel_metering {
469 store.set_fuel(self.config.fuel_limit).map_err(|e| {
471 PluginError::RuntimeError(format!("set_fuel: {}", e))
472 })?;
473 }
474 let deadline_ticks = self
480 .config
481 .timeout
482 .as_millis()
483 .max(1)
484 .min(u64::MAX as u128) as u64;
485 store.set_epoch_deadline(deadline_ticks);
486
487 let instance_pre = match plugin.instance_pre.get() {
493 Some(ip) => ip,
494 None => {
495 let ip = self.linker.instantiate_pre(&plugin.module).map_err(|e| {
496 PluginError::InstantiationError(format!(
497 "pre-instantiate {}: {}",
498 plugin.metadata.name, e
499 ))
500 })?;
501 let _ = plugin.instance_pre.set(ip);
503 plugin.instance_pre.get().expect("just set")
504 }
505 };
506 let instance = instance_pre.instantiate(&mut store).map_err(|e| {
507 PluginError::InstantiationError(format!(
508 "instantiate {}: {}",
509 plugin.metadata.name, e
510 ))
511 })?;
512
513 let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
514 PluginError::ExecutionError(format!(
515 "plugin {} does not export `memory`",
516 plugin.metadata.name
517 ))
518 })?;
519
520 let alloc = get_typed::<_, i32, i32>(&instance, &mut store, "alloc")?;
521 let dealloc = get_typed::<_, (i32, i32), ()>(&instance, &mut store, "dealloc")?;
522
523 let in_len = args.len() as i32;
526 let in_ptr = alloc.call(&mut store, in_len).map_err(|e| {
527 PluginError::ExecutionError(format!("alloc({}): {}", in_len, e))
528 })?;
529 if in_len > 0 {
530 write_memory(&memory, &mut store, in_ptr, args)?;
531 }
532
533 let export_name = hook.export_name();
536 let result_bytes = match get_typed::<_, (i32, i32), i64>(&instance, &mut store, export_name) {
537 Ok(hook_fn) => {
538 let packed = hook_fn.call(&mut store, (in_ptr, in_len)).map_err(|e| {
539 PluginError::ExecutionError(format!(
540 "hook {} call: {}",
541 export_name, e
542 ))
543 })?;
544 let out_ptr = (packed >> 32) as i32;
545 let out_len = (packed & 0xFFFF_FFFF) as i32;
546 if out_len > 0 {
547 let bytes = read_memory(&memory, &store, out_ptr, out_len)?;
548 let _ = dealloc.call(&mut store, (out_ptr, out_len));
550 bytes
551 } else {
552 Vec::new()
553 }
554 }
555 Err(_) => {
556 let observer = get_typed::<_, (i32, i32), ()>(
558 &instance,
559 &mut store,
560 export_name,
561 )?;
562 observer.call(&mut store, (in_ptr, in_len)).map_err(|e| {
563 PluginError::ExecutionError(format!(
564 "observer hook {} call: {}",
565 export_name, e
566 ))
567 })?;
568 Vec::new()
569 }
570 };
571
572 let _ = dealloc.call(&mut store, (in_ptr, in_len));
575
576 if self.config.fuel_metering {
578 if let Ok(remaining) = store.get_fuel() {
579 let consumed = self.config.fuel_limit.saturating_sub(remaining);
580 plugin.instance_data.write().fuel_consumed = consumed;
581 }
582 }
583 plugin.instance_data.write().memory_used =
584 (memory.data_size(&store)) as usize;
585
586 Ok(result_bytes)
587 }
588
589 pub fn call_pre_query(
591 &self,
592 plugin: &LoadedPlugin,
593 ctx: &QueryContext,
594 ) -> Result<PreQueryResult, PluginError> {
595 let args = serde_json::to_vec(ctx).map_err(|e| {
597 PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
598 })?;
599
600 let result = self.call_hook(plugin, HookType::PreQuery, &args)?;
602
603 if result.is_empty() {
605 return Ok(PreQueryResult::Continue);
606 }
607
608 serde_json::from_slice(&result).map_err(|e| {
609 PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
610 })
611 }
612
613 pub fn call_authenticate(
615 &self,
616 plugin: &LoadedPlugin,
617 request: &AuthRequest,
618 ) -> Result<AuthResult, PluginError> {
619 let args = serde_json::to_vec(request).map_err(|e| {
621 PluginError::ExecutionError(format!("Failed to serialize request: {}", e))
622 })?;
623
624 let result = self.call_hook(plugin, HookType::Authenticate, &args)?;
626
627 if result.is_empty() {
629 return Ok(AuthResult::Defer);
630 }
631
632 serde_json::from_slice(&result).map_err(|e| {
633 PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
634 })
635 }
636
637 pub fn call_route(
639 &self,
640 plugin: &LoadedPlugin,
641 ctx: &QueryContext,
642 ) -> Result<RouteResult, PluginError> {
643 let args = serde_json::to_vec(ctx).map_err(|e| {
645 PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
646 })?;
647
648 let result = self.call_hook(plugin, HookType::Route, &args)?;
650
651 if result.is_empty() {
653 return Ok(RouteResult::Default);
654 }
655
656 serde_json::from_slice(&result).map_err(|e| {
657 PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
658 })
659 }
660
661 pub fn stats(&self) -> RuntimeStats {
663 RuntimeStats {
664 uptime: self.created_at.elapsed(),
665 cached_modules: self.module_cache.read().len(),
666 fuel_metering_enabled: self.config.fuel_metering,
667 memory_limit: self.config.memory_limit,
668 timeout: self.config.timeout,
669 }
670 }
671}
672
673impl Drop for WasmPluginRuntime {
674 fn drop(&mut self) {
675 self.epoch_stop
678 .store(true, std::sync::atomic::Ordering::Relaxed);
679 }
680}
681
682#[derive(Debug, Clone)]
684pub struct RuntimeStats {
685 pub uptime: Duration,
687
688 pub cached_modules: usize,
690
691 pub fuel_metering_enabled: bool,
693
694 pub memory_limit: usize,
696
697 pub timeout: Duration,
699}
700
701impl serde::Serialize for QueryContext {
706 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
707 where
708 S: serde::Serializer,
709 {
710 use serde::ser::SerializeStruct;
711 let mut state = serializer.serialize_struct("QueryContext", 5)?;
712 state.serialize_field("query", &self.query)?;
713 state.serialize_field("normalized", &self.normalized)?;
714 state.serialize_field("tables", &self.tables)?;
715 state.serialize_field("is_read_only", &self.is_read_only)?;
716 state.serialize_field("hook_context", &self.hook_context)?;
717 state.end()
718 }
719}
720
721fn get_typed<T, P, R>(
724 instance: &Instance,
725 store: &mut Store<T>,
726 name: &str,
727) -> Result<TypedFunc<P, R>, PluginError>
728where
729 P: wasmtime::WasmParams,
730 R: wasmtime::WasmResults,
731{
732 instance
733 .get_typed_func::<P, R>(store, name)
734 .map_err(|e| PluginError::ExecutionError(format!("export `{}`: {}", name, e)))
735}
736
737fn write_memory<T>(
740 memory: &Memory,
741 store: &mut Store<T>,
742 ptr: i32,
743 bytes: &[u8],
744) -> Result<(), PluginError> {
745 memory.write(store, ptr as usize, bytes).map_err(|e| {
746 PluginError::ExecutionError(format!("memory.write @ {}: {}", ptr, e))
747 })
748}
749
750fn read_memory<T>(
752 memory: &Memory,
753 store: &Store<T>,
754 ptr: i32,
755 len: i32,
756) -> Result<Vec<u8>, PluginError> {
757 if len <= 0 {
758 return Ok(Vec::new());
759 }
760 let mut out = vec![0u8; len as usize];
761 memory.read(store, ptr as usize, &mut out).map_err(|e| {
762 PluginError::ExecutionError(format!("memory.read @ {}+{}: {}", ptr, len, e))
763 })?;
764 Ok(out)
765}
766
767impl serde::Serialize for AuthRequest {
768 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
769 where
770 S: serde::Serializer,
771 {
772 use serde::ser::SerializeStruct;
773 let mut state = serializer.serialize_struct("AuthRequest", 5)?;
774 state.serialize_field("headers", &self.headers)?;
775 state.serialize_field("username", &self.username)?;
776 state.serialize_field("password", &self.password)?;
777 state.serialize_field("client_ip", &self.client_ip)?;
778 state.serialize_field("database", &self.database)?;
779 state.end()
780 }
781}
782
783impl<'de> serde::Deserialize<'de> for PreQueryResult {
784 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
785 where
786 D: serde::Deserializer<'de>,
787 {
788 #[derive(serde::Deserialize)]
789 struct Helper {
790 action: String,
791 #[serde(default)]
792 value: Option<String>,
793 #[serde(default)]
794 data: Option<Vec<u8>>,
795 }
796
797 let helper = Helper::deserialize(deserializer)?;
798 match helper.action.as_str() {
799 "continue" => Ok(PreQueryResult::Continue),
800 "rewrite" => Ok(PreQueryResult::Rewrite(
801 helper.value.unwrap_or_default(),
802 )),
803 "block" => Ok(PreQueryResult::Block(
804 helper.value.unwrap_or_default(),
805 )),
806 "cached" => Ok(PreQueryResult::Cached(
807 helper.data.unwrap_or_default(),
808 )),
809 _ => Ok(PreQueryResult::Continue),
810 }
811 }
812}
813
814impl<'de> serde::Deserialize<'de> for AuthResult {
815 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
816 where
817 D: serde::Deserializer<'de>,
818 {
819 #[derive(serde::Deserialize)]
820 struct Helper {
821 action: String,
822 #[serde(default)]
823 identity: Option<IdentityHelper>,
824 #[serde(default)]
825 message: Option<String>,
826 }
827
828 #[derive(serde::Deserialize)]
829 struct IdentityHelper {
830 user_id: String,
831 username: String,
832 #[serde(default)]
833 roles: Vec<String>,
834 #[serde(default)]
835 tenant_id: Option<String>,
836 }
837
838 let helper = Helper::deserialize(deserializer)?;
839 match helper.action.as_str() {
840 "success" => {
841 let id = helper.identity.unwrap_or(IdentityHelper {
842 user_id: String::new(),
843 username: String::new(),
844 roles: Vec::new(),
845 tenant_id: None,
846 });
847 Ok(AuthResult::Success(super::Identity {
848 user_id: id.user_id,
849 username: id.username,
850 roles: id.roles,
851 tenant_id: id.tenant_id,
852 claims: std::collections::HashMap::new(),
853 }))
854 }
855 "denied" => Ok(AuthResult::Denied(
856 helper.message.unwrap_or_default(),
857 )),
858 "defer" => Ok(AuthResult::Defer),
859 _ => Ok(AuthResult::Defer),
860 }
861 }
862}
863
864impl<'de> serde::Deserialize<'de> for RouteResult {
865 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
866 where
867 D: serde::Deserializer<'de>,
868 {
869 #[derive(serde::Deserialize)]
870 struct Helper {
871 action: String,
872 #[serde(default)]
873 target: Option<String>,
874 #[serde(default)]
875 reason: Option<String>,
876 }
877
878 let helper = Helper::deserialize(deserializer)?;
879 match helper.action.as_str() {
880 "default" => Ok(RouteResult::Default),
881 "node" => Ok(RouteResult::Node(helper.target.unwrap_or_default())),
882 "primary" => Ok(RouteResult::Primary),
883 "standby" => Ok(RouteResult::Standby),
884 "branch" => Ok(RouteResult::Branch(helper.target.unwrap_or_default())),
885 "block" => Ok(RouteResult::Block(
889 helper.reason.unwrap_or_else(|| "blocked by plugin".to_string()),
890 )),
891 _ => Ok(RouteResult::Default),
892 }
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899
900 fn build_test_module(engine: &Engine) -> Module {
909 const PAYLOAD: &[u8] = b"hello-from-wasm";
910 let payload_hex: String = PAYLOAD
911 .iter()
912 .map(|b| format!("\\{:02x}", b))
913 .collect();
914 let wat = format!(
915 r#"
916 (module
917 (memory (export "memory") 1)
918
919 ;; Trivial alloc: always returns offset 4096 (test inputs
920 ;; are tiny so non-overlapping reuse is fine here). Real
921 ;; plugins ship a real allocator; the runtime only cares
922 ;; that `alloc` returns a writable address.
923 (func (export "alloc") (param $size i32) (result i32)
924 (i32.const 4096))
925
926 (func (export "dealloc") (param $ptr i32) (param $size i32)
927 (drop (local.get $ptr))
928 (drop (local.get $size)))
929
930 ;; Result-returning hook: writes PAYLOAD at offset 1024 and
931 ;; returns (1024 << 32) | PAYLOAD.len.
932 (func (export "pre_query")
933 (param $in_ptr i32) (param $in_len i32) (result i64)
934 (i64.or
935 (i64.shl (i64.const 1024) (i64.const 32))
936 (i64.const {payload_len})))
937
938 ;; Observer hook: takes args, returns nothing.
939 (func (export "post_query")
940 (param $in_ptr i32) (param $in_len i32)
941 (drop (local.get $in_ptr)))
942
943 (data (i32.const 1024) "{payload}")
944 )
945 "#,
946 payload = payload_hex,
947 payload_len = PAYLOAD.len(),
948 );
949 let bytes = wat::parse_str(&wat).expect("wat parses");
950 Module::from_binary(engine, &bytes).expect("module compiles")
951 }
952
953 fn build_spin_module(engine: &Engine) -> Module {
957 let wat = r#"
958 (module
959 (memory (export "memory") 1)
960 (func (export "alloc") (param i32) (result i32) (i32.const 4096))
961 (func (export "dealloc") (param i32) (param i32))
962 (func (export "pre_query") (param i32) (param i32) (result i64)
963 (loop $l (br $l))
964 (i64.const 0)))
965 "#;
966 let bytes = wat::parse_str(wat).expect("wat parses");
967 Module::from_binary(engine, &bytes).expect("module compiles")
968 }
969
970 #[test]
974 fn test_call_hook_enforces_timeout() {
975 let mut config = PluginRuntimeConfig::default();
976 config.fuel_metering = false; config.timeout = Duration::from_millis(100);
978 let runtime = Arc::new(WasmPluginRuntime::new(&config).unwrap());
979
980 let module = build_spin_module(runtime.engine());
981 let mut metadata = PluginMetadata::default();
982 metadata.name = "spin".to_string();
983 metadata.hooks = vec![HookType::PreQuery];
984 let plugin = Arc::new(LoadedPlugin::new(
985 metadata,
986 PathBuf::from("/test/spin.wasm"),
987 module,
988 PluginSandbox::default(),
989 ));
990
991 let (tx, rx) = std::sync::mpsc::channel();
992 {
993 let r = runtime.clone();
994 let p = plugin.clone();
995 std::thread::spawn(move || {
996 let res = r.call_hook(&p, HookType::PreQuery, b"{}");
997 let _ = tx.send(res.is_err());
998 });
999 }
1000 match rx.recv_timeout(Duration::from_secs(5)) {
1001 Ok(is_err) => assert!(is_err, "runaway plugin should trap with an error"),
1002 Err(_) => panic!("call_hook did not return within 5s — epoch timeout not enforced"),
1003 }
1004 }
1005
1006 #[test]
1007 fn test_plugin_error_display() {
1008 let err = PluginError::LoadError("test".to_string());
1009 assert!(err.to_string().contains("Load error"));
1010
1011 let err = PluginError::Timeout("plugin-a".to_string());
1012 assert!(err.to_string().contains("Timeout"));
1013 }
1014
1015 #[test]
1016 fn test_plugin_state() {
1017 assert_eq!(PluginState::Running, PluginState::Running);
1018 assert_ne!(PluginState::Running, PluginState::Paused);
1019 }
1020
1021 #[test]
1022 fn test_runtime_creation() {
1023 let config = PluginRuntimeConfig::default();
1024 let runtime = WasmPluginRuntime::new(&config);
1025 assert!(runtime.is_ok());
1026 }
1027
1028 #[test]
1029 fn test_runtime_stats() {
1030 let config = PluginRuntimeConfig::default();
1031 let runtime = WasmPluginRuntime::new(&config).unwrap();
1032 let stats = runtime.stats();
1033
1034 assert_eq!(stats.cached_modules, 0);
1035 assert!(stats.fuel_metering_enabled);
1036 }
1037
1038 #[test]
1039 fn test_loaded_plugin_invocation_count() {
1040 let engine = Engine::default();
1043 let module = build_test_module(&engine);
1044 let metadata = PluginMetadata::default();
1045 let sandbox = PluginSandbox::default();
1046 let plugin = LoadedPlugin::new(
1047 metadata,
1048 PathBuf::from("/test/plugin.wasm"),
1049 module,
1050 sandbox,
1051 );
1052
1053 assert_eq!(plugin.invocation_count(), 0);
1054 plugin.record_invocation();
1055 assert_eq!(plugin.invocation_count(), 1);
1056 plugin.record_invocation();
1057 assert_eq!(plugin.invocation_count(), 2);
1058 }
1059
1060 #[test]
1065 fn test_call_hook_roundtrips_real_wasm() {
1066 let mut config = PluginRuntimeConfig::default();
1067 config.fuel_metering = false;
1070 let runtime = WasmPluginRuntime::new(&config).unwrap();
1071
1072 let module = build_test_module(runtime.engine());
1073 let mut metadata = PluginMetadata::default();
1074 metadata.name = "test-roundtrip".to_string();
1075 metadata.hooks = vec![HookType::PreQuery, HookType::PostQuery];
1076
1077 let plugin = LoadedPlugin::new(
1078 metadata,
1079 PathBuf::from("/test/roundtrip.wasm"),
1080 module,
1081 PluginSandbox::default(),
1082 );
1083 let bytes = runtime
1087 .call_hook(&plugin, HookType::PreQuery, b"ignored input")
1088 .expect("pre_query call");
1089 assert_eq!(bytes, b"hello-from-wasm");
1090 assert_eq!(plugin.invocation_count(), 1);
1091
1092 let out = runtime
1094 .call_hook(&plugin, HookType::PostQuery, b"some bytes")
1095 .expect("post_query call");
1096 assert!(out.is_empty());
1097 assert_eq!(plugin.invocation_count(), 2);
1098 }
1099
1100 #[test]
1103 fn test_call_hook_rejects_undeclared_hook() {
1104 let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1105 let module = build_test_module(runtime.engine());
1106 let mut metadata = PluginMetadata::default();
1107 metadata.hooks = vec![]; let plugin = LoadedPlugin::new(
1109 metadata,
1110 PathBuf::from("/test/empty.wasm"),
1111 module,
1112 PluginSandbox::default(),
1113 );
1114 let err = runtime
1115 .call_hook(&plugin, HookType::PreQuery, &[])
1116 .unwrap_err();
1117 assert!(matches!(err, PluginError::HookNotFound(_)));
1118 }
1119
1120 #[test]
1123 fn test_call_hook_missing_export_returns_error() {
1124 let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1125 let module = build_test_module(runtime.engine());
1126 let mut metadata = PluginMetadata::default();
1127 metadata.hooks = vec![HookType::Authenticate];
1129 let plugin = LoadedPlugin::new(
1130 metadata,
1131 PathBuf::from("/test/missing.wasm"),
1132 module,
1133 PluginSandbox::default(),
1134 );
1135 let err = runtime
1136 .call_hook(&plugin, HookType::Authenticate, &[])
1137 .unwrap_err();
1138 assert!(matches!(err, PluginError::ExecutionError(_)));
1139 }
1140
1141 fn build_kv_test_module(engine: &Engine) -> Module {
1145 let wat = r#"
1149 (module
1150 (import "env" "kv_set"
1151 (func $kv_set (param i32 i32 i32 i32) (result i32)))
1152 (memory (export "memory") 1)
1153
1154 (data (i32.const 100) "key")
1155 (data (i32.const 200) "value")
1156
1157 (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1158 (func (export "dealloc") (param i32 i32))
1159
1160 ;; pre_query: kv_set("key", "value"); return 0 (no payload).
1161 (func (export "pre_query")
1162 (param $in_ptr i32) (param $in_len i32) (result i64)
1163 (drop (call $kv_set
1164 (i32.const 100) (i32.const 3)
1165 (i32.const 200) (i32.const 5)))
1166 (i64.const 0))
1167 )
1168 "#;
1169 let bytes = wat::parse_str(wat).expect("kv-wat parses");
1170 Module::from_binary(engine, &bytes).expect("kv module compiles")
1171 }
1172
1173 #[test]
1177 fn test_host_kv_import_persists_value() {
1178 let mut config = PluginRuntimeConfig::default();
1179 config.fuel_metering = false;
1180 let runtime = WasmPluginRuntime::new(&config).unwrap();
1181
1182 let module = build_kv_test_module(runtime.engine());
1183 let mut metadata = PluginMetadata::default();
1184 metadata.name = "kv-test-plugin".to_string();
1185 metadata.hooks = vec![HookType::PreQuery];
1186
1187 let plugin = LoadedPlugin::new(
1188 metadata,
1189 PathBuf::from("/test/kv.wasm"),
1190 module,
1191 PluginSandbox::default(),
1192 );
1193
1194 assert_eq!(runtime.kv().get("kv-test-plugin", b"key"), None);
1196
1197 let _ = runtime
1198 .call_hook(&plugin, HookType::PreQuery, &[])
1199 .expect("pre_query call");
1200
1201 assert_eq!(
1204 runtime.kv().get("kv-test-plugin", b"key"),
1205 Some(b"value".to_vec())
1206 );
1207 assert_eq!(runtime.kv().get("other-plugin", b"key"), None);
1209 }
1210
1211 fn build_sha256_test_module(engine: &Engine) -> Module {
1219 let wat = r#"
1220 (module
1221 (import "env" "sha256_hex"
1222 (func $sha256_hex (param i32 i32 i32) (result i32)))
1223 (memory (export "memory") 1)
1224
1225 (data (i32.const 100) "abc")
1226
1227 (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1228 (func (export "dealloc") (param i32 i32))
1229
1230 (func (export "pre_query")
1231 (param $in_ptr i32) (param $in_len i32) (result i64)
1232 (drop (call $sha256_hex
1233 (i32.const 100) (i32.const 3)
1234 (i32.const 200)))
1235 (i64.or
1236 (i64.shl (i64.const 200) (i64.const 32))
1237 (i64.const 64)))
1238 )
1239 "#;
1240 let bytes = wat::parse_str(wat).expect("sha256-wat parses");
1241 Module::from_binary(engine, &bytes).expect("sha256 module compiles")
1242 }
1243
1244 #[test]
1248 fn test_route_result_deserialises_block_with_reason() {
1249 let json = r#"{"action":"block","reason":"cross-region read forbidden"}"#;
1250 let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1251 match r {
1252 RouteResult::Block(reason) => {
1253 assert_eq!(reason, "cross-region read forbidden");
1254 }
1255 other => panic!("expected Block, got {:?}", other),
1256 }
1257 }
1258
1259 #[test]
1262 fn test_route_result_block_defaults_reason_when_missing() {
1263 let json = r#"{"action":"block"}"#;
1264 let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1265 match r {
1266 RouteResult::Block(reason) => {
1267 assert!(!reason.is_empty(), "default reason should not be empty");
1268 }
1269 other => panic!("expected Block, got {:?}", other),
1270 }
1271 }
1272
1273 #[test]
1277 fn test_host_sha256_import_matches_rfc_6234_vector() {
1278 const SHA256_OF_ABC: &[u8; 64] =
1279 b"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad";
1280
1281 let mut config = PluginRuntimeConfig::default();
1282 config.fuel_metering = false;
1283 let runtime = WasmPluginRuntime::new(&config).unwrap();
1284
1285 let module = build_sha256_test_module(runtime.engine());
1286 let mut metadata = PluginMetadata::default();
1287 metadata.name = "sha256-test-plugin".to_string();
1288 metadata.hooks = vec![HookType::PreQuery];
1289
1290 let plugin = LoadedPlugin::new(
1291 metadata,
1292 PathBuf::from("/test/sha256.wasm"),
1293 module,
1294 PluginSandbox::default(),
1295 );
1296
1297 let out = runtime
1298 .call_hook(&plugin, HookType::PreQuery, &[])
1299 .expect("pre_query call");
1300 assert_eq!(out.len(), 64);
1301 assert_eq!(&out[..], SHA256_OF_ABC);
1302 }
1303}