1use std::collections::HashMap;
29use std::path::PathBuf;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, OnceLock};
32use std::time::{Duration, Instant};
33
34use parking_lot::RwLock;
35use wasmtime::{Engine, Instance, InstancePre, Linker, Memory, Module, Store, TypedFunc};
36
37use super::config::PluginRuntimeConfig;
38use super::host_functions::HostFunctionRegistry;
39use super::host_imports::{register_crypto_imports, register_kv_imports, KvBackend, StoreCtx};
40use super::sandbox::{PluginSandbox, ResourceLimits, SecurityPolicy};
41use super::{
42 AuthRequest, AuthResult, HookType, PluginMetadata, PreQueryResult, QueryContext, RouteResult,
43};
44
45#[derive(Debug, Clone)]
47pub enum PluginError {
48 LoadError(String),
50
51 InstantiationError(String),
53
54 ExecutionError(String),
56
57 Timeout(String),
59
60 MemoryExceeded(String),
62
63 SecurityViolation(String),
65
66 InvalidManifest(String),
68
69 HookNotFound(String),
71
72 RuntimeError(String),
74}
75
76impl std::fmt::Display for PluginError {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 match self {
79 PluginError::LoadError(msg) => write!(f, "Load error: {}", msg),
80 PluginError::InstantiationError(msg) => write!(f, "Instantiation error: {}", msg),
81 PluginError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
82 PluginError::Timeout(msg) => write!(f, "Timeout: {}", msg),
83 PluginError::MemoryExceeded(msg) => write!(f, "Memory exceeded: {}", msg),
84 PluginError::SecurityViolation(msg) => write!(f, "Security violation: {}", msg),
85 PluginError::InvalidManifest(msg) => write!(f, "Invalid manifest: {}", msg),
86 PluginError::HookNotFound(msg) => write!(f, "Hook not found: {}", msg),
87 PluginError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
88 }
89 }
90}
91
92impl std::error::Error for PluginError {}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum PluginState {
97 Loading,
99
100 Running,
102
103 Paused,
105
106 Error(String),
108
109 Unloading,
111}
112
113pub struct LoadedPlugin {
115 pub metadata: PluginMetadata,
117
118 pub state: PluginState,
120
121 pub path: PathBuf,
123
124 module: Module,
127
128 instance_pre: OnceLock<InstancePre<StoreCtx>>,
134
135 #[allow(dead_code)]
137 sandbox: PluginSandbox,
138
139 instance_data: RwLock<PluginInstanceData>,
141
142 loaded_at: Instant,
144
145 last_invoked: RwLock<Option<Instant>>,
147
148 invocation_count: std::sync::atomic::AtomicU64,
150}
151
152struct PluginInstanceData {
154 memory_used: usize,
156
157 fuel_consumed: u64,
159
160 #[allow(dead_code)]
162 state: HashMap<String, Vec<u8>>,
163}
164
165impl LoadedPlugin {
166 pub fn new(
168 metadata: PluginMetadata,
169 path: PathBuf,
170 module: Module,
171 sandbox: PluginSandbox,
172 ) -> Self {
173 Self {
174 metadata,
175 state: PluginState::Running,
176 path,
177 module,
178 instance_pre: OnceLock::new(),
179 sandbox,
180 instance_data: RwLock::new(PluginInstanceData {
181 memory_used: 0,
182 fuel_consumed: 0,
183 state: HashMap::new(),
184 }),
185 loaded_at: Instant::now(),
186 last_invoked: RwLock::new(None),
187 invocation_count: std::sync::atomic::AtomicU64::new(0),
188 }
189 }
190
191 #[allow(dead_code)]
195 pub(crate) fn module(&self) -> &Module {
196 &self.module
197 }
198
199 pub fn memory_used(&self) -> usize {
201 self.instance_data.read().memory_used
202 }
203
204 pub fn invocation_count(&self) -> u64 {
206 self.invocation_count
207 .load(std::sync::atomic::Ordering::Relaxed)
208 }
209
210 pub fn uptime(&self) -> Duration {
212 self.loaded_at.elapsed()
213 }
214
215 pub fn last_invoked(&self) -> Option<Instant> {
217 *self.last_invoked.read()
218 }
219
220 pub fn record_invocation(&self) {
222 self.invocation_count
223 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
224 *self.last_invoked.write() = Some(Instant::now());
225 }
226}
227
228pub struct WasmPluginRuntime {
230 config: PluginRuntimeConfig,
232
233 engine: Engine,
236
237 linker: Linker<StoreCtx>,
242
243 epoch_stop: Arc<AtomicBool>,
245
246 #[allow(dead_code)]
248 host_functions: Arc<HostFunctionRegistry>,
249
250 kv: KvBackend,
253
254 module_cache: RwLock<HashMap<PathBuf, Module>>,
257
258 default_policy: SecurityPolicy,
260
261 created_at: Instant,
263}
264
265impl WasmPluginRuntime {
266 pub fn new(config: &PluginRuntimeConfig) -> Result<Self, PluginError> {
268 let host_functions = Arc::new(HostFunctionRegistry::new());
269
270 let mut engine_config = wasmtime::Config::new();
271 if config.fuel_metering {
272 engine_config.consume_fuel(true);
273 }
274 engine_config.epoch_interruption(true);
277
278 let engine = Engine::new(&engine_config)
279 .map_err(|e| PluginError::RuntimeError(format!("wasmtime engine init: {}", e)))?;
280
281 let mut linker: Linker<StoreCtx> = Linker::new(&engine);
285 register_kv_imports(&mut linker)?;
286 register_crypto_imports(&mut linker)?;
287
288 let epoch_stop = Arc::new(AtomicBool::new(false));
295 {
296 let engine = engine.clone();
297 let stop = epoch_stop.clone();
298 std::thread::Builder::new()
299 .name("wasm-epoch-ticker".into())
300 .spawn(move || {
301 while !stop.load(std::sync::atomic::Ordering::Relaxed) {
302 std::thread::sleep(Duration::from_millis(1));
303 engine.increment_epoch();
304 }
305 })
306 .ok();
307 }
308
309 let default_policy = SecurityPolicy {
310 allowed_hosts: vec!["localhost".to_string()],
311 allowed_paths: vec![config.plugin_dir.clone()],
312 max_memory: config.memory_limit,
313 max_execution_time: config.timeout,
314 allow_network: false,
315 allow_filesystem: false,
316 };
317
318 Ok(Self {
319 config: config.clone(),
320 engine,
321 linker,
322 epoch_stop,
323 host_functions,
324 kv: KvBackend::new(),
325 module_cache: RwLock::new(HashMap::new()),
326 default_policy,
327 created_at: Instant::now(),
328 })
329 }
330
331 pub fn kv(&self) -> &KvBackend {
334 &self.kv
335 }
336
337 #[allow(dead_code)]
340 pub(crate) fn linker(&self) -> &Linker<StoreCtx> {
341 &self.linker
342 }
343
344 #[allow(dead_code)]
347 pub(crate) fn engine(&self) -> &Engine {
348 &self.engine
349 }
350
351 pub fn config(&self) -> &PluginRuntimeConfig {
355 &self.config
356 }
357
358 pub fn instantiate(
360 &self,
361 manifest: &super::loader::PluginManifest,
362 wasm_bytes: &[u8],
363 ) -> Result<LoadedPlugin, PluginError> {
364 if wasm_bytes.len() < 8 {
366 return Err(PluginError::LoadError("WASM module too small".to_string()));
367 }
368
369 if &wasm_bytes[0..4] != b"\x00asm" {
371 return Err(PluginError::LoadError(
372 "Invalid WASM magic number".to_string(),
373 ));
374 }
375
376 let metadata = PluginMetadata {
378 name: manifest.name.clone(),
379 version: manifest.version.clone(),
380 description: manifest.description.clone(),
381 author: manifest.author.clone(),
382 hooks: manifest.hooks.clone(),
383 permissions: manifest.permissions.clone(),
384 min_memory: manifest.min_memory,
385 max_memory: manifest.max_memory.min(self.config.memory_limit),
386 };
387
388 let resource_limits = ResourceLimits {
390 max_memory: metadata.max_memory,
391 max_execution_time: self.config.timeout,
392 max_fuel: if self.config.fuel_metering {
393 Some(self.config.fuel_limit)
394 } else {
395 None
396 },
397 max_table_elements: 10000,
398 max_instances: 1,
399 };
400
401 let sandbox = PluginSandbox::new(
402 self.default_policy.clone(),
403 resource_limits,
404 manifest.permissions.clone(),
405 );
406
407 let module = Module::from_binary(&self.engine, wasm_bytes)
410 .map_err(|e| PluginError::InstantiationError(format!("wasmtime compile: {}", e)))?;
411
412 {
414 let mut cache = self.module_cache.write();
415 cache.insert(manifest.path.clone(), module.clone());
416 }
417
418 Ok(LoadedPlugin::new(
419 metadata,
420 manifest.path.clone(),
421 module,
422 sandbox,
423 ))
424 }
425
426 pub fn call_hook(
442 &self,
443 plugin: &LoadedPlugin,
444 hook: HookType,
445 args: &[u8],
446 ) -> Result<Vec<u8>, PluginError> {
447 if !plugin.metadata.hooks.contains(&hook) {
449 return Err(PluginError::HookNotFound(format!(
450 "Plugin {} does not support hook {:?}",
451 plugin.metadata.name, hook
452 )));
453 }
454
455 if plugin.state != PluginState::Running {
457 return Err(PluginError::ExecutionError(format!(
458 "Plugin {} is not running (state: {:?})",
459 plugin.metadata.name, plugin.state
460 )));
461 }
462
463 plugin.record_invocation();
465
466 let store_ctx = StoreCtx {
470 plugin_name: plugin.metadata.name.clone(),
471 kv: self.kv.clone(),
472 };
473 let mut store: Store<StoreCtx> = Store::new(&self.engine, store_ctx);
474 if self.config.fuel_metering {
475 store
477 .set_fuel(self.config.fuel_limit)
478 .map_err(|e| PluginError::RuntimeError(format!("set_fuel: {}", e)))?;
479 }
480 let deadline_ticks = self.config.timeout.as_millis().max(1).min(u64::MAX as u128) as u64;
486 store.set_epoch_deadline(deadline_ticks);
487
488 let instance_pre = match plugin.instance_pre.get() {
494 Some(ip) => ip,
495 None => {
496 let ip = self.linker.instantiate_pre(&plugin.module).map_err(|e| {
497 PluginError::InstantiationError(format!(
498 "pre-instantiate {}: {}",
499 plugin.metadata.name, e
500 ))
501 })?;
502 let _ = plugin.instance_pre.set(ip);
504 plugin.instance_pre.get().expect("just set")
505 }
506 };
507 let instance = instance_pre.instantiate(&mut store).map_err(|e| {
508 PluginError::InstantiationError(format!("instantiate {}: {}", plugin.metadata.name, e))
509 })?;
510
511 let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
512 PluginError::ExecutionError(format!(
513 "plugin {} does not export `memory`",
514 plugin.metadata.name
515 ))
516 })?;
517
518 let alloc = get_typed::<_, i32, i32>(&instance, &mut store, "alloc")?;
519 let dealloc = get_typed::<_, (i32, i32), ()>(&instance, &mut store, "dealloc")?;
520
521 let in_len = args.len() as i32;
524 let in_ptr = alloc
525 .call(&mut store, in_len)
526 .map_err(|e| PluginError::ExecutionError(format!("alloc({}): {}", in_len, e)))?;
527 if in_len > 0 {
528 write_memory(&memory, &mut store, in_ptr, args)?;
529 }
530
531 let export_name = hook.export_name();
534 let result_bytes = match get_typed::<_, (i32, i32), i64>(&instance, &mut store, export_name)
535 {
536 Ok(hook_fn) => {
537 let packed = hook_fn.call(&mut store, (in_ptr, in_len)).map_err(|e| {
538 PluginError::ExecutionError(format!("hook {} call: {}", export_name, e))
539 })?;
540 let out_ptr = (packed >> 32) as i32;
541 let out_len = (packed & 0xFFFF_FFFF) as i32;
542 if out_len > 0 {
543 let bytes = read_memory(&memory, &store, out_ptr, out_len)?;
544 let _ = dealloc.call(&mut store, (out_ptr, out_len));
546 bytes
547 } else {
548 Vec::new()
549 }
550 }
551 Err(_) => {
552 let observer = get_typed::<_, (i32, i32), ()>(&instance, &mut store, export_name)?;
554 observer.call(&mut store, (in_ptr, in_len)).map_err(|e| {
555 PluginError::ExecutionError(format!(
556 "observer hook {} call: {}",
557 export_name, e
558 ))
559 })?;
560 Vec::new()
561 }
562 };
563
564 let _ = dealloc.call(&mut store, (in_ptr, in_len));
567
568 if self.config.fuel_metering {
570 if let Ok(remaining) = store.get_fuel() {
571 let consumed = self.config.fuel_limit.saturating_sub(remaining);
572 plugin.instance_data.write().fuel_consumed = consumed;
573 }
574 }
575 plugin.instance_data.write().memory_used = memory.data_size(&store);
576
577 Ok(result_bytes)
578 }
579
580 pub fn call_pre_query(
582 &self,
583 plugin: &LoadedPlugin,
584 ctx: &QueryContext,
585 ) -> Result<PreQueryResult, PluginError> {
586 let args = serde_json::to_vec(ctx).map_err(|e| {
588 PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
589 })?;
590
591 let result = self.call_hook(plugin, HookType::PreQuery, &args)?;
593
594 if result.is_empty() {
596 return Ok(PreQueryResult::Continue);
597 }
598
599 serde_json::from_slice(&result).map_err(|e| {
600 PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
601 })
602 }
603
604 pub fn call_authenticate(
606 &self,
607 plugin: &LoadedPlugin,
608 request: &AuthRequest,
609 ) -> Result<AuthResult, PluginError> {
610 let args = serde_json::to_vec(request).map_err(|e| {
612 PluginError::ExecutionError(format!("Failed to serialize request: {}", e))
613 })?;
614
615 let result = self.call_hook(plugin, HookType::Authenticate, &args)?;
617
618 if result.is_empty() {
620 return Ok(AuthResult::Defer);
621 }
622
623 serde_json::from_slice(&result).map_err(|e| {
624 PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
625 })
626 }
627
628 pub fn call_route(
630 &self,
631 plugin: &LoadedPlugin,
632 ctx: &QueryContext,
633 ) -> Result<RouteResult, PluginError> {
634 let args = serde_json::to_vec(ctx).map_err(|e| {
636 PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
637 })?;
638
639 let result = self.call_hook(plugin, HookType::Route, &args)?;
641
642 if result.is_empty() {
644 return Ok(RouteResult::Default);
645 }
646
647 serde_json::from_slice(&result).map_err(|e| {
648 PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
649 })
650 }
651
652 pub fn stats(&self) -> RuntimeStats {
654 RuntimeStats {
655 uptime: self.created_at.elapsed(),
656 cached_modules: self.module_cache.read().len(),
657 fuel_metering_enabled: self.config.fuel_metering,
658 memory_limit: self.config.memory_limit,
659 timeout: self.config.timeout,
660 }
661 }
662}
663
664impl Drop for WasmPluginRuntime {
665 fn drop(&mut self) {
666 self.epoch_stop
669 .store(true, std::sync::atomic::Ordering::Relaxed);
670 }
671}
672
673#[derive(Debug, Clone)]
675pub struct RuntimeStats {
676 pub uptime: Duration,
678
679 pub cached_modules: usize,
681
682 pub fuel_metering_enabled: bool,
684
685 pub memory_limit: usize,
687
688 pub timeout: Duration,
690}
691
692impl serde::Serialize for QueryContext {
697 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
698 where
699 S: serde::Serializer,
700 {
701 use serde::ser::SerializeStruct;
702 let mut state = serializer.serialize_struct("QueryContext", 5)?;
703 state.serialize_field("query", &self.query)?;
704 state.serialize_field("normalized", &self.normalized)?;
705 state.serialize_field("tables", &self.tables)?;
706 state.serialize_field("is_read_only", &self.is_read_only)?;
707 state.serialize_field("hook_context", &self.hook_context)?;
708 state.end()
709 }
710}
711
712fn get_typed<T, P, R>(
715 instance: &Instance,
716 store: &mut Store<T>,
717 name: &str,
718) -> Result<TypedFunc<P, R>, PluginError>
719where
720 P: wasmtime::WasmParams,
721 R: wasmtime::WasmResults,
722{
723 instance
724 .get_typed_func::<P, R>(store, name)
725 .map_err(|e| PluginError::ExecutionError(format!("export `{}`: {}", name, e)))
726}
727
728fn write_memory<T>(
731 memory: &Memory,
732 store: &mut Store<T>,
733 ptr: i32,
734 bytes: &[u8],
735) -> Result<(), PluginError> {
736 memory
737 .write(store, ptr as usize, bytes)
738 .map_err(|e| PluginError::ExecutionError(format!("memory.write @ {}: {}", ptr, e)))
739}
740
741fn read_memory<T>(
743 memory: &Memory,
744 store: &Store<T>,
745 ptr: i32,
746 len: i32,
747) -> Result<Vec<u8>, PluginError> {
748 if len <= 0 {
749 return Ok(Vec::new());
750 }
751 let mut out = vec![0u8; len as usize];
752 memory.read(store, ptr as usize, &mut out).map_err(|e| {
753 PluginError::ExecutionError(format!("memory.read @ {}+{}: {}", ptr, len, e))
754 })?;
755 Ok(out)
756}
757
758impl serde::Serialize for AuthRequest {
759 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
760 where
761 S: serde::Serializer,
762 {
763 use serde::ser::SerializeStruct;
764 let mut state = serializer.serialize_struct("AuthRequest", 5)?;
765 state.serialize_field("headers", &self.headers)?;
766 state.serialize_field("username", &self.username)?;
767 state.serialize_field("password", &self.password)?;
768 state.serialize_field("client_ip", &self.client_ip)?;
769 state.serialize_field("database", &self.database)?;
770 state.end()
771 }
772}
773
774impl<'de> serde::Deserialize<'de> for PreQueryResult {
775 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
776 where
777 D: serde::Deserializer<'de>,
778 {
779 #[derive(serde::Deserialize)]
780 struct Helper {
781 action: String,
782 #[serde(default)]
783 value: Option<String>,
784 #[serde(default)]
785 data: Option<Vec<u8>>,
786 }
787
788 let helper = Helper::deserialize(deserializer)?;
789 match helper.action.as_str() {
790 "continue" => Ok(PreQueryResult::Continue),
791 "rewrite" => Ok(PreQueryResult::Rewrite(helper.value.unwrap_or_default())),
792 "block" => Ok(PreQueryResult::Block(helper.value.unwrap_or_default())),
793 "cached" => Ok(PreQueryResult::Cached(helper.data.unwrap_or_default())),
794 _ => Ok(PreQueryResult::Continue),
795 }
796 }
797}
798
799impl<'de> serde::Deserialize<'de> for AuthResult {
800 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
801 where
802 D: serde::Deserializer<'de>,
803 {
804 #[derive(serde::Deserialize)]
805 struct Helper {
806 action: String,
807 #[serde(default)]
808 identity: Option<IdentityHelper>,
809 #[serde(default)]
810 message: Option<String>,
811 }
812
813 #[derive(serde::Deserialize)]
814 struct IdentityHelper {
815 user_id: String,
816 username: String,
817 #[serde(default)]
818 roles: Vec<String>,
819 #[serde(default)]
820 tenant_id: Option<String>,
821 }
822
823 let helper = Helper::deserialize(deserializer)?;
824 match helper.action.as_str() {
825 "success" => {
826 let id = helper.identity.unwrap_or(IdentityHelper {
827 user_id: String::new(),
828 username: String::new(),
829 roles: Vec::new(),
830 tenant_id: None,
831 });
832 Ok(AuthResult::Success(super::Identity {
833 user_id: id.user_id,
834 username: id.username,
835 roles: id.roles,
836 tenant_id: id.tenant_id,
837 claims: std::collections::HashMap::new(),
838 }))
839 }
840 "denied" => Ok(AuthResult::Denied(helper.message.unwrap_or_default())),
841 "defer" => Ok(AuthResult::Defer),
842 _ => Ok(AuthResult::Defer),
843 }
844 }
845}
846
847impl<'de> serde::Deserialize<'de> for RouteResult {
848 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
849 where
850 D: serde::Deserializer<'de>,
851 {
852 #[derive(serde::Deserialize)]
853 struct Helper {
854 action: String,
855 #[serde(default)]
856 target: Option<String>,
857 #[serde(default)]
858 reason: Option<String>,
859 }
860
861 let helper = Helper::deserialize(deserializer)?;
862 match helper.action.as_str() {
863 "default" => Ok(RouteResult::Default),
864 "node" => Ok(RouteResult::Node(helper.target.unwrap_or_default())),
865 "primary" => Ok(RouteResult::Primary),
866 "standby" => Ok(RouteResult::Standby),
867 "branch" => Ok(RouteResult::Branch(helper.target.unwrap_or_default())),
868 "block" => Ok(RouteResult::Block(
872 helper
873 .reason
874 .unwrap_or_else(|| "blocked by plugin".to_string()),
875 )),
876 _ => Ok(RouteResult::Default),
877 }
878 }
879}
880
881#[cfg(test)]
882mod tests {
883 use super::*;
884
885 fn build_test_module(engine: &Engine) -> Module {
894 const PAYLOAD: &[u8] = b"hello-from-wasm";
895 let payload_hex: String = PAYLOAD.iter().map(|b| format!("\\{:02x}", b)).collect();
896 let wat = format!(
897 r#"
898 (module
899 (memory (export "memory") 1)
900
901 ;; Trivial alloc: always returns offset 4096 (test inputs
902 ;; are tiny so non-overlapping reuse is fine here). Real
903 ;; plugins ship a real allocator; the runtime only cares
904 ;; that `alloc` returns a writable address.
905 (func (export "alloc") (param $size i32) (result i32)
906 (i32.const 4096))
907
908 (func (export "dealloc") (param $ptr i32) (param $size i32)
909 (drop (local.get $ptr))
910 (drop (local.get $size)))
911
912 ;; Result-returning hook: writes PAYLOAD at offset 1024 and
913 ;; returns (1024 << 32) | PAYLOAD.len.
914 (func (export "pre_query")
915 (param $in_ptr i32) (param $in_len i32) (result i64)
916 (i64.or
917 (i64.shl (i64.const 1024) (i64.const 32))
918 (i64.const {payload_len})))
919
920 ;; Observer hook: takes args, returns nothing.
921 (func (export "post_query")
922 (param $in_ptr i32) (param $in_len i32)
923 (drop (local.get $in_ptr)))
924
925 (data (i32.const 1024) "{payload}")
926 )
927 "#,
928 payload = payload_hex,
929 payload_len = PAYLOAD.len(),
930 );
931 let bytes = wat::parse_str(&wat).expect("wat parses");
932 Module::from_binary(engine, &bytes).expect("module compiles")
933 }
934
935 fn build_spin_module(engine: &Engine) -> Module {
939 let wat = r#"
940 (module
941 (memory (export "memory") 1)
942 (func (export "alloc") (param i32) (result i32) (i32.const 4096))
943 (func (export "dealloc") (param i32) (param i32))
944 (func (export "pre_query") (param i32) (param i32) (result i64)
945 (loop $l (br $l))
946 (i64.const 0)))
947 "#;
948 let bytes = wat::parse_str(wat).expect("wat parses");
949 Module::from_binary(engine, &bytes).expect("module compiles")
950 }
951
952 #[test]
956 fn test_call_hook_enforces_timeout() {
957 let mut config = PluginRuntimeConfig::default();
958 config.fuel_metering = false; config.timeout = Duration::from_millis(100);
960 let runtime = Arc::new(WasmPluginRuntime::new(&config).unwrap());
961
962 let module = build_spin_module(runtime.engine());
963 let mut metadata = PluginMetadata::default();
964 metadata.name = "spin".to_string();
965 metadata.hooks = vec![HookType::PreQuery];
966 let plugin = Arc::new(LoadedPlugin::new(
967 metadata,
968 PathBuf::from("/test/spin.wasm"),
969 module,
970 PluginSandbox::default(),
971 ));
972
973 let (tx, rx) = std::sync::mpsc::channel();
974 {
975 let r = runtime.clone();
976 let p = plugin.clone();
977 std::thread::spawn(move || {
978 let res = r.call_hook(&p, HookType::PreQuery, b"{}");
979 let _ = tx.send(res.is_err());
980 });
981 }
982 match rx.recv_timeout(Duration::from_secs(5)) {
983 Ok(is_err) => assert!(is_err, "runaway plugin should trap with an error"),
984 Err(_) => panic!("call_hook did not return within 5s — epoch timeout not enforced"),
985 }
986 }
987
988 #[test]
989 fn test_plugin_error_display() {
990 let err = PluginError::LoadError("test".to_string());
991 assert!(err.to_string().contains("Load error"));
992
993 let err = PluginError::Timeout("plugin-a".to_string());
994 assert!(err.to_string().contains("Timeout"));
995 }
996
997 #[test]
998 fn test_plugin_state() {
999 assert_eq!(PluginState::Running, PluginState::Running);
1000 assert_ne!(PluginState::Running, PluginState::Paused);
1001 }
1002
1003 #[test]
1004 fn test_runtime_creation() {
1005 let config = PluginRuntimeConfig::default();
1006 let runtime = WasmPluginRuntime::new(&config);
1007 assert!(runtime.is_ok());
1008 }
1009
1010 #[test]
1011 fn test_runtime_stats() {
1012 let config = PluginRuntimeConfig::default();
1013 let runtime = WasmPluginRuntime::new(&config).unwrap();
1014 let stats = runtime.stats();
1015
1016 assert_eq!(stats.cached_modules, 0);
1017 assert!(stats.fuel_metering_enabled);
1018 }
1019
1020 #[test]
1021 fn test_loaded_plugin_invocation_count() {
1022 let engine = Engine::default();
1025 let module = build_test_module(&engine);
1026 let metadata = PluginMetadata::default();
1027 let sandbox = PluginSandbox::default();
1028 let plugin = LoadedPlugin::new(
1029 metadata,
1030 PathBuf::from("/test/plugin.wasm"),
1031 module,
1032 sandbox,
1033 );
1034
1035 assert_eq!(plugin.invocation_count(), 0);
1036 plugin.record_invocation();
1037 assert_eq!(plugin.invocation_count(), 1);
1038 plugin.record_invocation();
1039 assert_eq!(plugin.invocation_count(), 2);
1040 }
1041
1042 #[test]
1047 fn test_call_hook_roundtrips_real_wasm() {
1048 let mut config = PluginRuntimeConfig::default();
1049 config.fuel_metering = false;
1052 let runtime = WasmPluginRuntime::new(&config).unwrap();
1053
1054 let module = build_test_module(runtime.engine());
1055 let mut metadata = PluginMetadata::default();
1056 metadata.name = "test-roundtrip".to_string();
1057 metadata.hooks = vec![HookType::PreQuery, HookType::PostQuery];
1058
1059 let plugin = LoadedPlugin::new(
1060 metadata,
1061 PathBuf::from("/test/roundtrip.wasm"),
1062 module,
1063 PluginSandbox::default(),
1064 );
1065 let bytes = runtime
1069 .call_hook(&plugin, HookType::PreQuery, b"ignored input")
1070 .expect("pre_query call");
1071 assert_eq!(bytes, b"hello-from-wasm");
1072 assert_eq!(plugin.invocation_count(), 1);
1073
1074 let out = runtime
1076 .call_hook(&plugin, HookType::PostQuery, b"some bytes")
1077 .expect("post_query call");
1078 assert!(out.is_empty());
1079 assert_eq!(plugin.invocation_count(), 2);
1080 }
1081
1082 #[test]
1085 fn test_call_hook_rejects_undeclared_hook() {
1086 let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1087 let module = build_test_module(runtime.engine());
1088 let mut metadata = PluginMetadata::default();
1089 metadata.hooks = vec![]; let plugin = LoadedPlugin::new(
1091 metadata,
1092 PathBuf::from("/test/empty.wasm"),
1093 module,
1094 PluginSandbox::default(),
1095 );
1096 let err = runtime
1097 .call_hook(&plugin, HookType::PreQuery, &[])
1098 .unwrap_err();
1099 assert!(matches!(err, PluginError::HookNotFound(_)));
1100 }
1101
1102 #[test]
1105 fn test_call_hook_missing_export_returns_error() {
1106 let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1107 let module = build_test_module(runtime.engine());
1108 let mut metadata = PluginMetadata::default();
1109 metadata.hooks = vec![HookType::Authenticate];
1111 let plugin = LoadedPlugin::new(
1112 metadata,
1113 PathBuf::from("/test/missing.wasm"),
1114 module,
1115 PluginSandbox::default(),
1116 );
1117 let err = runtime
1118 .call_hook(&plugin, HookType::Authenticate, &[])
1119 .unwrap_err();
1120 assert!(matches!(err, PluginError::ExecutionError(_)));
1121 }
1122
1123 fn build_kv_test_module(engine: &Engine) -> Module {
1127 let wat = r#"
1131 (module
1132 (import "env" "kv_set"
1133 (func $kv_set (param i32 i32 i32 i32) (result i32)))
1134 (memory (export "memory") 1)
1135
1136 (data (i32.const 100) "key")
1137 (data (i32.const 200) "value")
1138
1139 (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1140 (func (export "dealloc") (param i32 i32))
1141
1142 ;; pre_query: kv_set("key", "value"); return 0 (no payload).
1143 (func (export "pre_query")
1144 (param $in_ptr i32) (param $in_len i32) (result i64)
1145 (drop (call $kv_set
1146 (i32.const 100) (i32.const 3)
1147 (i32.const 200) (i32.const 5)))
1148 (i64.const 0))
1149 )
1150 "#;
1151 let bytes = wat::parse_str(wat).expect("kv-wat parses");
1152 Module::from_binary(engine, &bytes).expect("kv module compiles")
1153 }
1154
1155 #[test]
1159 fn test_host_kv_import_persists_value() {
1160 let mut config = PluginRuntimeConfig::default();
1161 config.fuel_metering = false;
1162 let runtime = WasmPluginRuntime::new(&config).unwrap();
1163
1164 let module = build_kv_test_module(runtime.engine());
1165 let mut metadata = PluginMetadata::default();
1166 metadata.name = "kv-test-plugin".to_string();
1167 metadata.hooks = vec![HookType::PreQuery];
1168
1169 let plugin = LoadedPlugin::new(
1170 metadata,
1171 PathBuf::from("/test/kv.wasm"),
1172 module,
1173 PluginSandbox::default(),
1174 );
1175
1176 assert_eq!(runtime.kv().get("kv-test-plugin", b"key"), None);
1178
1179 let _ = runtime
1180 .call_hook(&plugin, HookType::PreQuery, &[])
1181 .expect("pre_query call");
1182
1183 assert_eq!(
1186 runtime.kv().get("kv-test-plugin", b"key"),
1187 Some(b"value".to_vec())
1188 );
1189 assert_eq!(runtime.kv().get("other-plugin", b"key"), None);
1191 }
1192
1193 fn build_sha256_test_module(engine: &Engine) -> Module {
1201 let wat = r#"
1202 (module
1203 (import "env" "sha256_hex"
1204 (func $sha256_hex (param i32 i32 i32) (result i32)))
1205 (memory (export "memory") 1)
1206
1207 (data (i32.const 100) "abc")
1208
1209 (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1210 (func (export "dealloc") (param i32 i32))
1211
1212 (func (export "pre_query")
1213 (param $in_ptr i32) (param $in_len i32) (result i64)
1214 (drop (call $sha256_hex
1215 (i32.const 100) (i32.const 3)
1216 (i32.const 200)))
1217 (i64.or
1218 (i64.shl (i64.const 200) (i64.const 32))
1219 (i64.const 64)))
1220 )
1221 "#;
1222 let bytes = wat::parse_str(wat).expect("sha256-wat parses");
1223 Module::from_binary(engine, &bytes).expect("sha256 module compiles")
1224 }
1225
1226 #[test]
1230 fn test_route_result_deserialises_block_with_reason() {
1231 let json = r#"{"action":"block","reason":"cross-region read forbidden"}"#;
1232 let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1233 match r {
1234 RouteResult::Block(reason) => {
1235 assert_eq!(reason, "cross-region read forbidden");
1236 }
1237 other => panic!("expected Block, got {:?}", other),
1238 }
1239 }
1240
1241 #[test]
1244 fn test_route_result_block_defaults_reason_when_missing() {
1245 let json = r#"{"action":"block"}"#;
1246 let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1247 match r {
1248 RouteResult::Block(reason) => {
1249 assert!(!reason.is_empty(), "default reason should not be empty");
1250 }
1251 other => panic!("expected Block, got {:?}", other),
1252 }
1253 }
1254
1255 #[test]
1259 fn test_host_sha256_import_matches_rfc_6234_vector() {
1260 const SHA256_OF_ABC: &[u8; 64] =
1261 b"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad";
1262
1263 let mut config = PluginRuntimeConfig::default();
1264 config.fuel_metering = false;
1265 let runtime = WasmPluginRuntime::new(&config).unwrap();
1266
1267 let module = build_sha256_test_module(runtime.engine());
1268 let mut metadata = PluginMetadata::default();
1269 metadata.name = "sha256-test-plugin".to_string();
1270 metadata.hooks = vec![HookType::PreQuery];
1271
1272 let plugin = LoadedPlugin::new(
1273 metadata,
1274 PathBuf::from("/test/sha256.wasm"),
1275 module,
1276 PluginSandbox::default(),
1277 );
1278
1279 let out = runtime
1280 .call_hook(&plugin, HookType::PreQuery, &[])
1281 .expect("pre_query call");
1282 assert_eq!(out.len(), 64);
1283 assert_eq!(&out[..], SHA256_OF_ABC);
1284 }
1285}