Skip to main content

barbacane_wasm/
instance.rs

1//! WASM plugin instance management.
2//!
3//! Each plugin instance wraps a wasmtime Store and Instance with the
4//! plugin state required for host function calls.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use wasmtime::{Caller, Engine, Instance, Linker, Memory, Store, TypedFunc};
10
11use barbacane_plugin_sdk::types::base64_body;
12use serde::Deserialize;
13use std::collections::BTreeMap;
14
15use crate::broker::BrokerMessage;
16use crate::engine::CompiledModule;
17use crate::error::WasmError;
18use crate::http_client::{
19    HttpClient, HttpRequest as HttpClientRequest, HttpResponse as HttpClientResponse,
20};
21use crate::limits::PluginLimits;
22
23/// Events sent through the streaming channel by `host_http_stream` (ADR-0023).
24///
25/// The host function sends `Headers` once (before any chunks), then zero or
26/// more `Chunk` events. The sender is dropped when the upstream stream ends,
27/// signalling the receiver that the body is complete.
28#[derive(Debug)]
29pub enum StreamEvent {
30    /// Response status and headers from the upstream (sent before body chunks).
31    Headers {
32        status: u16,
33        headers: BTreeMap<String, String>,
34    },
35    /// Body chunk forwarded from the upstream streaming response.
36    Chunk(bytes::Bytes),
37}
38
39/// HTTP request format from WASM plugins.
40/// This matches the format used by http-upstream plugin.
41///
42/// The `body` field uses base64 encoding over JSON to support binary payloads
43/// (e.g. multipart/form-data with file uploads).
44#[derive(Debug, Deserialize)]
45pub(crate) struct PluginHttpRequest {
46    pub(crate) method: String,
47    pub(crate) url: String,
48    #[serde(default)]
49    pub(crate) headers: BTreeMap<String, String>,
50    #[serde(default, with = "base64_body")]
51    pub(crate) body: Option<Vec<u8>>,
52    #[serde(default)]
53    pub(crate) timeout_ms: Option<u64>,
54}
55
56/// Per-request context passed to plugins.
57#[derive(Debug, Clone, Default)]
58pub struct RequestContext {
59    /// Key-value store for inter-middleware communication.
60    pub values: HashMap<String, String>,
61
62    /// Result buffer for host_context_get.
63    pub last_get_result: Option<String>,
64
65    /// Trace ID for distributed tracing.
66    pub trace_id: String,
67
68    /// Request ID.
69    pub request_id: String,
70}
71
72impl RequestContext {
73    /// Create a new request context.
74    pub fn new(trace_id: String, request_id: String) -> Self {
75        Self {
76            values: HashMap::new(),
77            last_get_result: None,
78            trace_id,
79            request_id,
80        }
81    }
82}
83
84/// State attached to each WASM store.
85pub struct PluginState {
86    /// Plugin name for logging.
87    pub plugin_name: String,
88
89    /// Output buffer for plugin results.
90    pub output_buffer: Vec<u8>,
91
92    /// Per-request context.
93    pub context: RequestContext,
94
95    /// Maximum memory in bytes.
96    pub max_memory: usize,
97
98    /// HTTP client for outbound requests (shared).
99    pub http_client: Option<Arc<HttpClient>>,
100
101    /// Result buffer for host_http_read_result.
102    pub last_http_result: Option<Vec<u8>>,
103
104    /// Resolved secrets store (shared across instances).
105    pub secrets: crate::secrets::SecretsStore,
106
107    /// Result buffer for host_secret_read_result.
108    pub last_secret_result: Option<Vec<u8>>,
109
110    /// Rate limiter (shared across instances).
111    pub rate_limiter: Option<crate::rate_limiter::RateLimiter>,
112
113    /// Result buffer for host_rate_limit_read_result.
114    pub last_rate_limit_result: Option<Vec<u8>>,
115
116    /// Response cache (shared across instances).
117    pub response_cache: Option<crate::cache::ResponseCache>,
118
119    /// Result buffer for host_cache_read_result.
120    pub last_cache_result: Option<Vec<u8>>,
121
122    /// Metrics registry for plugin telemetry (shared).
123    pub metrics: Option<Arc<barbacane_telemetry::MetricsRegistry>>,
124
125    /// Kafka publisher for host_kafka_publish (shared).
126    pub kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
127
128    /// NATS publisher for host_nats_publish (shared).
129    pub nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
130
131    /// Result buffer for host_kafka_publish / host_nats_publish.
132    pub last_broker_result: Option<Vec<u8>>,
133
134    /// Result buffer for host_uuid_read_result.
135    pub last_uuid_result: Option<Vec<u8>>,
136
137    /// Channel sender for host_http_stream (ADR-0023).
138    ///
139    /// Set by the host before calling a streaming-capable dispatcher. The host
140    /// function sends `StreamEvent::Headers` once, then `StreamEvent::Chunk` for
141    /// each body chunk, then drops the sender to signal end-of-stream.
142    pub stream_sender: Option<Arc<tokio::sync::mpsc::UnboundedSender<StreamEvent>>>,
143
144    /// Upstream WebSocket upgrade request from `host_ws_upgrade` (ADR-0026).
145    ///
146    /// After a successful `host_ws_upgrade`, the request params are stored here.
147    /// The actual connection is deferred to the async relay task on the main
148    /// runtime, because the TcpStream must be created on the runtime that will
149    /// drive it (a temporary runtime's I/O driver dies when the runtime drops).
150    pub ws_upgrade_request: Option<crate::ws_client::WsUpgradeRequest>,
151
152    // --- Side-channel body buffers ---
153    // Bodies travel as raw bytes via dedicated host functions instead of
154    // base64-encoded inside JSON. This eliminates the ~3.65× memory overhead
155    // and allows 10MB+ bodies within the default 16MB WASM memory limit.
156    /// Request/response body held by the host, set before calling the handler.
157    /// Plugins read it via `host_body_len()` + `host_body_read()`.
158    pub request_body: Option<Vec<u8>>,
159
160    /// Output body set by the plugin via `host_body_set()` or `host_body_clear()`.
161    /// Outer Option: was the function called? Inner Option: body or None.
162    pub output_body: Option<Option<Vec<u8>>>,
163
164    /// HTTP response body from `host_http_call`, held separately from the
165    /// JSON metadata in `last_http_result`. Plugins read it via
166    /// `host_http_response_body_len()` + `host_http_response_body_read()`.
167    pub http_response_body: Option<Vec<u8>>,
168
169    /// Outbound HTTP request body set by the plugin via
170    /// `host_http_request_body_set()`. Consumed by `host_http_call`.
171    pub http_request_body: Option<Vec<u8>>,
172}
173
174#[allow(dead_code)] // Constructors used by different pool configurations
175impl PluginState {
176    /// Create new plugin state.
177    pub fn new(plugin_name: String, limits: &PluginLimits) -> Self {
178        Self {
179            plugin_name,
180            output_buffer: Vec::new(),
181            context: RequestContext::default(),
182            max_memory: limits.max_memory_bytes,
183            http_client: None,
184            last_http_result: None,
185            secrets: crate::secrets::SecretsStore::new(),
186            last_secret_result: None,
187            rate_limiter: None,
188            last_rate_limit_result: None,
189            response_cache: None,
190            last_cache_result: None,
191            metrics: None,
192            kafka_publisher: None,
193            nats_publisher: None,
194            last_broker_result: None,
195            last_uuid_result: None,
196            stream_sender: None,
197            ws_upgrade_request: None,
198            request_body: None,
199            output_body: None,
200            http_response_body: None,
201            http_request_body: None,
202        }
203    }
204
205    /// Create new plugin state with HTTP client.
206    pub fn with_http_client(
207        plugin_name: String,
208        limits: &PluginLimits,
209        http_client: Arc<HttpClient>,
210    ) -> Self {
211        Self {
212            plugin_name,
213            output_buffer: Vec::new(),
214            context: RequestContext::default(),
215            max_memory: limits.max_memory_bytes,
216            http_client: Some(http_client),
217            last_http_result: None,
218            secrets: crate::secrets::SecretsStore::new(),
219            last_secret_result: None,
220            rate_limiter: None,
221            last_rate_limit_result: None,
222            response_cache: None,
223            last_cache_result: None,
224            metrics: None,
225            kafka_publisher: None,
226            nats_publisher: None,
227            last_broker_result: None,
228            last_uuid_result: None,
229            stream_sender: None,
230            ws_upgrade_request: None,
231            request_body: None,
232            output_body: None,
233            http_response_body: None,
234            http_request_body: None,
235        }
236    }
237
238    /// Create new plugin state with HTTP client and secrets.
239    pub fn with_http_client_and_secrets(
240        plugin_name: String,
241        limits: &PluginLimits,
242        http_client: Arc<HttpClient>,
243        secrets: crate::secrets::SecretsStore,
244    ) -> Self {
245        Self {
246            plugin_name,
247            output_buffer: Vec::new(),
248            context: RequestContext::default(),
249            max_memory: limits.max_memory_bytes,
250            http_client: Some(http_client),
251            last_http_result: None,
252            secrets,
253            last_secret_result: None,
254            rate_limiter: None,
255            last_rate_limit_result: None,
256            response_cache: None,
257            last_cache_result: None,
258            metrics: None,
259            kafka_publisher: None,
260            nats_publisher: None,
261            last_broker_result: None,
262            last_uuid_result: None,
263            stream_sender: None,
264            ws_upgrade_request: None,
265            request_body: None,
266            output_body: None,
267            http_response_body: None,
268            http_request_body: None,
269        }
270    }
271
272    /// Create new plugin state with all options.
273    #[allow(clippy::too_many_arguments)]
274    pub fn with_all_options(
275        plugin_name: String,
276        limits: &PluginLimits,
277        http_client: Option<Arc<HttpClient>>,
278        secrets: crate::secrets::SecretsStore,
279        rate_limiter: Option<crate::rate_limiter::RateLimiter>,
280        response_cache: Option<crate::cache::ResponseCache>,
281        nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
282        kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
283    ) -> Self {
284        Self {
285            plugin_name,
286            output_buffer: Vec::new(),
287            context: RequestContext::default(),
288            max_memory: limits.max_memory_bytes,
289            http_client,
290            last_http_result: None,
291            secrets,
292            last_secret_result: None,
293            rate_limiter,
294            last_rate_limit_result: None,
295            response_cache,
296            last_cache_result: None,
297            metrics: None,
298            kafka_publisher,
299            nats_publisher,
300            last_broker_result: None,
301            last_uuid_result: None,
302            stream_sender: None,
303            ws_upgrade_request: None,
304            request_body: None,
305            output_body: None,
306            http_response_body: None,
307            http_request_body: None,
308        }
309    }
310
311    /// Create new plugin state with all options including metrics.
312    #[allow(clippy::too_many_arguments)]
313    pub fn with_all_options_and_metrics(
314        plugin_name: String,
315        limits: &PluginLimits,
316        http_client: Option<Arc<HttpClient>>,
317        secrets: crate::secrets::SecretsStore,
318        rate_limiter: Option<crate::rate_limiter::RateLimiter>,
319        response_cache: Option<crate::cache::ResponseCache>,
320        nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
321        kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
322        metrics: Option<Arc<barbacane_telemetry::MetricsRegistry>>,
323    ) -> Self {
324        Self {
325            plugin_name,
326            output_buffer: Vec::new(),
327            context: RequestContext::default(),
328            max_memory: limits.max_memory_bytes,
329            http_client,
330            last_http_result: None,
331            secrets,
332            last_secret_result: None,
333            rate_limiter,
334            last_rate_limit_result: None,
335            response_cache,
336            last_cache_result: None,
337            metrics,
338            kafka_publisher,
339            nats_publisher,
340            last_broker_result: None,
341            last_uuid_result: None,
342            stream_sender: None,
343            ws_upgrade_request: None,
344            request_body: None,
345            output_body: None,
346            http_response_body: None,
347            http_request_body: None,
348        }
349    }
350
351    /// Get the output buffer contents.
352    pub fn take_output(&mut self) -> Vec<u8> {
353        std::mem::take(&mut self.output_buffer)
354    }
355
356    /// Set the request context for this call.
357    pub fn set_context(&mut self, context: RequestContext) {
358        self.context = context;
359    }
360
361    /// Set the stream sender for host_http_stream (ADR-0023).
362    pub fn set_stream_sender(
363        &mut self,
364        sender: Arc<tokio::sync::mpsc::UnboundedSender<StreamEvent>>,
365    ) {
366        self.stream_sender = Some(sender);
367    }
368
369    /// Take the upstream WebSocket upgrade request from host_ws_upgrade (ADR-0026).
370    pub fn take_ws_upgrade_request(&mut self) -> Option<crate::ws_client::WsUpgradeRequest> {
371        self.ws_upgrade_request.take()
372    }
373
374    /// Set the request body for the next handler call (side-channel).
375    pub fn set_request_body(&mut self, body: Option<Vec<u8>>) {
376        self.request_body = body;
377    }
378
379    /// Take the output body set by the plugin via host_body_set/host_body_clear.
380    /// Returns `None` if the plugin didn't call either function (body unchanged).
381    /// Returns `Some(None)` if the plugin called host_body_clear.
382    /// Returns `Some(Some(bytes))` if the plugin called host_body_set.
383    pub fn take_output_body(&mut self) -> Option<Option<Vec<u8>>> {
384        self.output_body.take()
385    }
386}
387
388impl wasmtime::ResourceLimiter for PluginState {
389    fn memory_growing(
390        &mut self,
391        _current: usize,
392        desired: usize,
393        _maximum: Option<usize>,
394    ) -> Result<bool, wasmtime::Error> {
395        Ok(desired <= self.max_memory)
396    }
397
398    fn table_growing(
399        &mut self,
400        _current: usize,
401        desired: usize,
402        _maximum: Option<usize>,
403    ) -> Result<bool, wasmtime::Error> {
404        // Allow reasonable table growth
405        Ok(desired <= 10_000)
406    }
407}
408
409/// A WASM plugin instance ready for execution.
410pub struct PluginInstance {
411    store: Store<PluginState>,
412    _instance: Instance,
413    limits: PluginLimits,
414
415    // Cached function references
416    init_func: Option<TypedFunc<(i32, i32), i32>>,
417    on_request_func: Option<TypedFunc<(i32, i32), i32>>,
418    on_response_func: Option<TypedFunc<(i32, i32), i32>>,
419    dispatch_func: Option<TypedFunc<(i32, i32), i32>>,
420    alloc_func: Option<TypedFunc<i32, i32>>,
421    memory: Memory,
422}
423
424impl PluginInstance {
425    /// Create a new plugin instance from a compiled module.
426    pub fn new(
427        engine: &Engine,
428        module: &CompiledModule,
429        limits: PluginLimits,
430    ) -> Result<Self, WasmError> {
431        Self::new_with_options(engine, module, limits, None, None)
432    }
433
434    /// Create a new plugin instance with an HTTP client for outbound calls.
435    pub fn new_with_http_client(
436        engine: &Engine,
437        module: &CompiledModule,
438        limits: PluginLimits,
439        http_client: Option<Arc<HttpClient>>,
440    ) -> Result<Self, WasmError> {
441        Self::new_with_options(engine, module, limits, http_client, None)
442    }
443
444    /// Create a new plugin instance with HTTP client and secrets.
445    pub fn new_with_options(
446        engine: &Engine,
447        module: &CompiledModule,
448        limits: PluginLimits,
449        http_client: Option<Arc<HttpClient>>,
450        secrets: Option<crate::secrets::SecretsStore>,
451    ) -> Result<Self, WasmError> {
452        Self::new_with_all_options(
453            engine,
454            module,
455            limits,
456            http_client,
457            secrets,
458            None,
459            None,
460            None,
461            None,
462        )
463    }
464
465    /// Create a new plugin instance with all options including rate limiter and cache.
466    #[allow(clippy::too_many_arguments)]
467    pub fn new_with_all_options(
468        engine: &Engine,
469        module: &CompiledModule,
470        limits: PluginLimits,
471        http_client: Option<Arc<HttpClient>>,
472        secrets: Option<crate::secrets::SecretsStore>,
473        rate_limiter: Option<crate::rate_limiter::RateLimiter>,
474        response_cache: Option<crate::cache::ResponseCache>,
475        nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
476        kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
477    ) -> Result<Self, WasmError> {
478        let state = PluginState::with_all_options(
479            module.name.clone(),
480            &limits,
481            http_client,
482            secrets.unwrap_or_default(),
483            rate_limiter,
484            response_cache,
485            nats_publisher,
486            kafka_publisher,
487        );
488        let mut store = Store::new(engine, state);
489
490        // Set fuel for execution limiting
491        store
492            .set_fuel(limits.max_fuel)
493            .map_err(|e| WasmError::Instantiation(format!("failed to set fuel: {}", e)))?;
494
495        // Enable resource limiting
496        store.limiter(|state| state);
497
498        // Create linker and add host functions
499        let mut linker = Linker::new(engine);
500        add_host_functions(&mut linker)?;
501
502        // Instantiate the module
503        let instance = linker
504            .instantiate(&mut store, module.module())
505            .map_err(|e| WasmError::Instantiation(e.to_string()))?;
506
507        // Get memory
508        let memory = instance
509            .get_memory(&mut store, "memory")
510            .ok_or_else(|| WasmError::MissingExport("memory".into()))?;
511
512        // Cache function references
513        let init_func = instance
514            .get_typed_func::<(i32, i32), i32>(&mut store, "init")
515            .ok();
516        let on_request_func = instance
517            .get_typed_func::<(i32, i32), i32>(&mut store, "on_request")
518            .ok();
519        let on_response_func = instance
520            .get_typed_func::<(i32, i32), i32>(&mut store, "on_response")
521            .ok();
522        let dispatch_func = instance
523            .get_typed_func::<(i32, i32), i32>(&mut store, "dispatch")
524            .ok();
525        let alloc_func = instance
526            .get_typed_func::<i32, i32>(&mut store, "alloc")
527            .ok();
528
529        Ok(Self {
530            store,
531            _instance: instance,
532            limits,
533            init_func,
534            on_request_func,
535            on_response_func,
536            dispatch_func,
537            alloc_func,
538            memory,
539        })
540    }
541
542    /// Get the plugin name.
543    pub fn name(&self) -> &str {
544        &self.store.data().plugin_name
545    }
546
547    /// Write data to the plugin's linear memory and return the pointer.
548    ///
549    /// Uses the plugin's exported `alloc` function so that dlmalloc is aware
550    /// of the allocation and will not reuse the region during deserialization.
551    /// Falls back to growing memory directly for legacy plugins that lack the
552    /// `alloc` export (only safe for very small payloads like config JSON).
553    pub fn write_to_memory(&mut self, data: &[u8]) -> Result<i32, WasmError> {
554        if data.is_empty() {
555            return Ok(0);
556        }
557
558        if let Some(alloc_func) = self.alloc_func.clone() {
559            // Allocate via the plugin's own allocator — dlmalloc tracks this
560            // region and will not hand it out again during deserialization.
561            let ptr = alloc_func
562                .call(&mut self.store, data.len() as i32)
563                .map_err(|e| WasmError::Trap(format!("alloc failed: {}", e)))?;
564
565            if ptr == 0 {
566                let current_size = self.memory.data_size(&self.store);
567                return Err(WasmError::MemoryLimitExceeded {
568                    requested: data.len(),
569                    limit: self.limits.max_memory_bytes.saturating_sub(current_size),
570                });
571            }
572
573            self.memory
574                .write(&mut self.store, ptr as usize, data)
575                .map_err(|e| WasmError::Trap(format!("memory write failed: {}", e)))?;
576
577            Ok(ptr)
578        } else {
579            // Legacy fallback: grow memory and write at the new region.
580            // Only safe for small payloads (e.g. config JSON during init).
581            let current_size = self.memory.data_size(&self.store);
582            let needed = current_size + data.len();
583
584            if needed > self.limits.max_memory_bytes {
585                return Err(WasmError::MemoryLimitExceeded {
586                    requested: data.len(),
587                    limit: self.limits.max_memory_bytes.saturating_sub(current_size),
588                });
589            }
590
591            const PAGE_SIZE: usize = 65_536;
592            let pages_needed = data.len().div_ceil(PAGE_SIZE);
593
594            self.memory
595                .grow(&mut self.store, pages_needed as u64)
596                .map_err(|_| WasmError::MemoryLimitExceeded {
597                    requested: data.len(),
598                    limit: self.limits.max_memory_bytes.saturating_sub(current_size),
599                })?;
600
601            let ptr = current_size;
602            self.memory
603                .write(&mut self.store, ptr, data)
604                .map_err(|e| WasmError::Trap(format!("memory write failed: {}", e)))?;
605
606            Ok(ptr as i32)
607        }
608    }
609
610    /// Call the init function with the given config.
611    pub fn init(&mut self, config_json: &[u8]) -> Result<i32, WasmError> {
612        let init_func = self
613            .init_func
614            .clone()
615            .ok_or_else(|| WasmError::MissingExport("init".into()))?;
616
617        // Write config to memory
618        let ptr = self.write_to_memory(config_json)?;
619        let len = config_json.len() as i32;
620
621        // Reset fuel for this call
622        if let Err(e) = self.store.set_fuel(self.limits.max_fuel) {
623            tracing::warn!(error = %e, "failed to reset WASM fuel");
624        }
625
626        // Call init
627        let result = init_func
628            .call(&mut self.store, (ptr, len))
629            .map_err(|e| WasmError::Trap(e.to_string()))?;
630
631        Ok(result)
632    }
633
634    /// Call on_request with the given request data.
635    pub fn on_request(&mut self, request_json: &[u8]) -> Result<i32, WasmError> {
636        let func = self
637            .on_request_func
638            .clone()
639            .ok_or_else(|| WasmError::MissingExport("on_request".into()))?;
640
641        self.call_handler(func, request_json)
642    }
643
644    /// Call on_response with the given response data.
645    pub fn on_response(&mut self, response_json: &[u8]) -> Result<i32, WasmError> {
646        let func = self
647            .on_response_func
648            .clone()
649            .ok_or_else(|| WasmError::MissingExport("on_response".into()))?;
650
651        self.call_handler(func, response_json)
652    }
653
654    /// Call dispatch with the given request data.
655    pub fn dispatch(&mut self, request_json: &[u8]) -> Result<i32, WasmError> {
656        let func = self
657            .dispatch_func
658            .clone()
659            .ok_or_else(|| WasmError::MissingExport("dispatch".into()))?;
660
661        self.call_handler(func, request_json)
662    }
663
664    /// Call a handler function with data.
665    fn call_handler(
666        &mut self,
667        func: TypedFunc<(i32, i32), i32>,
668        data: &[u8],
669    ) -> Result<i32, WasmError> {
670        // Clear output buffer
671        self.store.data_mut().output_buffer.clear();
672
673        // Reset fuel before write_to_memory — the `alloc` call runs plugin
674        // code and needs fuel. Scale fuel with payload size: large bodies
675        // require proportionally more instructions for serde + base64.
676        let fuel = self.limits.max_fuel.max(data.len() as u64 * 100);
677        if let Err(e) = self.store.set_fuel(fuel) {
678            tracing::warn!(error = %e, "failed to reset WASM fuel");
679        }
680
681        // Write data to memory (may call plugin's `alloc` export)
682        let ptr = self.write_to_memory(data)?;
683        let len = data.len() as i32;
684
685        // Call function
686        let result = func
687            .call(&mut self.store, (ptr, len))
688            .map_err(|e| WasmError::Trap(e.to_string()))?;
689
690        Ok(result)
691    }
692
693    /// Get the output buffer contents.
694    pub fn take_output(&mut self) -> Vec<u8> {
695        self.store.data_mut().take_output()
696    }
697
698    /// Set the request context for the next call.
699    pub fn set_context(&mut self, context: RequestContext) {
700        self.store.data_mut().set_context(context);
701    }
702
703    /// Get the current request context (after modifications by host functions).
704    pub fn get_context(&self) -> RequestContext {
705        self.store.data().context.clone()
706    }
707
708    /// Take the last HTTP result buffer (from `host_http_call` or `host_http_stream`).
709    ///
710    /// Returns `None` if no HTTP call was made or the result was already taken.
711    pub fn take_last_http_result(&mut self) -> Option<Vec<u8>> {
712        self.store.data_mut().last_http_result.take()
713    }
714
715    /// Inject a stream sender for `host_http_stream` before calling `dispatch`.
716    ///
717    /// The sender is wrapped in an `Arc` so host functions can clone it for
718    /// use inside `std::thread::scope` without lifetime conflicts.
719    pub fn set_stream_sender(
720        &mut self,
721        sender: Arc<tokio::sync::mpsc::UnboundedSender<StreamEvent>>,
722    ) {
723        self.store.data_mut().set_stream_sender(sender);
724    }
725
726    /// Take the upstream WebSocket upgrade request from `host_ws_upgrade` (ADR-0026).
727    ///
728    /// Returns `None` if no WebSocket upgrade was requested or the request
729    /// was already taken.
730    pub fn take_ws_upgrade_request(&mut self) -> Option<crate::ws_client::WsUpgradeRequest> {
731        self.store.data_mut().take_ws_upgrade_request()
732    }
733
734    /// Set the request/response body for the next handler call (side-channel).
735    pub fn set_request_body(&mut self, body: Option<Vec<u8>>) {
736        self.store.data_mut().set_request_body(body);
737    }
738
739    /// Take the output body set by the plugin via host_body_set/host_body_clear.
740    pub fn take_output_body(&mut self) -> Option<Option<Vec<u8>>> {
741        self.store.data_mut().take_output_body()
742    }
743}
744
745/// Register a `host_*_read_result` function that copies data from plugin state to WASM memory.
746///
747/// All read_result host functions follow the same pattern: take a result buffer from state,
748/// get the WASM memory export, copy bytes into the provided buffer, return bytes written.
749fn add_read_result_fn(
750    linker: &mut Linker<PluginState>,
751    name: &str,
752    extract: impl Fn(&mut PluginState) -> Option<Vec<u8>> + Send + Sync + 'static,
753) -> Result<(), WasmError> {
754    linker
755        .func_wrap(
756            "barbacane",
757            name,
758            move |mut caller: Caller<'_, PluginState>, buf_ptr: i32, buf_len: i32| -> i32 {
759                let result = extract(caller.data_mut());
760                if let Some(data) = result {
761                    let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
762                        Some(m) => m,
763                        None => return 0,
764                    };
765                    let copy_len = std::cmp::min(data.len(), buf_len as usize);
766                    if memory
767                        .write(&mut caller, buf_ptr as usize, &data[..copy_len])
768                        .is_ok()
769                    {
770                        return copy_len as i32;
771                    }
772                }
773                0
774            },
775        )
776        .map_err(|e| WasmError::Instantiation(format!("failed to add {}: {}", name, e)))?;
777    Ok(())
778}
779
780/// Add host functions to the linker.
781fn add_host_functions(linker: &mut Linker<PluginState>) -> Result<(), WasmError> {
782    // host_set_output - always available
783    linker
784        .func_wrap(
785            "barbacane",
786            "host_set_output",
787            |mut caller: Caller<'_, PluginState>, ptr: i32, len: i32| {
788                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
789                    Some(m) => m,
790                    None => return,
791                };
792
793                let start = ptr as usize;
794                let end = start + len as usize;
795                let data = memory.data(&caller);
796
797                if end <= data.len() {
798                    let bytes = data[start..end].to_vec();
799                    caller.data_mut().output_buffer = bytes;
800                }
801            },
802        )
803        .map_err(|e| WasmError::Instantiation(format!("failed to add host_set_output: {}", e)))?;
804
805    // --- Side-channel body host functions ---
806    // Bodies travel as raw bytes instead of base64-in-JSON, eliminating
807    // the ~3.65× memory overhead per boundary crossing.
808
809    // host_body_len — returns the length of the held body, or -1 if None.
810    linker
811        .func_wrap(
812            "barbacane",
813            "host_body_len",
814            |caller: Caller<'_, PluginState>| -> i64 {
815                match &caller.data().request_body {
816                    Some(body) => body.len() as i64,
817                    None => -1,
818                }
819            },
820        )
821        .map_err(|e| WasmError::Instantiation(format!("failed to add host_body_len: {}", e)))?;
822
823    // host_body_read — copy the held body into WASM memory at ptr.
824    add_read_result_fn(linker, "host_body_read", |state| state.request_body.take())?;
825
826    // host_body_set — set the output body from raw bytes in WASM memory.
827    linker
828        .func_wrap(
829            "barbacane",
830            "host_body_set",
831            |mut caller: Caller<'_, PluginState>, ptr: i32, len: i32| {
832                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
833                    Some(m) => m,
834                    None => return,
835                };
836
837                let start = ptr as usize;
838                let end = start + len as usize;
839                let data = memory.data(&caller);
840
841                if end <= data.len() {
842                    let bytes = data[start..end].to_vec();
843                    caller.data_mut().output_body = Some(Some(bytes));
844                }
845            },
846        )
847        .map_err(|e| WasmError::Instantiation(format!("failed to add host_body_set: {}", e)))?;
848
849    // host_body_clear — explicitly set the output body to None.
850    linker
851        .func_wrap(
852            "barbacane",
853            "host_body_clear",
854            |mut caller: Caller<'_, PluginState>| {
855                caller.data_mut().output_body = Some(None);
856            },
857        )
858        .map_err(|e| WasmError::Instantiation(format!("failed to add host_body_clear: {}", e)))?;
859
860    // host_http_response_body_len — length of the HTTP response body from host_http_call.
861    linker
862        .func_wrap(
863            "barbacane",
864            "host_http_response_body_len",
865            |caller: Caller<'_, PluginState>| -> i64 {
866                match &caller.data().http_response_body {
867                    Some(body) => body.len() as i64,
868                    None => -1,
869                }
870            },
871        )
872        .map_err(|e| {
873            WasmError::Instantiation(format!("failed to add host_http_response_body_len: {}", e))
874        })?;
875
876    // host_http_response_body_read — copy HTTP response body into WASM memory.
877    add_read_result_fn(linker, "host_http_response_body_read", |state| {
878        state.http_response_body.take()
879    })?;
880
881    // host_http_request_body_set — set the outbound HTTP request body from WASM memory.
882    linker
883        .func_wrap(
884            "barbacane",
885            "host_http_request_body_set",
886            |mut caller: Caller<'_, PluginState>, ptr: i32, len: i32| {
887                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
888                    Some(m) => m,
889                    None => return,
890                };
891
892                let start = ptr as usize;
893                let end = start + len as usize;
894                let data = memory.data(&caller);
895
896                if end <= data.len() {
897                    let bytes = data[start..end].to_vec();
898                    caller.data_mut().http_request_body = Some(bytes);
899                }
900            },
901        )
902        .map_err(|e| {
903            WasmError::Instantiation(format!("failed to add host_http_request_body_set: {}", e))
904        })?;
905
906    // host_log
907    linker
908        .func_wrap(
909            "barbacane",
910            "host_log",
911            |mut caller: Caller<'_, PluginState>, level: i32, msg_ptr: i32, msg_len: i32| {
912                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
913                    Some(m) => m,
914                    None => return,
915                };
916
917                let start = msg_ptr as usize;
918                let end = start + msg_len as usize;
919                let data = memory.data(&caller);
920
921                if end <= data.len() {
922                    if let Ok(message) = std::str::from_utf8(&data[start..end]) {
923                        let plugin_name = caller.data().plugin_name.clone();
924                        match level {
925                            0 => tracing::error!(plugin = %plugin_name, "{}", message),
926                            1 => tracing::warn!(plugin = %plugin_name, "{}", message),
927                            2 => tracing::info!(plugin = %plugin_name, "{}", message),
928                            _ => tracing::debug!(plugin = %plugin_name, "{}", message),
929                        }
930                    }
931                }
932            },
933        )
934        .map_err(|e| WasmError::Instantiation(format!("failed to add host_log: {}", e)))?;
935
936    // host_context_get
937    linker
938        .func_wrap(
939            "barbacane",
940            "host_context_get",
941            |mut caller: Caller<'_, PluginState>, key_ptr: i32, key_len: i32| -> i32 {
942                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
943                    Some(m) => m,
944                    None => return -1,
945                };
946
947                let start = key_ptr as usize;
948                let end = start + key_len as usize;
949                let data = memory.data(&caller);
950
951                if end > data.len() {
952                    return -1;
953                }
954
955                let key = match std::str::from_utf8(&data[start..end]) {
956                    Ok(k) => k.to_string(),
957                    Err(_) => return -1,
958                };
959
960                match caller.data().context.values.get(&key).cloned() {
961                    Some(value) => {
962                        let len = value.len() as i32;
963                        caller.data_mut().context.last_get_result = Some(value);
964                        len
965                    }
966                    None => -1,
967                }
968            },
969        )
970        .map_err(|e| WasmError::Instantiation(format!("failed to add host_context_get: {}", e)))?;
971
972    // host_context_read_result
973    add_read_result_fn(linker, "host_context_read_result", |state| {
974        state.context.last_get_result.take().map(String::into_bytes)
975    })?;
976
977    // host_context_set
978    linker
979        .func_wrap(
980            "barbacane",
981            "host_context_set",
982            |mut caller: Caller<'_, PluginState>,
983             key_ptr: i32,
984             key_len: i32,
985             val_ptr: i32,
986             val_len: i32| {
987                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
988                    Some(m) => m,
989                    None => return,
990                };
991
992                let key_start = key_ptr as usize;
993                let key_end = key_start + key_len as usize;
994                let val_start = val_ptr as usize;
995                let val_end = val_start + val_len as usize;
996
997                // Read data first, then mutate
998                let data = memory.data(&caller);
999                if key_end <= data.len() && val_end <= data.len() {
1000                    let key_result =
1001                        std::str::from_utf8(&data[key_start..key_end]).map(String::from);
1002                    let val_result =
1003                        std::str::from_utf8(&data[val_start..val_end]).map(String::from);
1004
1005                    if let (Ok(key), Ok(value)) = (key_result, val_result) {
1006                        caller.data_mut().context.values.insert(key, value);
1007                    }
1008                }
1009            },
1010        )
1011        .map_err(|e| WasmError::Instantiation(format!("failed to add host_context_set: {}", e)))?;
1012
1013    // host_clock_now
1014    linker
1015        .func_wrap(
1016            "barbacane",
1017            "host_clock_now",
1018            |_caller: Caller<'_, PluginState>| -> i64 {
1019                use std::time::Instant;
1020
1021                static START: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
1022                let start = START.get_or_init(Instant::now);
1023
1024                start.elapsed().as_millis() as i64
1025            },
1026        )
1027        .map_err(|e| WasmError::Instantiation(format!("failed to add host_clock_now: {}", e)))?;
1028
1029    // host_time_now - alias for host_clock_now (deprecated, use host_clock_now)
1030    linker
1031        .func_wrap(
1032            "barbacane",
1033            "host_time_now",
1034            |_caller: Caller<'_, PluginState>| -> i64 {
1035                use std::time::Instant;
1036
1037                static START: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
1038                let start = START.get_or_init(Instant::now);
1039
1040                start.elapsed().as_millis() as i64
1041            },
1042        )
1043        .map_err(|e| WasmError::Instantiation(format!("failed to add host_time_now: {}", e)))?;
1044
1045    // host_get_unix_timestamp - returns current Unix timestamp in seconds
1046    linker
1047        .func_wrap(
1048            "barbacane",
1049            "host_get_unix_timestamp",
1050            |_caller: Caller<'_, PluginState>| -> u64 {
1051                use std::time::{SystemTime, UNIX_EPOCH};
1052
1053                SystemTime::now()
1054                    .duration_since(UNIX_EPOCH)
1055                    .map(|d| d.as_secs())
1056                    .unwrap_or(0)
1057            },
1058        )
1059        .map_err(|e| {
1060            WasmError::Instantiation(format!("failed to add host_get_unix_timestamp: {}", e))
1061        })?;
1062
1063    // host_uuid_generate - generates UUID v7 and returns length
1064    linker
1065        .func_wrap(
1066            "barbacane",
1067            "host_uuid_generate",
1068            |mut caller: Caller<'_, PluginState>| -> i32 {
1069                let uuid = uuid::Uuid::now_v7().to_string();
1070                let len = uuid.len() as i32;
1071                caller.data_mut().last_uuid_result = Some(uuid.into_bytes());
1072                len
1073            },
1074        )
1075        .map_err(|e| {
1076            WasmError::Instantiation(format!("failed to add host_uuid_generate: {}", e))
1077        })?;
1078
1079    // host_uuid_read_result - copies generated UUID to WASM memory
1080    add_read_result_fn(linker, "host_uuid_read_result", |state| {
1081        state.last_uuid_result.take()
1082    })?;
1083
1084    // host_http_call - make outbound HTTP request
1085    linker
1086        .func_wrap(
1087            "barbacane",
1088            "host_http_call",
1089            |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1090                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1091                    Some(m) => m,
1092                    None => return -1,
1093                };
1094
1095                let start = req_ptr as usize;
1096                let end = start + req_len as usize;
1097                let data = memory.data(&caller);
1098
1099                if end > data.len() {
1100                    return -1;
1101                }
1102
1103                // Parse the request JSON from plugin format
1104                let plugin_request: PluginHttpRequest =
1105                    match serde_json::from_slice(&data[start..end]) {
1106                        Ok(r) => r,
1107                        Err(e) => {
1108                            tracing::error!("failed to parse HTTP request: {}", e);
1109                            return -1;
1110                        }
1111                    };
1112
1113                // Body priority: side-channel (host_http_request_body_set) > JSON body.
1114                // Side-channel avoids base64 overhead for large payloads.
1115                let body = caller
1116                    .data_mut()
1117                    .http_request_body
1118                    .take()
1119                    .or(plugin_request.body);
1120
1121                let request = HttpClientRequest {
1122                    method: plugin_request.method,
1123                    url: plugin_request.url,
1124                    headers: plugin_request.headers.into_iter().collect(),
1125                    body,
1126                    timeout: plugin_request
1127                        .timeout_ms
1128                        .map(std::time::Duration::from_millis),
1129                };
1130
1131                // Get the HTTP client
1132                let http_client = match caller.data().http_client.clone() {
1133                    Some(c) => c,
1134                    None => {
1135                        tracing::error!("HTTP client not available");
1136                        return -1;
1137                    }
1138                };
1139
1140                // Use a separate runtime to avoid deadlock with the main runtime.
1141                // The main runtime is blocked waiting for the WASM call to complete,
1142                // so we can't schedule work on it. Create a new runtime just for this call.
1143                // TODO: Optimize by using a thread-local runtime or worker pool instead of
1144                // creating a new runtime per call (performance improvement for high throughput).
1145                let response_result = std::thread::scope(|s| {
1146                    let handle = s.spawn(|| {
1147                        let rt = match tokio::runtime::Builder::new_current_thread()
1148                            .enable_all()
1149                            .build()
1150                        {
1151                            Ok(rt) => rt,
1152                            Err(e) => {
1153                                tracing::error!("failed to create runtime: {}", e);
1154                                return None;
1155                            }
1156                        };
1157
1158                        rt.block_on(async {
1159                            match http_client.call(request).await {
1160                                Ok(mut response) => {
1161                                    // Strip body into side-channel to avoid base64
1162                                    // encoding in the JSON metadata.
1163                                    let body = response.body.take();
1164                                    let json = serde_json::to_vec(&response).ok();
1165                                    Some((json, body))
1166                                }
1167                                Err(e) => {
1168                                    tracing::error!("HTTP call failed: {}", e);
1169                                    // Return error response
1170                                    let error_response = match e {
1171                                        crate::http_client::HttpClientError::Timeout => {
1172                                            HttpClientResponse::error(
1173                                                504,
1174                                                "urn:barbacane:error:upstream-timeout",
1175                                                "Gateway Timeout",
1176                                                "Upstream request timed out",
1177                                            )
1178                                        }
1179                                        crate::http_client::HttpClientError::CircuitOpen(host) => {
1180                                            HttpClientResponse::error(
1181                                                503,
1182                                                "urn:barbacane:error:circuit-open",
1183                                                "Service Unavailable",
1184                                                &format!("Circuit breaker open for {}", host),
1185                                            )
1186                                        }
1187                                        crate::http_client::HttpClientError::ConnectionFailed(
1188                                            _,
1189                                        ) => HttpClientResponse::error(
1190                                            502,
1191                                            "urn:barbacane:error:upstream-unavailable",
1192                                            "Bad Gateway",
1193                                            "Failed to connect to upstream",
1194                                        ),
1195                                        _ => HttpClientResponse::error(
1196                                            502,
1197                                            "urn:barbacane:error:upstream-unavailable",
1198                                            "Bad Gateway",
1199                                            &e.to_string(),
1200                                        ),
1201                                    };
1202                                    let json = serde_json::to_vec(&error_response).ok();
1203                                    Some((json, None))
1204                                }
1205                            }
1206                        })
1207                    });
1208
1209                    match handle.join() {
1210                        Ok(result) => result,
1211                        Err(e) => {
1212                            tracing::error!("worker thread panicked: {:?}", e);
1213                            None
1214                        }
1215                    }
1216                });
1217
1218                match response_result {
1219                    Some((Some(json), body)) => {
1220                        let len = json.len() as i32;
1221                        caller.data_mut().last_http_result = Some(json);
1222                        caller.data_mut().http_response_body = body;
1223                        len
1224                    }
1225                    _ => -1,
1226                }
1227            },
1228        )
1229        .map_err(|e| WasmError::Instantiation(format!("failed to add host_http_call: {}", e)))?;
1230
1231    // host_http_read_result - read HTTP response (works for both host_http_call and host_http_stream)
1232    add_read_result_fn(linker, "host_http_read_result", |state| {
1233        state.last_http_result.take()
1234    })?;
1235
1236    // host_http_stream - streaming HTTP request (ADR-0023)
1237    //
1238    // Same request format as host_http_call. The host immediately begins
1239    // forwarding response chunks to the client via the stream_sender channel
1240    // while buffering the complete body in last_http_result.
1241    // Returns the length of the buffered response, or -1 on error.
1242    linker
1243        .func_wrap(
1244            "barbacane",
1245            "host_http_stream",
1246            |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1247                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1248                    Some(m) => m,
1249                    None => return -1,
1250                };
1251
1252                let start = req_ptr as usize;
1253                let end = start + req_len as usize;
1254                let data = memory.data(&caller);
1255
1256                if end > data.len() {
1257                    return -1;
1258                }
1259
1260                let plugin_request: PluginHttpRequest =
1261                    match serde_json::from_slice(&data[start..end]) {
1262                        Ok(r) => r,
1263                        Err(e) => {
1264                            tracing::error!("host_http_stream: failed to parse request: {}", e);
1265                            return -1;
1266                        }
1267                    };
1268
1269                // Body priority: side-channel (host_http_request_body_set) > JSON body.
1270                let body = caller
1271                    .data_mut()
1272                    .http_request_body
1273                    .take()
1274                    .or(plugin_request.body);
1275
1276                let request = HttpClientRequest {
1277                    method: plugin_request.method,
1278                    url: plugin_request.url,
1279                    headers: plugin_request.headers.into_iter().collect(),
1280                    body,
1281                    timeout: plugin_request
1282                        .timeout_ms
1283                        .map(std::time::Duration::from_millis),
1284                };
1285
1286                let http_client = match caller.data().http_client.clone() {
1287                    Some(c) => c,
1288                    None => {
1289                        tracing::error!("host_http_stream: HTTP client not available");
1290                        return -1;
1291                    }
1292                };
1293
1294                // Clone the stream sender (Arc makes this cheap).
1295                let stream_sender = caller.data().stream_sender.clone();
1296
1297                let response_result = std::thread::scope(|s| {
1298                    let handle = s.spawn(|| {
1299                        let rt = match tokio::runtime::Builder::new_current_thread()
1300                            .enable_all()
1301                            .build()
1302                        {
1303                            Ok(rt) => rt,
1304                            Err(e) => {
1305                                tracing::error!(
1306                                    "host_http_stream: failed to create runtime: {}",
1307                                    e
1308                                );
1309                                return None;
1310                            }
1311                        };
1312
1313                        rt.block_on(async {
1314                            use futures_util::StreamExt;
1315
1316                            match http_client.stream_raw(request).await {
1317                                Ok(upstream) => {
1318                                    let status = upstream.status().as_u16();
1319                                    let upstream_headers: BTreeMap<String, String> = upstream
1320                                        .headers()
1321                                        .iter()
1322                                        .filter_map(|(k, v)| {
1323                                            v.to_str()
1324                                                .ok()
1325                                                .map(|v| (k.as_str().to_lowercase(), v.to_string()))
1326                                        })
1327                                        .collect();
1328
1329                                    // Send headers through the streaming channel.
1330                                    if let Some(tx) = &stream_sender {
1331                                        let _ = tx.send(StreamEvent::Headers {
1332                                            status,
1333                                            headers: upstream_headers.clone(),
1334                                        });
1335                                    }
1336
1337                                    // Stream body chunks, sending each through the channel
1338                                    // while building the complete buffer for last_http_result.
1339                                    let mut buffer: Vec<u8> = Vec::new();
1340                                    let mut byte_stream = upstream.bytes_stream();
1341
1342                                    while let Some(chunk_result) = byte_stream.next().await {
1343                                        match chunk_result {
1344                                            Ok(chunk) => {
1345                                                if let Some(tx) = &stream_sender {
1346                                                    let _ =
1347                                                        tx.send(StreamEvent::Chunk(chunk.clone()));
1348                                                }
1349                                                buffer.extend_from_slice(&chunk);
1350                                            }
1351                                            Err(e) => {
1352                                                tracing::error!(
1353                                                    "host_http_stream: upstream read error: {}",
1354                                                    e
1355                                                );
1356                                                return None;
1357                                            }
1358                                        }
1359                                    }
1360
1361                                    // Strip body into side-channel, serialize
1362                                    // metadata-only JSON for host_http_read_result.
1363                                    let complete = HttpClientResponse {
1364                                        status,
1365                                        headers: upstream_headers.into_iter().collect(),
1366                                        body: None,
1367                                    };
1368                                    let json = serde_json::to_vec(&complete).ok();
1369                                    Some((json, Some(buffer)))
1370                                }
1371                                Err(e) => {
1372                                    tracing::error!("host_http_stream: request failed: {}", e);
1373                                    None
1374                                }
1375                            }
1376                        })
1377                    });
1378
1379                    match handle.join() {
1380                        Ok(result) => result,
1381                        Err(e) => {
1382                            tracing::error!("host_http_stream: worker thread panicked: {:?}", e);
1383                            None
1384                        }
1385                    }
1386                });
1387
1388                match response_result {
1389                    Some((Some(json), body)) => {
1390                        let len = json.len() as i32;
1391                        caller.data_mut().last_http_result = Some(json);
1392                        caller.data_mut().http_response_body = body;
1393                        len
1394                    }
1395                    _ => -1,
1396                }
1397            },
1398        )
1399        .map_err(|e| WasmError::Instantiation(format!("failed to add host_http_stream: {}", e)))?;
1400
1401    // host_ws_upgrade - request an upstream WebSocket connection (ADR-0026)
1402    //
1403    // The plugin sends a JSON payload: { url, connect_timeout_ms, headers }.
1404    // The request is validated and stored in PluginState. The actual TCP
1405    // connection is deferred to the async relay task on the main tokio runtime,
1406    // because a TcpStream must be created on the runtime that will drive it
1407    // (a temporary runtime's I/O driver dies when the runtime drops).
1408    // Returns 0 on success (valid request), -1 on parse failure.
1409    linker
1410        .func_wrap(
1411            "barbacane",
1412            "host_ws_upgrade",
1413            |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1414                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1415                    Some(m) => m,
1416                    None => return -1,
1417                };
1418
1419                let start = req_ptr as usize;
1420                let end = start + req_len as usize;
1421                let data = memory.data(&caller);
1422
1423                if end > data.len() {
1424                    return -1;
1425                }
1426
1427                let ws_request: crate::ws_client::WsUpgradeRequest =
1428                    match serde_json::from_slice(&data[start..end]) {
1429                        Ok(r) => r,
1430                        Err(e) => {
1431                            tracing::error!("host_ws_upgrade: failed to parse request: {}", e);
1432                            let err_msg = format!("invalid upgrade request: {}", e);
1433                            caller.data_mut().last_http_result = Some(err_msg.into_bytes());
1434                            return -1;
1435                        }
1436                    };
1437
1438                let plugin_name = caller.data().plugin_name.clone();
1439                tracing::debug!(
1440                    plugin = %plugin_name,
1441                    url = %ws_request.url,
1442                    "host_ws_upgrade: storing request for deferred connection"
1443                );
1444
1445                // Store the request; the actual connection happens on the main
1446                // runtime inside the relay task (see relay_websocket).
1447                caller.data_mut().ws_upgrade_request = Some(ws_request);
1448                0
1449            },
1450        )
1451        .map_err(|e| WasmError::Instantiation(format!("failed to add host_ws_upgrade: {}", e)))?;
1452
1453    // host_verify_signature - verify a cryptographic signature using a JWK
1454    linker
1455        .func_wrap(
1456            "barbacane",
1457            "host_verify_signature",
1458            |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1459                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1460                    Some(m) => m,
1461                    None => return -1,
1462                };
1463
1464                let start = req_ptr as usize;
1465                let end = start + req_len as usize;
1466                let data = memory.data(&caller);
1467
1468                if end > data.len() {
1469                    return -1;
1470                }
1471
1472                // Parse the verification request
1473                let request: crate::crypto::VerifySignatureRequest =
1474                    match serde_json::from_slice(&data[start..end]) {
1475                        Ok(r) => r,
1476                        Err(e) => {
1477                            tracing::error!(
1478                                plugin = %caller.data_mut().plugin_name,
1479                                "failed to parse verify_signature request: {}", e
1480                            );
1481                            return -1;
1482                        }
1483                    };
1484
1485                // Perform verification
1486                match crate::crypto::verify_signature(&request) {
1487                    Ok(true) => 1,
1488                    Ok(false) => 0,
1489                    Err(e) => {
1490                        tracing::error!(
1491                            plugin = %caller.data_mut().plugin_name,
1492                            "signature verification error: {}", e
1493                        );
1494                        -1
1495                    }
1496                }
1497            },
1498        )
1499        .map_err(|e| {
1500            WasmError::Instantiation(format!("failed to add host_verify_signature: {}", e))
1501        })?;
1502
1503    // host_get_secret - get a secret by reference
1504    linker
1505        .func_wrap(
1506            "barbacane",
1507            "host_get_secret",
1508            |mut caller: Caller<'_, PluginState>, ref_ptr: i32, ref_len: i32| -> i32 {
1509                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1510                    Some(m) => m,
1511                    None => return -1,
1512                };
1513
1514                let start = ref_ptr as usize;
1515                let end = start + ref_len as usize;
1516                let data = memory.data(&caller);
1517
1518                if end > data.len() {
1519                    return -1;
1520                }
1521
1522                // Read the secret reference from plugin memory
1523                let secret_ref = match std::str::from_utf8(&data[start..end]) {
1524                    Ok(r) => r.to_string(),
1525                    Err(_) => return -1,
1526                };
1527
1528                // Look up in secrets store
1529                match caller.data().secrets.get(&secret_ref) {
1530                    Some(value) => {
1531                        let bytes = value.as_bytes().to_vec();
1532                        let len = bytes.len() as i32;
1533                        caller.data_mut().last_secret_result = Some(bytes);
1534                        len
1535                    }
1536                    None => {
1537                        tracing::warn!(
1538                            plugin = %caller.data().plugin_name,
1539                            reference = %secret_ref,
1540                            "secret not found in store"
1541                        );
1542                        -1
1543                    }
1544                }
1545            },
1546        )
1547        .map_err(|e| WasmError::Instantiation(format!("failed to add host_get_secret: {}", e)))?;
1548
1549    // host_secret_read_result - read secret value into plugin memory
1550    add_read_result_fn(linker, "host_secret_read_result", |state| {
1551        state.last_secret_result.take()
1552    })?;
1553
1554    // host_rate_limit_check - check rate limit for a key
1555    linker
1556        .func_wrap(
1557            "barbacane",
1558            "host_rate_limit_check",
1559            |mut caller: Caller<'_, PluginState>,
1560             key_ptr: i32,
1561             key_len: i32,
1562             quota: u32,
1563             window_secs: u32|
1564             -> i32 {
1565                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1566                    Some(m) => m,
1567                    None => return -1,
1568                };
1569
1570                let start = key_ptr as usize;
1571                let end = start + key_len as usize;
1572                let data = memory.data(&caller);
1573
1574                if end > data.len() {
1575                    return -1;
1576                }
1577
1578                // Read the partition key from plugin memory
1579                let key = match std::str::from_utf8(&data[start..end]) {
1580                    Ok(k) => k.to_string(),
1581                    Err(_) => return -1,
1582                };
1583
1584                // Get the rate limiter
1585                let rate_limiter = match &caller.data().rate_limiter {
1586                    Some(rl) => rl.clone(),
1587                    None => {
1588                        tracing::error!("rate limiter not available");
1589                        return -1;
1590                    }
1591                };
1592
1593                // Check the rate limit
1594                let result = rate_limiter.check(&key, quota, window_secs as u64);
1595
1596                // Serialize the result
1597                match serde_json::to_vec(&result) {
1598                    Ok(json) => {
1599                        let len = json.len() as i32;
1600                        caller.data_mut().last_rate_limit_result = Some(json);
1601                        len
1602                    }
1603                    Err(e) => {
1604                        tracing::error!("failed to serialize rate limit result: {}", e);
1605                        -1
1606                    }
1607                }
1608            },
1609        )
1610        .map_err(|e| {
1611            WasmError::Instantiation(format!("failed to add host_rate_limit_check: {}", e))
1612        })?;
1613
1614    // host_rate_limit_read_result - read rate limit result into plugin memory
1615    add_read_result_fn(linker, "host_rate_limit_read_result", |state| {
1616        state.last_rate_limit_result.take()
1617    })?;
1618
1619    // host_cache_get - look up a cached response
1620    linker
1621        .func_wrap(
1622            "barbacane",
1623            "host_cache_get",
1624            |mut caller: Caller<'_, PluginState>, key_ptr: i32, key_len: i32| -> i32 {
1625                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1626                    Some(m) => m,
1627                    None => return -1,
1628                };
1629
1630                let start = key_ptr as usize;
1631                let end = start + key_len as usize;
1632                let data = memory.data(&caller);
1633
1634                if end > data.len() {
1635                    return -1;
1636                }
1637
1638                // Read the cache key from plugin memory
1639                let key = match std::str::from_utf8(&data[start..end]) {
1640                    Ok(k) => k.to_string(),
1641                    Err(_) => return -1,
1642                };
1643
1644                // Get the response cache
1645                let cache = match &caller.data().response_cache {
1646                    Some(c) => c.clone(),
1647                    None => {
1648                        tracing::error!("response cache not available");
1649                        return -1;
1650                    }
1651                };
1652
1653                // Check the cache
1654                let result = cache.get(&key);
1655
1656                // Serialize the result
1657                match serde_json::to_vec(&result) {
1658                    Ok(json) => {
1659                        let len = json.len() as i32;
1660                        caller.data_mut().last_cache_result = Some(json);
1661                        len
1662                    }
1663                    Err(e) => {
1664                        tracing::error!("failed to serialize cache result: {}", e);
1665                        -1
1666                    }
1667                }
1668            },
1669        )
1670        .map_err(|e| WasmError::Instantiation(format!("failed to add host_cache_get: {}", e)))?;
1671
1672    // host_cache_set - store a response in the cache
1673    linker
1674        .func_wrap(
1675            "barbacane",
1676            "host_cache_set",
1677            |mut caller: Caller<'_, PluginState>,
1678             key_ptr: i32,
1679             key_len: i32,
1680             entry_ptr: i32,
1681             entry_len: i32,
1682             ttl_secs: u32|
1683             -> i32 {
1684                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1685                    Some(m) => m,
1686                    None => return -1,
1687                };
1688
1689                let key_start = key_ptr as usize;
1690                let key_end = key_start + key_len as usize;
1691                let entry_start = entry_ptr as usize;
1692                let entry_end = entry_start + entry_len as usize;
1693                let data = memory.data(&caller);
1694
1695                if key_end > data.len() || entry_end > data.len() {
1696                    return -1;
1697                }
1698
1699                // Read the cache key
1700                let key = match std::str::from_utf8(&data[key_start..key_end]) {
1701                    Ok(k) => k.to_string(),
1702                    Err(_) => return -1,
1703                };
1704
1705                // Parse the cache entry JSON
1706                let entry: crate::cache::CacheEntry =
1707                    match serde_json::from_slice(&data[entry_start..entry_end]) {
1708                        Ok(e) => e,
1709                        Err(e) => {
1710                            tracing::error!("failed to parse cache entry: {}", e);
1711                            return -1;
1712                        }
1713                    };
1714
1715                // Get the response cache
1716                let cache = match &caller.data().response_cache {
1717                    Some(c) => c.clone(),
1718                    None => {
1719                        tracing::error!("response cache not available");
1720                        return -1;
1721                    }
1722                };
1723
1724                // Store in cache
1725                cache.set(&key, entry, ttl_secs as u64);
1726                0 // Success
1727            },
1728        )
1729        .map_err(|e| WasmError::Instantiation(format!("failed to add host_cache_set: {}", e)))?;
1730
1731    // host_cache_read_result - read cache lookup result into plugin memory
1732    add_read_result_fn(linker, "host_cache_read_result", |state| {
1733        state.last_cache_result.take()
1734    })?;
1735
1736    // === Telemetry Host Functions ===
1737
1738    // host_metric_counter_inc - increment a plugin counter metric
1739    linker
1740        .func_wrap(
1741            "barbacane",
1742            "host_metric_counter_inc",
1743            |mut caller: Caller<'_, PluginState>,
1744             name_ptr: i32,
1745             name_len: i32,
1746             labels_ptr: i32,
1747             labels_len: i32,
1748             value: f64| {
1749                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1750                    Some(m) => m,
1751                    None => return,
1752                };
1753
1754                let data = memory.data(&caller);
1755                let name_start = name_ptr as usize;
1756                let name_end = name_start + name_len as usize;
1757                let labels_start = labels_ptr as usize;
1758                let labels_end = labels_start + labels_len as usize;
1759
1760                if name_end > data.len() || labels_end > data.len() {
1761                    return;
1762                }
1763
1764                let name = match std::str::from_utf8(&data[name_start..name_end]) {
1765                    Ok(n) => n.to_string(),
1766                    Err(_) => return,
1767                };
1768
1769                let labels_json = match std::str::from_utf8(&data[labels_start..labels_end]) {
1770                    Ok(l) => l.to_string(),
1771                    Err(_) => return,
1772                };
1773
1774                let plugin_name = caller.data().plugin_name.clone();
1775                if let Some(metrics) = &caller.data().metrics {
1776                    metrics.plugin_counter_inc(&plugin_name, &name, &labels_json, value as u64);
1777                }
1778            },
1779        )
1780        .map_err(|e| {
1781            WasmError::Instantiation(format!("failed to add host_metric_counter_inc: {}", e))
1782        })?;
1783
1784    // host_metric_histogram_observe - observe a plugin histogram metric
1785    linker
1786        .func_wrap(
1787            "barbacane",
1788            "host_metric_histogram_observe",
1789            |mut caller: Caller<'_, PluginState>,
1790             name_ptr: i32,
1791             name_len: i32,
1792             labels_ptr: i32,
1793             labels_len: i32,
1794             value: f64| {
1795                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1796                    Some(m) => m,
1797                    None => return,
1798                };
1799
1800                let data = memory.data(&caller);
1801                let name_start = name_ptr as usize;
1802                let name_end = name_start + name_len as usize;
1803                let labels_start = labels_ptr as usize;
1804                let labels_end = labels_start + labels_len as usize;
1805
1806                if name_end > data.len() || labels_end > data.len() {
1807                    return;
1808                }
1809
1810                let name = match std::str::from_utf8(&data[name_start..name_end]) {
1811                    Ok(n) => n.to_string(),
1812                    Err(_) => return,
1813                };
1814
1815                let labels_json = match std::str::from_utf8(&data[labels_start..labels_end]) {
1816                    Ok(l) => l.to_string(),
1817                    Err(_) => return,
1818                };
1819
1820                let plugin_name = caller.data().plugin_name.clone();
1821                if let Some(metrics) = &caller.data().metrics {
1822                    metrics.plugin_histogram_observe(&plugin_name, &name, &labels_json, value);
1823                }
1824            },
1825        )
1826        .map_err(|e| {
1827            WasmError::Instantiation(format!(
1828                "failed to add host_metric_histogram_observe: {}",
1829                e
1830            ))
1831        })?;
1832
1833    // host_span_start - start a child span (stub - returns span ID)
1834    // Full implementation requires passing span context through RequestContext
1835    linker
1836        .func_wrap(
1837            "barbacane",
1838            "host_span_start",
1839            |mut caller: Caller<'_, PluginState>, name_ptr: i32, name_len: i32| -> i32 {
1840                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1841                    Some(m) => m,
1842                    None => return -1,
1843                };
1844
1845                let data = memory.data(&caller);
1846                let start = name_ptr as usize;
1847                let end = start + name_len as usize;
1848
1849                if end > data.len() {
1850                    return -1;
1851                }
1852
1853                let span_name = match std::str::from_utf8(&data[start..end]) {
1854                    Ok(n) => n,
1855                    Err(_) => return -1,
1856                };
1857
1858                // Log the span start for now (full tracing integration in Phase 9)
1859                let plugin_name = &caller.data().plugin_name;
1860                tracing::debug!(plugin = %plugin_name, span = %span_name, "plugin span started");
1861
1862                // Return a placeholder span ID
1863                1
1864            },
1865        )
1866        .map_err(|e| WasmError::Instantiation(format!("failed to add host_span_start: {}", e)))?;
1867
1868    // host_span_end - end the current span
1869    linker
1870        .func_wrap(
1871            "barbacane",
1872            "host_span_end",
1873            |caller: Caller<'_, PluginState>| {
1874                let plugin_name = &caller.data().plugin_name;
1875                tracing::debug!(plugin = %plugin_name, "plugin span ended");
1876            },
1877        )
1878        .map_err(|e| WasmError::Instantiation(format!("failed to add host_span_end: {}", e)))?;
1879
1880    // host_span_set_attribute - set an attribute on the current span
1881    linker
1882        .func_wrap(
1883            "barbacane",
1884            "host_span_set_attribute",
1885            |mut caller: Caller<'_, PluginState>,
1886             key_ptr: i32,
1887             key_len: i32,
1888             val_ptr: i32,
1889             val_len: i32| {
1890                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1891                    Some(m) => m,
1892                    None => return,
1893                };
1894
1895                let data = memory.data(&caller);
1896                let key_start = key_ptr as usize;
1897                let key_end = key_start + key_len as usize;
1898                let val_start = val_ptr as usize;
1899                let val_end = val_start + val_len as usize;
1900
1901                if key_end > data.len() || val_end > data.len() {
1902                    return;
1903                }
1904
1905                let key = match std::str::from_utf8(&data[key_start..key_end]) {
1906                    Ok(k) => k,
1907                    Err(_) => return,
1908                };
1909
1910                let value = match std::str::from_utf8(&data[val_start..val_end]) {
1911                    Ok(v) => v,
1912                    Err(_) => return,
1913                };
1914
1915                let plugin_name = &caller.data().plugin_name;
1916                tracing::debug!(plugin = %plugin_name, %key, %value, "plugin span attribute set");
1917            },
1918        )
1919        .map_err(|e| {
1920            WasmError::Instantiation(format!("failed to add host_span_set_attribute: {}", e))
1921        })?;
1922
1923    // === Broker Host Functions (M10) ===
1924
1925    // host_kafka_publish - publish a message to Kafka
1926    linker
1927        .func_wrap(
1928            "barbacane",
1929            "host_kafka_publish",
1930            |mut caller: Caller<'_, PluginState>, msg_ptr: i32, msg_len: i32| -> i32 {
1931                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1932                    Some(m) => m,
1933                    None => return -1,
1934                };
1935
1936                let start = msg_ptr as usize;
1937                let end = start + msg_len as usize;
1938                let data = memory.data(&caller);
1939
1940                if end > data.len() {
1941                    return -1;
1942                }
1943
1944                // Parse the broker message from plugin memory
1945                let message: BrokerMessage = match serde_json::from_slice(&data[start..end]) {
1946                    Ok(m) => m,
1947                    Err(e) => {
1948                        tracing::error!("failed to parse broker message: {}", e);
1949                        return -1;
1950                    }
1951                };
1952
1953                // Extract URL (broker addresses) from the message
1954                let brokers = match &message.url {
1955                    Some(u) => u.clone(),
1956                    None => {
1957                        tracing::error!("Kafka publish: missing url in broker message");
1958                        return -1;
1959                    }
1960                };
1961
1962                // Get the Kafka publisher
1963                let publisher = match caller.data().kafka_publisher.clone() {
1964                    Some(p) => p,
1965                    None => {
1966                        tracing::error!("Kafka publisher not available");
1967                        return -1;
1968                    }
1969                };
1970
1971                let topic = message.topic.clone();
1972                let key = message.key.clone();
1973                let payload = message.payload.clone();
1974                let headers = message.headers.clone();
1975
1976                // Use thread::scope to escape the main tokio runtime context,
1977                // then call publish_blocking which uses the publisher's own runtime.
1978                let result = std::thread::scope(|s| {
1979                    let handle = s.spawn(|| {
1980                        publisher.publish_blocking(&brokers, &topic, key, &payload, headers)
1981                    });
1982
1983                    match handle.join() {
1984                        Ok(result) => Some(result),
1985                        Err(e) => {
1986                            tracing::error!("Kafka publish thread panicked: {:?}", e);
1987                            None
1988                        }
1989                    }
1990                });
1991
1992                // Serialize the result
1993                let result_json = match result {
1994                    Some(Ok(r)) => serde_json::to_vec(&r),
1995                    Some(Err(e)) => {
1996                        let error_result =
1997                            crate::broker::PublishResult::failure(message.topic, e.to_string());
1998                        serde_json::to_vec(&error_result)
1999                    }
2000                    None => {
2001                        let error_result = crate::broker::PublishResult::failure(
2002                            message.topic,
2003                            "Kafka publish failed".to_string(),
2004                        );
2005                        serde_json::to_vec(&error_result)
2006                    }
2007                };
2008
2009                match result_json {
2010                    Ok(json) => {
2011                        let len = json.len() as i32;
2012                        caller.data_mut().last_broker_result = Some(json);
2013                        len
2014                    }
2015                    Err(e) => {
2016                        tracing::error!("failed to serialize broker result: {}", e);
2017                        -1
2018                    }
2019                }
2020            },
2021        )
2022        .map_err(|e| {
2023            WasmError::Instantiation(format!("failed to add host_kafka_publish: {}", e))
2024        })?;
2025
2026    // host_nats_publish - publish a message to NATS
2027    linker
2028        .func_wrap(
2029            "barbacane",
2030            "host_nats_publish",
2031            |mut caller: Caller<'_, PluginState>, msg_ptr: i32, msg_len: i32| -> i32 {
2032                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2033                    Some(m) => m,
2034                    None => return -1,
2035                };
2036
2037                let start = msg_ptr as usize;
2038                let end = start + msg_len as usize;
2039                let data = memory.data(&caller);
2040
2041                if end > data.len() {
2042                    return -1;
2043                }
2044
2045                // Parse the broker message from plugin memory
2046                let message: BrokerMessage = match serde_json::from_slice(&data[start..end]) {
2047                    Ok(m) => m,
2048                    Err(e) => {
2049                        tracing::error!("failed to parse broker message: {}", e);
2050                        return -1;
2051                    }
2052                };
2053
2054                // Extract URL from the message
2055                let url = match &message.url {
2056                    Some(u) => u.clone(),
2057                    None => {
2058                        tracing::error!("NATS publish: missing url in broker message");
2059                        return -1;
2060                    }
2061                };
2062
2063                // Get the NATS publisher
2064                let publisher = match caller.data().nats_publisher.clone() {
2065                    Some(p) => p,
2066                    None => {
2067                        tracing::error!("NATS publisher not available");
2068                        return -1;
2069                    }
2070                };
2071
2072                let subject = message.topic.clone();
2073                let payload = bytes::Bytes::from(message.payload.clone());
2074                let headers = message.headers.clone();
2075
2076                // Use thread::scope to escape the main tokio runtime context,
2077                // then call publish_blocking which uses the publisher's own runtime.
2078                let result = std::thread::scope(|s| {
2079                    let handle =
2080                        s.spawn(|| publisher.publish_blocking(&url, &subject, payload, headers));
2081
2082                    match handle.join() {
2083                        Ok(result) => Some(result),
2084                        Err(e) => {
2085                            tracing::error!("NATS publish thread panicked: {:?}", e);
2086                            None
2087                        }
2088                    }
2089                });
2090
2091                // Serialize the result
2092                let result_json = match result {
2093                    Some(Ok(r)) => serde_json::to_vec(&r),
2094                    Some(Err(e)) => {
2095                        let error_result =
2096                            crate::broker::PublishResult::failure(message.topic, e.to_string());
2097                        serde_json::to_vec(&error_result)
2098                    }
2099                    None => {
2100                        let error_result = crate::broker::PublishResult::failure(
2101                            message.topic,
2102                            "NATS publish failed".to_string(),
2103                        );
2104                        serde_json::to_vec(&error_result)
2105                    }
2106                };
2107
2108                match result_json {
2109                    Ok(json) => {
2110                        let len = json.len() as i32;
2111                        caller.data_mut().last_broker_result = Some(json);
2112                        len
2113                    }
2114                    Err(e) => {
2115                        tracing::error!("failed to serialize broker result: {}", e);
2116                        -1
2117                    }
2118                }
2119            },
2120        )
2121        .map_err(|e| WasmError::Instantiation(format!("failed to add host_nats_publish: {}", e)))?;
2122
2123    // host_broker_read_result - read broker publish result into plugin memory
2124    add_read_result_fn(linker, "host_broker_read_result", |state| {
2125        state.last_broker_result.take()
2126    })?;
2127
2128    // ── Minimal WASI stubs ──────────────────────────────────────────────
2129    // Some plugins (e.g. cel) compile with wasm32-wasip1 and import WASI
2130    // functions even though Barbacane provides its own host ABI. We add
2131    // lightweight stubs so the linker can resolve these imports.
2132    // Full WASI support (wasmtime-wasi) can replace these if needed.
2133
2134    // random_get — deterministic bytes for HashMap seed initialisation.
2135    // Acceptable in a sandboxed single-request context (no HashDoS risk).
2136    linker
2137        .func_wrap(
2138            "wasi_snapshot_preview1",
2139            "random_get",
2140            |mut caller: Caller<'_, PluginState>, buf_ptr: i32, buf_len: i32| -> i32 {
2141                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2142                    Some(m) => m,
2143                    None => return 1,
2144                };
2145                let start = buf_ptr as usize;
2146                let end = start + buf_len as usize;
2147                let data = memory.data_mut(&mut caller);
2148                if end > data.len() {
2149                    return 1;
2150                }
2151                for (i, byte) in data[start..end].iter_mut().enumerate() {
2152                    *byte = (i.wrapping_mul(0x9E3779B9) >> 24) as u8;
2153                }
2154                0
2155            },
2156        )
2157        .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2158
2159    // sched_yield — no-op, always succeeds.
2160    linker
2161        .func_wrap("wasi_snapshot_preview1", "sched_yield", || -> i32 { 0 })
2162        .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2163
2164    // clock_time_get — returns current time in nanoseconds.
2165    linker
2166        .func_wrap(
2167            "wasi_snapshot_preview1",
2168            "clock_time_get",
2169            |mut caller: Caller<'_, PluginState>,
2170             _clock_id: i32,
2171             _precision: i64,
2172             time_ptr: i32|
2173             -> i32 {
2174                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2175                    Some(m) => m,
2176                    None => return 1,
2177                };
2178                let nanos = std::time::SystemTime::now()
2179                    .duration_since(std::time::UNIX_EPOCH)
2180                    .map(|d| d.as_nanos() as u64)
2181                    .unwrap_or(0);
2182                let ptr = time_ptr as usize;
2183                let data = memory.data_mut(&mut caller);
2184                if ptr + 8 > data.len() {
2185                    return 1;
2186                }
2187                data[ptr..ptr + 8].copy_from_slice(&nanos.to_le_bytes());
2188                0
2189            },
2190        )
2191        .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2192
2193    // fd_write — silently discards output (plugins use host_log instead).
2194    linker
2195        .func_wrap(
2196            "wasi_snapshot_preview1",
2197            "fd_write",
2198            |mut caller: Caller<'_, PluginState>,
2199             _fd: i32,
2200             iovs_ptr: i32,
2201             iovs_len: i32,
2202             nwritten_ptr: i32|
2203             -> i32 {
2204                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2205                    Some(m) => m,
2206                    None => return 1,
2207                };
2208                let data = memory.data_mut(&mut caller);
2209                let mut total: u32 = 0;
2210                for i in 0..iovs_len as usize {
2211                    let base = (iovs_ptr as usize) + i * 8;
2212                    if base + 8 > data.len() {
2213                        return 1;
2214                    }
2215                    let len =
2216                        u32::from_le_bytes(data[base + 4..base + 8].try_into().unwrap_or([0; 4]));
2217                    total = total.saturating_add(len);
2218                }
2219                let ptr = nwritten_ptr as usize;
2220                if ptr + 4 > data.len() {
2221                    return 1;
2222                }
2223                data[ptr..ptr + 4].copy_from_slice(&total.to_le_bytes());
2224                0
2225            },
2226        )
2227        .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2228
2229    // environ_get — no environment variables exposed.
2230    linker
2231        .func_wrap(
2232            "wasi_snapshot_preview1",
2233            "environ_get",
2234            |_caller: Caller<'_, PluginState>, _environ: i32, _environ_buf: i32| -> i32 { 0 },
2235        )
2236        .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2237
2238    // environ_sizes_get — reports 0 vars, 0 bytes.
2239    linker
2240        .func_wrap(
2241            "wasi_snapshot_preview1",
2242            "environ_sizes_get",
2243            |mut caller: Caller<'_, PluginState>, num_ptr: i32, buf_size_ptr: i32| -> i32 {
2244                let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2245                    Some(m) => m,
2246                    None => return 1,
2247                };
2248                let data = memory.data_mut(&mut caller);
2249                let np = num_ptr as usize;
2250                let bp = buf_size_ptr as usize;
2251                if np + 4 > data.len() || bp + 4 > data.len() {
2252                    return 1;
2253                }
2254                data[np..np + 4].copy_from_slice(&0u32.to_le_bytes());
2255                data[bp..bp + 4].copy_from_slice(&0u32.to_le_bytes());
2256                0
2257            },
2258        )
2259        .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2260
2261    // proc_exit — traps the module (should never be reached).
2262    linker
2263        .func_wrap(
2264            "wasi_snapshot_preview1",
2265            "proc_exit",
2266            |_caller: Caller<'_, PluginState>, _code: i32| {
2267                // Intentional trap — WASM execution stops here
2268            },
2269        )
2270        .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2271
2272    Ok(())
2273}
2274
2275#[cfg(test)]
2276mod tests {
2277    use super::*;
2278
2279    #[test]
2280    fn request_context_new() {
2281        let ctx = RequestContext::new("trace-123".into(), "req-456".into());
2282        assert_eq!(ctx.trace_id, "trace-123");
2283        assert_eq!(ctx.request_id, "req-456");
2284        assert!(ctx.values.is_empty());
2285    }
2286
2287    #[test]
2288    fn plugin_state_take_output() {
2289        let limits = PluginLimits::default();
2290        let mut state = PluginState::new("test".into(), &limits);
2291        state.output_buffer = vec![1, 2, 3];
2292
2293        let output = state.take_output();
2294        assert_eq!(output, vec![1, 2, 3]);
2295        assert!(state.output_buffer.is_empty());
2296    }
2297
2298    #[test]
2299    fn plugin_state_uuid_result_initialized() {
2300        let limits = PluginLimits::default();
2301        let state = PluginState::new("test".into(), &limits);
2302        assert!(state.last_uuid_result.is_none());
2303    }
2304
2305    #[test]
2306    fn uuid_v7_format() {
2307        // Test that UUID v7 generates valid format
2308        let uuid = uuid::Uuid::now_v7().to_string();
2309        assert_eq!(uuid.len(), 36); // UUID string format: 8-4-4-4-12
2310        assert!(uuid.chars().nth(14) == Some('7')); // Version 7 marker
2311    }
2312
2313    #[test]
2314    fn plugin_state_nats_publisher_default() {
2315        let limits = PluginLimits::default();
2316        let state = PluginState::new("test".into(), &limits);
2317        assert!(state.nats_publisher.is_none());
2318    }
2319
2320    #[test]
2321    fn plugin_state_broker_result_default() {
2322        let limits = PluginLimits::default();
2323        let state = PluginState::new("test".into(), &limits);
2324        assert!(state.last_broker_result.is_none());
2325    }
2326
2327    #[test]
2328    fn plugin_state_kafka_publisher_default() {
2329        let limits = PluginLimits::default();
2330        let state = PluginState::new("test".into(), &limits);
2331        assert!(state.kafka_publisher.is_none());
2332    }
2333
2334    // ── streaming (ADR-0023) ──────────────────────────────────────────────────
2335
2336    #[test]
2337    fn plugin_state_stream_sender_default_is_none() {
2338        let limits = PluginLimits::default();
2339        let state = PluginState::new("test".into(), &limits);
2340        assert!(state.stream_sender.is_none());
2341    }
2342
2343    #[test]
2344    fn plugin_state_set_stream_sender() {
2345        let limits = PluginLimits::default();
2346        let mut state = PluginState::new("test".into(), &limits);
2347        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<crate::instance::StreamEvent>();
2348        state.set_stream_sender(Arc::new(tx));
2349        assert!(state.stream_sender.is_some());
2350    }
2351
2352    #[test]
2353    fn plugin_state_take_last_http_result_default_is_none() {
2354        let limits = PluginLimits::default();
2355        let state = PluginState::new("test".into(), &limits);
2356        assert!(state.last_http_result.is_none());
2357    }
2358
2359    #[test]
2360    fn plugin_state_take_last_http_result_takes_value() {
2361        let limits = PluginLimits::default();
2362        let mut state = PluginState::new("test".into(), &limits);
2363        state.last_http_result = Some(vec![1, 2, 3]);
2364
2365        let taken = state.last_http_result.take();
2366        assert_eq!(taken, Some(vec![1, 2, 3]));
2367        assert!(state.last_http_result.is_none());
2368    }
2369
2370    #[test]
2371    fn stream_event_headers_fields() {
2372        let event = StreamEvent::Headers {
2373            status: 200,
2374            headers: std::collections::BTreeMap::from([(
2375                "content-type".to_string(),
2376                "text/event-stream".to_string(),
2377            )]),
2378        };
2379        if let StreamEvent::Headers { status, headers } = event {
2380            assert_eq!(status, 200);
2381            assert_eq!(
2382                headers.get("content-type").map(String::as_str),
2383                Some("text/event-stream")
2384            );
2385        } else {
2386            panic!("expected Headers variant");
2387        }
2388    }
2389
2390    #[test]
2391    fn stream_event_chunk_contains_bytes() {
2392        let data = bytes::Bytes::from_static(b"data: hello\n\n");
2393        let event = StreamEvent::Chunk(data.clone());
2394        if let StreamEvent::Chunk(b) = event {
2395            assert_eq!(b, data);
2396        } else {
2397            panic!("expected Chunk variant");
2398        }
2399    }
2400
2401    #[test]
2402    fn plugin_state_with_all_options_sets_publishers() {
2403        let limits = PluginLimits::default();
2404        let nats = Arc::new(crate::nats_client::NatsPublisher::new());
2405        let kafka = Arc::new(crate::kafka_client::KafkaPublisher::new());
2406        let state = PluginState::with_all_options(
2407            "test".into(),
2408            &limits,
2409            None,
2410            crate::secrets::SecretsStore::new(),
2411            None,
2412            None,
2413            Some(nats),
2414            Some(kafka),
2415        );
2416        assert!(state.nats_publisher.is_some());
2417        assert!(state.kafka_publisher.is_some());
2418    }
2419}