use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use alloy::primitives::{keccak256, FixedBytes};
use async_trait::async_trait;
use tracing::{debug, info, warn};
use wasmtime::{
component::{Component, HasSelf, Linker},
Config, Engine, Store,
};
use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
use crate::{
error::EnclaveError,
protocol::{EgressRequest, EgressResponse, EgressWireMessage, WasmPluginError, WasmPluginInput, WasmPluginOutput},
};
#[allow(missing_docs, missing_debug_implementations)]
mod bindings {
wasmtime::component::bindgen!({
path: "wit",
imports: { default: async },
exports: { default: async },
});
}
use bindings::*;
#[async_trait]
pub trait EgressClient: Send + Sync {
async fn fetch(&self, req: EgressRequest) -> Result<EgressResponse, String>;
}
#[derive(Debug, Clone, Copy)]
pub enum EgressMode {
Tcp(u16),
Vsock {
cid: u32,
port: u32,
},
}
#[derive(Debug)]
pub struct TcpEgressClient {
addr: String,
stream: tokio::sync::Mutex<Option<tokio::net::TcpStream>>,
}
impl TcpEgressClient {
pub fn new(port: u16) -> Self {
Self {
addr: format!("127.0.0.1:{port}"),
stream: tokio::sync::Mutex::new(None),
}
}
}
#[async_trait]
impl EgressClient for TcpEgressClient {
async fn fetch(&self, req: EgressRequest) -> Result<EgressResponse, String> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut guard = self.stream.lock().await;
let stream = match guard.as_mut() {
Some(s) => s,
None => {
let s = tokio::net::TcpStream::connect(&self.addr)
.await
.map_err(|e| format!("egress connect failed: {e}"))?;
guard.insert(s)
}
};
let msg = EgressWireMessage::Request(req);
let encoded = bincode::serde::encode_to_vec(&msg, bincode::config::standard())
.map_err(|e| format!("egress encode failed: {e}"))?;
stream
.write_u32(encoded.len() as u32)
.await
.map_err(|e| format!("egress write len failed: {e}"))?;
stream
.write_all(&encoded)
.await
.map_err(|e| format!("egress write failed: {e}"))?;
stream.flush().await.map_err(|e| format!("egress flush failed: {e}"))?;
let resp_len = stream
.read_u32()
.await
.map_err(|e| format!("egress read len failed: {e}"))? as usize;
if resp_len > crate::protocol::MAX_FRAME_LEN {
return Err(format!("egress response frame too large: {resp_len}"));
}
let mut buf = vec![0u8; resp_len];
stream
.read_exact(&mut buf)
.await
.map_err(|e| format!("egress read failed: {e}"))?;
let (resp_msg, _): (EgressWireMessage, _) =
bincode::serde::decode_from_slice(&buf, bincode::config::standard())
.map_err(|e| format!("egress decode failed: {e}"))?;
match resp_msg {
EgressWireMessage::Response(resp) => Ok(resp),
EgressWireMessage::Error(err) => Err(err.message),
EgressWireMessage::Request(_) => Err("unexpected request from egress proxy".to_string()),
}
}
}
#[cfg(target_os = "linux")]
#[derive(Debug)]
pub struct VsockEgressClient {
cid: u32,
port: u32,
stream: tokio::sync::Mutex<Option<tokio_vsock::VsockStream>>,
}
#[cfg(target_os = "linux")]
impl VsockEgressClient {
pub fn new(cid: u32, port: u32) -> Self {
Self {
cid,
port,
stream: tokio::sync::Mutex::new(None),
}
}
}
#[cfg(target_os = "linux")]
#[async_trait]
impl EgressClient for VsockEgressClient {
async fn fetch(&self, req: EgressRequest) -> Result<EgressResponse, String> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut guard = self.stream.lock().await;
let stream = match guard.as_mut() {
Some(s) => s,
None => {
let addr = tokio_vsock::VsockAddr::new(self.cid, self.port);
let s = tokio_vsock::VsockStream::connect(addr)
.await
.map_err(|e| format!("egress vsock connect failed (cid={} port={}): {e}", self.cid, self.port))?;
guard.insert(s)
}
};
let msg = EgressWireMessage::Request(req);
let encoded = bincode::serde::encode_to_vec(&msg, bincode::config::standard())
.map_err(|e| format!("egress encode failed: {e}"))?;
stream
.write_u32(encoded.len() as u32)
.await
.map_err(|e| format!("egress write len failed: {e}"))?;
stream
.write_all(&encoded)
.await
.map_err(|e| format!("egress write failed: {e}"))?;
stream.flush().await.map_err(|e| format!("egress flush failed: {e}"))?;
let resp_len = stream
.read_u32()
.await
.map_err(|e| format!("egress read len failed: {e}"))? as usize;
if resp_len > crate::protocol::MAX_FRAME_LEN {
return Err(format!("egress response frame too large: {resp_len}"));
}
let mut buf = vec![0u8; resp_len];
stream
.read_exact(&mut buf)
.await
.map_err(|e| format!("egress read failed: {e}"))?;
let (resp_msg, _): (EgressWireMessage, _) =
bincode::serde::decode_from_slice(&buf, bincode::config::standard())
.map_err(|e| format!("egress decode failed: {e}"))?;
match resp_msg {
EgressWireMessage::Response(resp) => Ok(resp),
EgressWireMessage::Error(err) => Err(err.message),
EgressWireMessage::Request(_) => Err("unexpected request from egress proxy".to_string()),
}
}
}
struct BatchAwareEgressClient {
inner: Box<dyn EgressClient>,
batch_counter: Arc<AtomicU32>,
}
impl BatchAwareEgressClient {
fn new(inner: Box<dyn EgressClient>, batch_counter: Arc<AtomicU32>) -> Self {
Self { inner, batch_counter }
}
}
#[async_trait]
impl EgressClient for BatchAwareEgressClient {
async fn fetch(&self, req: EgressRequest) -> Result<EgressResponse, String> {
let count = self.batch_counter.fetch_add(1, Ordering::Relaxed);
if count >= MAX_BATCH_EGRESS_CALLS {
self.batch_counter.fetch_sub(1, Ordering::Relaxed);
return Err(format!("batch egress budget exceeded ({MAX_BATCH_EGRESS_CALLS} calls)"));
}
self.inner.fetch(req).await
}
}
fn make_egress_client(mode: &EgressMode) -> Box<dyn EgressClient> {
match mode {
EgressMode::Tcp(port) => Box::new(TcpEgressClient::new(*port)),
#[cfg(target_os = "linux")]
EgressMode::Vsock { cid, port } => Box::new(VsockEgressClient::new(*cid, *port)),
#[cfg(not(target_os = "linux"))]
EgressMode::Vsock { .. } => {
panic!("vsock egress is only supported on Linux (Nitro enclaves)")
}
}
}
pub const MAX_PLUGINS_PER_BATCH: usize = 32;
pub const MAX_WASM_BYTES_PER_PLUGIN: usize = 8 * 1024 * 1024;
const MAX_BATCH_EGRESS_CALLS: u32 = 100;
const PLUGIN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
#[allow(missing_debug_implementations)]
struct ComponentCache {
cache: moka::future::Cache<FixedBytes<32>, Arc<Component>>,
engine: Engine,
}
impl ComponentCache {
fn new(engine: Engine, max_entries: u64) -> Self {
Self {
cache: moka::future::Cache::builder().max_capacity(max_entries).build(),
engine,
}
}
async fn get_or_compile(
&self,
wasm_bytes: &[u8],
expected_hash: &FixedBytes<32>,
) -> Result<Arc<Component>, EnclaveError> {
let actual_hash = keccak256(wasm_bytes);
if actual_hash != *expected_hash {
return Err(EnclaveError::InvalidRequest(format!(
"WASM integrity check failed: keccak256(wasm_bytes)={actual_hash} != expected={expected_hash}"
)));
}
self.cache
.try_get_with(actual_hash, async {
let engine = self.engine.clone();
let wasm_owned = wasm_bytes.to_vec();
let hash_for_log = actual_hash;
info!(wasm_hash = %hash_for_log, wasm_size = wasm_owned.len(), "compiling WASM component");
let start = std::time::Instant::now();
let component = tokio::task::spawn_blocking(move || Component::new(&engine, &wasm_owned))
.await
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("WASM compilation task panicked: {e}")))?
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("WASM compilation failed: {e}")))?;
info!(
wasm_hash = %hash_for_log,
compile_ms = start.elapsed().as_millis() as u64,
"WASM component compiled and cached"
);
Ok(Arc::new(component))
})
.await
.map_err(|e: Arc<EnclaveError>| EnclaveError::PolicyEvalFailed(e.to_string()))
}
}
#[allow(missing_debug_implementations)]
struct HttpProvider {
egress: Arc<dyn EgressClient>,
call_count: AtomicU32,
max_calls: u32,
}
impl newton::provider::http::Host for HttpProvider {
async fn fetch(
&mut self,
request: newton::provider::http::HttpRequest,
) -> Result<newton::provider::http::HttpResponse, String> {
let count = self.call_count.fetch_add(1, Ordering::Relaxed);
if count >= self.max_calls {
return Err(format!("exceeded maximum HTTP calls limit of {}", self.max_calls));
}
let egress_req = EgressRequest {
url: request.url,
method: request.method,
headers: request.headers,
body: request.body,
};
let resp = self.egress.fetch(egress_req).await?;
Ok(newton::provider::http::HttpResponse {
status: resp.status,
headers: resp.headers,
body: resp.body,
})
}
}
#[derive(Debug)]
struct SecretsProvider {
secrets_json: Option<zeroize::Zeroizing<Vec<u8>>>,
}
impl newton::provider::secrets::Host for SecretsProvider {
async fn get(&mut self) -> Result<newton::provider::secrets::SecretResponse, String> {
match &self.secrets_json {
Some(bytes) => Ok(newton::provider::secrets::SecretResponse { value: bytes.to_vec() }),
None => Err("no secrets available for this task".to_string()),
}
}
}
#[derive(Debug)]
struct TlsnProvider;
impl newton::provider::tlsn::Host for TlsnProvider {
async fn verify_from_cid(&mut self, _proof_cid: String) -> Result<newton::provider::tlsn::VerifiedData, String> {
Err("TLSNotary verification not available inside enclave".to_string())
}
async fn verify(&mut self, _presentation_bytes: Vec<u8>) -> Result<newton::provider::tlsn::VerifiedData, String> {
Err("TLSNotary verification not available inside enclave".to_string())
}
}
#[allow(missing_debug_implementations)]
struct WasmHostCtx {
table: ResourceTable,
wasi: WasiCtx,
http: HttpProvider,
secrets: SecretsProvider,
tlsn: TlsnProvider,
}
impl WasiView for WasmHostCtx {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.wasi,
table: &mut self.table,
}
}
}
pub struct WasmExecutor {
engine: Engine,
cache: ComponentCache,
fuel: u64,
}
impl std::fmt::Debug for WasmExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WasmExecutor")
.field("cache_entries", &self.cache.cache.entry_count())
.field("fuel", &self.fuel)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct WasmExecutorConfig {
pub fuel: u64,
pub max_wasm_stack: usize,
pub max_cache_entries: u64,
}
impl Default for WasmExecutorConfig {
fn default() -> Self {
Self {
fuel: 100_000_000,
max_wasm_stack: 64 * 1024 * 1024,
max_cache_entries: 32,
}
}
}
impl WasmExecutor {
pub fn new(config: WasmExecutorConfig) -> Result<Self, EnclaveError> {
let mut wasm_config = Config::new();
wasm_config.wasm_component_model(true);
wasm_config.async_support(true);
wasm_config.consume_fuel(config.fuel > 0);
wasm_config.max_wasm_stack(config.max_wasm_stack);
wasm_config.async_stack_size(config.max_wasm_stack.saturating_mul(2));
let engine = Engine::new(&wasm_config)
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("failed to create WASM engine: {e}")))?;
let cache = ComponentCache::new(engine.clone(), config.max_cache_entries);
Ok(Self {
engine,
cache,
fuel: config.fuel,
})
}
pub async fn execute(
&self,
wasm_bytes: &[u8],
wasm_code_hash: &FixedBytes<32>,
input: &str,
egress: Arc<dyn EgressClient>,
secrets_json: Option<zeroize::Zeroizing<Vec<u8>>>,
max_http_calls: u32,
) -> Result<Result<String, String>, EnclaveError> {
let component = self.cache.get_or_compile(wasm_bytes, wasm_code_hash).await?;
let ctx = WasmHostCtx {
table: ResourceTable::new(),
wasi: WasiCtxBuilder::new().build(),
http: HttpProvider {
egress,
call_count: AtomicU32::new(0),
max_calls: max_http_calls,
},
secrets: SecretsProvider { secrets_json },
tlsn: TlsnProvider,
};
let mut store = Store::new(&self.engine, ctx);
if self.fuel > 0 {
store
.set_fuel(self.fuel)
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("failed to set fuel: {e}")))?;
}
let mut linker = Linker::new(&self.engine);
linker.allow_shadowing(true);
wasmtime_wasi::p2::add_to_linker_async(&mut linker)
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("failed to add WASI to linker: {e}")))?;
newton::provider::http::add_to_linker::<WasmHostCtx, HasSelf<HttpProvider>>(
&mut linker,
|ctx: &mut WasmHostCtx| &mut ctx.http,
)
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("failed to add http to linker: {e}")))?;
newton::provider::secrets::add_to_linker::<WasmHostCtx, HasSelf<SecretsProvider>>(
&mut linker,
|ctx: &mut WasmHostCtx| &mut ctx.secrets,
)
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("failed to add secrets to linker: {e}")))?;
newton::provider::tlsn::add_to_linker::<WasmHostCtx, HasSelf<TlsnProvider>>(
&mut linker,
|ctx: &mut WasmHostCtx| &mut ctx.tlsn,
)
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("failed to add tlsn to linker: {e}")))?;
let newton_provider = NewtonProvider::instantiate_async(&mut store, &component, &linker)
.await
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("failed to instantiate WASM component: {e}")))?;
let result = newton_provider
.call_run(&mut store, input)
.await
.map_err(|e| EnclaveError::PolicyEvalFailed(format!("WASM execution failed: {e}")))?;
Ok(result)
}
pub async fn execute_batch(
self: &Arc<Self>,
plugins: Vec<WasmPluginInput>,
egress_mode: EgressMode,
hpke_sk: Arc<newton_core::crypto::HpkePrivateKey>,
max_concurrent: usize,
) -> Vec<WasmPluginOutput> {
let plugin_count = plugins.len();
if plugin_count == 0 {
return vec![];
}
let capped_count = plugin_count.min(MAX_PLUGINS_PER_BATCH);
if plugin_count > MAX_PLUGINS_PER_BATCH {
warn!(
requested = plugin_count,
max = MAX_PLUGINS_PER_BATCH,
"plugin count exceeds maximum, truncating"
);
}
let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent.max(1)));
let batch_egress_counter = Arc::new(AtomicU32::new(0));
let mut handles = Vec::with_capacity(capped_count);
for (idx, plugin) in plugins.into_iter().take(capped_count).enumerate() {
if plugin.wasm_bytes.len() > MAX_WASM_BYTES_PER_PLUGIN {
warn!(
idx,
size = plugin.wasm_bytes.len(),
max = MAX_WASM_BYTES_PER_PLUGIN,
"plugin WASM binary exceeds size limit"
);
handles.push(tokio::spawn(async move {
(
idx,
WasmPluginOutput {
result: Err(WasmPluginError::OversizedBinary(format!(
"WASM binary size {} exceeds limit {}",
plugin.wasm_bytes.len(),
MAX_WASM_BYTES_PER_PLUGIN
))),
},
)
}));
continue;
}
let sem = Arc::clone(&semaphore);
let exec = Arc::clone(self);
let hpke_sk = Arc::clone(&hpke_sk);
let batch_counter = Arc::clone(&batch_egress_counter);
handles.push(tokio::spawn(async move {
let _permit = match sem.acquire().await {
Ok(p) => p,
Err(_) => {
return (
idx,
WasmPluginOutput {
result: Err(WasmPluginError::ResourceExhausted("WASM semaphore closed".to_string())),
},
);
}
};
let result = tokio::time::timeout(PLUGIN_TIMEOUT, async {
let secrets_json = match decrypt_secrets(plugin.encrypted_secrets.as_ref(), &hpke_sk) {
Ok(json) => json,
Err(e) => {
return WasmPluginOutput {
result: Err(WasmPluginError::SecretDecryptFailed(format!("{e}"))),
};
}
};
let remaining = MAX_BATCH_EGRESS_CALLS.saturating_sub(batch_counter.load(Ordering::Relaxed));
let effective_max = plugin.max_http_calls.min(remaining);
if effective_max == 0 {
return WasmPluginOutput {
result: Err(WasmPluginError::ResourceExhausted(
"batch egress budget exhausted".to_string(),
)),
};
}
let egress: Arc<dyn EgressClient> = Arc::new(BatchAwareEgressClient::new(
make_egress_client(&egress_mode),
Arc::clone(&batch_counter),
));
match exec
.execute(
&plugin.wasm_bytes,
&plugin.wasm_code_hash,
&plugin.wasm_args,
egress,
secrets_json,
effective_max,
)
.await
{
Ok(wasm_result) => WasmPluginOutput {
result: wasm_result.map_err(WasmPluginError::ExecutionFailed),
},
Err(e) => WasmPluginOutput {
result: Err(WasmPluginError::CompilationFailed(format!("{e}"))),
},
}
})
.await;
let output = match result {
Ok(o) => o,
Err(_) => WasmPluginOutput {
result: Err(WasmPluginError::Timeout(format!(
"plugin timed out after {}s",
PLUGIN_TIMEOUT.as_secs()
))),
},
};
(idx, output)
}));
}
let joined = futures::future::join_all(handles).await;
let mut results: Vec<Option<WasmPluginOutput>> = vec![None; capped_count];
for join_result in joined {
match join_result {
Ok((idx, output)) => {
results[idx] = Some(output);
}
Err(e) => {
warn!(error = %e, "WASM batch task join failed (panic in spawned task)");
}
}
}
results
.into_iter()
.map(|r| {
r.unwrap_or(WasmPluginOutput {
result: Err(WasmPluginError::Cancelled(
"WASM task panicked or was cancelled".to_string(),
)),
})
})
.collect()
}
}
fn decrypt_secrets(
envelope: Option<&newton_core::crypto::SecureEnvelope>,
hpke_sk: &newton_core::crypto::HpkePrivateKey,
) -> Result<Option<zeroize::Zeroizing<Vec<u8>>>, EnclaveError> {
match envelope {
Some(env) => env
.open(hpke_sk)
.map(Some)
.map_err(|e| EnclaveError::DecryptFailed(format!("secrets envelope: {e}"))),
None => Ok(None),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_engine() -> Engine {
let mut c = Config::new();
c.wasm_component_model(true);
c.async_support(true);
Engine::new(&c).unwrap()
}
#[test]
fn integrity_check_rejects_mismatched_hash() {
let rt = tokio::runtime::Runtime::new().unwrap();
let cache = ComponentCache::new(test_engine(), 32);
let wasm_bytes = b"not real wasm";
let wrong_hash = FixedBytes::ZERO;
rt.block_on(async {
match cache.get_or_compile(wasm_bytes, &wrong_hash).await {
Err(e) => assert!(e.to_string().contains("integrity check failed")),
Ok(_) => panic!("should fail with mismatched hash"),
}
});
}
#[test]
fn correct_hash_passes_integrity_but_compilation_fails() {
let rt = tokio::runtime::Runtime::new().unwrap();
let cache = ComponentCache::new(test_engine(), 32);
let wasm_bytes = b"test wasm bytes";
let correct_hash = keccak256(wasm_bytes);
rt.block_on(async {
match cache.get_or_compile(wasm_bytes, &correct_hash).await {
Err(e) => assert!(
!e.to_string().contains("integrity check failed"),
"error should be about compilation, not integrity: {e}"
),
Ok(_) => panic!("should fail compilation on invalid WASM"),
}
});
}
#[test]
fn executor_config_defaults_match_host() {
let config = WasmExecutorConfig::default();
assert_eq!(config.fuel, 100_000_000);
assert_eq!(config.max_wasm_stack, 64 * 1024 * 1024);
}
#[test]
fn secrets_decryption_round_trips() {
let (sk, pk) = newton_core::crypto::generate_keypair();
let secrets_json = br#"{"api_key":"sk-test-123","endpoint":"https://api.example.com"}"#;
let policy_client = "0x0000000000000000000000000000000000000001";
let chain_id = 31337u64;
let envelope =
newton_core::crypto::SecureEnvelope::seal(secrets_json, policy_client, chain_id, &pk, &[0x11; 32])
.unwrap();
let decrypted = decrypt_secrets(Some(&envelope), &sk).unwrap().unwrap();
assert_eq!(&*decrypted, secrets_json);
}
#[tokio::test]
async fn egress_client_connects_and_round_trips() {
use crate::protocol::EgressWireMessage;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let len = stream.read_u32().await.unwrap() as usize;
let mut buf = vec![0u8; len];
stream.read_exact(&mut buf).await.unwrap();
let resp = EgressWireMessage::Response(crate::protocol::EgressResponse {
status: 200,
headers: vec![],
body: br#"{"result":"ok"}"#.to_vec(),
});
let encoded = bincode::serde::encode_to_vec(&resp, bincode::config::standard()).unwrap();
stream.write_u32(encoded.len() as u32).await.unwrap();
stream.write_all(&encoded).await.unwrap();
stream.flush().await.unwrap();
});
let client = TcpEgressClient::new(port);
let req = crate::protocol::EgressRequest {
url: "https://api.example.com/v1/data".to_string(),
method: "GET".to_string(),
headers: vec![],
body: None,
};
let resp = client.fetch(req).await.unwrap();
assert_eq!(resp.status, 200);
assert_eq!(resp.body, br#"{"result":"ok"}"#);
}
#[tokio::test]
async fn execute_wasm_plugin_with_secrets() {
let wasm_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../../integration-tests/fixtures/test_wasm_plugin.wasm");
assert!(
wasm_path.exists(),
"WASM fixture not found at {wasm_path:?} — build with: cargo component build --release --manifest-path integration-tests/test-wasm-plugin/Cargo.toml"
);
let wasm_bytes = std::fs::read(&wasm_path).unwrap();
let wasm_code_hash = keccak256(&wasm_bytes);
let executor = WasmExecutor::new(WasmExecutorConfig::default()).unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let egress: Arc<dyn EgressClient> = Arc::new(TcpEgressClient::new(port));
let secrets_json = br#"{"api_key":"sk-test-12345","endpoint":"https://api.example.com/v1"}"#;
let secrets = zeroize::Zeroizing::new(secrets_json.to_vec());
let input = r#"{"query":"test_input"}"#;
let result = executor
.execute(&wasm_bytes, &wasm_code_hash, input, egress, Some(secrets), 10)
.await
.unwrap();
match result {
Ok(json_str) => {
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert!(parsed.get("secrets").is_some(), "output should contain secrets");
let secrets_val = &parsed["secrets"];
assert_eq!(
secrets_val["api_key"].as_str().unwrap(),
"sk-test-12345",
"secrets should contain the decrypted api_key"
);
}
Err(e) => panic!("WASM plugin should succeed, got error: {e}"),
}
}
#[test]
fn batch_execution_with_invalid_wasm_returns_per_plugin_errors() {
let rt = tokio::runtime::Runtime::new().unwrap();
let executor = Arc::new(WasmExecutor::new(WasmExecutorConfig::default()).unwrap());
let (sk, pk) = newton_core::crypto::generate_keypair();
let secrets_json = br#"{"key":"value"}"#;
let envelope = newton_core::crypto::SecureEnvelope::seal(
secrets_json,
"0x0000000000000000000000000000000000000001",
31337,
&pk,
&[0x22; 32],
)
.unwrap();
let plugin = crate::protocol::WasmPluginInput {
wasm_bytes: b"not valid wasm".to_vec(),
wasm_code_hash: keccak256(b"not valid wasm"),
wasm_args: r#"{"input":"test"}"#.to_string(),
encrypted_secrets: Some(envelope),
max_http_calls: 5,
};
rt.block_on(async {
let results = executor
.execute_batch(vec![plugin], EgressMode::Tcp(0), Arc::new(sk), 4)
.await;
assert_eq!(results.len(), 1);
let err = results[0].result.as_ref().unwrap_err();
assert!(
matches!(err, crate::protocol::WasmPluginError::CompilationFailed(_)),
"expected WASM compilation error, got: {err}"
);
});
}
}