use std::future::Future;
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use fidius_core::Value;
use wasmtime::component::{Component, InstancePre, Linker, Val};
use wasmtime::{Engine, Store};
use wasmtime_wasi::p2::add_to_linker_sync;
use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
use wasmtime_wasi_http::p2::bindings::http::types::ErrorCode;
use wasmtime_wasi_http::p2::body::HyperOutgoingBody;
use wasmtime_wasi_http::p2::types::{HostFutureIncomingResponse, OutgoingRequestConfig};
use wasmtime_wasi_http::p2::{
add_only_http_to_linker_sync, default_send_request, HttpResult, WasiHttpCtxView, WasiHttpHooks,
WasiHttpView,
};
use wasmtime_wasi_http::WasiHttpCtx;
use crate::error::CallError;
use crate::executor::{PluginExecutor, ValueExecutor};
use crate::types::PluginInfo;
#[derive(Debug, Clone)]
pub struct EgressDenied {
pub reason: String,
}
impl EgressDenied {
pub fn new(reason: impl Into<String>) -> Self {
Self {
reason: reason.into(),
}
}
}
pub trait EgressPolicy: Send + Sync + 'static {
fn authorize(&self, parts: &mut http::request::Parts) -> Result<(), EgressDenied>;
}
struct EgressHooks {
policy: Option<Arc<dyn EgressPolicy>>,
}
impl WasiHttpHooks for EgressHooks {
fn send_request(
&mut self,
request: http::Request<HyperOutgoingBody>,
config: OutgoingRequestConfig,
) -> HttpResult<HostFutureIncomingResponse> {
let Some(policy) = self.policy.as_ref() else {
return Err(ErrorCode::HttpRequestDenied.into());
};
let (mut parts, body) = request.into_parts();
if policy.authorize(&mut parts).is_err() {
return Err(ErrorCode::HttpRequestDenied.into());
}
Ok(default_send_request(
http::Request::from_parts(parts, body),
config,
))
}
}
struct HostState {
ctx: WasiCtx,
table: ResourceTable,
http_ctx: WasiHttpCtx,
hooks: EgressHooks,
}
impl WasiHttpView for HostState {
fn http(&mut self) -> WasiHttpCtxView<'_> {
WasiHttpCtxView {
ctx: &mut self.http_ctx,
table: &mut self.table,
hooks: &mut self.hooks,
}
}
}
const KNOWN_CAPABILITIES: &[&str] = &[
"args", "stdout", "stderr", "stdin", "network", "sockets", "clocks", "random",
"http",
];
fn validate_capabilities(caps: &[String]) -> Result<(), CallError> {
for c in caps {
if c == "env" {
return Err(CallError::Backend {
runtime: "wasm".into(),
message: "wasm capability 'env' grants ALL host environment variables (every \
secret) and is not allowed; grant specific variables with \
'env:VAR_NAME' instead"
.into(),
});
}
if let Some(name) = c.strip_prefix("env:") {
if name.is_empty() {
return Err(CallError::Backend {
runtime: "wasm".into(),
message: "wasm capability 'env:' requires a variable name (e.g. \
'env:STRIPE_API_BASE')"
.into(),
});
}
continue;
}
if !KNOWN_CAPABILITIES.contains(&c.as_str()) {
return Err(CallError::Backend {
runtime: "wasm".into(),
message: format!(
"unknown wasm capability '{c}'; allowed: {}, env:VAR_NAME",
KNOWN_CAPABILITIES.join(", ")
),
});
}
}
Ok(())
}
fn build_wasi_ctx(caps: &[String]) -> WasiCtx {
let mut b = WasiCtxBuilder::new();
for c in caps {
let c = c.as_str();
match c {
"args" => {
b.inherit_args();
}
"stdout" => {
b.inherit_stdout();
}
"stderr" => {
b.inherit_stderr();
}
"stdin" => {
b.inherit_stdin();
}
"network" | "sockets" => {
b.inherit_network();
b.allow_ip_name_lookup(true);
b.socket_addr_check(|addr, _use| {
let ok = !is_blocked_ip(&addr.ip());
Box::pin(async move { ok }) as Pin<Box<dyn Future<Output = bool> + Send + Sync>>
});
}
"clocks" | "random" => {}
"http" => {}
_ if c.starts_with("env:") => {
let name = &c["env:".len()..];
if let Ok(val) = std::env::var(name) {
b.env(name, val);
}
}
_ => {}
}
}
b.build()
}
fn is_blocked_ip(ip: &IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => {
v4.is_loopback()
|| v4.is_link_local()
|| v4.is_private()
|| v4.is_unspecified()
|| v4.is_broadcast()
}
IpAddr::V6(v6) => {
v6.is_loopback()
|| v6.is_unspecified()
|| (v6.segments()[0] & 0xffc0) == 0xfe80 || (v6.segments()[0] & 0xfe00) == 0xfc00 || v6
.to_ipv4_mapped()
.is_some_and(|m| is_blocked_ip(&IpAddr::V4(m)))
}
}
}
impl WasiView for HostState {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.ctx,
table: &mut self.table,
}
}
}
#[derive(Debug, Clone)]
pub struct WasmMethod {
pub name: String,
pub wire_raw: bool,
pub streaming: bool,
}
pub struct WasmComponentExecutor {
engine: Engine,
instance_pre: InstancePre<HostState>,
interface: String,
methods: Vec<WasmMethod>,
capabilities: Vec<String>,
egress: Option<Arc<dyn EgressPolicy>>,
info: PluginInfo,
}
impl WasmComponentExecutor {
pub fn from_component_bytes(
bytes: &[u8],
interface: String,
methods: Vec<WasmMethod>,
capabilities: Vec<String>,
info: PluginInfo,
) -> Result<Self, CallError> {
Self::from_component_bytes_with_egress(bytes, interface, methods, capabilities, None, info)
}
pub fn from_component_bytes_with_egress(
bytes: &[u8],
interface: String,
methods: Vec<WasmMethod>,
capabilities: Vec<String>,
egress: Option<Arc<dyn EgressPolicy>>,
info: PluginInfo,
) -> Result<Self, CallError> {
validate_capabilities(&capabilities)?;
let engine = Engine::default();
let component = Component::new(&engine, bytes).map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
Self::build(
engine,
&component,
interface,
methods,
capabilities,
egress,
info,
)
}
pub unsafe fn from_cwasm(
cwasm: &[u8],
interface: String,
methods: Vec<WasmMethod>,
capabilities: Vec<String>,
info: PluginInfo,
) -> Result<Self, CallError> {
validate_capabilities(&capabilities)?;
let engine = Engine::default();
let component = Component::deserialize(&engine, cwasm).map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
Self::build(
engine,
&component,
interface,
methods,
capabilities,
None,
info,
)
}
fn build(
engine: Engine,
component: &Component,
interface: String,
methods: Vec<WasmMethod>,
capabilities: Vec<String>,
egress: Option<Arc<dyn EgressPolicy>>,
info: PluginInfo,
) -> Result<Self, CallError> {
let mut linker: Linker<HostState> = Linker::new(&engine);
add_to_linker_sync(&mut linker).map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
let http_enabled = capabilities.iter().any(|c| c == "http") && egress.is_some();
if http_enabled {
add_only_http_to_linker_sync(&mut linker).map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
}
let instance_pre = linker
.instantiate_pre(component)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
Ok(Self {
engine,
instance_pre,
interface,
methods,
capabilities,
egress,
info,
})
}
fn instantiate(&self) -> Result<(Store<HostState>, wasmtime::component::Instance), CallError> {
let host = HostState {
ctx: build_wasi_ctx(&self.capabilities),
table: ResourceTable::new(),
http_ctx: WasiHttpCtx::new(),
hooks: EgressHooks {
policy: self.egress.clone(),
},
};
let mut store = Store::new(&self.engine, host);
let instance =
self.instance_pre
.instantiate(&mut store)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
Ok((store, instance))
}
fn func(
&self,
store: &mut Store<HostState>,
instance: &wasmtime::component::Instance,
name: &str,
) -> Result<wasmtime::component::Func, CallError> {
let (_, iface_idx) = instance
.get_export(&mut *store, None, &self.interface)
.ok_or_else(|| CallError::Backend {
runtime: "wasm".into(),
message: format!("component does not export interface '{}'", self.interface),
})?;
let (_, func_idx) = instance
.get_export(&mut *store, Some(&iface_idx), name)
.ok_or_else(|| CallError::Backend {
runtime: "wasm".into(),
message: format!("interface '{}' does not export '{name}'", self.interface),
})?;
instance
.get_func(&mut *store, func_idx)
.ok_or_else(|| CallError::Backend {
runtime: "wasm".into(),
message: format!("export '{name}' is not a function"),
})
}
fn method(&self, index: usize, want_raw: bool) -> Result<&WasmMethod, CallError> {
let m = self
.methods
.get(index)
.ok_or(CallError::InvalidMethodIndex {
index,
count: self.methods.len() as u32,
})?;
if m.wire_raw != want_raw {
return Err(CallError::WireModeMismatch {
method: m.name.clone(),
declared: m.wire_raw,
attempted: want_raw,
});
}
Ok(m)
}
pub fn interface_hash(&self) -> Result<u64, CallError> {
let (mut store, instance) = self.instantiate()?;
let func = self.func(&mut store, &instance, "fidius-interface-hash")?;
let mut out = [Val::U64(0)];
func.call(&mut store, &[], &mut out)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
match &out[0] {
Val::U64(h) => Ok(*h),
other => Err(CallError::Backend {
runtime: "wasm".into(),
message: format!("fidius-interface-hash returned non-u64: {other:?}"),
}),
}
}
}
impl PluginExecutor for WasmComponentExecutor {
fn info(&self) -> &PluginInfo {
&self.info
}
fn method_count(&self) -> u32 {
self.methods.len() as u32
}
fn call_raw(&self, method: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
let m = self.method(method, true)?.clone();
let (mut store, instance) = self.instantiate()?;
let func = self.func(&mut store, &instance, &m.name)?;
let typed =
func.typed::<(Vec<u8>,), (Vec<u8>,)>(&store)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: format!("raw method '{}' is not list<u8> -> list<u8>: {e}", m.name),
})?;
let (out,) = typed
.call(&mut store, (input.to_vec(),))
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
typed
.post_return(&mut store)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
Ok(out)
}
}
impl ValueExecutor for WasmComponentExecutor {
fn call(&self, method: usize, args: Value) -> Result<Value, CallError> {
let m = self.method(method, false)?.clone();
let (mut store, instance) = self.instantiate()?;
let func = self.func(&mut store, &instance, &m.name)?;
let params: Vec<Val> = match args {
Value::List(items) => items.iter().map(value_to_val).collect::<Result<_, _>>()?,
Value::Unit => Vec::new(),
single => vec![value_to_val(&single)?],
};
let mut out = [Val::Bool(false)];
func.call(&mut store, ¶ms, &mut out)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
if let Val::Result(Err(payload)) = &out[0] {
return Err(plugin_error_from_val(payload.as_deref()));
}
let ret = match &out[0] {
Val::Result(Ok(inner)) => inner.as_deref().map(val_to_value).unwrap_or(Value::Unit),
other => val_to_value(other),
};
Ok(ret)
}
}
#[cfg(feature = "streaming")]
const STREAM_CHANNEL_CAP: usize = 4;
#[cfg(feature = "streaming")]
#[async_trait::async_trait]
impl crate::stream::StreamExecutor for WasmComponentExecutor {
async fn call_streaming(
&self,
method: usize,
args: Value,
) -> Result<crate::stream::ChunkStream, CallError> {
let m = self.method(method, false)?.clone();
if !m.streaming {
return Err(CallError::Backend {
runtime: "wasm".into(),
message: format!("method '{}' is not a server-streaming method", m.name),
});
}
let (mut store, instance) = self.instantiate()?;
let params: Vec<Val> = match args {
Value::List(items) => items.iter().map(value_to_val).collect::<Result<_, _>>()?,
Value::Unit => Vec::new(),
single => vec![value_to_val(&single)?],
};
let start = self.func(&mut store, &instance, &m.name)?;
let mut out = [Val::Bool(false)];
start
.call(&mut store, ¶ms, &mut out)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
})?;
let resource = match out.into_iter().next() {
Some(Val::Resource(r)) => r,
other => {
return Err(CallError::Backend {
runtime: "wasm".into(),
message: format!(
"streaming method '{}' did not return a resource: {other:?}",
m.name
),
})
}
};
let next_name = format!("[method]{}-stream.next", m.name);
let next_func = self.func(&mut store, &instance, &next_name)?;
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value, CallError>>(STREAM_CHANNEL_CAP);
std::thread::spawn(move || {
loop {
let mut nout = [Val::Bool(false)];
if let Err(e) = next_func.call(&mut store, &[Val::Resource(resource)], &mut nout) {
let _ = tx.blocking_send(Err(CallError::Backend {
runtime: "wasm".into(),
message: e.to_string(),
}));
break;
}
let step: Option<Result<Value, CallError>> = match &nout[0] {
Val::Result(Ok(inner)) => match inner.as_deref() {
Some(Val::Option(Some(v))) => Some(Ok(val_to_value(v))),
Some(Val::Option(None)) | None => None,
Some(other) => Some(Ok(val_to_value(other))),
},
Val::Result(Err(payload)) => {
Some(Err(plugin_error_from_val(payload.as_deref())))
}
other => Some(Ok(val_to_value(other))),
};
match step {
None => break,
Some(item) => {
let is_err = item.is_err();
if tx.blocking_send(item).is_err() {
break;
}
if is_err {
break;
}
}
}
}
let _ = resource.resource_drop(&mut store);
drop(store);
});
let body = futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
});
Ok(crate::stream::ChunkStream::new(body))
}
}
fn plugin_error_from_val(payload: Option<&Val>) -> CallError {
use fidius_core::PluginError;
let mut code = "WASM_ERROR".to_string();
let mut message = String::new();
let mut details: Option<String> = None;
if let Some(Val::Record(fields)) = payload {
for (k, v) in fields {
match (k.as_str(), v) {
("code", Val::String(s)) => code = s.clone(),
("message", Val::String(s)) => message = s.clone(),
("details", Val::Option(Some(b))) => {
if let Val::String(s) = b.as_ref() {
details = Some(s.clone());
}
}
_ => {}
}
}
} else if let Some(other) = payload {
message = format!("{other:?}");
}
let mut err = PluginError::new(code, message);
if let Some(d) = details {
err.details = Some(d);
}
CallError::Plugin(err)
}
fn to_kebab(s: &str) -> String {
let mut out = String::new();
for (i, ch) in s.chars().enumerate() {
if ch == '_' {
out.push('-');
} else if ch.is_uppercase() {
if i != 0 {
out.push('-');
}
out.extend(ch.to_lowercase());
} else {
out.push(ch);
}
}
out
}
fn kebab_to_snake(s: &str) -> String {
s.replace('-', "_")
}
fn kebab_to_pascal(s: &str) -> String {
s.split('-')
.map(|seg| {
let mut c = seg.chars();
match c.next() {
Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
None => String::new(),
}
})
.collect()
}
fn value_to_val(v: &Value) -> Result<Val, CallError> {
Ok(match v {
Value::Bool(b) => Val::Bool(*b),
Value::S8(x) => Val::S8(*x),
Value::S16(x) => Val::S16(*x),
Value::S32(x) => Val::S32(*x),
Value::S64(x) => Val::S64(*x),
Value::U8(x) => Val::U8(*x),
Value::U16(x) => Val::U16(*x),
Value::U32(x) => Val::U32(*x),
Value::U64(x) => Val::U64(*x),
Value::F32(x) => Val::Float32(*x),
Value::F64(x) => Val::Float64(*x),
Value::Char(c) => Val::Char(*c),
Value::String(s) => Val::String(s.clone()),
Value::Bytes(b) => Val::List(b.iter().map(|x| Val::U8(*x)).collect()),
Value::List(items) => Val::List(items.iter().map(value_to_val).collect::<Result<_, _>>()?),
Value::Record(fields) => Val::Record(
fields
.iter()
.map(|(k, v)| Ok::<_, CallError>((to_kebab(k), value_to_val(v)?)))
.collect::<Result<_, _>>()?,
),
Value::Option(None) => Val::Option(None),
Value::Option(Some(inner)) => Val::Option(Some(Box::new(value_to_val(inner)?))),
Value::Variant { name, value } => {
let payload = match value.as_ref() {
Value::Unit => None,
other => Some(Box::new(value_to_val(other)?)),
};
Val::Variant(to_kebab(name), payload)
}
Value::Unit => Val::Tuple(Vec::new()),
Value::Map(_) => {
return Err(CallError::Serialization(
"non-string-keyed maps are not yet supported across the WASM boundary".into(),
))
}
})
}
fn val_to_value(v: &Val) -> Value {
match v {
Val::Bool(b) => Value::Bool(*b),
Val::S8(x) => Value::S8(*x),
Val::S16(x) => Value::S16(*x),
Val::S32(x) => Value::S32(*x),
Val::S64(x) => Value::S64(*x),
Val::U8(x) => Value::U8(*x),
Val::U16(x) => Value::U16(*x),
Val::U32(x) => Value::U32(*x),
Val::U64(x) => Value::U64(*x),
Val::Float32(x) => Value::F32(*x),
Val::Float64(x) => Value::F64(*x),
Val::Char(c) => Value::Char(*c),
Val::String(s) => Value::String(s.clone()),
Val::List(items) => Value::List(items.iter().map(val_to_value).collect()),
Val::Record(fields) => Value::Record(
fields
.iter()
.map(|(k, v)| (kebab_to_snake(k), val_to_value(v)))
.collect(),
),
Val::Tuple(items) => Value::List(items.iter().map(val_to_value).collect()),
Val::Option(None) => Value::Option(None),
Val::Option(Some(inner)) => Value::Option(Some(Box::new(val_to_value(inner)))),
Val::Variant(name, payload) => Value::Variant {
name: kebab_to_pascal(name),
value: Box::new(payload.as_deref().map(val_to_value).unwrap_or(Value::Unit)),
},
Val::Enum(name) => Value::Variant {
name: kebab_to_pascal(name),
value: Box::new(Value::Unit),
},
Val::Result(Ok(inner)) => inner.as_deref().map(val_to_value).unwrap_or(Value::Unit),
Val::Result(Err(inner)) => inner.as_deref().map(val_to_value).unwrap_or(Value::Unit),
other => Value::String(format!("{other:?}")),
}
}
pub fn validate_component(bytes: &[u8]) -> Result<(), CallError> {
let engine = Engine::default();
Component::new(&engine, bytes)
.map(|_| ())
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: format!("not a valid WASM component: {e}"),
})
}
pub fn precompile_component(bytes: &[u8]) -> Result<Vec<u8>, CallError> {
let engine = Engine::default();
engine
.precompile_component(bytes)
.map_err(|e| CallError::Backend {
runtime: "wasm".into(),
message: format!("failed to precompile component: {e}"),
})
}
#[cfg(test)]
mod ssrf_tests {
use super::is_blocked_ip;
use std::net::IpAddr;
fn ip(s: &str) -> IpAddr {
s.parse().unwrap()
}
#[test]
fn blocks_internal_and_metadata_targets() {
for s in [
"169.254.169.254", "127.0.0.1",
"::1",
"10.1.2.3",
"172.16.5.5",
"192.168.1.1",
"0.0.0.0",
"fe80::1", "fc00::1", "::ffff:127.0.0.1", ] {
assert!(is_blocked_ip(&ip(s)), "{s} must be blocked");
}
}
#[test]
fn allows_public_targets() {
for s in [
"1.1.1.1",
"93.184.216.34",
"8.8.8.8",
"2606:4700:4700::1111",
] {
assert!(!is_blocked_ip(&ip(s)), "{s} must be allowed");
}
}
}