1use std::collections::HashMap;
7use std::sync::Arc;
8
9use wasmtime::{Caller, Engine, Instance, Linker, Memory, Store, TypedFunc};
10
11use barbacane_plugin_sdk::types::base64_body;
12use serde::Deserialize;
13use std::collections::BTreeMap;
14
15use crate::broker::BrokerMessage;
16use crate::engine::CompiledModule;
17use crate::error::WasmError;
18use crate::http_client::{
19 HttpClient, HttpRequest as HttpClientRequest, HttpResponse as HttpClientResponse,
20};
21use crate::limits::PluginLimits;
22
23#[derive(Debug)]
29pub enum StreamEvent {
30 Headers {
32 status: u16,
33 headers: BTreeMap<String, String>,
34 },
35 Chunk(bytes::Bytes),
37}
38
39#[derive(Debug, Deserialize)]
45pub(crate) struct PluginHttpRequest {
46 pub(crate) method: String,
47 pub(crate) url: String,
48 #[serde(default)]
49 pub(crate) headers: BTreeMap<String, String>,
50 #[serde(default, with = "base64_body")]
51 pub(crate) body: Option<Vec<u8>>,
52 #[serde(default)]
53 pub(crate) timeout_ms: Option<u64>,
54}
55
56#[derive(Debug, Clone, Default)]
58pub struct RequestContext {
59 pub values: HashMap<String, String>,
61
62 pub last_get_result: Option<String>,
64
65 pub trace_id: String,
67
68 pub request_id: String,
70}
71
72impl RequestContext {
73 pub fn new(trace_id: String, request_id: String) -> Self {
75 Self {
76 values: HashMap::new(),
77 last_get_result: None,
78 trace_id,
79 request_id,
80 }
81 }
82}
83
84pub struct PluginState {
86 pub plugin_name: String,
88
89 pub output_buffer: Vec<u8>,
91
92 pub context: RequestContext,
94
95 pub max_memory: usize,
97
98 pub http_client: Option<Arc<HttpClient>>,
100
101 pub last_http_result: Option<Vec<u8>>,
103
104 pub secrets: crate::secrets::SecretsStore,
106
107 pub last_secret_result: Option<Vec<u8>>,
109
110 pub rate_limiter: Option<crate::rate_limiter::RateLimiter>,
112
113 pub last_rate_limit_result: Option<Vec<u8>>,
115
116 pub response_cache: Option<crate::cache::ResponseCache>,
118
119 pub last_cache_result: Option<Vec<u8>>,
121
122 pub metrics: Option<Arc<barbacane_telemetry::MetricsRegistry>>,
124
125 pub kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
127
128 pub nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
130
131 pub last_broker_result: Option<Vec<u8>>,
133
134 pub last_uuid_result: Option<Vec<u8>>,
136
137 pub stream_sender: Option<Arc<tokio::sync::mpsc::UnboundedSender<StreamEvent>>>,
143
144 pub ws_upgrade_request: Option<crate::ws_client::WsUpgradeRequest>,
151
152 pub request_body: Option<Vec<u8>>,
159
160 pub output_body: Option<Option<Vec<u8>>>,
163
164 pub http_response_body: Option<Vec<u8>>,
168
169 pub http_request_body: Option<Vec<u8>>,
172}
173
174#[allow(dead_code)] impl PluginState {
176 pub fn new(plugin_name: String, limits: &PluginLimits) -> Self {
178 Self {
179 plugin_name,
180 output_buffer: Vec::new(),
181 context: RequestContext::default(),
182 max_memory: limits.max_memory_bytes,
183 http_client: None,
184 last_http_result: None,
185 secrets: crate::secrets::SecretsStore::new(),
186 last_secret_result: None,
187 rate_limiter: None,
188 last_rate_limit_result: None,
189 response_cache: None,
190 last_cache_result: None,
191 metrics: None,
192 kafka_publisher: None,
193 nats_publisher: None,
194 last_broker_result: None,
195 last_uuid_result: None,
196 stream_sender: None,
197 ws_upgrade_request: None,
198 request_body: None,
199 output_body: None,
200 http_response_body: None,
201 http_request_body: None,
202 }
203 }
204
205 pub fn with_http_client(
207 plugin_name: String,
208 limits: &PluginLimits,
209 http_client: Arc<HttpClient>,
210 ) -> Self {
211 Self {
212 plugin_name,
213 output_buffer: Vec::new(),
214 context: RequestContext::default(),
215 max_memory: limits.max_memory_bytes,
216 http_client: Some(http_client),
217 last_http_result: None,
218 secrets: crate::secrets::SecretsStore::new(),
219 last_secret_result: None,
220 rate_limiter: None,
221 last_rate_limit_result: None,
222 response_cache: None,
223 last_cache_result: None,
224 metrics: None,
225 kafka_publisher: None,
226 nats_publisher: None,
227 last_broker_result: None,
228 last_uuid_result: None,
229 stream_sender: None,
230 ws_upgrade_request: None,
231 request_body: None,
232 output_body: None,
233 http_response_body: None,
234 http_request_body: None,
235 }
236 }
237
238 pub fn with_http_client_and_secrets(
240 plugin_name: String,
241 limits: &PluginLimits,
242 http_client: Arc<HttpClient>,
243 secrets: crate::secrets::SecretsStore,
244 ) -> Self {
245 Self {
246 plugin_name,
247 output_buffer: Vec::new(),
248 context: RequestContext::default(),
249 max_memory: limits.max_memory_bytes,
250 http_client: Some(http_client),
251 last_http_result: None,
252 secrets,
253 last_secret_result: None,
254 rate_limiter: None,
255 last_rate_limit_result: None,
256 response_cache: None,
257 last_cache_result: None,
258 metrics: None,
259 kafka_publisher: None,
260 nats_publisher: None,
261 last_broker_result: None,
262 last_uuid_result: None,
263 stream_sender: None,
264 ws_upgrade_request: None,
265 request_body: None,
266 output_body: None,
267 http_response_body: None,
268 http_request_body: None,
269 }
270 }
271
272 #[allow(clippy::too_many_arguments)]
274 pub fn with_all_options(
275 plugin_name: String,
276 limits: &PluginLimits,
277 http_client: Option<Arc<HttpClient>>,
278 secrets: crate::secrets::SecretsStore,
279 rate_limiter: Option<crate::rate_limiter::RateLimiter>,
280 response_cache: Option<crate::cache::ResponseCache>,
281 nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
282 kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
283 ) -> Self {
284 Self {
285 plugin_name,
286 output_buffer: Vec::new(),
287 context: RequestContext::default(),
288 max_memory: limits.max_memory_bytes,
289 http_client,
290 last_http_result: None,
291 secrets,
292 last_secret_result: None,
293 rate_limiter,
294 last_rate_limit_result: None,
295 response_cache,
296 last_cache_result: None,
297 metrics: None,
298 kafka_publisher,
299 nats_publisher,
300 last_broker_result: None,
301 last_uuid_result: None,
302 stream_sender: None,
303 ws_upgrade_request: None,
304 request_body: None,
305 output_body: None,
306 http_response_body: None,
307 http_request_body: None,
308 }
309 }
310
311 #[allow(clippy::too_many_arguments)]
313 pub fn with_all_options_and_metrics(
314 plugin_name: String,
315 limits: &PluginLimits,
316 http_client: Option<Arc<HttpClient>>,
317 secrets: crate::secrets::SecretsStore,
318 rate_limiter: Option<crate::rate_limiter::RateLimiter>,
319 response_cache: Option<crate::cache::ResponseCache>,
320 nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
321 kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
322 metrics: Option<Arc<barbacane_telemetry::MetricsRegistry>>,
323 ) -> Self {
324 Self {
325 plugin_name,
326 output_buffer: Vec::new(),
327 context: RequestContext::default(),
328 max_memory: limits.max_memory_bytes,
329 http_client,
330 last_http_result: None,
331 secrets,
332 last_secret_result: None,
333 rate_limiter,
334 last_rate_limit_result: None,
335 response_cache,
336 last_cache_result: None,
337 metrics,
338 kafka_publisher,
339 nats_publisher,
340 last_broker_result: None,
341 last_uuid_result: None,
342 stream_sender: None,
343 ws_upgrade_request: None,
344 request_body: None,
345 output_body: None,
346 http_response_body: None,
347 http_request_body: None,
348 }
349 }
350
351 pub fn take_output(&mut self) -> Vec<u8> {
353 std::mem::take(&mut self.output_buffer)
354 }
355
356 pub fn set_context(&mut self, context: RequestContext) {
358 self.context = context;
359 }
360
361 pub fn set_stream_sender(
363 &mut self,
364 sender: Arc<tokio::sync::mpsc::UnboundedSender<StreamEvent>>,
365 ) {
366 self.stream_sender = Some(sender);
367 }
368
369 pub fn take_ws_upgrade_request(&mut self) -> Option<crate::ws_client::WsUpgradeRequest> {
371 self.ws_upgrade_request.take()
372 }
373
374 pub fn set_request_body(&mut self, body: Option<Vec<u8>>) {
376 self.request_body = body;
377 }
378
379 pub fn take_output_body(&mut self) -> Option<Option<Vec<u8>>> {
384 self.output_body.take()
385 }
386}
387
388impl wasmtime::ResourceLimiter for PluginState {
389 fn memory_growing(
390 &mut self,
391 _current: usize,
392 desired: usize,
393 _maximum: Option<usize>,
394 ) -> Result<bool, wasmtime::Error> {
395 Ok(desired <= self.max_memory)
396 }
397
398 fn table_growing(
399 &mut self,
400 _current: usize,
401 desired: usize,
402 _maximum: Option<usize>,
403 ) -> Result<bool, wasmtime::Error> {
404 Ok(desired <= 10_000)
406 }
407}
408
409pub struct PluginInstance {
411 store: Store<PluginState>,
412 _instance: Instance,
413 limits: PluginLimits,
414
415 init_func: Option<TypedFunc<(i32, i32), i32>>,
417 on_request_func: Option<TypedFunc<(i32, i32), i32>>,
418 on_response_func: Option<TypedFunc<(i32, i32), i32>>,
419 dispatch_func: Option<TypedFunc<(i32, i32), i32>>,
420 alloc_func: Option<TypedFunc<i32, i32>>,
421 memory: Memory,
422}
423
424impl PluginInstance {
425 pub fn new(
427 engine: &Engine,
428 module: &CompiledModule,
429 limits: PluginLimits,
430 ) -> Result<Self, WasmError> {
431 Self::new_with_options(engine, module, limits, None, None)
432 }
433
434 pub fn new_with_http_client(
436 engine: &Engine,
437 module: &CompiledModule,
438 limits: PluginLimits,
439 http_client: Option<Arc<HttpClient>>,
440 ) -> Result<Self, WasmError> {
441 Self::new_with_options(engine, module, limits, http_client, None)
442 }
443
444 pub fn new_with_options(
446 engine: &Engine,
447 module: &CompiledModule,
448 limits: PluginLimits,
449 http_client: Option<Arc<HttpClient>>,
450 secrets: Option<crate::secrets::SecretsStore>,
451 ) -> Result<Self, WasmError> {
452 Self::new_with_all_options(
453 engine,
454 module,
455 limits,
456 http_client,
457 secrets,
458 None,
459 None,
460 None,
461 None,
462 )
463 }
464
465 #[allow(clippy::too_many_arguments)]
467 pub fn new_with_all_options(
468 engine: &Engine,
469 module: &CompiledModule,
470 limits: PluginLimits,
471 http_client: Option<Arc<HttpClient>>,
472 secrets: Option<crate::secrets::SecretsStore>,
473 rate_limiter: Option<crate::rate_limiter::RateLimiter>,
474 response_cache: Option<crate::cache::ResponseCache>,
475 nats_publisher: Option<Arc<crate::nats_client::NatsPublisher>>,
476 kafka_publisher: Option<Arc<crate::kafka_client::KafkaPublisher>>,
477 ) -> Result<Self, WasmError> {
478 let state = PluginState::with_all_options(
479 module.name.clone(),
480 &limits,
481 http_client,
482 secrets.unwrap_or_default(),
483 rate_limiter,
484 response_cache,
485 nats_publisher,
486 kafka_publisher,
487 );
488 let mut store = Store::new(engine, state);
489
490 store
492 .set_fuel(limits.max_fuel)
493 .map_err(|e| WasmError::Instantiation(format!("failed to set fuel: {}", e)))?;
494
495 store.limiter(|state| state);
497
498 let mut linker = Linker::new(engine);
500 add_host_functions(&mut linker)?;
501
502 let instance = linker
504 .instantiate(&mut store, module.module())
505 .map_err(|e| WasmError::Instantiation(e.to_string()))?;
506
507 let memory = instance
509 .get_memory(&mut store, "memory")
510 .ok_or_else(|| WasmError::MissingExport("memory".into()))?;
511
512 let init_func = instance
514 .get_typed_func::<(i32, i32), i32>(&mut store, "init")
515 .ok();
516 let on_request_func = instance
517 .get_typed_func::<(i32, i32), i32>(&mut store, "on_request")
518 .ok();
519 let on_response_func = instance
520 .get_typed_func::<(i32, i32), i32>(&mut store, "on_response")
521 .ok();
522 let dispatch_func = instance
523 .get_typed_func::<(i32, i32), i32>(&mut store, "dispatch")
524 .ok();
525 let alloc_func = instance
526 .get_typed_func::<i32, i32>(&mut store, "alloc")
527 .ok();
528
529 Ok(Self {
530 store,
531 _instance: instance,
532 limits,
533 init_func,
534 on_request_func,
535 on_response_func,
536 dispatch_func,
537 alloc_func,
538 memory,
539 })
540 }
541
542 pub fn name(&self) -> &str {
544 &self.store.data().plugin_name
545 }
546
547 pub fn write_to_memory(&mut self, data: &[u8]) -> Result<i32, WasmError> {
554 if data.is_empty() {
555 return Ok(0);
556 }
557
558 if let Some(alloc_func) = self.alloc_func.clone() {
559 let ptr = alloc_func
562 .call(&mut self.store, data.len() as i32)
563 .map_err(|e| WasmError::Trap(format!("alloc failed: {}", e)))?;
564
565 if ptr == 0 {
566 let current_size = self.memory.data_size(&self.store);
567 return Err(WasmError::MemoryLimitExceeded {
568 requested: data.len(),
569 limit: self.limits.max_memory_bytes.saturating_sub(current_size),
570 });
571 }
572
573 self.memory
574 .write(&mut self.store, ptr as usize, data)
575 .map_err(|e| WasmError::Trap(format!("memory write failed: {}", e)))?;
576
577 Ok(ptr)
578 } else {
579 let current_size = self.memory.data_size(&self.store);
582 let needed = current_size + data.len();
583
584 if needed > self.limits.max_memory_bytes {
585 return Err(WasmError::MemoryLimitExceeded {
586 requested: data.len(),
587 limit: self.limits.max_memory_bytes.saturating_sub(current_size),
588 });
589 }
590
591 const PAGE_SIZE: usize = 65_536;
592 let pages_needed = data.len().div_ceil(PAGE_SIZE);
593
594 self.memory
595 .grow(&mut self.store, pages_needed as u64)
596 .map_err(|_| WasmError::MemoryLimitExceeded {
597 requested: data.len(),
598 limit: self.limits.max_memory_bytes.saturating_sub(current_size),
599 })?;
600
601 let ptr = current_size;
602 self.memory
603 .write(&mut self.store, ptr, data)
604 .map_err(|e| WasmError::Trap(format!("memory write failed: {}", e)))?;
605
606 Ok(ptr as i32)
607 }
608 }
609
610 pub fn init(&mut self, config_json: &[u8]) -> Result<i32, WasmError> {
612 let init_func = self
613 .init_func
614 .clone()
615 .ok_or_else(|| WasmError::MissingExport("init".into()))?;
616
617 let ptr = self.write_to_memory(config_json)?;
619 let len = config_json.len() as i32;
620
621 if let Err(e) = self.store.set_fuel(self.limits.max_fuel) {
623 tracing::warn!(error = %e, "failed to reset WASM fuel");
624 }
625
626 let result = init_func
628 .call(&mut self.store, (ptr, len))
629 .map_err(|e| WasmError::Trap(e.to_string()))?;
630
631 Ok(result)
632 }
633
634 pub fn on_request(&mut self, request_json: &[u8]) -> Result<i32, WasmError> {
636 let func = self
637 .on_request_func
638 .clone()
639 .ok_or_else(|| WasmError::MissingExport("on_request".into()))?;
640
641 self.call_handler(func, request_json)
642 }
643
644 pub fn on_response(&mut self, response_json: &[u8]) -> Result<i32, WasmError> {
646 let func = self
647 .on_response_func
648 .clone()
649 .ok_or_else(|| WasmError::MissingExport("on_response".into()))?;
650
651 self.call_handler(func, response_json)
652 }
653
654 pub fn dispatch(&mut self, request_json: &[u8]) -> Result<i32, WasmError> {
656 let func = self
657 .dispatch_func
658 .clone()
659 .ok_or_else(|| WasmError::MissingExport("dispatch".into()))?;
660
661 self.call_handler(func, request_json)
662 }
663
664 fn call_handler(
666 &mut self,
667 func: TypedFunc<(i32, i32), i32>,
668 data: &[u8],
669 ) -> Result<i32, WasmError> {
670 self.store.data_mut().output_buffer.clear();
672
673 let fuel = self.limits.max_fuel.max(data.len() as u64 * 100);
677 if let Err(e) = self.store.set_fuel(fuel) {
678 tracing::warn!(error = %e, "failed to reset WASM fuel");
679 }
680
681 let ptr = self.write_to_memory(data)?;
683 let len = data.len() as i32;
684
685 let result = func
687 .call(&mut self.store, (ptr, len))
688 .map_err(|e| WasmError::Trap(e.to_string()))?;
689
690 Ok(result)
691 }
692
693 pub fn take_output(&mut self) -> Vec<u8> {
695 self.store.data_mut().take_output()
696 }
697
698 pub fn set_context(&mut self, context: RequestContext) {
700 self.store.data_mut().set_context(context);
701 }
702
703 pub fn get_context(&self) -> RequestContext {
705 self.store.data().context.clone()
706 }
707
708 pub fn take_last_http_result(&mut self) -> Option<Vec<u8>> {
712 self.store.data_mut().last_http_result.take()
713 }
714
715 pub fn set_stream_sender(
720 &mut self,
721 sender: Arc<tokio::sync::mpsc::UnboundedSender<StreamEvent>>,
722 ) {
723 self.store.data_mut().set_stream_sender(sender);
724 }
725
726 pub fn take_ws_upgrade_request(&mut self) -> Option<crate::ws_client::WsUpgradeRequest> {
731 self.store.data_mut().take_ws_upgrade_request()
732 }
733
734 pub fn set_request_body(&mut self, body: Option<Vec<u8>>) {
736 self.store.data_mut().set_request_body(body);
737 }
738
739 pub fn take_output_body(&mut self) -> Option<Option<Vec<u8>>> {
741 self.store.data_mut().take_output_body()
742 }
743}
744
745fn add_read_result_fn(
750 linker: &mut Linker<PluginState>,
751 name: &str,
752 extract: impl Fn(&mut PluginState) -> Option<Vec<u8>> + Send + Sync + 'static,
753) -> Result<(), WasmError> {
754 linker
755 .func_wrap(
756 "barbacane",
757 name,
758 move |mut caller: Caller<'_, PluginState>, buf_ptr: i32, buf_len: i32| -> i32 {
759 let result = extract(caller.data_mut());
760 if let Some(data) = result {
761 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
762 Some(m) => m,
763 None => return 0,
764 };
765 let copy_len = std::cmp::min(data.len(), buf_len as usize);
766 if memory
767 .write(&mut caller, buf_ptr as usize, &data[..copy_len])
768 .is_ok()
769 {
770 return copy_len as i32;
771 }
772 }
773 0
774 },
775 )
776 .map_err(|e| WasmError::Instantiation(format!("failed to add {}: {}", name, e)))?;
777 Ok(())
778}
779
780fn add_host_functions(linker: &mut Linker<PluginState>) -> Result<(), WasmError> {
782 linker
784 .func_wrap(
785 "barbacane",
786 "host_set_output",
787 |mut caller: Caller<'_, PluginState>, ptr: i32, len: i32| {
788 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
789 Some(m) => m,
790 None => return,
791 };
792
793 let start = ptr as usize;
794 let end = start + len as usize;
795 let data = memory.data(&caller);
796
797 if end <= data.len() {
798 let bytes = data[start..end].to_vec();
799 caller.data_mut().output_buffer = bytes;
800 }
801 },
802 )
803 .map_err(|e| WasmError::Instantiation(format!("failed to add host_set_output: {}", e)))?;
804
805 linker
811 .func_wrap(
812 "barbacane",
813 "host_body_len",
814 |caller: Caller<'_, PluginState>| -> i64 {
815 match &caller.data().request_body {
816 Some(body) => body.len() as i64,
817 None => -1,
818 }
819 },
820 )
821 .map_err(|e| WasmError::Instantiation(format!("failed to add host_body_len: {}", e)))?;
822
823 add_read_result_fn(linker, "host_body_read", |state| state.request_body.take())?;
825
826 linker
828 .func_wrap(
829 "barbacane",
830 "host_body_set",
831 |mut caller: Caller<'_, PluginState>, ptr: i32, len: i32| {
832 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
833 Some(m) => m,
834 None => return,
835 };
836
837 let start = ptr as usize;
838 let end = start + len as usize;
839 let data = memory.data(&caller);
840
841 if end <= data.len() {
842 let bytes = data[start..end].to_vec();
843 caller.data_mut().output_body = Some(Some(bytes));
844 }
845 },
846 )
847 .map_err(|e| WasmError::Instantiation(format!("failed to add host_body_set: {}", e)))?;
848
849 linker
851 .func_wrap(
852 "barbacane",
853 "host_body_clear",
854 |mut caller: Caller<'_, PluginState>| {
855 caller.data_mut().output_body = Some(None);
856 },
857 )
858 .map_err(|e| WasmError::Instantiation(format!("failed to add host_body_clear: {}", e)))?;
859
860 linker
862 .func_wrap(
863 "barbacane",
864 "host_http_response_body_len",
865 |caller: Caller<'_, PluginState>| -> i64 {
866 match &caller.data().http_response_body {
867 Some(body) => body.len() as i64,
868 None => -1,
869 }
870 },
871 )
872 .map_err(|e| {
873 WasmError::Instantiation(format!("failed to add host_http_response_body_len: {}", e))
874 })?;
875
876 add_read_result_fn(linker, "host_http_response_body_read", |state| {
878 state.http_response_body.take()
879 })?;
880
881 linker
883 .func_wrap(
884 "barbacane",
885 "host_http_request_body_set",
886 |mut caller: Caller<'_, PluginState>, ptr: i32, len: i32| {
887 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
888 Some(m) => m,
889 None => return,
890 };
891
892 let start = ptr as usize;
893 let end = start + len as usize;
894 let data = memory.data(&caller);
895
896 if end <= data.len() {
897 let bytes = data[start..end].to_vec();
898 caller.data_mut().http_request_body = Some(bytes);
899 }
900 },
901 )
902 .map_err(|e| {
903 WasmError::Instantiation(format!("failed to add host_http_request_body_set: {}", e))
904 })?;
905
906 linker
908 .func_wrap(
909 "barbacane",
910 "host_log",
911 |mut caller: Caller<'_, PluginState>, level: i32, msg_ptr: i32, msg_len: i32| {
912 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
913 Some(m) => m,
914 None => return,
915 };
916
917 let start = msg_ptr as usize;
918 let end = start + msg_len as usize;
919 let data = memory.data(&caller);
920
921 if end <= data.len() {
922 if let Ok(message) = std::str::from_utf8(&data[start..end]) {
923 let plugin_name = caller.data().plugin_name.clone();
924 match level {
925 0 => tracing::error!(plugin = %plugin_name, "{}", message),
926 1 => tracing::warn!(plugin = %plugin_name, "{}", message),
927 2 => tracing::info!(plugin = %plugin_name, "{}", message),
928 _ => tracing::debug!(plugin = %plugin_name, "{}", message),
929 }
930 }
931 }
932 },
933 )
934 .map_err(|e| WasmError::Instantiation(format!("failed to add host_log: {}", e)))?;
935
936 linker
938 .func_wrap(
939 "barbacane",
940 "host_context_get",
941 |mut caller: Caller<'_, PluginState>, key_ptr: i32, key_len: i32| -> i32 {
942 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
943 Some(m) => m,
944 None => return -1,
945 };
946
947 let start = key_ptr as usize;
948 let end = start + key_len as usize;
949 let data = memory.data(&caller);
950
951 if end > data.len() {
952 return -1;
953 }
954
955 let key = match std::str::from_utf8(&data[start..end]) {
956 Ok(k) => k.to_string(),
957 Err(_) => return -1,
958 };
959
960 match caller.data().context.values.get(&key).cloned() {
961 Some(value) => {
962 let len = value.len() as i32;
963 caller.data_mut().context.last_get_result = Some(value);
964 len
965 }
966 None => -1,
967 }
968 },
969 )
970 .map_err(|e| WasmError::Instantiation(format!("failed to add host_context_get: {}", e)))?;
971
972 add_read_result_fn(linker, "host_context_read_result", |state| {
974 state.context.last_get_result.take().map(String::into_bytes)
975 })?;
976
977 linker
979 .func_wrap(
980 "barbacane",
981 "host_context_set",
982 |mut caller: Caller<'_, PluginState>,
983 key_ptr: i32,
984 key_len: i32,
985 val_ptr: i32,
986 val_len: i32| {
987 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
988 Some(m) => m,
989 None => return,
990 };
991
992 let key_start = key_ptr as usize;
993 let key_end = key_start + key_len as usize;
994 let val_start = val_ptr as usize;
995 let val_end = val_start + val_len as usize;
996
997 let data = memory.data(&caller);
999 if key_end <= data.len() && val_end <= data.len() {
1000 let key_result =
1001 std::str::from_utf8(&data[key_start..key_end]).map(String::from);
1002 let val_result =
1003 std::str::from_utf8(&data[val_start..val_end]).map(String::from);
1004
1005 if let (Ok(key), Ok(value)) = (key_result, val_result) {
1006 caller.data_mut().context.values.insert(key, value);
1007 }
1008 }
1009 },
1010 )
1011 .map_err(|e| WasmError::Instantiation(format!("failed to add host_context_set: {}", e)))?;
1012
1013 linker
1015 .func_wrap(
1016 "barbacane",
1017 "host_clock_now",
1018 |_caller: Caller<'_, PluginState>| -> i64 {
1019 use std::time::Instant;
1020
1021 static START: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
1022 let start = START.get_or_init(Instant::now);
1023
1024 start.elapsed().as_millis() as i64
1025 },
1026 )
1027 .map_err(|e| WasmError::Instantiation(format!("failed to add host_clock_now: {}", e)))?;
1028
1029 linker
1031 .func_wrap(
1032 "barbacane",
1033 "host_time_now",
1034 |_caller: Caller<'_, PluginState>| -> i64 {
1035 use std::time::Instant;
1036
1037 static START: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
1038 let start = START.get_or_init(Instant::now);
1039
1040 start.elapsed().as_millis() as i64
1041 },
1042 )
1043 .map_err(|e| WasmError::Instantiation(format!("failed to add host_time_now: {}", e)))?;
1044
1045 linker
1047 .func_wrap(
1048 "barbacane",
1049 "host_get_unix_timestamp",
1050 |_caller: Caller<'_, PluginState>| -> u64 {
1051 use std::time::{SystemTime, UNIX_EPOCH};
1052
1053 SystemTime::now()
1054 .duration_since(UNIX_EPOCH)
1055 .map(|d| d.as_secs())
1056 .unwrap_or(0)
1057 },
1058 )
1059 .map_err(|e| {
1060 WasmError::Instantiation(format!("failed to add host_get_unix_timestamp: {}", e))
1061 })?;
1062
1063 linker
1065 .func_wrap(
1066 "barbacane",
1067 "host_uuid_generate",
1068 |mut caller: Caller<'_, PluginState>| -> i32 {
1069 let uuid = uuid::Uuid::now_v7().to_string();
1070 let len = uuid.len() as i32;
1071 caller.data_mut().last_uuid_result = Some(uuid.into_bytes());
1072 len
1073 },
1074 )
1075 .map_err(|e| {
1076 WasmError::Instantiation(format!("failed to add host_uuid_generate: {}", e))
1077 })?;
1078
1079 add_read_result_fn(linker, "host_uuid_read_result", |state| {
1081 state.last_uuid_result.take()
1082 })?;
1083
1084 linker
1086 .func_wrap(
1087 "barbacane",
1088 "host_http_call",
1089 |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1090 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1091 Some(m) => m,
1092 None => return -1,
1093 };
1094
1095 let start = req_ptr as usize;
1096 let end = start + req_len as usize;
1097 let data = memory.data(&caller);
1098
1099 if end > data.len() {
1100 return -1;
1101 }
1102
1103 let plugin_request: PluginHttpRequest =
1105 match serde_json::from_slice(&data[start..end]) {
1106 Ok(r) => r,
1107 Err(e) => {
1108 tracing::error!("failed to parse HTTP request: {}", e);
1109 return -1;
1110 }
1111 };
1112
1113 let body = caller
1116 .data_mut()
1117 .http_request_body
1118 .take()
1119 .or(plugin_request.body);
1120
1121 let request = HttpClientRequest {
1122 method: plugin_request.method,
1123 url: plugin_request.url,
1124 headers: plugin_request.headers.into_iter().collect(),
1125 body,
1126 timeout: plugin_request
1127 .timeout_ms
1128 .map(std::time::Duration::from_millis),
1129 };
1130
1131 let http_client = match caller.data().http_client.clone() {
1133 Some(c) => c,
1134 None => {
1135 tracing::error!("HTTP client not available");
1136 return -1;
1137 }
1138 };
1139
1140 let response_result = std::thread::scope(|s| {
1146 let handle = s.spawn(|| {
1147 let rt = match tokio::runtime::Builder::new_current_thread()
1148 .enable_all()
1149 .build()
1150 {
1151 Ok(rt) => rt,
1152 Err(e) => {
1153 tracing::error!("failed to create runtime: {}", e);
1154 return None;
1155 }
1156 };
1157
1158 rt.block_on(async {
1159 match http_client.call(request).await {
1160 Ok(mut response) => {
1161 let body = response.body.take();
1164 let json = serde_json::to_vec(&response).ok();
1165 Some((json, body))
1166 }
1167 Err(e) => {
1168 tracing::error!("HTTP call failed: {}", e);
1169 let error_response = match e {
1171 crate::http_client::HttpClientError::Timeout => {
1172 HttpClientResponse::error(
1173 504,
1174 "urn:barbacane:error:upstream-timeout",
1175 "Gateway Timeout",
1176 "Upstream request timed out",
1177 )
1178 }
1179 crate::http_client::HttpClientError::CircuitOpen(host) => {
1180 HttpClientResponse::error(
1181 503,
1182 "urn:barbacane:error:circuit-open",
1183 "Service Unavailable",
1184 &format!("Circuit breaker open for {}", host),
1185 )
1186 }
1187 crate::http_client::HttpClientError::ConnectionFailed(
1188 _,
1189 ) => HttpClientResponse::error(
1190 502,
1191 "urn:barbacane:error:upstream-unavailable",
1192 "Bad Gateway",
1193 "Failed to connect to upstream",
1194 ),
1195 _ => HttpClientResponse::error(
1196 502,
1197 "urn:barbacane:error:upstream-unavailable",
1198 "Bad Gateway",
1199 &e.to_string(),
1200 ),
1201 };
1202 let json = serde_json::to_vec(&error_response).ok();
1203 Some((json, None))
1204 }
1205 }
1206 })
1207 });
1208
1209 match handle.join() {
1210 Ok(result) => result,
1211 Err(e) => {
1212 tracing::error!("worker thread panicked: {:?}", e);
1213 None
1214 }
1215 }
1216 });
1217
1218 match response_result {
1219 Some((Some(json), body)) => {
1220 let len = json.len() as i32;
1221 caller.data_mut().last_http_result = Some(json);
1222 caller.data_mut().http_response_body = body;
1223 len
1224 }
1225 _ => -1,
1226 }
1227 },
1228 )
1229 .map_err(|e| WasmError::Instantiation(format!("failed to add host_http_call: {}", e)))?;
1230
1231 add_read_result_fn(linker, "host_http_read_result", |state| {
1233 state.last_http_result.take()
1234 })?;
1235
1236 linker
1243 .func_wrap(
1244 "barbacane",
1245 "host_http_stream",
1246 |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1247 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1248 Some(m) => m,
1249 None => return -1,
1250 };
1251
1252 let start = req_ptr as usize;
1253 let end = start + req_len as usize;
1254 let data = memory.data(&caller);
1255
1256 if end > data.len() {
1257 return -1;
1258 }
1259
1260 let plugin_request: PluginHttpRequest =
1261 match serde_json::from_slice(&data[start..end]) {
1262 Ok(r) => r,
1263 Err(e) => {
1264 tracing::error!("host_http_stream: failed to parse request: {}", e);
1265 return -1;
1266 }
1267 };
1268
1269 let body = caller
1271 .data_mut()
1272 .http_request_body
1273 .take()
1274 .or(plugin_request.body);
1275
1276 let request = HttpClientRequest {
1277 method: plugin_request.method,
1278 url: plugin_request.url,
1279 headers: plugin_request.headers.into_iter().collect(),
1280 body,
1281 timeout: plugin_request
1282 .timeout_ms
1283 .map(std::time::Duration::from_millis),
1284 };
1285
1286 let http_client = match caller.data().http_client.clone() {
1287 Some(c) => c,
1288 None => {
1289 tracing::error!("host_http_stream: HTTP client not available");
1290 return -1;
1291 }
1292 };
1293
1294 let stream_sender = caller.data().stream_sender.clone();
1296
1297 let response_result = std::thread::scope(|s| {
1298 let handle = s.spawn(|| {
1299 let rt = match tokio::runtime::Builder::new_current_thread()
1300 .enable_all()
1301 .build()
1302 {
1303 Ok(rt) => rt,
1304 Err(e) => {
1305 tracing::error!(
1306 "host_http_stream: failed to create runtime: {}",
1307 e
1308 );
1309 return None;
1310 }
1311 };
1312
1313 rt.block_on(async {
1314 use futures_util::StreamExt;
1315
1316 match http_client.stream_raw(request).await {
1317 Ok(upstream) => {
1318 let status = upstream.status().as_u16();
1319 let upstream_headers: BTreeMap<String, String> = upstream
1320 .headers()
1321 .iter()
1322 .filter_map(|(k, v)| {
1323 v.to_str()
1324 .ok()
1325 .map(|v| (k.as_str().to_lowercase(), v.to_string()))
1326 })
1327 .collect();
1328
1329 if let Some(tx) = &stream_sender {
1331 let _ = tx.send(StreamEvent::Headers {
1332 status,
1333 headers: upstream_headers.clone(),
1334 });
1335 }
1336
1337 let mut buffer: Vec<u8> = Vec::new();
1340 let mut byte_stream = upstream.bytes_stream();
1341
1342 while let Some(chunk_result) = byte_stream.next().await {
1343 match chunk_result {
1344 Ok(chunk) => {
1345 if let Some(tx) = &stream_sender {
1346 let _ =
1347 tx.send(StreamEvent::Chunk(chunk.clone()));
1348 }
1349 buffer.extend_from_slice(&chunk);
1350 }
1351 Err(e) => {
1352 tracing::error!(
1353 "host_http_stream: upstream read error: {}",
1354 e
1355 );
1356 return None;
1357 }
1358 }
1359 }
1360
1361 let complete = HttpClientResponse {
1364 status,
1365 headers: upstream_headers.into_iter().collect(),
1366 body: None,
1367 };
1368 let json = serde_json::to_vec(&complete).ok();
1369 Some((json, Some(buffer)))
1370 }
1371 Err(e) => {
1372 tracing::error!("host_http_stream: request failed: {}", e);
1373 None
1374 }
1375 }
1376 })
1377 });
1378
1379 match handle.join() {
1380 Ok(result) => result,
1381 Err(e) => {
1382 tracing::error!("host_http_stream: worker thread panicked: {:?}", e);
1383 None
1384 }
1385 }
1386 });
1387
1388 match response_result {
1389 Some((Some(json), body)) => {
1390 let len = json.len() as i32;
1391 caller.data_mut().last_http_result = Some(json);
1392 caller.data_mut().http_response_body = body;
1393 len
1394 }
1395 _ => -1,
1396 }
1397 },
1398 )
1399 .map_err(|e| WasmError::Instantiation(format!("failed to add host_http_stream: {}", e)))?;
1400
1401 linker
1410 .func_wrap(
1411 "barbacane",
1412 "host_ws_upgrade",
1413 |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1414 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1415 Some(m) => m,
1416 None => return -1,
1417 };
1418
1419 let start = req_ptr as usize;
1420 let end = start + req_len as usize;
1421 let data = memory.data(&caller);
1422
1423 if end > data.len() {
1424 return -1;
1425 }
1426
1427 let ws_request: crate::ws_client::WsUpgradeRequest =
1428 match serde_json::from_slice(&data[start..end]) {
1429 Ok(r) => r,
1430 Err(e) => {
1431 tracing::error!("host_ws_upgrade: failed to parse request: {}", e);
1432 let err_msg = format!("invalid upgrade request: {}", e);
1433 caller.data_mut().last_http_result = Some(err_msg.into_bytes());
1434 return -1;
1435 }
1436 };
1437
1438 let plugin_name = caller.data().plugin_name.clone();
1439 tracing::debug!(
1440 plugin = %plugin_name,
1441 url = %ws_request.url,
1442 "host_ws_upgrade: storing request for deferred connection"
1443 );
1444
1445 caller.data_mut().ws_upgrade_request = Some(ws_request);
1448 0
1449 },
1450 )
1451 .map_err(|e| WasmError::Instantiation(format!("failed to add host_ws_upgrade: {}", e)))?;
1452
1453 linker
1455 .func_wrap(
1456 "barbacane",
1457 "host_verify_signature",
1458 |mut caller: Caller<'_, PluginState>, req_ptr: i32, req_len: i32| -> i32 {
1459 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1460 Some(m) => m,
1461 None => return -1,
1462 };
1463
1464 let start = req_ptr as usize;
1465 let end = start + req_len as usize;
1466 let data = memory.data(&caller);
1467
1468 if end > data.len() {
1469 return -1;
1470 }
1471
1472 let request: crate::crypto::VerifySignatureRequest =
1474 match serde_json::from_slice(&data[start..end]) {
1475 Ok(r) => r,
1476 Err(e) => {
1477 tracing::error!(
1478 plugin = %caller.data_mut().plugin_name,
1479 "failed to parse verify_signature request: {}", e
1480 );
1481 return -1;
1482 }
1483 };
1484
1485 match crate::crypto::verify_signature(&request) {
1487 Ok(true) => 1,
1488 Ok(false) => 0,
1489 Err(e) => {
1490 tracing::error!(
1491 plugin = %caller.data_mut().plugin_name,
1492 "signature verification error: {}", e
1493 );
1494 -1
1495 }
1496 }
1497 },
1498 )
1499 .map_err(|e| {
1500 WasmError::Instantiation(format!("failed to add host_verify_signature: {}", e))
1501 })?;
1502
1503 linker
1505 .func_wrap(
1506 "barbacane",
1507 "host_get_secret",
1508 |mut caller: Caller<'_, PluginState>, ref_ptr: i32, ref_len: i32| -> i32 {
1509 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1510 Some(m) => m,
1511 None => return -1,
1512 };
1513
1514 let start = ref_ptr as usize;
1515 let end = start + ref_len as usize;
1516 let data = memory.data(&caller);
1517
1518 if end > data.len() {
1519 return -1;
1520 }
1521
1522 let secret_ref = match std::str::from_utf8(&data[start..end]) {
1524 Ok(r) => r.to_string(),
1525 Err(_) => return -1,
1526 };
1527
1528 match caller.data().secrets.get(&secret_ref) {
1530 Some(value) => {
1531 let bytes = value.as_bytes().to_vec();
1532 let len = bytes.len() as i32;
1533 caller.data_mut().last_secret_result = Some(bytes);
1534 len
1535 }
1536 None => {
1537 tracing::warn!(
1538 plugin = %caller.data().plugin_name,
1539 reference = %secret_ref,
1540 "secret not found in store"
1541 );
1542 -1
1543 }
1544 }
1545 },
1546 )
1547 .map_err(|e| WasmError::Instantiation(format!("failed to add host_get_secret: {}", e)))?;
1548
1549 add_read_result_fn(linker, "host_secret_read_result", |state| {
1551 state.last_secret_result.take()
1552 })?;
1553
1554 linker
1556 .func_wrap(
1557 "barbacane",
1558 "host_rate_limit_check",
1559 |mut caller: Caller<'_, PluginState>,
1560 key_ptr: i32,
1561 key_len: i32,
1562 quota: u32,
1563 window_secs: u32|
1564 -> i32 {
1565 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1566 Some(m) => m,
1567 None => return -1,
1568 };
1569
1570 let start = key_ptr as usize;
1571 let end = start + key_len as usize;
1572 let data = memory.data(&caller);
1573
1574 if end > data.len() {
1575 return -1;
1576 }
1577
1578 let key = match std::str::from_utf8(&data[start..end]) {
1580 Ok(k) => k.to_string(),
1581 Err(_) => return -1,
1582 };
1583
1584 let rate_limiter = match &caller.data().rate_limiter {
1586 Some(rl) => rl.clone(),
1587 None => {
1588 tracing::error!("rate limiter not available");
1589 return -1;
1590 }
1591 };
1592
1593 let result = rate_limiter.check(&key, quota, window_secs as u64);
1595
1596 match serde_json::to_vec(&result) {
1598 Ok(json) => {
1599 let len = json.len() as i32;
1600 caller.data_mut().last_rate_limit_result = Some(json);
1601 len
1602 }
1603 Err(e) => {
1604 tracing::error!("failed to serialize rate limit result: {}", e);
1605 -1
1606 }
1607 }
1608 },
1609 )
1610 .map_err(|e| {
1611 WasmError::Instantiation(format!("failed to add host_rate_limit_check: {}", e))
1612 })?;
1613
1614 add_read_result_fn(linker, "host_rate_limit_read_result", |state| {
1616 state.last_rate_limit_result.take()
1617 })?;
1618
1619 linker
1621 .func_wrap(
1622 "barbacane",
1623 "host_cache_get",
1624 |mut caller: Caller<'_, PluginState>, key_ptr: i32, key_len: i32| -> i32 {
1625 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1626 Some(m) => m,
1627 None => return -1,
1628 };
1629
1630 let start = key_ptr as usize;
1631 let end = start + key_len as usize;
1632 let data = memory.data(&caller);
1633
1634 if end > data.len() {
1635 return -1;
1636 }
1637
1638 let key = match std::str::from_utf8(&data[start..end]) {
1640 Ok(k) => k.to_string(),
1641 Err(_) => return -1,
1642 };
1643
1644 let cache = match &caller.data().response_cache {
1646 Some(c) => c.clone(),
1647 None => {
1648 tracing::error!("response cache not available");
1649 return -1;
1650 }
1651 };
1652
1653 let result = cache.get(&key);
1655
1656 match serde_json::to_vec(&result) {
1658 Ok(json) => {
1659 let len = json.len() as i32;
1660 caller.data_mut().last_cache_result = Some(json);
1661 len
1662 }
1663 Err(e) => {
1664 tracing::error!("failed to serialize cache result: {}", e);
1665 -1
1666 }
1667 }
1668 },
1669 )
1670 .map_err(|e| WasmError::Instantiation(format!("failed to add host_cache_get: {}", e)))?;
1671
1672 linker
1674 .func_wrap(
1675 "barbacane",
1676 "host_cache_set",
1677 |mut caller: Caller<'_, PluginState>,
1678 key_ptr: i32,
1679 key_len: i32,
1680 entry_ptr: i32,
1681 entry_len: i32,
1682 ttl_secs: u32|
1683 -> i32 {
1684 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1685 Some(m) => m,
1686 None => return -1,
1687 };
1688
1689 let key_start = key_ptr as usize;
1690 let key_end = key_start + key_len as usize;
1691 let entry_start = entry_ptr as usize;
1692 let entry_end = entry_start + entry_len as usize;
1693 let data = memory.data(&caller);
1694
1695 if key_end > data.len() || entry_end > data.len() {
1696 return -1;
1697 }
1698
1699 let key = match std::str::from_utf8(&data[key_start..key_end]) {
1701 Ok(k) => k.to_string(),
1702 Err(_) => return -1,
1703 };
1704
1705 let entry: crate::cache::CacheEntry =
1707 match serde_json::from_slice(&data[entry_start..entry_end]) {
1708 Ok(e) => e,
1709 Err(e) => {
1710 tracing::error!("failed to parse cache entry: {}", e);
1711 return -1;
1712 }
1713 };
1714
1715 let cache = match &caller.data().response_cache {
1717 Some(c) => c.clone(),
1718 None => {
1719 tracing::error!("response cache not available");
1720 return -1;
1721 }
1722 };
1723
1724 cache.set(&key, entry, ttl_secs as u64);
1726 0 },
1728 )
1729 .map_err(|e| WasmError::Instantiation(format!("failed to add host_cache_set: {}", e)))?;
1730
1731 add_read_result_fn(linker, "host_cache_read_result", |state| {
1733 state.last_cache_result.take()
1734 })?;
1735
1736 linker
1740 .func_wrap(
1741 "barbacane",
1742 "host_metric_counter_inc",
1743 |mut caller: Caller<'_, PluginState>,
1744 name_ptr: i32,
1745 name_len: i32,
1746 labels_ptr: i32,
1747 labels_len: i32,
1748 value: f64| {
1749 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1750 Some(m) => m,
1751 None => return,
1752 };
1753
1754 let data = memory.data(&caller);
1755 let name_start = name_ptr as usize;
1756 let name_end = name_start + name_len as usize;
1757 let labels_start = labels_ptr as usize;
1758 let labels_end = labels_start + labels_len as usize;
1759
1760 if name_end > data.len() || labels_end > data.len() {
1761 return;
1762 }
1763
1764 let name = match std::str::from_utf8(&data[name_start..name_end]) {
1765 Ok(n) => n.to_string(),
1766 Err(_) => return,
1767 };
1768
1769 let labels_json = match std::str::from_utf8(&data[labels_start..labels_end]) {
1770 Ok(l) => l.to_string(),
1771 Err(_) => return,
1772 };
1773
1774 let plugin_name = caller.data().plugin_name.clone();
1775 if let Some(metrics) = &caller.data().metrics {
1776 metrics.plugin_counter_inc(&plugin_name, &name, &labels_json, value as u64);
1777 }
1778 },
1779 )
1780 .map_err(|e| {
1781 WasmError::Instantiation(format!("failed to add host_metric_counter_inc: {}", e))
1782 })?;
1783
1784 linker
1786 .func_wrap(
1787 "barbacane",
1788 "host_metric_histogram_observe",
1789 |mut caller: Caller<'_, PluginState>,
1790 name_ptr: i32,
1791 name_len: i32,
1792 labels_ptr: i32,
1793 labels_len: i32,
1794 value: f64| {
1795 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1796 Some(m) => m,
1797 None => return,
1798 };
1799
1800 let data = memory.data(&caller);
1801 let name_start = name_ptr as usize;
1802 let name_end = name_start + name_len as usize;
1803 let labels_start = labels_ptr as usize;
1804 let labels_end = labels_start + labels_len as usize;
1805
1806 if name_end > data.len() || labels_end > data.len() {
1807 return;
1808 }
1809
1810 let name = match std::str::from_utf8(&data[name_start..name_end]) {
1811 Ok(n) => n.to_string(),
1812 Err(_) => return,
1813 };
1814
1815 let labels_json = match std::str::from_utf8(&data[labels_start..labels_end]) {
1816 Ok(l) => l.to_string(),
1817 Err(_) => return,
1818 };
1819
1820 let plugin_name = caller.data().plugin_name.clone();
1821 if let Some(metrics) = &caller.data().metrics {
1822 metrics.plugin_histogram_observe(&plugin_name, &name, &labels_json, value);
1823 }
1824 },
1825 )
1826 .map_err(|e| {
1827 WasmError::Instantiation(format!(
1828 "failed to add host_metric_histogram_observe: {}",
1829 e
1830 ))
1831 })?;
1832
1833 linker
1836 .func_wrap(
1837 "barbacane",
1838 "host_span_start",
1839 |mut caller: Caller<'_, PluginState>, name_ptr: i32, name_len: i32| -> i32 {
1840 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1841 Some(m) => m,
1842 None => return -1,
1843 };
1844
1845 let data = memory.data(&caller);
1846 let start = name_ptr as usize;
1847 let end = start + name_len as usize;
1848
1849 if end > data.len() {
1850 return -1;
1851 }
1852
1853 let span_name = match std::str::from_utf8(&data[start..end]) {
1854 Ok(n) => n,
1855 Err(_) => return -1,
1856 };
1857
1858 let plugin_name = &caller.data().plugin_name;
1860 tracing::debug!(plugin = %plugin_name, span = %span_name, "plugin span started");
1861
1862 1
1864 },
1865 )
1866 .map_err(|e| WasmError::Instantiation(format!("failed to add host_span_start: {}", e)))?;
1867
1868 linker
1870 .func_wrap(
1871 "barbacane",
1872 "host_span_end",
1873 |caller: Caller<'_, PluginState>| {
1874 let plugin_name = &caller.data().plugin_name;
1875 tracing::debug!(plugin = %plugin_name, "plugin span ended");
1876 },
1877 )
1878 .map_err(|e| WasmError::Instantiation(format!("failed to add host_span_end: {}", e)))?;
1879
1880 linker
1882 .func_wrap(
1883 "barbacane",
1884 "host_span_set_attribute",
1885 |mut caller: Caller<'_, PluginState>,
1886 key_ptr: i32,
1887 key_len: i32,
1888 val_ptr: i32,
1889 val_len: i32| {
1890 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1891 Some(m) => m,
1892 None => return,
1893 };
1894
1895 let data = memory.data(&caller);
1896 let key_start = key_ptr as usize;
1897 let key_end = key_start + key_len as usize;
1898 let val_start = val_ptr as usize;
1899 let val_end = val_start + val_len as usize;
1900
1901 if key_end > data.len() || val_end > data.len() {
1902 return;
1903 }
1904
1905 let key = match std::str::from_utf8(&data[key_start..key_end]) {
1906 Ok(k) => k,
1907 Err(_) => return,
1908 };
1909
1910 let value = match std::str::from_utf8(&data[val_start..val_end]) {
1911 Ok(v) => v,
1912 Err(_) => return,
1913 };
1914
1915 let plugin_name = &caller.data().plugin_name;
1916 tracing::debug!(plugin = %plugin_name, %key, %value, "plugin span attribute set");
1917 },
1918 )
1919 .map_err(|e| {
1920 WasmError::Instantiation(format!("failed to add host_span_set_attribute: {}", e))
1921 })?;
1922
1923 linker
1927 .func_wrap(
1928 "barbacane",
1929 "host_kafka_publish",
1930 |mut caller: Caller<'_, PluginState>, msg_ptr: i32, msg_len: i32| -> i32 {
1931 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
1932 Some(m) => m,
1933 None => return -1,
1934 };
1935
1936 let start = msg_ptr as usize;
1937 let end = start + msg_len as usize;
1938 let data = memory.data(&caller);
1939
1940 if end > data.len() {
1941 return -1;
1942 }
1943
1944 let message: BrokerMessage = match serde_json::from_slice(&data[start..end]) {
1946 Ok(m) => m,
1947 Err(e) => {
1948 tracing::error!("failed to parse broker message: {}", e);
1949 return -1;
1950 }
1951 };
1952
1953 let brokers = match &message.url {
1955 Some(u) => u.clone(),
1956 None => {
1957 tracing::error!("Kafka publish: missing url in broker message");
1958 return -1;
1959 }
1960 };
1961
1962 let publisher = match caller.data().kafka_publisher.clone() {
1964 Some(p) => p,
1965 None => {
1966 tracing::error!("Kafka publisher not available");
1967 return -1;
1968 }
1969 };
1970
1971 let topic = message.topic.clone();
1972 let key = message.key.clone();
1973 let payload = message.payload.clone();
1974 let headers = message.headers.clone();
1975
1976 let result = std::thread::scope(|s| {
1979 let handle = s.spawn(|| {
1980 publisher.publish_blocking(&brokers, &topic, key, &payload, headers)
1981 });
1982
1983 match handle.join() {
1984 Ok(result) => Some(result),
1985 Err(e) => {
1986 tracing::error!("Kafka publish thread panicked: {:?}", e);
1987 None
1988 }
1989 }
1990 });
1991
1992 let result_json = match result {
1994 Some(Ok(r)) => serde_json::to_vec(&r),
1995 Some(Err(e)) => {
1996 let error_result =
1997 crate::broker::PublishResult::failure(message.topic, e.to_string());
1998 serde_json::to_vec(&error_result)
1999 }
2000 None => {
2001 let error_result = crate::broker::PublishResult::failure(
2002 message.topic,
2003 "Kafka publish failed".to_string(),
2004 );
2005 serde_json::to_vec(&error_result)
2006 }
2007 };
2008
2009 match result_json {
2010 Ok(json) => {
2011 let len = json.len() as i32;
2012 caller.data_mut().last_broker_result = Some(json);
2013 len
2014 }
2015 Err(e) => {
2016 tracing::error!("failed to serialize broker result: {}", e);
2017 -1
2018 }
2019 }
2020 },
2021 )
2022 .map_err(|e| {
2023 WasmError::Instantiation(format!("failed to add host_kafka_publish: {}", e))
2024 })?;
2025
2026 linker
2028 .func_wrap(
2029 "barbacane",
2030 "host_nats_publish",
2031 |mut caller: Caller<'_, PluginState>, msg_ptr: i32, msg_len: i32| -> i32 {
2032 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2033 Some(m) => m,
2034 None => return -1,
2035 };
2036
2037 let start = msg_ptr as usize;
2038 let end = start + msg_len as usize;
2039 let data = memory.data(&caller);
2040
2041 if end > data.len() {
2042 return -1;
2043 }
2044
2045 let message: BrokerMessage = match serde_json::from_slice(&data[start..end]) {
2047 Ok(m) => m,
2048 Err(e) => {
2049 tracing::error!("failed to parse broker message: {}", e);
2050 return -1;
2051 }
2052 };
2053
2054 let url = match &message.url {
2056 Some(u) => u.clone(),
2057 None => {
2058 tracing::error!("NATS publish: missing url in broker message");
2059 return -1;
2060 }
2061 };
2062
2063 let publisher = match caller.data().nats_publisher.clone() {
2065 Some(p) => p,
2066 None => {
2067 tracing::error!("NATS publisher not available");
2068 return -1;
2069 }
2070 };
2071
2072 let subject = message.topic.clone();
2073 let payload = bytes::Bytes::from(message.payload.clone());
2074 let headers = message.headers.clone();
2075
2076 let result = std::thread::scope(|s| {
2079 let handle =
2080 s.spawn(|| publisher.publish_blocking(&url, &subject, payload, headers));
2081
2082 match handle.join() {
2083 Ok(result) => Some(result),
2084 Err(e) => {
2085 tracing::error!("NATS publish thread panicked: {:?}", e);
2086 None
2087 }
2088 }
2089 });
2090
2091 let result_json = match result {
2093 Some(Ok(r)) => serde_json::to_vec(&r),
2094 Some(Err(e)) => {
2095 let error_result =
2096 crate::broker::PublishResult::failure(message.topic, e.to_string());
2097 serde_json::to_vec(&error_result)
2098 }
2099 None => {
2100 let error_result = crate::broker::PublishResult::failure(
2101 message.topic,
2102 "NATS publish failed".to_string(),
2103 );
2104 serde_json::to_vec(&error_result)
2105 }
2106 };
2107
2108 match result_json {
2109 Ok(json) => {
2110 let len = json.len() as i32;
2111 caller.data_mut().last_broker_result = Some(json);
2112 len
2113 }
2114 Err(e) => {
2115 tracing::error!("failed to serialize broker result: {}", e);
2116 -1
2117 }
2118 }
2119 },
2120 )
2121 .map_err(|e| WasmError::Instantiation(format!("failed to add host_nats_publish: {}", e)))?;
2122
2123 add_read_result_fn(linker, "host_broker_read_result", |state| {
2125 state.last_broker_result.take()
2126 })?;
2127
2128 linker
2137 .func_wrap(
2138 "wasi_snapshot_preview1",
2139 "random_get",
2140 |mut caller: Caller<'_, PluginState>, buf_ptr: i32, buf_len: i32| -> i32 {
2141 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2142 Some(m) => m,
2143 None => return 1,
2144 };
2145 let start = buf_ptr as usize;
2146 let end = start + buf_len as usize;
2147 let data = memory.data_mut(&mut caller);
2148 if end > data.len() {
2149 return 1;
2150 }
2151 for (i, byte) in data[start..end].iter_mut().enumerate() {
2152 *byte = (i.wrapping_mul(0x9E3779B9) >> 24) as u8;
2153 }
2154 0
2155 },
2156 )
2157 .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2158
2159 linker
2161 .func_wrap("wasi_snapshot_preview1", "sched_yield", || -> i32 { 0 })
2162 .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2163
2164 linker
2166 .func_wrap(
2167 "wasi_snapshot_preview1",
2168 "clock_time_get",
2169 |mut caller: Caller<'_, PluginState>,
2170 _clock_id: i32,
2171 _precision: i64,
2172 time_ptr: i32|
2173 -> i32 {
2174 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2175 Some(m) => m,
2176 None => return 1,
2177 };
2178 let nanos = std::time::SystemTime::now()
2179 .duration_since(std::time::UNIX_EPOCH)
2180 .map(|d| d.as_nanos() as u64)
2181 .unwrap_or(0);
2182 let ptr = time_ptr as usize;
2183 let data = memory.data_mut(&mut caller);
2184 if ptr + 8 > data.len() {
2185 return 1;
2186 }
2187 data[ptr..ptr + 8].copy_from_slice(&nanos.to_le_bytes());
2188 0
2189 },
2190 )
2191 .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2192
2193 linker
2195 .func_wrap(
2196 "wasi_snapshot_preview1",
2197 "fd_write",
2198 |mut caller: Caller<'_, PluginState>,
2199 _fd: i32,
2200 iovs_ptr: i32,
2201 iovs_len: i32,
2202 nwritten_ptr: i32|
2203 -> i32 {
2204 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2205 Some(m) => m,
2206 None => return 1,
2207 };
2208 let data = memory.data_mut(&mut caller);
2209 let mut total: u32 = 0;
2210 for i in 0..iovs_len as usize {
2211 let base = (iovs_ptr as usize) + i * 8;
2212 if base + 8 > data.len() {
2213 return 1;
2214 }
2215 let len =
2216 u32::from_le_bytes(data[base + 4..base + 8].try_into().unwrap_or([0; 4]));
2217 total = total.saturating_add(len);
2218 }
2219 let ptr = nwritten_ptr as usize;
2220 if ptr + 4 > data.len() {
2221 return 1;
2222 }
2223 data[ptr..ptr + 4].copy_from_slice(&total.to_le_bytes());
2224 0
2225 },
2226 )
2227 .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2228
2229 linker
2231 .func_wrap(
2232 "wasi_snapshot_preview1",
2233 "environ_get",
2234 |_caller: Caller<'_, PluginState>, _environ: i32, _environ_buf: i32| -> i32 { 0 },
2235 )
2236 .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2237
2238 linker
2240 .func_wrap(
2241 "wasi_snapshot_preview1",
2242 "environ_sizes_get",
2243 |mut caller: Caller<'_, PluginState>, num_ptr: i32, buf_size_ptr: i32| -> i32 {
2244 let memory = match caller.get_export("memory").and_then(|e| e.into_memory()) {
2245 Some(m) => m,
2246 None => return 1,
2247 };
2248 let data = memory.data_mut(&mut caller);
2249 let np = num_ptr as usize;
2250 let bp = buf_size_ptr as usize;
2251 if np + 4 > data.len() || bp + 4 > data.len() {
2252 return 1;
2253 }
2254 data[np..np + 4].copy_from_slice(&0u32.to_le_bytes());
2255 data[bp..bp + 4].copy_from_slice(&0u32.to_le_bytes());
2256 0
2257 },
2258 )
2259 .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2260
2261 linker
2263 .func_wrap(
2264 "wasi_snapshot_preview1",
2265 "proc_exit",
2266 |_caller: Caller<'_, PluginState>, _code: i32| {
2267 },
2269 )
2270 .map_err(|e| WasmError::Instantiation(format!("wasi stub: {e}")))?;
2271
2272 Ok(())
2273}
2274
2275#[cfg(test)]
2276mod tests {
2277 use super::*;
2278
2279 #[test]
2280 fn request_context_new() {
2281 let ctx = RequestContext::new("trace-123".into(), "req-456".into());
2282 assert_eq!(ctx.trace_id, "trace-123");
2283 assert_eq!(ctx.request_id, "req-456");
2284 assert!(ctx.values.is_empty());
2285 }
2286
2287 #[test]
2288 fn plugin_state_take_output() {
2289 let limits = PluginLimits::default();
2290 let mut state = PluginState::new("test".into(), &limits);
2291 state.output_buffer = vec![1, 2, 3];
2292
2293 let output = state.take_output();
2294 assert_eq!(output, vec![1, 2, 3]);
2295 assert!(state.output_buffer.is_empty());
2296 }
2297
2298 #[test]
2299 fn plugin_state_uuid_result_initialized() {
2300 let limits = PluginLimits::default();
2301 let state = PluginState::new("test".into(), &limits);
2302 assert!(state.last_uuid_result.is_none());
2303 }
2304
2305 #[test]
2306 fn uuid_v7_format() {
2307 let uuid = uuid::Uuid::now_v7().to_string();
2309 assert_eq!(uuid.len(), 36); assert!(uuid.chars().nth(14) == Some('7')); }
2312
2313 #[test]
2314 fn plugin_state_nats_publisher_default() {
2315 let limits = PluginLimits::default();
2316 let state = PluginState::new("test".into(), &limits);
2317 assert!(state.nats_publisher.is_none());
2318 }
2319
2320 #[test]
2321 fn plugin_state_broker_result_default() {
2322 let limits = PluginLimits::default();
2323 let state = PluginState::new("test".into(), &limits);
2324 assert!(state.last_broker_result.is_none());
2325 }
2326
2327 #[test]
2328 fn plugin_state_kafka_publisher_default() {
2329 let limits = PluginLimits::default();
2330 let state = PluginState::new("test".into(), &limits);
2331 assert!(state.kafka_publisher.is_none());
2332 }
2333
2334 #[test]
2337 fn plugin_state_stream_sender_default_is_none() {
2338 let limits = PluginLimits::default();
2339 let state = PluginState::new("test".into(), &limits);
2340 assert!(state.stream_sender.is_none());
2341 }
2342
2343 #[test]
2344 fn plugin_state_set_stream_sender() {
2345 let limits = PluginLimits::default();
2346 let mut state = PluginState::new("test".into(), &limits);
2347 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<crate::instance::StreamEvent>();
2348 state.set_stream_sender(Arc::new(tx));
2349 assert!(state.stream_sender.is_some());
2350 }
2351
2352 #[test]
2353 fn plugin_state_take_last_http_result_default_is_none() {
2354 let limits = PluginLimits::default();
2355 let state = PluginState::new("test".into(), &limits);
2356 assert!(state.last_http_result.is_none());
2357 }
2358
2359 #[test]
2360 fn plugin_state_take_last_http_result_takes_value() {
2361 let limits = PluginLimits::default();
2362 let mut state = PluginState::new("test".into(), &limits);
2363 state.last_http_result = Some(vec![1, 2, 3]);
2364
2365 let taken = state.last_http_result.take();
2366 assert_eq!(taken, Some(vec![1, 2, 3]));
2367 assert!(state.last_http_result.is_none());
2368 }
2369
2370 #[test]
2371 fn stream_event_headers_fields() {
2372 let event = StreamEvent::Headers {
2373 status: 200,
2374 headers: std::collections::BTreeMap::from([(
2375 "content-type".to_string(),
2376 "text/event-stream".to_string(),
2377 )]),
2378 };
2379 if let StreamEvent::Headers { status, headers } = event {
2380 assert_eq!(status, 200);
2381 assert_eq!(
2382 headers.get("content-type").map(String::as_str),
2383 Some("text/event-stream")
2384 );
2385 } else {
2386 panic!("expected Headers variant");
2387 }
2388 }
2389
2390 #[test]
2391 fn stream_event_chunk_contains_bytes() {
2392 let data = bytes::Bytes::from_static(b"data: hello\n\n");
2393 let event = StreamEvent::Chunk(data.clone());
2394 if let StreamEvent::Chunk(b) = event {
2395 assert_eq!(b, data);
2396 } else {
2397 panic!("expected Chunk variant");
2398 }
2399 }
2400
2401 #[test]
2402 fn plugin_state_with_all_options_sets_publishers() {
2403 let limits = PluginLimits::default();
2404 let nats = Arc::new(crate::nats_client::NatsPublisher::new());
2405 let kafka = Arc::new(crate::kafka_client::KafkaPublisher::new());
2406 let state = PluginState::with_all_options(
2407 "test".into(),
2408 &limits,
2409 None,
2410 crate::secrets::SecretsStore::new(),
2411 None,
2412 None,
2413 Some(nats),
2414 Some(kafka),
2415 );
2416 assert!(state.nats_publisher.is_some());
2417 assert!(state.kafka_publisher.is_some());
2418 }
2419}