Skip to main content

heliosdb_proxy/plugins/
runtime.rs

1//! WASM Plugin Runtime
2//!
3//! Real wasmtime-backed plugin executor.
4//!
5//! ## ABI
6//!
7//! Plugins export, at minimum:
8//!
9//! - `memory` (default linear memory).
10//! - `alloc(size: i32) -> i32` — host calls this to obtain a slot
11//!   in plugin memory it can write input bytes into.
12//! - `dealloc(ptr: i32, size: i32)` — host calls this to free either
13//!   the input slot (after the call) or the output slot (after the
14//!   host has read the result).
15//! - One function per declared hook, with one of two signatures:
16//!   - **Result-returning hooks** (`pre_query`, `route`,
17//!     `authenticate`, `rewrite`): `(ptr: i32, len: i32) -> i64`
18//!     where the i64 is `(result_ptr << 32) | result_len`.
19//!     `result_ptr == 0 && result_len == 0` is a valid "no result"
20//!     reply (host treats it as the default per-hook outcome).
21//!   - **Observer hooks** (`post_query`, `metrics`, `on_connect`,
22//!     `on_disconnect`): `(ptr: i32, len: i32)` with no return —
23//!     the host ignores any output the plugin may have written.
24//!
25//! The runtime tries the result-returning signature first; if the
26//! exported function has the no-return shape it falls back.
27
28use std::collections::HashMap;
29use std::path::PathBuf;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, OnceLock};
32use std::time::{Duration, Instant};
33
34use parking_lot::RwLock;
35use wasmtime::{Engine, Instance, InstancePre, Linker, Memory, Module, Store, TypedFunc};
36
37use super::config::PluginRuntimeConfig;
38use super::host_functions::HostFunctionRegistry;
39use super::host_imports::{register_crypto_imports, register_kv_imports, KvBackend, StoreCtx};
40use super::sandbox::{PluginSandbox, ResourceLimits, SecurityPolicy};
41use super::{
42    AuthRequest, AuthResult, HookType, PluginMetadata, PreQueryResult, QueryContext, RouteResult,
43};
44
45/// Error types for plugin operations
46#[derive(Debug, Clone)]
47pub enum PluginError {
48    /// Failed to load plugin
49    LoadError(String),
50
51    /// Failed to instantiate plugin
52    InstantiationError(String),
53
54    /// Plugin execution failed
55    ExecutionError(String),
56
57    /// Plugin timed out
58    Timeout(String),
59
60    /// Memory limit exceeded
61    MemoryExceeded(String),
62
63    /// Security policy violation
64    SecurityViolation(String),
65
66    /// Invalid plugin manifest
67    InvalidManifest(String),
68
69    /// Hook not found
70    HookNotFound(String),
71
72    /// Internal runtime error
73    RuntimeError(String),
74}
75
76impl std::fmt::Display for PluginError {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        match self {
79            PluginError::LoadError(msg) => write!(f, "Load error: {}", msg),
80            PluginError::InstantiationError(msg) => write!(f, "Instantiation error: {}", msg),
81            PluginError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
82            PluginError::Timeout(msg) => write!(f, "Timeout: {}", msg),
83            PluginError::MemoryExceeded(msg) => write!(f, "Memory exceeded: {}", msg),
84            PluginError::SecurityViolation(msg) => write!(f, "Security violation: {}", msg),
85            PluginError::InvalidManifest(msg) => write!(f, "Invalid manifest: {}", msg),
86            PluginError::HookNotFound(msg) => write!(f, "Hook not found: {}", msg),
87            PluginError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
88        }
89    }
90}
91
92impl std::error::Error for PluginError {}
93
94/// Plugin state
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum PluginState {
97    /// Plugin is loading
98    Loading,
99
100    /// Plugin is ready
101    Running,
102
103    /// Plugin is paused
104    Paused,
105
106    /// Plugin has errored
107    Error(String),
108
109    /// Plugin is unloading
110    Unloading,
111}
112
113/// A loaded and instantiated plugin
114pub struct LoadedPlugin {
115    /// Plugin metadata
116    pub metadata: PluginMetadata,
117
118    /// Current state
119    pub state: PluginState,
120
121    /// File path
122    pub path: PathBuf,
123
124    /// Compiled wasmtime module — cheap to clone (internally Arc'd)
125    /// and shared across invocations. Replaces the prior Vec<u8> stub.
126    module: Module,
127
128    /// Pre-resolved instantiation plan (module imports linked against the
129    /// runtime's shared `Linker`). Computed once on the first hook call and
130    /// reused for every subsequent call, so per-dispatch cost drops to
131    /// `Store::new` + `InstancePre::instantiate` — no per-call `Linker`
132    /// allocation, host-import re-registration, or import-name resolution.
133    instance_pre: OnceLock<InstancePre<StoreCtx>>,
134
135    /// Security sandbox
136    #[allow(dead_code)]
137    sandbox: PluginSandbox,
138
139    /// Instance data (mock for non-wasmtime builds)
140    instance_data: RwLock<PluginInstanceData>,
141
142    /// Creation timestamp
143    loaded_at: Instant,
144
145    /// Last invocation timestamp
146    last_invoked: RwLock<Option<Instant>>,
147
148    /// Invocation count
149    invocation_count: std::sync::atomic::AtomicU64,
150}
151
152/// Plugin instance data
153struct PluginInstanceData {
154    /// Plugin memory usage
155    memory_used: usize,
156
157    /// Fuel consumed (if metering enabled)
158    fuel_consumed: u64,
159
160    /// Custom state from plugin
161    #[allow(dead_code)]
162    state: HashMap<String, Vec<u8>>,
163}
164
165impl LoadedPlugin {
166    /// Create a new loaded plugin
167    pub fn new(
168        metadata: PluginMetadata,
169        path: PathBuf,
170        module: Module,
171        sandbox: PluginSandbox,
172    ) -> Self {
173        Self {
174            metadata,
175            state: PluginState::Running,
176            path,
177            module,
178            instance_pre: OnceLock::new(),
179            sandbox,
180            instance_data: RwLock::new(PluginInstanceData {
181                memory_used: 0,
182                fuel_consumed: 0,
183                state: HashMap::new(),
184            }),
185            loaded_at: Instant::now(),
186            last_invoked: RwLock::new(None),
187            invocation_count: std::sync::atomic::AtomicU64::new(0),
188        }
189    }
190
191    /// Borrow the compiled module (Arc-cheap clone available via
192    /// `plugin.module.clone()` if the caller needs to outlive the
193    /// borrow).
194    #[allow(dead_code)]
195    pub(crate) fn module(&self) -> &Module {
196        &self.module
197    }
198
199    /// Get memory usage
200    pub fn memory_used(&self) -> usize {
201        self.instance_data.read().memory_used
202    }
203
204    /// Get invocation count
205    pub fn invocation_count(&self) -> u64 {
206        self.invocation_count
207            .load(std::sync::atomic::Ordering::Relaxed)
208    }
209
210    /// Get uptime
211    pub fn uptime(&self) -> Duration {
212        self.loaded_at.elapsed()
213    }
214
215    /// Get last invoked time
216    pub fn last_invoked(&self) -> Option<Instant> {
217        *self.last_invoked.read()
218    }
219
220    /// Record an invocation
221    pub fn record_invocation(&self) {
222        self.invocation_count
223            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
224        *self.last_invoked.write() = Some(Instant::now());
225    }
226}
227
228/// WASM plugin runtime
229pub struct WasmPluginRuntime {
230    /// Runtime configuration
231    config: PluginRuntimeConfig,
232
233    /// wasmtime engine — shared across all plugins. Compiles modules
234    /// once; modules cheaply share a reference to it.
235    engine: Engine,
236
237    /// Shared host-import linker, built once with the `env::*` KV and
238    /// crypto imports. Reused to pre-resolve every plugin's instantiation
239    /// plan instead of allocating a fresh `Linker` and re-registering host
240    /// functions on every hook call.
241    linker: Linker<StoreCtx>,
242
243    /// Stop flag for the background epoch ticker thread (see `new`).
244    epoch_stop: Arc<AtomicBool>,
245
246    /// Host function registry
247    #[allow(dead_code)]
248    host_functions: Arc<HostFunctionRegistry>,
249
250    /// Per-plugin KV backend bridged into wasmtime imports. Survives
251    /// across calls so plugins can persist state between hooks.
252    kv: KvBackend,
253
254    /// Module cache (path -> compiled module). Avoids re-compiling
255    /// the same `.wasm` on every load.
256    module_cache: RwLock<HashMap<PathBuf, Module>>,
257
258    /// Default security policy
259    default_policy: SecurityPolicy,
260
261    /// Creation timestamp
262    created_at: Instant,
263}
264
265impl WasmPluginRuntime {
266    /// Create a new WASM runtime
267    pub fn new(config: &PluginRuntimeConfig) -> Result<Self, PluginError> {
268        let host_functions = Arc::new(HostFunctionRegistry::new());
269
270        let mut engine_config = wasmtime::Config::new();
271        if config.fuel_metering {
272            engine_config.consume_fuel(true);
273        }
274        // Epoch-based interrupts let us bound execution time without
275        // polling fuel from inside the call.
276        engine_config.epoch_interruption(true);
277
278        let engine = Engine::new(&engine_config)
279            .map_err(|e| PluginError::RuntimeError(format!("wasmtime engine init: {}", e)))?;
280
281        // Build the host-import linker once. The KV/crypto imports read
282        // their state from each call's `Store` data (Caller<StoreCtx>), so
283        // a single shared linker is correct across all plugins and calls.
284        let mut linker: Linker<StoreCtx> = Linker::new(&engine);
285        register_kv_imports(&mut linker)?;
286        register_crypto_imports(&mut linker)?;
287
288        // Background epoch ticker: bumps the engine epoch every ~1ms so
289        // that per-call epoch deadlines actually enforce a wall-clock
290        // timeout on plugin execution (previously the deadline was set to
291        // u64::MAX, so the configured timeout was never enforced). A
292        // std::thread is used so enforcement works with or without a tokio
293        // runtime; it exits within ~1ms of the runtime being dropped.
294        let epoch_stop = Arc::new(AtomicBool::new(false));
295        {
296            let engine = engine.clone();
297            let stop = epoch_stop.clone();
298            std::thread::Builder::new()
299                .name("wasm-epoch-ticker".into())
300                .spawn(move || {
301                    while !stop.load(std::sync::atomic::Ordering::Relaxed) {
302                        std::thread::sleep(Duration::from_millis(1));
303                        engine.increment_epoch();
304                    }
305                })
306                .ok();
307        }
308
309        let default_policy = SecurityPolicy {
310            allowed_hosts: vec!["localhost".to_string()],
311            allowed_paths: vec![config.plugin_dir.clone()],
312            max_memory: config.memory_limit,
313            max_execution_time: config.timeout,
314            allow_network: false,
315            allow_filesystem: false,
316        };
317
318        Ok(Self {
319            config: config.clone(),
320            engine,
321            linker,
322            epoch_stop,
323            host_functions,
324            kv: KvBackend::new(),
325            module_cache: RwLock::new(HashMap::new()),
326            default_policy,
327            created_at: Instant::now(),
328        })
329    }
330
331    /// Expose the per-plugin KV backend so admin/test code can seed
332    /// or inspect a plugin's state without going through WASM.
333    pub fn kv(&self) -> &KvBackend {
334        &self.kv
335    }
336
337    /// Borrow the shared host-import linker (used by the manager/tests to
338    /// pre-resolve modules against the same import set).
339    #[allow(dead_code)]
340    pub(crate) fn linker(&self) -> &Linker<StoreCtx> {
341        &self.linker
342    }
343
344    /// Expose the engine so tests + the plugin manager can build new
345    /// `Store`s against it.
346    #[allow(dead_code)]
347    pub(crate) fn engine(&self) -> &Engine {
348        &self.engine
349    }
350
351    /// Expose the runtime config so the plugin manager can consult
352    /// fields it owns (e.g. `trust_root`) without holding a separate
353    /// copy.
354    pub fn config(&self) -> &PluginRuntimeConfig {
355        &self.config
356    }
357
358    /// Instantiate a plugin from manifest and WASM bytes
359    pub fn instantiate(
360        &self,
361        manifest: &super::loader::PluginManifest,
362        wasm_bytes: &[u8],
363    ) -> Result<LoadedPlugin, PluginError> {
364        // Validate WASM module (basic magic number check)
365        if wasm_bytes.len() < 8 {
366            return Err(PluginError::LoadError("WASM module too small".to_string()));
367        }
368
369        // WASM magic number: 0x00 0x61 0x73 0x6d (\\0asm)
370        if &wasm_bytes[0..4] != b"\x00asm" {
371            return Err(PluginError::LoadError(
372                "Invalid WASM magic number".to_string(),
373            ));
374        }
375
376        // Build metadata from manifest
377        let metadata = PluginMetadata {
378            name: manifest.name.clone(),
379            version: manifest.version.clone(),
380            description: manifest.description.clone(),
381            author: manifest.author.clone(),
382            hooks: manifest.hooks.clone(),
383            permissions: manifest.permissions.clone(),
384            min_memory: manifest.min_memory,
385            max_memory: manifest.max_memory.min(self.config.memory_limit),
386        };
387
388        // Build sandbox with merged policy
389        let resource_limits = ResourceLimits {
390            max_memory: metadata.max_memory,
391            max_execution_time: self.config.timeout,
392            max_fuel: if self.config.fuel_metering {
393                Some(self.config.fuel_limit)
394            } else {
395                None
396            },
397            max_table_elements: 10000,
398            max_instances: 1,
399        };
400
401        let sandbox = PluginSandbox::new(
402            self.default_policy.clone(),
403            resource_limits,
404            manifest.permissions.clone(),
405        );
406
407        // Compile via wasmtime — this validates the module and produces
408        // an Arc-wrapped Module ready for repeated instantiation.
409        let module = Module::from_binary(&self.engine, wasm_bytes)
410            .map_err(|e| PluginError::InstantiationError(format!("wasmtime compile: {}", e)))?;
411
412        // Cache the compiled Module (cheap clone on hit).
413        {
414            let mut cache = self.module_cache.write();
415            cache.insert(manifest.path.clone(), module.clone());
416        }
417
418        Ok(LoadedPlugin::new(
419            metadata,
420            manifest.path.clone(),
421            module,
422            sandbox,
423        ))
424    }
425
426    /// Call a hook on a plugin via wasmtime.
427    ///
428    /// 1. Build a fresh `Store` (wasmtime stores are not Sync, so each
429    ///    invocation is isolated).
430    /// 2. Apply fuel metering (per-call fuel cap) when configured.
431    /// 3. Instantiate the module against an empty Linker — host
432    ///    functions are TODO; plugins that import them will fail at
433    ///    instantiation with a clear error message.
434    /// 4. Look up `memory`, `alloc`, `dealloc`, and the named hook
435    ///    function exports.
436    /// 5. Allocate a slot in plugin memory, write `args`, call the
437    ///    hook, decode `(result_ptr, result_len)`, copy the result
438    ///    bytes out.
439    /// 6. Free both input and output slots via `dealloc`.
440    /// 7. Drop the store; the plugin's per-call state is gone.
441    pub fn call_hook(
442        &self,
443        plugin: &LoadedPlugin,
444        hook: HookType,
445        args: &[u8],
446    ) -> Result<Vec<u8>, PluginError> {
447        // Check if plugin supports this hook
448        if !plugin.metadata.hooks.contains(&hook) {
449            return Err(PluginError::HookNotFound(format!(
450                "Plugin {} does not support hook {:?}",
451                plugin.metadata.name, hook
452            )));
453        }
454
455        // Check state
456        if plugin.state != PluginState::Running {
457            return Err(PluginError::ExecutionError(format!(
458                "Plugin {} is not running (state: {:?})",
459                plugin.metadata.name, plugin.state
460            )));
461        }
462
463        // Record invocation
464        plugin.record_invocation();
465
466        // Fresh per-call store; not Sync, so we never share across calls.
467        // The data carries the plugin's identity + a clone of the shared
468        // KV backend so host imports can route to the right namespace.
469        let store_ctx = StoreCtx {
470            plugin_name: plugin.metadata.name.clone(),
471            kv: self.kv.clone(),
472        };
473        let mut store: Store<StoreCtx> = Store::new(&self.engine, store_ctx);
474        if self.config.fuel_metering {
475            // wasmtime's set_fuel returns Result; cap is per-call.
476            store
477                .set_fuel(self.config.fuel_limit)
478                .map_err(|e| PluginError::RuntimeError(format!("set_fuel: {}", e)))?;
479        }
480        // Epoch interruption was enabled at engine init and a background
481        // ticker bumps the engine epoch every ~1ms. Set the deadline to
482        // `timeout` worth of ticks so a runaway plugin traps at its
483        // configured wall-clock timeout instead of blocking the caller
484        // indefinitely. (Set after Store::new, before the call.)
485        let deadline_ticks = self.config.timeout.as_millis().max(1).min(u64::MAX as u128) as u64;
486        store.set_epoch_deadline(deadline_ticks);
487
488        // Instantiate from the plugin's pre-resolved plan, computed once
489        // against the runtime's shared host-import linker and cached on the
490        // plugin. Per call this is just a linear-memory/table init — no
491        // Linker allocation, no host-import re-registration, no import-name
492        // resolution.
493        let instance_pre = match plugin.instance_pre.get() {
494            Some(ip) => ip,
495            None => {
496                let ip = self.linker.instantiate_pre(&plugin.module).map_err(|e| {
497                    PluginError::InstantiationError(format!(
498                        "pre-instantiate {}: {}",
499                        plugin.metadata.name, e
500                    ))
501                })?;
502                // Race-tolerant: if another thread set it first, keep theirs.
503                let _ = plugin.instance_pre.set(ip);
504                plugin.instance_pre.get().expect("just set")
505            }
506        };
507        let instance = instance_pre.instantiate(&mut store).map_err(|e| {
508            PluginError::InstantiationError(format!("instantiate {}: {}", plugin.metadata.name, e))
509        })?;
510
511        let memory = instance.get_memory(&mut store, "memory").ok_or_else(|| {
512            PluginError::ExecutionError(format!(
513                "plugin {} does not export `memory`",
514                plugin.metadata.name
515            ))
516        })?;
517
518        let alloc = get_typed::<_, i32, i32>(&instance, &mut store, "alloc")?;
519        let dealloc = get_typed::<_, (i32, i32), ()>(&instance, &mut store, "dealloc")?;
520
521        // Allocate input slot inside the plugin's address space and
522        // copy `args` in.
523        let in_len = args.len() as i32;
524        let in_ptr = alloc
525            .call(&mut store, in_len)
526            .map_err(|e| PluginError::ExecutionError(format!("alloc({}): {}", in_len, e)))?;
527        if in_len > 0 {
528            write_memory(&memory, &mut store, in_ptr, args)?;
529        }
530
531        // Try the result-returning ABI first; if the export has the
532        // observer ABI (no return), fall back to that.
533        let export_name = hook.export_name();
534        let result_bytes = match get_typed::<_, (i32, i32), i64>(&instance, &mut store, export_name)
535        {
536            Ok(hook_fn) => {
537                let packed = hook_fn.call(&mut store, (in_ptr, in_len)).map_err(|e| {
538                    PluginError::ExecutionError(format!("hook {} call: {}", export_name, e))
539                })?;
540                let out_ptr = (packed >> 32) as i32;
541                let out_len = (packed & 0xFFFF_FFFF) as i32;
542                if out_len > 0 {
543                    let bytes = read_memory(&memory, &store, out_ptr, out_len)?;
544                    // Free the plugin-allocated output slot.
545                    let _ = dealloc.call(&mut store, (out_ptr, out_len));
546                    bytes
547                } else {
548                    Vec::new()
549                }
550            }
551            Err(_) => {
552                // Observer ABI: (i32, i32) → ()
553                let observer = get_typed::<_, (i32, i32), ()>(&instance, &mut store, export_name)?;
554                observer.call(&mut store, (in_ptr, in_len)).map_err(|e| {
555                    PluginError::ExecutionError(format!(
556                        "observer hook {} call: {}",
557                        export_name, e
558                    ))
559                })?;
560                Vec::new()
561            }
562        };
563
564        // Free the input slot. Best-effort; failure here doesn't
565        // abort the call (the store is about to be dropped anyway).
566        let _ = dealloc.call(&mut store, (in_ptr, in_len));
567
568        // Update per-plugin instance accounting.
569        if self.config.fuel_metering {
570            if let Ok(remaining) = store.get_fuel() {
571                let consumed = self.config.fuel_limit.saturating_sub(remaining);
572                plugin.instance_data.write().fuel_consumed = consumed;
573            }
574        }
575        plugin.instance_data.write().memory_used = memory.data_size(&store);
576
577        Ok(result_bytes)
578    }
579
580    /// Call pre-query hook
581    pub fn call_pre_query(
582        &self,
583        plugin: &LoadedPlugin,
584        ctx: &QueryContext,
585    ) -> Result<PreQueryResult, PluginError> {
586        // Serialize context
587        let args = serde_json::to_vec(ctx).map_err(|e| {
588            PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
589        })?;
590
591        // Call the hook
592        let result = self.call_hook(plugin, HookType::PreQuery, &args)?;
593
594        // Deserialize result (or return default)
595        if result.is_empty() {
596            return Ok(PreQueryResult::Continue);
597        }
598
599        serde_json::from_slice(&result).map_err(|e| {
600            PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
601        })
602    }
603
604    /// Call authenticate hook
605    pub fn call_authenticate(
606        &self,
607        plugin: &LoadedPlugin,
608        request: &AuthRequest,
609    ) -> Result<AuthResult, PluginError> {
610        // Serialize request
611        let args = serde_json::to_vec(request).map_err(|e| {
612            PluginError::ExecutionError(format!("Failed to serialize request: {}", e))
613        })?;
614
615        // Call the hook
616        let result = self.call_hook(plugin, HookType::Authenticate, &args)?;
617
618        // Deserialize result (or return default)
619        if result.is_empty() {
620            return Ok(AuthResult::Defer);
621        }
622
623        serde_json::from_slice(&result).map_err(|e| {
624            PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
625        })
626    }
627
628    /// Call route hook
629    pub fn call_route(
630        &self,
631        plugin: &LoadedPlugin,
632        ctx: &QueryContext,
633    ) -> Result<RouteResult, PluginError> {
634        // Serialize context
635        let args = serde_json::to_vec(ctx).map_err(|e| {
636            PluginError::ExecutionError(format!("Failed to serialize context: {}", e))
637        })?;
638
639        // Call the hook
640        let result = self.call_hook(plugin, HookType::Route, &args)?;
641
642        // Deserialize result (or return default)
643        if result.is_empty() {
644            return Ok(RouteResult::Default);
645        }
646
647        serde_json::from_slice(&result).map_err(|e| {
648            PluginError::ExecutionError(format!("Failed to deserialize result: {}", e))
649        })
650    }
651
652    /// Get runtime statistics
653    pub fn stats(&self) -> RuntimeStats {
654        RuntimeStats {
655            uptime: self.created_at.elapsed(),
656            cached_modules: self.module_cache.read().len(),
657            fuel_metering_enabled: self.config.fuel_metering,
658            memory_limit: self.config.memory_limit,
659            timeout: self.config.timeout,
660        }
661    }
662}
663
664impl Drop for WasmPluginRuntime {
665    fn drop(&mut self) {
666        // Signal the epoch ticker thread to exit; it polls the flag every
667        // ~1ms and then releases its Engine clone.
668        self.epoch_stop
669            .store(true, std::sync::atomic::Ordering::Relaxed);
670    }
671}
672
673/// Runtime statistics
674#[derive(Debug, Clone)]
675pub struct RuntimeStats {
676    /// Uptime
677    pub uptime: Duration,
678
679    /// Number of cached modules
680    pub cached_modules: usize,
681
682    /// Whether fuel metering is enabled
683    pub fuel_metering_enabled: bool,
684
685    /// Memory limit per plugin
686    pub memory_limit: usize,
687
688    /// Execution timeout
689    pub timeout: Duration,
690}
691
692// Serialization support for hook types — written by hand because the
693// types live in `super` and we can't derive without touching every
694// field's type chain. Includes hook_context so plugins can read
695// per-request attributes (tenant_id, agent_id, ai_traffic, etc).
696impl serde::Serialize for QueryContext {
697    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
698    where
699        S: serde::Serializer,
700    {
701        use serde::ser::SerializeStruct;
702        let mut state = serializer.serialize_struct("QueryContext", 5)?;
703        state.serialize_field("query", &self.query)?;
704        state.serialize_field("normalized", &self.normalized)?;
705        state.serialize_field("tables", &self.tables)?;
706        state.serialize_field("is_read_only", &self.is_read_only)?;
707        state.serialize_field("hook_context", &self.hook_context)?;
708        state.end()
709    }
710}
711
712/// Look up a typed exported function on an instance, with a uniform
713/// "missing/wrong-shape" error message.
714fn get_typed<T, P, R>(
715    instance: &Instance,
716    store: &mut Store<T>,
717    name: &str,
718) -> Result<TypedFunc<P, R>, PluginError>
719where
720    P: wasmtime::WasmParams,
721    R: wasmtime::WasmResults,
722{
723    instance
724        .get_typed_func::<P, R>(store, name)
725        .map_err(|e| PluginError::ExecutionError(format!("export `{}`: {}", name, e)))
726}
727
728/// Copy `bytes` into the plugin's linear memory at `ptr`. Bounds-
729/// checked via wasmtime's safe `Memory::write`.
730fn write_memory<T>(
731    memory: &Memory,
732    store: &mut Store<T>,
733    ptr: i32,
734    bytes: &[u8],
735) -> Result<(), PluginError> {
736    memory
737        .write(store, ptr as usize, bytes)
738        .map_err(|e| PluginError::ExecutionError(format!("memory.write @ {}: {}", ptr, e)))
739}
740
741/// Copy `len` bytes out of plugin memory starting at `ptr`.
742fn read_memory<T>(
743    memory: &Memory,
744    store: &Store<T>,
745    ptr: i32,
746    len: i32,
747) -> Result<Vec<u8>, PluginError> {
748    if len <= 0 {
749        return Ok(Vec::new());
750    }
751    let mut out = vec![0u8; len as usize];
752    memory.read(store, ptr as usize, &mut out).map_err(|e| {
753        PluginError::ExecutionError(format!("memory.read @ {}+{}: {}", ptr, len, e))
754    })?;
755    Ok(out)
756}
757
758impl serde::Serialize for AuthRequest {
759    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
760    where
761        S: serde::Serializer,
762    {
763        use serde::ser::SerializeStruct;
764        let mut state = serializer.serialize_struct("AuthRequest", 5)?;
765        state.serialize_field("headers", &self.headers)?;
766        state.serialize_field("username", &self.username)?;
767        state.serialize_field("password", &self.password)?;
768        state.serialize_field("client_ip", &self.client_ip)?;
769        state.serialize_field("database", &self.database)?;
770        state.end()
771    }
772}
773
774impl<'de> serde::Deserialize<'de> for PreQueryResult {
775    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
776    where
777        D: serde::Deserializer<'de>,
778    {
779        #[derive(serde::Deserialize)]
780        struct Helper {
781            action: String,
782            #[serde(default)]
783            value: Option<String>,
784            #[serde(default)]
785            data: Option<Vec<u8>>,
786        }
787
788        let helper = Helper::deserialize(deserializer)?;
789        match helper.action.as_str() {
790            "continue" => Ok(PreQueryResult::Continue),
791            "rewrite" => Ok(PreQueryResult::Rewrite(helper.value.unwrap_or_default())),
792            "block" => Ok(PreQueryResult::Block(helper.value.unwrap_or_default())),
793            "cached" => Ok(PreQueryResult::Cached(helper.data.unwrap_or_default())),
794            _ => Ok(PreQueryResult::Continue),
795        }
796    }
797}
798
799impl<'de> serde::Deserialize<'de> for AuthResult {
800    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
801    where
802        D: serde::Deserializer<'de>,
803    {
804        #[derive(serde::Deserialize)]
805        struct Helper {
806            action: String,
807            #[serde(default)]
808            identity: Option<IdentityHelper>,
809            #[serde(default)]
810            message: Option<String>,
811        }
812
813        #[derive(serde::Deserialize)]
814        struct IdentityHelper {
815            user_id: String,
816            username: String,
817            #[serde(default)]
818            roles: Vec<String>,
819            #[serde(default)]
820            tenant_id: Option<String>,
821        }
822
823        let helper = Helper::deserialize(deserializer)?;
824        match helper.action.as_str() {
825            "success" => {
826                let id = helper.identity.unwrap_or(IdentityHelper {
827                    user_id: String::new(),
828                    username: String::new(),
829                    roles: Vec::new(),
830                    tenant_id: None,
831                });
832                Ok(AuthResult::Success(super::Identity {
833                    user_id: id.user_id,
834                    username: id.username,
835                    roles: id.roles,
836                    tenant_id: id.tenant_id,
837                    claims: std::collections::HashMap::new(),
838                }))
839            }
840            "denied" => Ok(AuthResult::Denied(helper.message.unwrap_or_default())),
841            "defer" => Ok(AuthResult::Defer),
842            _ => Ok(AuthResult::Defer),
843        }
844    }
845}
846
847impl<'de> serde::Deserialize<'de> for RouteResult {
848    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
849    where
850        D: serde::Deserializer<'de>,
851    {
852        #[derive(serde::Deserialize)]
853        struct Helper {
854            action: String,
855            #[serde(default)]
856            target: Option<String>,
857            #[serde(default)]
858            reason: Option<String>,
859        }
860
861        let helper = Helper::deserialize(deserializer)?;
862        match helper.action.as_str() {
863            "default" => Ok(RouteResult::Default),
864            "node" => Ok(RouteResult::Node(helper.target.unwrap_or_default())),
865            "primary" => Ok(RouteResult::Primary),
866            "standby" => Ok(RouteResult::Standby),
867            "branch" => Ok(RouteResult::Branch(helper.target.unwrap_or_default())),
868            // Block carries a human-readable reason in its own field so
869            // it doesn't overload `target` (which is a node identifier
870            // for the other variants).
871            "block" => Ok(RouteResult::Block(
872                helper
873                    .reason
874                    .unwrap_or_else(|| "blocked by plugin".to_string()),
875            )),
876            _ => Ok(RouteResult::Default),
877        }
878    }
879}
880
881#[cfg(test)]
882mod tests {
883    use super::*;
884
885    /// Build a tiny WAT module for runtime tests against a specific
886    /// engine. wasmtime requires Module and instantiating Engine to
887    /// match — so this takes the runtime's engine rather than
888    /// constructing one locally.
889    ///
890    /// Exports `memory`, `alloc`, `dealloc`, a `pre_query` hook that
891    /// ignores its input and returns a fixed payload at offset 1024,
892    /// and a `post_query` observer hook.
893    fn build_test_module(engine: &Engine) -> Module {
894        const PAYLOAD: &[u8] = b"hello-from-wasm";
895        let payload_hex: String = PAYLOAD.iter().map(|b| format!("\\{:02x}", b)).collect();
896        let wat = format!(
897            r#"
898            (module
899              (memory (export "memory") 1)
900
901              ;; Trivial alloc: always returns offset 4096 (test inputs
902              ;; are tiny so non-overlapping reuse is fine here). Real
903              ;; plugins ship a real allocator; the runtime only cares
904              ;; that `alloc` returns a writable address.
905              (func (export "alloc") (param $size i32) (result i32)
906                (i32.const 4096))
907
908              (func (export "dealloc") (param $ptr i32) (param $size i32)
909                (drop (local.get $ptr))
910                (drop (local.get $size)))
911
912              ;; Result-returning hook: writes PAYLOAD at offset 1024 and
913              ;; returns (1024 << 32) | PAYLOAD.len.
914              (func (export "pre_query")
915                (param $in_ptr i32) (param $in_len i32) (result i64)
916                (i64.or
917                  (i64.shl (i64.const 1024) (i64.const 32))
918                  (i64.const {payload_len})))
919
920              ;; Observer hook: takes args, returns nothing.
921              (func (export "post_query")
922                (param $in_ptr i32) (param $in_len i32)
923                (drop (local.get $in_ptr)))
924
925              (data (i32.const 1024) "{payload}")
926            )
927            "#,
928            payload = payload_hex,
929            payload_len = PAYLOAD.len(),
930        );
931        let bytes = wat::parse_str(&wat).expect("wat parses");
932        Module::from_binary(engine, &bytes).expect("module compiles")
933    }
934
935    /// A module whose `pre_query` hook spins forever — used to prove the
936    /// epoch-deadline wall-clock timeout actually interrupts a runaway
937    /// plugin instead of blocking the caller indefinitely.
938    fn build_spin_module(engine: &Engine) -> Module {
939        let wat = r#"
940            (module
941              (memory (export "memory") 1)
942              (func (export "alloc") (param i32) (result i32) (i32.const 4096))
943              (func (export "dealloc") (param i32) (param i32))
944              (func (export "pre_query") (param i32) (param i32) (result i64)
945                (loop $l (br $l))
946                (i64.const 0)))
947        "#;
948        let bytes = wat::parse_str(wat).expect("wat parses");
949        Module::from_binary(engine, &bytes).expect("module compiles")
950    }
951
952    /// A runaway plugin must trap at its configured timeout (enforced by
953    /// the background epoch ticker), not hang the caller. Guarded by a 5s
954    /// join so a regression fails fast instead of hanging the test binary.
955    #[test]
956    fn test_call_hook_enforces_timeout() {
957        let mut config = PluginRuntimeConfig::default();
958        config.fuel_metering = false; // isolate epoch enforcement from fuel
959        config.timeout = Duration::from_millis(100);
960        let runtime = Arc::new(WasmPluginRuntime::new(&config).unwrap());
961
962        let module = build_spin_module(runtime.engine());
963        let mut metadata = PluginMetadata::default();
964        metadata.name = "spin".to_string();
965        metadata.hooks = vec![HookType::PreQuery];
966        let plugin = Arc::new(LoadedPlugin::new(
967            metadata,
968            PathBuf::from("/test/spin.wasm"),
969            module,
970            PluginSandbox::default(),
971        ));
972
973        let (tx, rx) = std::sync::mpsc::channel();
974        {
975            let r = runtime.clone();
976            let p = plugin.clone();
977            std::thread::spawn(move || {
978                let res = r.call_hook(&p, HookType::PreQuery, b"{}");
979                let _ = tx.send(res.is_err());
980            });
981        }
982        match rx.recv_timeout(Duration::from_secs(5)) {
983            Ok(is_err) => assert!(is_err, "runaway plugin should trap with an error"),
984            Err(_) => panic!("call_hook did not return within 5s — epoch timeout not enforced"),
985        }
986    }
987
988    #[test]
989    fn test_plugin_error_display() {
990        let err = PluginError::LoadError("test".to_string());
991        assert!(err.to_string().contains("Load error"));
992
993        let err = PluginError::Timeout("plugin-a".to_string());
994        assert!(err.to_string().contains("Timeout"));
995    }
996
997    #[test]
998    fn test_plugin_state() {
999        assert_eq!(PluginState::Running, PluginState::Running);
1000        assert_ne!(PluginState::Running, PluginState::Paused);
1001    }
1002
1003    #[test]
1004    fn test_runtime_creation() {
1005        let config = PluginRuntimeConfig::default();
1006        let runtime = WasmPluginRuntime::new(&config);
1007        assert!(runtime.is_ok());
1008    }
1009
1010    #[test]
1011    fn test_runtime_stats() {
1012        let config = PluginRuntimeConfig::default();
1013        let runtime = WasmPluginRuntime::new(&config).unwrap();
1014        let stats = runtime.stats();
1015
1016        assert_eq!(stats.cached_modules, 0);
1017        assert!(stats.fuel_metering_enabled);
1018    }
1019
1020    #[test]
1021    fn test_loaded_plugin_invocation_count() {
1022        // Re-use the test module — its compiled `Module` is what
1023        // the LoadedPlugin needs now that the field is wasmtime-typed.
1024        let engine = Engine::default();
1025        let module = build_test_module(&engine);
1026        let metadata = PluginMetadata::default();
1027        let sandbox = PluginSandbox::default();
1028        let plugin = LoadedPlugin::new(
1029            metadata,
1030            PathBuf::from("/test/plugin.wasm"),
1031            module,
1032            sandbox,
1033        );
1034
1035        assert_eq!(plugin.invocation_count(), 0);
1036        plugin.record_invocation();
1037        assert_eq!(plugin.invocation_count(), 1);
1038        plugin.record_invocation();
1039        assert_eq!(plugin.invocation_count(), 2);
1040    }
1041
1042    /// End-to-end: load a WAT-built module, call `pre_query`, observe
1043    /// the plugin's payload comes back through the (ptr, len) ABI.
1044    /// This is the killer test that proves the wasmtime path is
1045    /// real, not a stub.
1046    #[test]
1047    fn test_call_hook_roundtrips_real_wasm() {
1048        let mut config = PluginRuntimeConfig::default();
1049        // Disable fuel metering — the test module is trivial and we
1050        // don't want to debug fuel exhaustion in unit tests.
1051        config.fuel_metering = false;
1052        let runtime = WasmPluginRuntime::new(&config).unwrap();
1053
1054        let module = build_test_module(runtime.engine());
1055        let mut metadata = PluginMetadata::default();
1056        metadata.name = "test-roundtrip".to_string();
1057        metadata.hooks = vec![HookType::PreQuery, HookType::PostQuery];
1058
1059        let plugin = LoadedPlugin::new(
1060            metadata,
1061            PathBuf::from("/test/roundtrip.wasm"),
1062            module,
1063            PluginSandbox::default(),
1064        );
1065        // Force into Running state — Loading would block.
1066        // (LoadedPlugin::new already sets Running by default.)
1067
1068        let bytes = runtime
1069            .call_hook(&plugin, HookType::PreQuery, b"ignored input")
1070            .expect("pre_query call");
1071        assert_eq!(bytes, b"hello-from-wasm");
1072        assert_eq!(plugin.invocation_count(), 1);
1073
1074        // Observer ABI: post_query has no return; should yield empty.
1075        let out = runtime
1076            .call_hook(&plugin, HookType::PostQuery, b"some bytes")
1077            .expect("post_query call");
1078        assert!(out.is_empty());
1079        assert_eq!(plugin.invocation_count(), 2);
1080    }
1081
1082    /// A plugin that doesn't declare a hook in its metadata cannot
1083    /// invoke that hook even if the WASM exports the function.
1084    #[test]
1085    fn test_call_hook_rejects_undeclared_hook() {
1086        let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1087        let module = build_test_module(runtime.engine());
1088        let mut metadata = PluginMetadata::default();
1089        metadata.hooks = vec![]; // declares nothing
1090        let plugin = LoadedPlugin::new(
1091            metadata,
1092            PathBuf::from("/test/empty.wasm"),
1093            module,
1094            PluginSandbox::default(),
1095        );
1096        let err = runtime
1097            .call_hook(&plugin, HookType::PreQuery, &[])
1098            .unwrap_err();
1099        assert!(matches!(err, PluginError::HookNotFound(_)));
1100    }
1101
1102    /// Calling a hook whose export name is missing surfaces as
1103    /// `ExecutionError`, not a panic.
1104    #[test]
1105    fn test_call_hook_missing_export_returns_error() {
1106        let runtime = WasmPluginRuntime::new(&PluginRuntimeConfig::default()).unwrap();
1107        let module = build_test_module(runtime.engine());
1108        let mut metadata = PluginMetadata::default();
1109        // Declare a hook the test module doesn't export.
1110        metadata.hooks = vec![HookType::Authenticate];
1111        let plugin = LoadedPlugin::new(
1112            metadata,
1113            PathBuf::from("/test/missing.wasm"),
1114            module,
1115            PluginSandbox::default(),
1116        );
1117        let err = runtime
1118            .call_hook(&plugin, HookType::Authenticate, &[])
1119            .unwrap_err();
1120        assert!(matches!(err, PluginError::ExecutionError(_)));
1121    }
1122
1123    /// Build a WAT module that imports kv_set + kv_get from `env` and
1124    /// calls kv_set on `pre_query`. Used to validate the host-import
1125    /// bridge end-to-end through wasmtime.
1126    fn build_kv_test_module(engine: &Engine) -> Module {
1127        // Layout:
1128        //   offset 100: 3 bytes "key"
1129        //   offset 200: 5 bytes "value"
1130        let wat = r#"
1131            (module
1132              (import "env" "kv_set"
1133                (func $kv_set (param i32 i32 i32 i32) (result i32)))
1134              (memory (export "memory") 1)
1135
1136              (data (i32.const 100) "key")
1137              (data (i32.const 200) "value")
1138
1139              (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1140              (func (export "dealloc") (param i32 i32))
1141
1142              ;; pre_query: kv_set("key", "value"); return 0 (no payload).
1143              (func (export "pre_query")
1144                (param $in_ptr i32) (param $in_len i32) (result i64)
1145                (drop (call $kv_set
1146                  (i32.const 100) (i32.const 3)
1147                  (i32.const 200) (i32.const 5)))
1148                (i64.const 0))
1149            )
1150        "#;
1151        let bytes = wat::parse_str(wat).expect("kv-wat parses");
1152        Module::from_binary(engine, &bytes).expect("kv module compiles")
1153    }
1154
1155    /// Calls a WASM `pre_query` hook that invokes the host's kv_set
1156    /// import. Verifies the value lands in the runtime's KvBackend
1157    /// under the plugin's namespace and is readable from Rust.
1158    #[test]
1159    fn test_host_kv_import_persists_value() {
1160        let mut config = PluginRuntimeConfig::default();
1161        config.fuel_metering = false;
1162        let runtime = WasmPluginRuntime::new(&config).unwrap();
1163
1164        let module = build_kv_test_module(runtime.engine());
1165        let mut metadata = PluginMetadata::default();
1166        metadata.name = "kv-test-plugin".to_string();
1167        metadata.hooks = vec![HookType::PreQuery];
1168
1169        let plugin = LoadedPlugin::new(
1170            metadata,
1171            PathBuf::from("/test/kv.wasm"),
1172            module,
1173            PluginSandbox::default(),
1174        );
1175
1176        // Sanity: namespace empty before the call.
1177        assert_eq!(runtime.kv().get("kv-test-plugin", b"key"), None);
1178
1179        let _ = runtime
1180            .call_hook(&plugin, HookType::PreQuery, &[])
1181            .expect("pre_query call");
1182
1183        // The plugin called kv_set("key", "value") inside WASM; the
1184        // host should have stored it under this plugin's namespace.
1185        assert_eq!(
1186            runtime.kv().get("kv-test-plugin", b"key"),
1187            Some(b"value".to_vec())
1188        );
1189        // And nowhere else.
1190        assert_eq!(runtime.kv().get("other-plugin", b"key"), None);
1191    }
1192
1193    /// Build a WAT module that imports `env.sha256_hex` and exposes a
1194    /// `pre_query` hook which:
1195    ///   1. computes sha256_hex over an embedded "abc" payload at
1196    ///      offset 100 (3 bytes)
1197    ///   2. writes the 64-byte hex digest at offset 200
1198    ///   3. returns the packed (200 << 32) | 64
1199    /// so the host can read the digest out of plugin memory.
1200    fn build_sha256_test_module(engine: &Engine) -> Module {
1201        let wat = r#"
1202            (module
1203              (import "env" "sha256_hex"
1204                (func $sha256_hex (param i32 i32 i32) (result i32)))
1205              (memory (export "memory") 1)
1206
1207              (data (i32.const 100) "abc")
1208
1209              (func (export "alloc") (param i32) (result i32) (i32.const 4096))
1210              (func (export "dealloc") (param i32 i32))
1211
1212              (func (export "pre_query")
1213                (param $in_ptr i32) (param $in_len i32) (result i64)
1214                (drop (call $sha256_hex
1215                  (i32.const 100) (i32.const 3)
1216                  (i32.const 200)))
1217                (i64.or
1218                  (i64.shl (i64.const 200) (i64.const 32))
1219                  (i64.const 64)))
1220            )
1221        "#;
1222        let bytes = wat::parse_str(wat).expect("sha256-wat parses");
1223        Module::from_binary(engine, &bytes).expect("sha256 module compiles")
1224    }
1225
1226    /// RouteResult deserialiser handles the new Block variant via a
1227    /// `reason` field separate from `target` (which the other variants
1228    /// use as a node identifier).
1229    #[test]
1230    fn test_route_result_deserialises_block_with_reason() {
1231        let json = r#"{"action":"block","reason":"cross-region read forbidden"}"#;
1232        let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1233        match r {
1234            RouteResult::Block(reason) => {
1235                assert_eq!(reason, "cross-region read forbidden");
1236            }
1237            other => panic!("expected Block, got {:?}", other),
1238        }
1239    }
1240
1241    /// Block without a reason field falls back to a generic message —
1242    /// keeps the deserialiser permissive when plugins forget the field.
1243    #[test]
1244    fn test_route_result_block_defaults_reason_when_missing() {
1245        let json = r#"{"action":"block"}"#;
1246        let r: RouteResult = serde_json::from_str(json).expect("block deserialises");
1247        match r {
1248            RouteResult::Block(reason) => {
1249                assert!(!reason.is_empty(), "default reason should not be empty");
1250            }
1251            other => panic!("expected Block, got {:?}", other),
1252        }
1253    }
1254
1255    /// SHA-256 of "abc" is the canonical RFC 6234 test vector. Verifies
1256    /// the host-import bridge produces real cryptographic output, not
1257    /// the FNV-flavoured placeholder that audit-chain ships today.
1258    #[test]
1259    fn test_host_sha256_import_matches_rfc_6234_vector() {
1260        const SHA256_OF_ABC: &[u8; 64] =
1261            b"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad";
1262
1263        let mut config = PluginRuntimeConfig::default();
1264        config.fuel_metering = false;
1265        let runtime = WasmPluginRuntime::new(&config).unwrap();
1266
1267        let module = build_sha256_test_module(runtime.engine());
1268        let mut metadata = PluginMetadata::default();
1269        metadata.name = "sha256-test-plugin".to_string();
1270        metadata.hooks = vec![HookType::PreQuery];
1271
1272        let plugin = LoadedPlugin::new(
1273            metadata,
1274            PathBuf::from("/test/sha256.wasm"),
1275            module,
1276            PluginSandbox::default(),
1277        );
1278
1279        let out = runtime
1280            .call_hook(&plugin, HookType::PreQuery, &[])
1281            .expect("pre_query call");
1282        assert_eq!(out.len(), 64);
1283        assert_eq!(&out[..], SHA256_OF_ABC);
1284    }
1285}