Skip to main content

heliosdb_proxy/plugins/
runtime.rs

1//! WASM Plugin Runtime
2//!
3//! Real wasmtime-backed plugin executor.
4//!
5//! ## ABI
6//!
7//! Plugins export, at minimum:
8//!
9//! - `memory` (default linear memory).
10//! - `alloc(size: i32) -> i32` — host calls this to obtain a slot
11//!   in plugin memory it can write input bytes into.
12//! - `dealloc(ptr: i32, size: i32)` — host calls this to free either
13//!   the input slot (after the call) or the output slot (after the
14//!   host has read the result).
15//! - One function per declared hook, with one of two signatures:
16//!   - **Result-returning hooks** (`pre_query`, `route`,
17//!     `authenticate`, `rewrite`): `(ptr: i32, len: i32) -> i64`
18//!     where the i64 is `(result_ptr << 32) | result_len`.
19//!     `result_ptr == 0 && result_len == 0` is a valid "no result"
20//!     reply (host treats it as the default per-hook outcome).
21//!   - **Observer hooks** (`post_query`, `metrics`, `on_connect`,
22//!     `on_disconnect`): `(ptr: i32, len: i32)` with no return —
23//!     the host ignores any output the plugin may have written.
24//!
25//! The runtime tries the result-returning signature first; if the
26//! exported function has the no-return shape it falls back.
27
28use std::collections::HashMap;
29use std::path::PathBuf;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, OnceLock};
32use std::time::{Duration, Instant};
33
34use parking_lot::RwLock;
35use wasmtime::{Engine, Instance, InstancePre, Linker, Module, Store, TypedFunc, Memory};
36
37use super::config::PluginRuntimeConfig;
38use super::host_functions::HostFunctionRegistry;
39use super::host_imports::{register_crypto_imports, register_kv_imports, KvBackend, StoreCtx};
40use super::sandbox::{PluginSandbox, SecurityPolicy, ResourceLimits};
41use super::{
42    AuthRequest, AuthResult, HookType, PluginMetadata, PreQueryResult,
43    QueryContext, RouteResult,
44};
45
46/// Error types for plugin operations
47#[derive(Debug, Clone)]
48pub enum PluginError {
49    /// Failed to load plugin
50    LoadError(String),
51
52    /// Failed to instantiate plugin
53    InstantiationError(String),
54
55    /// Plugin execution failed
56    ExecutionError(String),
57
58    /// Plugin timed out
59    Timeout(String),
60
61    /// Memory limit exceeded
62    MemoryExceeded(String),
63
64    /// Security policy violation
65    SecurityViolation(String),
66
67    /// Invalid plugin manifest
68    InvalidManifest(String),
69
70    /// Hook not found
71    HookNotFound(String),
72
73    /// Internal runtime error
74    RuntimeError(String),
75}
76
77impl std::fmt::Display for PluginError {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            PluginError::LoadError(msg) => write!(f, "Load error: {}", msg),
81            PluginError::InstantiationError(msg) => write!(f, "Instantiation error: {}", msg),
82            PluginError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
83            PluginError::Timeout(msg) => write!(f, "Timeout: {}", msg),
84            PluginError::MemoryExceeded(msg) => write!(f, "Memory exceeded: {}", msg),
85            PluginError::SecurityViolation(msg) => write!(f, "Security violation: {}", msg),
86            PluginError::InvalidManifest(msg) => write!(f, "Invalid manifest: {}", msg),
87            PluginError::HookNotFound(msg) => write!(f, "Hook not found: {}", msg),
88            PluginError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
89        }
90    }
91}
92
93impl std::error::Error for PluginError {}
94
95/// Plugin state
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum PluginState {
98    /// Plugin is loading
99    Loading,
100
101    /// Plugin is ready
102    Running,
103
104    /// Plugin is paused
105    Paused,
106
107    /// Plugin has errored
108    Error(String),
109
110    /// Plugin is unloading
111    Unloading,
112}
113
114/// A loaded and instantiated plugin
115pub struct LoadedPlugin {
116    /// Plugin metadata
117    pub metadata: PluginMetadata,
118
119    /// Current state
120    pub state: PluginState,
121
122    /// File path
123    pub path: PathBuf,
124
125    /// Compiled wasmtime module — cheap to clone (internally Arc'd)
126    /// and shared across invocations. Replaces the prior Vec<u8> stub.
127    module: Module,
128
129    /// Pre-resolved instantiation plan (module imports linked against the
130    /// runtime's shared `Linker`). Computed once on the first hook call and
131    /// reused for every subsequent call, so per-dispatch cost drops to
132    /// `Store::new` + `InstancePre::instantiate` — no per-call `Linker`
133    /// allocation, host-import re-registration, or import-name resolution.
134    instance_pre: OnceLock<InstancePre<StoreCtx>>,
135
136    /// Security sandbox
137    sandbox: PluginSandbox,
138
139    /// Instance data (mock for non-wasmtime builds)
140    instance_data: RwLock<PluginInstanceData>,
141
142    /// Creation timestamp
143    loaded_at: Instant,
144
145    /// Last invocation timestamp
146    last_invoked: RwLock<Option<Instant>>,
147
148    /// Invocation count
149    invocation_count: std::sync::atomic::AtomicU64,
150}
151
152/// Plugin instance data
153struct PluginInstanceData {
154    /// Plugin memory usage
155    memory_used: usize,
156
157    /// Fuel consumed (if metering enabled)
158    fuel_consumed: u64,
159
160    /// Custom state from plugin
161    state: HashMap<String, Vec<u8>>,
162}
163
164impl LoadedPlugin {
165    /// Create a new loaded plugin
166    pub fn new(
167        metadata: PluginMetadata,
168        path: PathBuf,
169        module: Module,
170        sandbox: PluginSandbox,
171    ) -> Self {
172        Self {
173            metadata,
174            state: PluginState::Running,
175            path,
176            module,
177            instance_pre: OnceLock::new(),
178            sandbox,
179            instance_data: RwLock::new(PluginInstanceData {
180                memory_used: 0,
181                fuel_consumed: 0,
182                state: HashMap::new(),
183            }),
184            loaded_at: Instant::now(),
185            last_invoked: RwLock::new(None),
186            invocation_count: std::sync::atomic::AtomicU64::new(0),
187        }
188    }
189
190    /// Borrow the compiled module (Arc-cheap clone available via
191    /// `plugin.module.clone()` if the caller needs to outlive the
192    /// borrow).
193    pub(crate) fn module(&self) -> &Module {
194        &self.module
195    }
196
197    /// Get memory usage
198    pub fn memory_used(&self) -> usize {
199        self.instance_data.read().memory_used
200    }
201
202    /// Get invocation count
203    pub fn invocation_count(&self) -> u64 {
204        self.invocation_count.load(std::sync::atomic::Ordering::Relaxed)
205    }
206
207    /// Get uptime
208    pub fn uptime(&self) -> Duration {
209        self.loaded_at.elapsed()
210    }
211
212    /// Get last invoked time
213    pub fn last_invoked(&self) -> Option<Instant> {
214        *self.last_invoked.read()
215    }
216
217    /// Record an invocation
218    pub fn record_invocation(&self) {
219        self.invocation_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
220        *self.last_invoked.write() = Some(Instant::now());
221    }
222}
223
224/// WASM plugin runtime
225pub struct WasmPluginRuntime {
226    /// Runtime configuration
227    config: PluginRuntimeConfig,
228
229    /// wasmtime engine — shared across all plugins. Compiles modules
230    /// once; modules cheaply share a reference to it.
231    engine: Engine,
232
233    /// Shared host-import linker, built once with the `env::*` KV and
234    /// crypto imports. Reused to pre-resolve every plugin's instantiation
235    /// plan instead of allocating a fresh `Linker` and re-registering host
236    /// functions on every hook call.
237    linker: Linker<StoreCtx>,
238
239    /// Stop flag for the background epoch ticker thread (see `new`).
240    epoch_stop: Arc<AtomicBool>,
241
242    /// Host function registry
243    host_functions: Arc<HostFunctionRegistry>,
244
245    /// Per-plugin KV backend bridged into wasmtime imports. Survives
246    /// across calls so plugins can persist state between hooks.
247    kv: KvBackend,
248
249    /// Module cache (path -> compiled module). Avoids re-compiling
250    /// the same `.wasm` on every load.
251    module_cache: RwLock<HashMap<PathBuf, Module>>,
252
253    /// Default security policy
254    default_policy: SecurityPolicy,
255
256    /// Creation timestamp
257    created_at: Instant,
258}
259
260impl WasmPluginRuntime {
261    /// Create a new WASM runtime
262    pub fn new(config: &PluginRuntimeConfig) -> Result<Self, PluginError> {
263        let host_functions = Arc::new(HostFunctionRegistry::new());
264
265        let mut engine_config = wasmtime::Config::new();
266        if config.fuel_metering {
267            engine_config.consume_fuel(true);
268        }
269        // Epoch-based interrupts let us bound execution time without
270        // polling fuel from inside the call.
271        engine_config.epoch_interruption(true);
272
273        let engine = Engine::new(&engine_config).map_err(|e| {
274            PluginError::RuntimeError(format!("wasmtime engine init: {}", e))
275        })?;
276
277        // Build the host-import linker once. The KV/crypto imports read
278        // their state from each call's `Store` data (Caller<StoreCtx>), so
279        // a single shared linker is correct across all plugins and calls.
280        let mut linker: Linker<StoreCtx> = Linker::new(&engine);
281        register_kv_imports(&mut linker)?;
282        register_crypto_imports(&mut linker)?;
283
284        // Background epoch ticker: bumps the engine epoch every ~1ms so
285        // that per-call epoch deadlines actually enforce a wall-clock
286        // timeout on plugin execution (previously the deadline was set to
287        // u64::MAX, so the configured timeout was never enforced). A
288        // std::thread is used so enforcement works with or without a tokio
289        // runtime; it exits within ~1ms of the runtime being dropped.
290        let epoch_stop = Arc::new(AtomicBool::new(false));
291        {
292            let engine = engine.clone();
293            let stop = epoch_stop.clone();
294            std::thread::Builder::new()
295                .name("wasm-epoch-ticker".into())
296                .spawn(move || {
297                    while !stop.load(std::sync::atomic::Ordering::Relaxed) {
298                        std::thread::sleep(Duration::from_millis(1));
299                        engine.increment_epoch();
300                    }
301                })
302                .ok();
303        }
304
305        let default_policy = SecurityPolicy {
306            allowed_hosts: vec!["localhost".to_string()],
307            allowed_paths: vec![config.plugin_dir.clone()],
308            max_memory: config.memory_limit,
309            max_execution_time: config.timeout,
310            allow_network: false,
311            allow_filesystem: false,
312        };
313
314        Ok(Self {
315            config: config.clone(),
316            engine,
317            linker,
318            epoch_stop,
319            host_functions,
320            kv: KvBackend::new(),
321            module_cache: RwLock::new(HashMap::new()),
322            default_policy,
323            created_at: Instant::now(),
324        })
325    }
326
327    /// Expose the per-plugin KV backend so admin/test code can seed
328    /// or inspect a plugin's state without going through WASM.
329    pub fn kv(&self) -> &KvBackend {
330        &self.kv
331    }
332
333    /// Borrow the shared host-import linker (used by the manager/tests to
334    /// pre-resolve modules against the same import set).
335    #[allow(dead_code)]
336    pub(crate) fn linker(&self) -> &Linker<StoreCtx> {
337        &self.linker
338    }
339
340    /// Expose the engine so tests + the plugin manager can build new
341    /// `Store`s against it.
342    pub(crate) fn engine(&self) -> &Engine {
343        &self.engine
344    }
345
346    /// Expose the runtime config so the plugin manager can consult
347    /// fields it owns (e.g. `trust_root`) without holding a separate
348    /// copy.
349    pub fn config(&self) -> &PluginRuntimeConfig {
350        &self.config
351    }
352
353    /// Instantiate a plugin from manifest and WASM bytes
354    pub fn instantiate(
355        &self,
356        manifest: &super::loader::PluginManifest,
357        wasm_bytes: &[u8],
358    ) -> Result<LoadedPlugin, PluginError> {
359        // Validate WASM module (basic magic number check)
360        if wasm_bytes.len() < 8 {
361            return Err(PluginError::LoadError("WASM module too small".to_string()));
362        }
363
364        // WASM magic number: 0x00 0x61 0x73 0x6d (\\0asm)
365        if &wasm_bytes[0..4] != b"\x00asm" {
366            return Err(PluginError::LoadError("Invalid WASM magic number".to_string()));
367        }
368
369        // Build metadata from manifest
370        let metadata = PluginMetadata {
371            name: manifest.name.clone(),
372            version: manifest.version.clone(),
373            description: manifest.description.clone(),
374            author: manifest.author.clone(),
375            hooks: manifest.hooks.clone(),
376            permissions: manifest.permissions.clone(),
377            min_memory: manifest.min_memory,
378            max_memory: manifest.max_memory.min(self.config.memory_limit),
379        };
380
381        // Build sandbox with merged policy
382        let resource_limits = ResourceLimits {
383            max_memory: metadata.max_memory,
384            max_execution_time: self.config.timeout,
385            max_fuel: if self.config.fuel_metering {
386                Some(self.config.fuel_limit)
387            } else {
388                None
389            },
390            max_table_elements: 10000,
391            max_instances: 1,
392        };
393
394        let sandbox = PluginSandbox::new(
395            self.default_policy.clone(),
396            resource_limits,
397            manifest.permissions.clone(),
398        );
399
400        // Compile via wasmtime — this validates the module and produces
401        // an Arc-wrapped Module ready for repeated instantiation.
402        let module = Module::from_binary(&self.engine, wasm_bytes).map_err(|e| {
403            PluginError::InstantiationError(format!("wasmtime compile: {}", e))
404        })?;
405
406        // Cache the compiled Module (cheap clone on hit).
407        {
408            let mut cache = self.module_cache.write();
409            cache.insert(manifest.path.clone(), module.clone());
410        }
411
412        Ok(LoadedPlugin::new(
413            metadata,
414            manifest.path.clone(),
415            module,
416            sandbox,
417        ))
418    }
419
420    /// Call a hook on a plugin via wasmtime.
421    ///
422    /// 1. Build a fresh `Store` (wasmtime stores are not Sync, so each
423    ///    invocation is isolated).
424    /// 2. Apply fuel metering (per-call fuel cap) when configured.
425    /// 3. Instantiate the module against an empty Linker — host
426    ///    functions are TODO; plugins that import them will fail at
427    ///    instantiation with a clear error message.
428    /// 4. Look up `memory`, `alloc`, `dealloc`, and the named hook
429    ///    function exports.
430    /// 5. Allocate a slot in plugin memory, write `args`, call the
431    ///    hook, decode `(result_ptr, result_len)`, copy the result
432    ///    bytes out.
433    /// 6. Free both input and output slots via `dealloc`.
434    /// 7. Drop the store; the plugin's per-call state is gone.
435    pub fn call_hook(
436        &self,
437        plugin: &LoadedPlugin,
438        hook: HookType,
439        args: &[u8],
440    ) -> Result<Vec<u8>, PluginError> {
441        // Check if plugin supports this hook
442        if !plugin.metadata.hooks.contains(&hook) {
443            return Err(PluginError::HookNotFound(format!(
444                "Plugin {} does not support hook {:?}",
445                plugin.metadata.name, hook
446            )));
447        }
448
449        // Check state
450        if plugin.state != PluginState::Running {
451            return Err(PluginError::ExecutionError(format!(
452                "Plugin {} is not running (state: {:?})",
453                plugin.metadata.name, plugin.state
454            )));
455        }
456
457        // Record invocation
458        plugin.record_invocation();
459
460        // Fresh per-call store; not Sync, so we never share across calls.
461        // The data carries the plugin's identity + a clone of the shared
462        // KV backend so host imports can route to the right namespace.
463        let store_ctx = StoreCtx {
464            plugin_name: plugin.metadata.name.clone(),
465            kv: self.kv.clone(),
466        };
467        let mut store: Store<StoreCtx> = Store::new(&self.engine, store_ctx);
468        if self.config.fuel_metering {
469            // wasmtime's set_fuel returns Result; cap is per-call.
470            store.set_fuel(self.config.fuel_limit).map_err(|e| {
471                PluginError::RuntimeError(format!("set_fuel: {}", e))
472            })?;
473        }
474        // Epoch interruption was enabled at engine init and a background
475        // ticker bumps the engine epoch every ~1ms. Set the deadline to
476        // `timeout` worth of ticks so a runaway plugin traps at its
477        // configured wall-clock timeout instead of blocking the caller
478        // indefinitely. (Set after Store::new, before the call.)
479        let deadline_ticks = self
480            .config
481            .timeout
482            .as_millis()
483            .max(1)
484            .min(u64::MAX as u128) as u64;
485        store.set_epoch_deadline(deadline_ticks);
486
487        // Instantiate from the plugin's pre-resolved plan, computed once
488        // against the runtime's shared host-import linker and cached on the
489        // plugin. Per call this is just a linear-memory/table init — no
490        // Linker allocation, no host-import re-registration, no import-name
491        // resolution.
492        let instance_pre = match plugin.instance_pre.get() {
493            Some(ip) => ip,
494            None => {
495                let ip = self.linker.instantiate_pre(&plugin.module).map_err(|e| {
496                    PluginError::InstantiationError(format!(
497                        "pre-instantiate {}: {}",
498                        plugin.metadata.name, e
499                    ))
500                })?;
501                // Race-tolerant: if another thread set it first, keep theirs.
502                let _ = plugin.instance_pre.set(ip);
503                plugin.instance_pre.get().expect("just set")
504            }
505        };
506        let instance = instance_pre.instantiate(&mut store).map_err(|e| {
507            PluginError::InstantiationError(format!(
508                "instantiate {}: {}",
509                plugin.metadata.name, e
510            ))
511        })?;
512
513        let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
514            PluginError::ExecutionError(format!(
515                "plugin {} does not export `memory`",
516                plugin.metadata.name
517            ))
518        })?;
519
520        let alloc = get_typed::<_, i32, i32>(&instance, &mut store, "alloc")?;
521        let dealloc = get_typed::<_, (i32, i32), ()>(&instance, &mut store, "dealloc")?;
522
523        // Allocate input slot inside the plugin's address space and
524        // copy `args` in.
525        let in_len = args.len() as i32;
526        let in_ptr = alloc.call(&mut store, in_len).map_err(|e| {
527            PluginError::ExecutionError(format!("alloc({}): {}", in_len, e))
528        })?;
529        if in_len > 0 {
530            write_memory(&memory, &mut store, in_ptr, args)?;
531        }
532
533        // Try the result-returning ABI first; if the export has the
534        // observer ABI (no return), fall back to that.
535        let export_name = hook.export_name();
536        let result_bytes = match get_typed::<_, (i32, i32), i64>(&instance, &mut store, export_name) {
537            Ok(hook_fn) => {
538                let packed = hook_fn.call(&mut store, (in_ptr, in_len)).map_err(|e| {
539                    PluginError::ExecutionError(format!(
540                        "hook {} call: {}",
541                        export_name, e
542                    ))
543                })?;
544                let out_ptr = (packed >> 32) as i32;
545                let out_len = (packed & 0xFFFF_FFFF) as i32;
546                if out_len > 0 {
547                    let bytes = read_memory(&memory, &store, out_ptr, out_len)?;
548                    // Free the plugin-allocated output slot.
549                    let _ = dealloc.call(&mut store, (out_ptr, out_len));
550                    bytes
551                } else {
552                    Vec::new()
553                }
554            }
555            Err(_) => {
556                // Observer ABI: (i32, i32) → ()
557                let observer = get_typed::<_, (i32, i32), ()>(
558                    &instance,
559                    &mut store,
560                    export_name,
561                )?;
562                observer.call(&mut store, (in_ptr, in_len)).map_err(|e| {
563                    PluginError::ExecutionError(format!(
564                        "observer hook {} call: {}",
565                        export_name, e
566                    ))
567                })?;
568                Vec::new()
569            }
570        };
571
572        // Free the input slot. Best-effort; failure here doesn't
573        // abort the call (the store is about to be dropped anyway).
574        let _ = dealloc.call(&mut store, (in_ptr, in_len));
575
576        // Update per-plugin instance accounting.
577        if self.config.fuel_metering {
578            if let Ok(remaining) = store.get_fuel() {
579                let consumed = self.config.fuel_limit.saturating_sub(remaining);
580                plugin.instance_data.write().fuel_consumed = consumed;
581            }
582        }
583        plugin.instance_data.write().memory_used =
584            (memory.data_size(&store)) as usize;
585
586        Ok(result_bytes)
587    }
588
589    /// Call pre-query hook
590    pub fn call_pre_query(
591        &self,
592        plugin: &LoadedPlugin,
593        ctx: &QueryContext,
594    ) -> Result<PreQueryResult, PluginError> {
595        // Serialize context
596        let args = serde_json::to_vec(ctx).map_err(|e| {
597            PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
598        })?;
599
600        // Call the hook
601        let result = self.call_hook(plugin, HookType::PreQuery, &args)?;
602
603        // Deserialize result (or return default)
604        if result.is_empty() {
605            return Ok(PreQueryResult::Continue);
606        }
607
608        serde_json::from_slice(&result).map_err(|e| {
609            PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
610        })
611    }
612
613    /// Call authenticate hook
614    pub fn call_authenticate(
615        &self,
616        plugin: &LoadedPlugin,
617        request: &AuthRequest,
618    ) -> Result<AuthResult, PluginError> {
619        // Serialize request
620        let args = serde_json::to_vec(request).map_err(|e| {
621            PluginError::ExecutionError(format!("Failed to serialize request: {}", e))
622        })?;
623
624        // Call the hook
625        let result = self.call_hook(plugin, HookType::Authenticate, &args)?;
626
627        // Deserialize result (or return default)
628        if result.is_empty() {
629            return Ok(AuthResult::Defer);
630        }
631
632        serde_json::from_slice(&result).map_err(|e| {
633            PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
634        })
635    }
636
637    /// Call route hook
638    pub fn call_route(
639        &self,
640        plugin: &LoadedPlugin,
641        ctx: &QueryContext,
642    ) -> Result<RouteResult, PluginError> {
643        // Serialize context
644        let args = serde_json::to_vec(ctx).map_err(|e| {
645            PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
646        })?;
647
648        // Call the hook
649        let result = self.call_hook(plugin, HookType::Route, &args)?;
650
651        // Deserialize result (or return default)
652        if result.is_empty() {
653            return Ok(RouteResult::Default);
654        }
655
656        serde_json::from_slice(&result).map_err(|e| {
657            PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
658        })
659    }
660
661    /// Get runtime statistics
662    pub fn stats(&self) -> RuntimeStats {
663        RuntimeStats {
664            uptime: self.created_at.elapsed(),
665            cached_modules: self.module_cache.read().len(),
666            fuel_metering_enabled: self.config.fuel_metering,
667            memory_limit: self.config.memory_limit,
668            timeout: self.config.timeout,
669        }
670    }
671}
672
673impl Drop for WasmPluginRuntime {
674    fn drop(&mut self) {
675        // Signal the epoch ticker thread to exit; it polls the flag every
676        // ~1ms and then releases its Engine clone.
677        self.epoch_stop
678            .store(true, std::sync::atomic::Ordering::Relaxed);
679    }
680}
681
682/// Runtime statistics
683#[derive(Debug, Clone)]
684pub struct RuntimeStats {
685    /// Uptime
686    pub uptime: Duration,
687
688    /// Number of cached modules
689    pub cached_modules: usize,
690
691    /// Whether fuel metering is enabled
692    pub fuel_metering_enabled: bool,
693
694    /// Memory limit per plugin
695    pub memory_limit: usize,
696
697    /// Execution timeout
698    pub timeout: Duration,
699}
700
701// Serialization support for hook types — written by hand because the
702// types live in `super` and we can't derive without touching every
703// field's type chain. Includes hook_context so plugins can read
704// per-request attributes (tenant_id, agent_id, ai_traffic, etc).
705impl serde::Serialize for QueryContext {
706    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
707    where
708        S: serde::Serializer,
709    {
710        use serde::ser::SerializeStruct;
711        let mut state = serializer.serialize_struct("QueryContext", 5)?;
712        state.serialize_field("query", &self.query)?;
713        state.serialize_field("normalized", &self.normalized)?;
714        state.serialize_field("tables", &self.tables)?;
715        state.serialize_field("is_read_only", &self.is_read_only)?;
716        state.serialize_field("hook_context", &self.hook_context)?;
717        state.end()
718    }
719}
720
721/// Look up a typed exported function on an instance, with a uniform
722/// "missing/wrong-shape" error message.
723fn get_typed<T, P, R>(
724    instance: &Instance,
725    store: &mut Store<T>,
726    name: &str,
727) -> Result<TypedFunc<P, R>, PluginError>
728where
729    P: wasmtime::WasmParams,
730    R: wasmtime::WasmResults,
731{
732    instance
733        .get_typed_func::<P, R>(store, name)
734        .map_err(|e| PluginError::ExecutionError(format!("export `{}`: {}", name, e)))
735}
736
737/// Copy `bytes` into the plugin's linear memory at `ptr`. Bounds-
738/// checked via wasmtime's safe `Memory::write`.
739fn write_memory<T>(
740    memory: &Memory,
741    store: &mut Store<T>,
742    ptr: i32,
743    bytes: &[u8],
744) -> Result<(), PluginError> {
745    memory.write(store, ptr as usize, bytes).map_err(|e| {
746        PluginError::ExecutionError(format!("memory.write @ {}: {}", ptr, e))
747    })
748}
749
750/// Copy `len` bytes out of plugin memory starting at `ptr`.
751fn read_memory<T>(
752    memory: &Memory,
753    store: &Store<T>,
754    ptr: i32,
755    len: i32,
756) -> Result<Vec<u8>, PluginError> {
757    if len <= 0 {
758        return Ok(Vec::new());
759    }
760    let mut out = vec![0u8; len as usize];
761    memory.read(store, ptr as usize, &mut out).map_err(|e| {
762        PluginError::ExecutionError(format!("memory.read @ {}+{}: {}", ptr, len, e))
763    })?;
764    Ok(out)
765}
766
767impl serde::Serialize for AuthRequest {
768    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
769    where
770        S: serde::Serializer,
771    {
772        use serde::ser::SerializeStruct;
773        let mut state = serializer.serialize_struct("AuthRequest", 5)?;
774        state.serialize_field("headers", &self.headers)?;
775        state.serialize_field("username", &self.username)?;
776        state.serialize_field("password", &self.password)?;
777        state.serialize_field("client_ip", &self.client_ip)?;
778        state.serialize_field("database", &self.database)?;
779        state.end()
780    }
781}
782
783impl<'de> serde::Deserialize<'de> for PreQueryResult {
784    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
785    where
786        D: serde::Deserializer<'de>,
787    {
788        #[derive(serde::Deserialize)]
789        struct Helper {
790            action: String,
791            #[serde(default)]
792            value: Option<String>,
793            #[serde(default)]
794            data: Option<Vec<u8>>,
795        }
796
797        let helper = Helper::deserialize(deserializer)?;
798        match helper.action.as_str() {
799            "continue" => Ok(PreQueryResult::Continue),
800            "rewrite" => Ok(PreQueryResult::Rewrite(
801                helper.value.unwrap_or_default(),
802            )),
803            "block" => Ok(PreQueryResult::Block(
804                helper.value.unwrap_or_default(),
805            )),
806            "cached" => Ok(PreQueryResult::Cached(
807                helper.data.unwrap_or_default(),
808            )),
809            _ => Ok(PreQueryResult::Continue),
810        }
811    }
812}
813
814impl<'de> serde::Deserialize<'de> for AuthResult {
815    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
816    where
817        D: serde::Deserializer<'de>,
818    {
819        #[derive(serde::Deserialize)]
820        struct Helper {
821            action: String,
822            #[serde(default)]
823            identity: Option<IdentityHelper>,
824            #[serde(default)]
825            message: Option<String>,
826        }
827
828        #[derive(serde::Deserialize)]
829        struct IdentityHelper {
830            user_id: String,
831            username: String,
832            #[serde(default)]
833            roles: Vec<String>,
834            #[serde(default)]
835            tenant_id: Option<String>,
836        }
837
838        let helper = Helper::deserialize(deserializer)?;
839        match helper.action.as_str() {
840            "success" => {
841                let id = helper.identity.unwrap_or(IdentityHelper {
842                    user_id: String::new(),
843                    username: String::new(),
844                    roles: Vec::new(),
845                    tenant_id: None,
846                });
847                Ok(AuthResult::Success(super::Identity {
848                    user_id: id.user_id,
849                    username: id.username,
850                    roles: id.roles,
851                    tenant_id: id.tenant_id,
852                    claims: std::collections::HashMap::new(),
853                }))
854            }
855            "denied" => Ok(AuthResult::Denied(
856                helper.message.unwrap_or_default(),
857            )),
858            "defer" => Ok(AuthResult::Defer),
859            _ => Ok(AuthResult::Defer),
860        }
861    }
862}
863
864impl<'de> serde::Deserialize<'de> for RouteResult {
865    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
866    where
867        D: serde::Deserializer<'de>,
868    {
869        #[derive(serde::Deserialize)]
870        struct Helper {
871            action: String,
872            #[serde(default)]
873            target: Option<String>,
874            #[serde(default)]
875            reason: Option<String>,
876        }
877
878        let helper = Helper::deserialize(deserializer)?;
879        match helper.action.as_str() {
880            "default" => Ok(RouteResult::Default),
881            "node" => Ok(RouteResult::Node(helper.target.unwrap_or_default())),
882            "primary" => Ok(RouteResult::Primary),
883            "standby" => Ok(RouteResult::Standby),
884            "branch" => Ok(RouteResult::Branch(helper.target.unwrap_or_default())),
885            // Block carries a human-readable reason in its own field so
886            // it doesn't overload `target` (which is a node identifier
887            // for the other variants).
888            "block" => Ok(RouteResult::Block(
889                helper.reason.unwrap_or_else(|| "blocked by plugin".to_string()),
890            )),
891            _ => Ok(RouteResult::Default),
892        }
893    }
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899
900    /// Build a tiny WAT module for runtime tests against a specific
901    /// engine. wasmtime requires Module and instantiating Engine to
902    /// match — so this takes the runtime's engine rather than
903    /// constructing one locally.
904    ///
905    /// Exports `memory`, `alloc`, `dealloc`, a `pre_query` hook that
906    /// ignores its input and returns a fixed payload at offset 1024,
907    /// and a `post_query` observer hook.
908    fn build_test_module(engine: &Engine) -> Module {
909        const PAYLOAD: &[u8] = b"hello-from-wasm";
910        let payload_hex: String = PAYLOAD
911            .iter()
912            .map(|b| format!("\\{:02x}", b))
913            .collect();
914        let wat = format!(
915            r#"
916            (module
917              (memory (export "memory") 1)
918
919              ;; Trivial alloc: always returns offset 4096 (test inputs
920              ;; are tiny so non-overlapping reuse is fine here). Real
921              ;; plugins ship a real allocator; the runtime only cares
922              ;; that `alloc` returns a writable address.
923              (func (export "alloc") (param $size i32) (result i32)
924                (i32.const 4096))
925
926              (func (export "dealloc") (param $ptr i32) (param $size i32)
927                (drop (local.get $ptr))
928                (drop (local.get $size)))
929
930              ;; Result-returning hook: writes PAYLOAD at offset 1024 and
931              ;; returns (1024 << 32) | PAYLOAD.len.
932              (func (export "pre_query")
933                (param $in_ptr i32) (param $in_len i32) (result i64)
934                (i64.or
935                  (i64.shl (i64.const 1024) (i64.const 32))
936                  (i64.const {payload_len})))
937
938              ;; Observer hook: takes args, returns nothing.
939              (func (export "post_query")
940                (param $in_ptr i32) (param $in_len i32)
941                (drop (local.get $in_ptr)))
942
943              (data (i32.const 1024) "{payload}")
944            )
945            "#,
946            payload = payload_hex,
947            payload_len = PAYLOAD.len(),
948        );
949        let bytes = wat::parse_str(&wat).expect("wat parses");
950        Module::from_binary(engine, &bytes).expect("module compiles")
951    }
952
953    /// A module whose `pre_query` hook spins forever — used to prove the
954    /// epoch-deadline wall-clock timeout actually interrupts a runaway
955    /// plugin instead of blocking the caller indefinitely.
956    fn build_spin_module(engine: &Engine) -> Module {
957        let wat = r#"
958            (module
959              (memory (export "memory") 1)
960              (func (export "alloc") (param i32) (result i32) (i32.const 4096))
961              (func (export "dealloc") (param i32) (param i32))
962              (func (export "pre_query") (param i32) (param i32) (result i64)
963                (loop $l (br $l))
964                (i64.const 0)))
965        "#;
966        let bytes = wat::parse_str(wat).expect("wat parses");
967        Module::from_binary(engine, &bytes).expect("module compiles")
968    }
969
970    /// A runaway plugin must trap at its configured timeout (enforced by
971    /// the background epoch ticker), not hang the caller. Guarded by a 5s
972    /// join so a regression fails fast instead of hanging the test binary.
973    #[test]
974    fn test_call_hook_enforces_timeout() {
975        let mut config = PluginRuntimeConfig::default();
976        config.fuel_metering = false; // isolate epoch enforcement from fuel
977        config.timeout = Duration::from_millis(100);
978        let runtime = Arc::new(WasmPluginRuntime::new(&config).unwrap());
979
980        let module = build_spin_module(runtime.engine());
981        let mut metadata = PluginMetadata::default();
982        metadata.name = "spin".to_string();
983        metadata.hooks = vec![HookType::PreQuery];
984        let plugin = Arc::new(LoadedPlugin::new(
985            metadata,
986            PathBuf::from("/test/spin.wasm"),
987            module,
988            PluginSandbox::default(),
989        ));
990
991        let (tx, rx) = std::sync::mpsc::channel();
992        {
993            let r = runtime.clone();
994            let p = plugin.clone();
995            std::thread::spawn(move || {
996                let res = r.call_hook(&p, HookType::PreQuery, b"{}");
997                let _ = tx.send(res.is_err());
998            });
999        }
1000        match rx.recv_timeout(Duration::from_secs(5)) {
1001            Ok(is_err) => assert!(is_err, "runaway plugin should trap with an error"),
1002            Err(_) => panic!("call_hook did not return within 5s — epoch timeout not enforced"),
1003        }
1004    }
1005
1006    #[test]
1007    fn test_plugin_error_display() {
1008        let err = PluginError::LoadError("test".to_string());
1009        assert!(err.to_string().contains("Load error"));
1010
1011        let err = PluginError::Timeout("plugin-a".to_string());
1012        assert!(err.to_string().contains("Timeout"));
1013    }
1014
1015    #[test]
1016    fn test_plugin_state() {
1017        assert_eq!(PluginState::Running, PluginState::Running);
1018        assert_ne!(PluginState::Running, PluginState::Paused);
1019    }
1020
1021    #[test]
1022    fn test_runtime_creation() {
1023        let config = PluginRuntimeConfig::default();
1024        let runtime = WasmPluginRuntime::new(&config);
1025        assert!(runtime.is_ok());
1026    }
1027
1028    #[test]
1029    fn test_runtime_stats() {
1030        let config = PluginRuntimeConfig::default();
1031        let runtime = WasmPluginRuntime::new(&config).unwrap();
1032        let stats = runtime.stats();
1033
1034        assert_eq!(stats.cached_modules, 0);
1035        assert!(stats.fuel_metering_enabled);
1036    }
1037
1038    #[test]
1039    fn test_loaded_plugin_invocation_count() {
1040        // Re-use the test module — its compiled `Module` is what
1041        // the LoadedPlugin needs now that the field is wasmtime-typed.
1042        let engine = Engine::default();
1043        let module = build_test_module(&engine);
1044        let metadata = PluginMetadata::default();
1045        let sandbox = PluginSandbox::default();
1046        let plugin = LoadedPlugin::new(
1047            metadata,
1048            PathBuf::from("/test/plugin.wasm"),
1049            module,
1050            sandbox,
1051        );
1052
1053        assert_eq!(plugin.invocation_count(), 0);
1054        plugin.record_invocation();
1055        assert_eq!(plugin.invocation_count(), 1);
1056        plugin.record_invocation();
1057        assert_eq!(plugin.invocation_count(), 2);
1058    }
1059
1060    /// End-to-end: load a WAT-built module, call `pre_query`, observe
1061    /// the plugin's payload comes back through the (ptr, len) ABI.
1062    /// This is the killer test that proves the wasmtime path is
1063    /// real, not a stub.
1064    #[test]
1065    fn test_call_hook_roundtrips_real_wasm() {
1066        let mut config = PluginRuntimeConfig::default();
1067        // Disable fuel metering — the test module is trivial and we
1068        // don't want to debug fuel exhaustion in unit tests.
1069        config.fuel_metering = false;
1070        let runtime = WasmPluginRuntime::new(&config).unwrap();
1071
1072        let module = build_test_module(runtime.engine());
1073        let mut metadata = PluginMetadata::default();
1074        metadata.name = "test-roundtrip".to_string();
1075        metadata.hooks = vec![HookType::PreQuery, HookType::PostQuery];
1076
1077        let plugin = LoadedPlugin::new(
1078            metadata,
1079            PathBuf::from("/test/roundtrip.wasm"),
1080            module,
1081            PluginSandbox::default(),
1082        );
1083        // Force into Running state — Loading would block.
1084        // (LoadedPlugin::new already sets Running by default.)
1085
1086        let bytes = runtime
1087            .call_hook(&plugin, HookType::PreQuery, b"ignored input")
1088            .expect("pre_query call");
1089        assert_eq!(bytes, b"hello-from-wasm");
1090        assert_eq!(plugin.invocation_count(), 1);
1091
1092        // Observer ABI: post_query has no return; should yield empty.
1093        let out = runtime
1094            .call_hook(&plugin, HookType::PostQuery, b"some bytes")
1095            .expect("post_query call");
1096        assert!(out.is_empty());
1097        assert_eq!(plugin.invocation_count(), 2);
1098    }
1099
1100    /// A plugin that doesn't declare a hook in its metadata cannot
1101    /// invoke that hook even if the WASM exports the function.
1102    #[test]
1103    fn test_call_hook_rejects_undeclared_hook() {
1104        let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1105        let module = build_test_module(runtime.engine());
1106        let mut metadata = PluginMetadata::default();
1107        metadata.hooks = vec![]; // declares nothing
1108        let plugin = LoadedPlugin::new(
1109            metadata,
1110            PathBuf::from("/test/empty.wasm"),
1111            module,
1112            PluginSandbox::default(),
1113        );
1114        let err = runtime
1115            .call_hook(&plugin, HookType::PreQuery, &[])
1116            .unwrap_err();
1117        assert!(matches!(err, PluginError::HookNotFound(_)));
1118    }
1119
1120    /// Calling a hook whose export name is missing surfaces as
1121    /// `ExecutionError`, not a panic.
1122    #[test]
1123    fn test_call_hook_missing_export_returns_error() {
1124        let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1125        let module = build_test_module(runtime.engine());
1126        let mut metadata = PluginMetadata::default();
1127        // Declare a hook the test module doesn't export.
1128        metadata.hooks = vec![HookType::Authenticate];
1129        let plugin = LoadedPlugin::new(
1130            metadata,
1131            PathBuf::from("/test/missing.wasm"),
1132            module,
1133            PluginSandbox::default(),
1134        );
1135        let err = runtime
1136            .call_hook(&plugin, HookType::Authenticate, &[])
1137            .unwrap_err();
1138        assert!(matches!(err, PluginError::ExecutionError(_)));
1139    }
1140
1141    /// Build a WAT module that imports kv_set + kv_get from `env` and
1142    /// calls kv_set on `pre_query`. Used to validate the host-import
1143    /// bridge end-to-end through wasmtime.
1144    fn build_kv_test_module(engine: &Engine) -> Module {
1145        // Layout:
1146        //   offset 100: 3 bytes "key"
1147        //   offset 200: 5 bytes "value"
1148        let wat = r#"
1149            (module
1150              (import "env" "kv_set"
1151                (func $kv_set (param i32 i32 i32 i32) (result i32)))
1152              (memory (export "memory") 1)
1153
1154              (data (i32.const 100) "key")
1155              (data (i32.const 200) "value")
1156
1157              (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1158              (func (export "dealloc") (param i32 i32))
1159
1160              ;; pre_query: kv_set("key", "value"); return 0 (no payload).
1161              (func (export "pre_query")
1162                (param $in_ptr i32) (param $in_len i32) (result i64)
1163                (drop (call $kv_set
1164                  (i32.const 100) (i32.const 3)
1165                  (i32.const 200) (i32.const 5)))
1166                (i64.const 0))
1167            )
1168        "#;
1169        let bytes = wat::parse_str(wat).expect("kv-wat parses");
1170        Module::from_binary(engine, &bytes).expect("kv module compiles")
1171    }
1172
1173    /// Calls a WASM `pre_query` hook that invokes the host's kv_set
1174    /// import. Verifies the value lands in the runtime's KvBackend
1175    /// under the plugin's namespace and is readable from Rust.
1176    #[test]
1177    fn test_host_kv_import_persists_value() {
1178        let mut config = PluginRuntimeConfig::default();
1179        config.fuel_metering = false;
1180        let runtime = WasmPluginRuntime::new(&config).unwrap();
1181
1182        let module = build_kv_test_module(runtime.engine());
1183        let mut metadata = PluginMetadata::default();
1184        metadata.name = "kv-test-plugin".to_string();
1185        metadata.hooks = vec![HookType::PreQuery];
1186
1187        let plugin = LoadedPlugin::new(
1188            metadata,
1189            PathBuf::from("/test/kv.wasm"),
1190            module,
1191            PluginSandbox::default(),
1192        );
1193
1194        // Sanity: namespace empty before the call.
1195        assert_eq!(runtime.kv().get("kv-test-plugin", b"key"), None);
1196
1197        let _ = runtime
1198            .call_hook(&plugin, HookType::PreQuery, &[])
1199            .expect("pre_query call");
1200
1201        // The plugin called kv_set("key", "value") inside WASM; the
1202        // host should have stored it under this plugin's namespace.
1203        assert_eq!(
1204            runtime.kv().get("kv-test-plugin", b"key"),
1205            Some(b"value".to_vec())
1206        );
1207        // And nowhere else.
1208        assert_eq!(runtime.kv().get("other-plugin", b"key"), None);
1209    }
1210
1211    /// Build a WAT module that imports `env.sha256_hex` and exposes a
1212    /// `pre_query` hook which:
1213    ///   1. computes sha256_hex over an embedded "abc" payload at
1214    ///      offset 100 (3 bytes)
1215    ///   2. writes the 64-byte hex digest at offset 200
1216    ///   3. returns the packed (200 << 32) | 64
1217    /// so the host can read the digest out of plugin memory.
1218    fn build_sha256_test_module(engine: &Engine) -> Module {
1219        let wat = r#"
1220            (module
1221              (import "env" "sha256_hex"
1222                (func $sha256_hex (param i32 i32 i32) (result i32)))
1223              (memory (export "memory") 1)
1224
1225              (data (i32.const 100) "abc")
1226
1227              (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1228              (func (export "dealloc") (param i32 i32))
1229
1230              (func (export "pre_query")
1231                (param $in_ptr i32) (param $in_len i32) (result i64)
1232                (drop (call $sha256_hex
1233                  (i32.const 100) (i32.const 3)
1234                  (i32.const 200)))
1235                (i64.or
1236                  (i64.shl (i64.const 200) (i64.const 32))
1237                  (i64.const 64)))
1238            )
1239        "#;
1240        let bytes = wat::parse_str(wat).expect("sha256-wat parses");
1241        Module::from_binary(engine, &bytes).expect("sha256 module compiles")
1242    }
1243
1244    /// RouteResult deserialiser handles the new Block variant via a
1245    /// `reason` field separate from `target` (which the other variants
1246    /// use as a node identifier).
1247    #[test]
1248    fn test_route_result_deserialises_block_with_reason() {
1249        let json = r#"{"action":"block","reason":"cross-region read forbidden"}"#;
1250        let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1251        match r {
1252            RouteResult::Block(reason) => {
1253                assert_eq!(reason, "cross-region read forbidden");
1254            }
1255            other => panic!("expected Block, got {:?}", other),
1256        }
1257    }
1258
1259    /// Block without a reason field falls back to a generic message —
1260    /// keeps the deserialiser permissive when plugins forget the field.
1261    #[test]
1262    fn test_route_result_block_defaults_reason_when_missing() {
1263        let json = r#"{"action":"block"}"#;
1264        let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1265        match r {
1266            RouteResult::Block(reason) => {
1267                assert!(!reason.is_empty(), "default reason should not be empty");
1268            }
1269            other => panic!("expected Block, got {:?}", other),
1270        }
1271    }
1272
1273    /// SHA-256 of "abc" is the canonical RFC 6234 test vector. Verifies
1274    /// the host-import bridge produces real cryptographic output, not
1275    /// the FNV-flavoured placeholder that audit-chain ships today.
1276    #[test]
1277    fn test_host_sha256_import_matches_rfc_6234_vector() {
1278        const SHA256_OF_ABC: &[u8; 64] =
1279            b"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad";
1280
1281        let mut config = PluginRuntimeConfig::default();
1282        config.fuel_metering = false;
1283        let runtime = WasmPluginRuntime::new(&config).unwrap();
1284
1285        let module = build_sha256_test_module(runtime.engine());
1286        let mut metadata = PluginMetadata::default();
1287        metadata.name = "sha256-test-plugin".to_string();
1288        metadata.hooks = vec![HookType::PreQuery];
1289
1290        let plugin = LoadedPlugin::new(
1291            metadata,
1292            PathBuf::from("/test/sha256.wasm"),
1293            module,
1294            PluginSandbox::default(),
1295        );
1296
1297        let out = runtime
1298            .call_hook(&plugin, HookType::PreQuery, &[])
1299            .expect("pre_query call");
1300        assert_eq!(out.len(), 64);
1301        assert_eq!(&out[..], SHA256_OF_ABC);
1302    }
1303}