use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{ContainerId, ContainerState, Runtime};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::instrument;
use wasmtime::component::{Component, Linker as ComponentLinker, ResourceTable};
use wasmtime::{Config, Engine, Linker, Module, Store, StoreLimits, StoreLimitsBuilder};
use wasmtime_wasi::p1::{self, WasiP1Ctx};
#[allow(unused_imports)]
use wasmtime_wasi::p2::pipe::{MemoryInputPipe, MemoryOutputPipe};
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_registry::{detect_wasm_version_from_binary, WasiVersion};
use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec, StorageSpec, WasmCapabilities};
#[allow(dead_code)]
const STDIO_PIPE_CAPACITY: usize = 1024 * 1024;
#[derive(Debug, Clone)]
struct ExecutionResult {
exit_code: i32,
stdout: Vec<u8>,
stderr: Vec<u8>,
}
fn configure_wasi_mounts(
wasi_builder: &mut WasiCtxBuilder,
mounts: &[StorageSpec],
) -> std::result::Result<(), String> {
for mount in mounts {
match mount {
StorageSpec::Bind {
source,
target,
readonly,
} => {
let (dir_perms, file_perms) = if *readonly {
(DirPerms::READ, FilePerms::READ)
} else {
(DirPerms::all(), FilePerms::all())
};
wasi_builder
.preopened_dir(source, target, dir_perms, file_perms)
.map_err(|e| {
format!("failed to preopen bind mount '{source}' -> '{target}': {e}")
})?;
tracing::debug!(
source = %source,
target = %target,
readonly = %readonly,
"configured bind mount for WASM"
);
}
StorageSpec::Named {
name,
target,
readonly,
..
} => {
let volume_path = zlayer_paths::ZLayerDirs::system_default()
.volumes()
.join(name)
.to_string_lossy()
.into_owned();
let (dir_perms, file_perms) = if *readonly {
(DirPerms::READ, FilePerms::READ)
} else {
(DirPerms::all(), FilePerms::all())
};
wasi_builder
.preopened_dir(&volume_path, target, dir_perms, file_perms)
.map_err(|e| {
format!("failed to preopen named volume '{name}' at '{target}': {e}")
})?;
tracing::debug!(
name = %name,
volume_path = %volume_path,
target = %target,
readonly = %readonly,
"configured named volume for WASM"
);
}
StorageSpec::Tmpfs { target, .. } => {
tracing::warn!(
target = %target,
"tmpfs storage not supported for WASM, skipping"
);
}
StorageSpec::Anonymous { target, .. } => {
tracing::warn!(
target = %target,
"anonymous storage not supported for WASM, skipping"
);
}
StorageSpec::S3 { bucket, target, .. } => {
tracing::warn!(
bucket = %bucket,
target = %target,
"S3 storage not supported for WASM, skipping"
);
}
}
}
Ok(())
}
pub(super) fn parse_memory_limit(s: &str) -> std::result::Result<u64, String> {
let s = s.trim();
if s.len() < 3 {
return Err(format!("invalid memory format: {s}"));
}
let (num_str, suffix) = s.split_at(s.len() - 2);
let num: u64 = num_str
.parse()
.map_err(|e| format!("invalid number in memory limit '{s}': {e}"))?;
match suffix {
"Ki" => Ok(num.saturating_mul(1024)),
"Mi" => Ok(num.saturating_mul(1024 * 1024)),
"Gi" => Ok(num.saturating_mul(1024 * 1024 * 1024)),
"Ti" => Ok(num.saturating_mul(1024 * 1024 * 1024 * 1024)),
_ => Err(format!("unknown memory suffix '{suffix}' in '{s}'")),
}
}
fn build_store_limits(spec_wasm: Option<&zlayer_spec::WasmConfig>) -> Option<StoreLimits> {
let wasm = spec_wasm?;
let max_memory = wasm.max_memory.as_ref()?;
let max_bytes = match parse_memory_limit(max_memory) {
Ok(b) => b,
Err(e) => {
tracing::warn!(max_memory = %max_memory, error = %e, "ignoring invalid max_memory");
return None;
}
};
let limits = StoreLimitsBuilder::new()
.memory_size(
#[allow(clippy::cast_possible_truncation)]
{
max_bytes as usize
},
)
.build();
tracing::info!(max_bytes = max_bytes, "WASM store memory limit configured");
Some(limits)
}
#[derive(Debug, Clone)]
struct ResourceLimits {
store_limits: Option<StoreLimits>,
max_fuel: u64,
epoch_interval: Option<Duration>,
}
impl ResourceLimits {
fn from_spec(spec_wasm: Option<&zlayer_spec::WasmConfig>) -> Self {
let store_limits = build_store_limits(spec_wasm);
let max_fuel = spec_wasm.map_or(0, |w| w.max_fuel);
let epoch_interval = spec_wasm.and_then(|w| w.epoch_interval);
Self {
store_limits,
max_fuel,
epoch_interval,
}
}
}
struct WasiP1State {
wasi: WasiP1Ctx,
limiter: StoreLimits,
}
struct WasiP2State {
ctx: WasiCtx,
table: ResourceTable,
limiter: StoreLimits,
}
impl WasiView for WasiP2State {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.ctx,
table: &mut self.table,
}
}
}
#[derive(Debug, Clone)]
pub struct WasmConfig {
pub cache_dir: PathBuf,
pub enable_epochs: bool,
pub epoch_deadline: u64,
pub max_execution_time: Duration,
pub cache_type: Option<zlayer_registry::CacheType>,
}
impl Default for WasmConfig {
fn default() -> Self {
Self {
cache_dir: std::env::var("ZLAYER_WASM_CACHE_DIR").map_or_else(
|_| zlayer_paths::ZLayerDirs::system_default().wasm(),
PathBuf::from,
),
enable_epochs: true,
epoch_deadline: 1_000_000, max_execution_time: Duration::from_secs(3600), cache_type: None,
}
}
}
#[derive(Debug, Clone)]
enum InstanceState {
Pending,
Running {
#[allow(dead_code)]
started_at: Instant,
},
Completed { exit_code: i32 },
Failed { reason: String },
}
struct WasmInstance {
state: InstanceState,
image: String,
module_bytes: Vec<u8>,
wasi_version: WasiVersion,
stdout: Vec<u8>,
stderr: Vec<u8>,
env_vars: Vec<(String, String)>,
args: Vec<String>,
mounts: Vec<StorageSpec>,
resource_limits: ResourceLimits,
capabilities: Option<WasmCapabilities>,
execution_handle: Option<tokio::task::JoinHandle<std::result::Result<ExecutionResult, String>>>,
}
impl std::fmt::Debug for WasmInstance {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WasmInstance")
.field("state", &self.state)
.field("image", &self.image)
.field("wasi_version", &self.wasi_version)
.field("stdout_len", &self.stdout.len())
.field("stderr_len", &self.stderr.len())
.finish_non_exhaustive()
}
}
pub struct WasmRuntime {
engine: Engine,
config: WasmConfig,
registry: Arc<zlayer_registry::ImagePuller>,
auth_resolver: zlayer_core::AuthResolver,
instances: RwLock<HashMap<String, WasmInstance>>,
}
impl std::fmt::Debug for WasmRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WasmRuntime")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl WasmRuntime {
pub async fn new(
config: WasmConfig,
_auth_ctx: Option<crate::runtime::ContainerAuthContext>,
) -> Result<Self> {
tokio::fs::create_dir_all(&config.cache_dir)
.await
.map_err(|e| AgentError::CreateFailed {
id: "wasm-runtime".to_string(),
reason: format!("failed to create cache directory: {e}"),
})?;
let mut engine_config = Config::new();
if config.enable_epochs {
engine_config.epoch_interruption(true);
}
engine_config.consume_fuel(true);
let engine = Engine::new(&engine_config).map_err(|e| AgentError::CreateFailed {
id: "wasm-runtime".to_string(),
reason: format!("failed to create wasmtime engine: {e}"),
})?;
let blob_cache = if let Some(cache_type) = &config.cache_type {
cache_type
.build()
.await
.map_err(|e| AgentError::CreateFailed {
id: "wasm-runtime".to_string(),
reason: format!("failed to open blob cache: {e}"),
})?
} else {
let cache_type =
zlayer_registry::CacheType::from_env().map_err(|e| AgentError::CreateFailed {
id: "wasm-runtime".to_string(),
reason: format!("failed to configure blob cache from env: {e}"),
})?;
#[allow(clippy::match_wildcard_for_single_variants)]
let cache_type = match cache_type {
zlayer_registry::CacheType::Persistent { .. } => {
zlayer_registry::CacheType::persistent_at(config.cache_dir.clone())
}
other => other,
};
cache_type
.build()
.await
.map_err(|e| AgentError::CreateFailed {
id: "wasm-runtime".to_string(),
reason: format!("failed to open blob cache: {e}"),
})?
};
let registry = Arc::new(zlayer_registry::ImagePuller::with_cache(blob_cache));
tracing::info!("WASM runtime initialized with wasmtime");
Ok(Self {
engine,
config,
registry,
auth_resolver: zlayer_core::AuthResolver::new(zlayer_core::AuthConfig::default()),
instances: RwLock::new(HashMap::new()),
})
}
pub async fn with_defaults() -> Result<Self> {
Self::new(WasmConfig::default(), None).await
}
pub async fn with_auth(
config: WasmConfig,
auth_config: zlayer_core::AuthConfig,
) -> Result<Self> {
let mut runtime = Self::new(config, None).await?;
runtime.auth_resolver = zlayer_core::AuthResolver::new(auth_config);
Ok(runtime)
}
#[allow(clippy::unused_self)]
fn instance_id(&self, id: &ContainerId) -> String {
format!("wasm-{}-{}", id.service, id.replica)
}
#[allow(clippy::unused_self)]
fn build_env_vars(&self, spec: &ServiceSpec) -> Vec<(String, String)> {
let mut env = Vec::new();
env.push((
"PATH".to_string(),
"/usr/local/bin:/usr/bin:/bin".to_string(),
));
let resolved = crate::env::resolve_env_vars_with_warnings(&spec.env);
match resolved {
Ok(result) => {
for warning in &result.warnings {
tracing::warn!("env resolution warning: {}", warning);
}
for var in result.vars {
if let Some((key, value)) = var.split_once('=') {
env.push((key.to_string(), value.to_string()));
}
}
}
Err(e) => {
tracing::warn!("failed to resolve env vars: {}", e);
}
}
env
}
#[allow(clippy::unused_self)]
fn build_args(&self, spec: &ServiceSpec) -> Vec<String> {
let mut args = Vec::new();
args.push(spec.image.name.to_string());
if let Some(entrypoint) = &spec.command.entrypoint {
args.extend_from_slice(entrypoint);
}
if let Some(cmd_args) = &spec.command.args {
args.extend_from_slice(cmd_args);
}
args
}
#[allow(clippy::too_many_arguments)]
async fn execute_module(
engine: Engine,
module_bytes: Vec<u8>,
env_vars: Vec<(String, String)>,
args: Vec<String>,
mounts: Vec<StorageSpec>,
epoch_deadline: u64,
enable_epochs: bool,
resource_limits: ResourceLimits,
capabilities: Option<WasmCapabilities>,
) -> std::result::Result<ExecutionResult, String> {
tokio::task::spawn_blocking(move || {
let module = Module::new(&engine, &module_bytes)
.map_err(|e| format!("failed to compile module: {e}"))?;
let mut wasi_builder = WasiCtxBuilder::new();
if let Some(ref caps) = capabilities {
super::wasm_host::configure_wasi_ctx_with_capabilities(&mut wasi_builder, caps);
for (key, value) in &env_vars {
wasi_builder.env(key, value);
}
wasi_builder.args(&args);
if caps.filesystem {
configure_wasi_mounts(&mut wasi_builder, &mounts)?;
} else {
tracing::debug!("filesystem capability disabled, skipping WASI mounts");
}
} else {
for (key, value) in &env_vars {
wasi_builder.env(key, value);
}
wasi_builder.args(&args);
configure_wasi_mounts(&mut wasi_builder, &mounts)?;
wasi_builder.inherit_network();
}
let stdout_pipe = MemoryOutputPipe::new(STDIO_PIPE_CAPACITY);
let stderr_pipe = MemoryOutputPipe::new(STDIO_PIPE_CAPACITY);
wasi_builder.stdin(MemoryInputPipe::new(Vec::new())); wasi_builder.stdout(stdout_pipe.clone());
wasi_builder.stderr(stderr_pipe.clone());
let wasi_ctx = wasi_builder.build_p1();
let limiter = resource_limits
.store_limits
.clone()
.unwrap_or_else(|| StoreLimitsBuilder::new().build());
let state = WasiP1State {
wasi: wasi_ctx,
limiter,
};
let mut store = Store::new(&engine, state);
store.limiter(|s| &mut s.limiter);
if enable_epochs {
store.set_epoch_deadline(epoch_deadline);
}
if resource_limits.max_fuel > 0 {
store
.set_fuel(resource_limits.max_fuel)
.map_err(|e| format!("failed to set fuel: {e}"))?;
tracing::debug!(fuel = resource_limits.max_fuel, "WASM fuel budget set");
} else {
store
.set_fuel(u64::MAX)
.map_err(|e| format!("failed to set default fuel: {e}"))?;
}
let mut linker: Linker<WasiP1State> = Linker::new(&engine);
p1::add_to_linker_sync(&mut linker, |state| &mut state.wasi)
.map_err(|e| format!("failed to add WASI to linker: {e}"))?;
let instance = linker
.instantiate(&mut store, &module)
.map_err(|e| format!("failed to instantiate module: {e}"))?;
let start_func = instance
.get_func(&mut store, "_start")
.or_else(|| instance.get_func(&mut store, "main"));
let exit_code = match start_func {
Some(func) => {
match func.call(&mut store, &[], &mut []) {
Ok(()) => 0,
Err(e) => {
if let Some(exit) = e.downcast_ref::<wasmtime_wasi::I32Exit>() {
exit.0
} else {
let stdout = stdout_pipe.contents().to_vec();
let stderr = stderr_pipe.contents().to_vec();
return Err(format!(
"execution error: {} (stdout: {} bytes, stderr: {} bytes)",
e,
stdout.len(),
stderr.len()
));
}
}
}
}
None => return Err("no _start or main function found".to_string()),
};
let stdout = stdout_pipe.contents().to_vec();
let stderr = stderr_pipe.contents().to_vec();
Ok(ExecutionResult {
exit_code,
stdout,
stderr,
})
})
.await
.map_err(|e| format!("task join error: {e}"))?
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
#[instrument(
skip(engine, component_bytes, env_vars, args, mounts, resource_limits, capabilities),
fields(
component_size = component_bytes.len(),
args_count = args.len(),
mounts_count = mounts.len(),
)
)]
async fn execute_component(
engine: Engine,
component_bytes: Vec<u8>,
env_vars: Vec<(String, String)>,
args: Vec<String>,
mounts: Vec<StorageSpec>,
epoch_deadline: u64,
enable_epochs: bool,
resource_limits: ResourceLimits,
capabilities: Option<WasmCapabilities>,
) -> std::result::Result<ExecutionResult, String> {
tokio::task::spawn_blocking(move || {
let component = Component::from_binary(&engine, &component_bytes)
.map_err(|e| format!("failed to compile component: {e}"))?;
let mut wasi_builder = WasiCtxBuilder::new();
if let Some(ref caps) = capabilities {
super::wasm_host::configure_wasi_ctx_with_capabilities(&mut wasi_builder, caps);
for (key, value) in &env_vars {
wasi_builder.env(key, value);
}
wasi_builder.args(&args);
if caps.filesystem {
configure_wasi_mounts(&mut wasi_builder, &mounts)?;
} else {
tracing::debug!("filesystem capability disabled, skipping WASI mounts");
}
} else {
for (key, value) in &env_vars {
wasi_builder.env(key, value);
}
wasi_builder.args(&args);
configure_wasi_mounts(&mut wasi_builder, &mounts)?;
wasi_builder.inherit_network();
}
let stdout_pipe = MemoryOutputPipe::new(STDIO_PIPE_CAPACITY);
let stderr_pipe = MemoryOutputPipe::new(STDIO_PIPE_CAPACITY);
wasi_builder.stdin(MemoryInputPipe::new(Vec::new())); wasi_builder.stdout(stdout_pipe.clone());
wasi_builder.stderr(stderr_pipe.clone());
let wasi_ctx = wasi_builder.build();
let table = ResourceTable::new();
let limiter = resource_limits
.store_limits
.clone()
.unwrap_or_else(|| StoreLimitsBuilder::new().build());
let state = WasiP2State {
ctx: wasi_ctx,
table,
limiter,
};
let mut store = Store::new(&engine, state);
store.limiter(|s| &mut s.limiter);
if enable_epochs {
store.set_epoch_deadline(epoch_deadline);
}
if resource_limits.max_fuel > 0 {
store
.set_fuel(resource_limits.max_fuel)
.map_err(|e| format!("failed to set fuel: {e}"))?;
tracing::debug!(fuel = resource_limits.max_fuel, "WASM fuel budget set");
} else {
store
.set_fuel(u64::MAX)
.map_err(|e| format!("failed to set default fuel: {e}"))?;
}
let mut linker: ComponentLinker<WasiP2State> = ComponentLinker::new(&engine);
wasmtime_wasi::p2::add_to_linker_sync(&mut linker)
.map_err(|e| format!("failed to add WASIp2 to linker: {e}"))?;
let instance = linker
.instantiate(&mut store, &component)
.map_err(|e| format!("failed to instantiate component: {e}"))?;
let make_result = |exit_code: i32| ExecutionResult {
exit_code,
stdout: stdout_pipe.contents().to_vec(),
stderr: stderr_pipe.contents().to_vec(),
};
if let Some(run_func) = instance.get_func(&mut store, "wasi:cli/run@0.2.0#run") {
match run_func.call(&mut store, &[], &mut []) {
Ok(()) => return Ok(make_result(0)),
Err(e) => {
if let Some(exit) = e.downcast_ref::<wasmtime_wasi::I32Exit>() {
return Ok(make_result(exit.0));
}
return Err(format!("wasi:cli/run execution error: {e}"));
}
}
}
if let Some(start_func) = instance.get_func(&mut store, "_start") {
match start_func.call(&mut store, &[], &mut []) {
Ok(()) => return Ok(make_result(0)),
Err(e) => {
if let Some(exit) = e.downcast_ref::<wasmtime_wasi::I32Exit>() {
return Ok(make_result(exit.0));
}
return Err(format!("_start execution error: {e}"));
}
}
}
if let Some(main_func) = instance.get_func(&mut store, "main") {
match main_func.call(&mut store, &[], &mut []) {
Ok(()) => return Ok(make_result(0)),
Err(e) => {
if let Some(exit) = e.downcast_ref::<wasmtime_wasi::I32Exit>() {
return Ok(make_result(exit.0));
}
return Err(format!("main execution error: {e}"));
}
}
}
Err("no wasi:cli/run, _start, or main function found in component".to_string())
})
.await
.map_err(|e| format!("task join error: {e}"))?
}
}
#[async_trait::async_trait]
impl Runtime for WasmRuntime {
#[instrument(
skip(self),
fields(
otel.name = "wasm.pull",
container.image.name = %image,
)
)]
async fn pull_image(&self, image: &str) -> Result<()> {
self.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
.await
}
#[instrument(
skip(self, _auth),
fields(
otel.name = "wasm.pull",
container.image.name = %image,
pull_policy = ?policy,
)
)]
async fn pull_image_with_policy(
&self,
image: &str,
policy: PullPolicy,
_auth: Option<&RegistryAuth>,
) -> Result<()> {
if matches!(policy, PullPolicy::Never) {
tracing::debug!(image = %image, "pull policy is Never, skipping pull");
return Ok(());
}
let cache_key = image.replace(['/', ':', '@'], "_");
let cache_path = self.config.cache_dir.join(format!("{cache_key}.wasm"));
if matches!(policy, PullPolicy::IfNotPresent) && cache_path.exists() {
tracing::debug!(image = %image, "WASM module already cached");
return Ok(());
}
let auth = self.auth_resolver.resolve(image);
tracing::info!(image = %image, "pulling WASM artifact from registry");
let (wasm_bytes, wasm_info) =
self.registry
.pull_wasm(image, &auth)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("failed to pull WASM artifact: {e}"),
})?;
tokio::fs::write(&cache_path, &wasm_bytes)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("failed to cache WASM binary: {e}"),
})?;
tracing::info!(
image = %image,
wasi_version = %wasm_info.wasi_version,
size = wasm_bytes.len(),
"WASM artifact pulled and cached"
);
Ok(())
}
#[instrument(
skip(self, spec),
fields(
otel.name = "wasm.create",
container.id = %self.instance_id(id),
service.name = %id.service,
container.image.name = %spec.image.name,
)
)]
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let instance_id = self.instance_id(id);
let image = spec.image.name.to_string();
tracing::info!(
instance = %instance_id,
image = %image,
"creating WASM instance"
);
let cache_key = image.replace(['/', ':', '@'], "_");
let cache_path = self.config.cache_dir.join(format!("{cache_key}.wasm"));
let loaded_from_cache = cache_path.exists();
let (module_bytes, wasi_version) = if loaded_from_cache {
let bytes =
tokio::fs::read(&cache_path)
.await
.map_err(|e| AgentError::CreateFailed {
id: instance_id.clone(),
reason: format!("failed to read cached WASM: {e}"),
})?;
let detected_version = detect_wasm_version_from_binary(&bytes);
tracing::debug!(
instance = %instance_id,
wasi_version = %detected_version,
"detected WASI version from cached binary"
);
(bytes, detected_version)
} else {
let auth = self.auth_resolver.resolve(&image);
let (wasm_bytes, wasm_info) =
self.registry.pull_wasm(&image, &auth).await.map_err(|e| {
AgentError::CreateFailed {
id: instance_id.clone(),
reason: format!("failed to pull WASM: {e}"),
}
})?;
tokio::fs::write(&cache_path, &wasm_bytes)
.await
.map_err(|e| AgentError::CreateFailed {
id: instance_id.clone(),
reason: format!("failed to cache WASM: {e}"),
})?;
(wasm_bytes, wasm_info.wasi_version)
};
let env_vars = self.build_env_vars(spec);
let args = self.build_args(spec);
let mounts = spec.storage.clone();
if !mounts.is_empty() {
tracing::info!(
instance = %instance_id,
mount_count = mounts.len(),
"WASM instance has filesystem mounts configured"
);
}
let resource_limits = ResourceLimits::from_spec(spec.wasm.as_ref());
if resource_limits.store_limits.is_some() || resource_limits.max_fuel > 0 {
tracing::info!(
instance = %instance_id,
has_memory_limit = resource_limits.store_limits.is_some(),
max_fuel = resource_limits.max_fuel,
has_epoch_interval = resource_limits.epoch_interval.is_some(),
"WASM resource limits configured from spec"
);
}
let capabilities = spec
.wasm
.as_ref()
.and_then(|w| w.capabilities.clone())
.or_else(|| spec.service_type.default_wasm_capabilities());
if let Some(ref caps) = capabilities {
tracing::info!(
instance = %instance_id,
config = caps.config,
keyvalue = caps.keyvalue,
logging = caps.logging,
secrets = caps.secrets,
metrics = caps.metrics,
http_client = caps.http_client,
cli = caps.cli,
filesystem = caps.filesystem,
sockets = caps.sockets,
"WASM capabilities configured"
);
}
let instance = WasmInstance {
state: InstanceState::Pending,
image: image.clone(),
module_bytes,
wasi_version,
stdout: Vec::new(),
stderr: Vec::new(),
env_vars,
args,
mounts,
resource_limits,
capabilities,
execution_handle: None,
};
{
let mut instances = self.instances.write().await;
instances.insert(instance_id.clone(), instance);
}
tracing::info!(
instance = %instance_id,
"WASM instance created"
);
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "wasm.start",
container.id = %self.instance_id(id),
service.name = %id.service,
)
)]
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let instance_id = self.instance_id(id);
tracing::info!(instance = %instance_id, "starting WASM instance");
let (wasm_bytes, wasi_version, env_vars, args, mounts, resource_limits, capabilities) = {
let mut instances = self.instances.write().await;
let instance = instances
.get_mut(&instance_id)
.ok_or_else(|| AgentError::NotFound {
container: instance_id.clone(),
reason: "WASM instance not found".to_string(),
})?;
instance.state = InstanceState::Running {
started_at: Instant::now(),
};
(
instance.module_bytes.clone(),
instance.wasi_version.clone(),
instance.env_vars.clone(),
instance.args.clone(),
instance.mounts.clone(),
instance.resource_limits.clone(),
instance.capabilities.clone(),
)
};
let is_component = match &wasi_version {
WasiVersion::Preview2 => true,
WasiVersion::Preview1 => false,
WasiVersion::Unknown => {
let detected = detect_wasm_version_from_binary(&wasm_bytes);
detected.is_preview2()
}
};
tracing::info!(
instance = %instance_id,
wasi_version = %wasi_version,
is_component = is_component,
mount_count = mounts.len(),
"starting WASM execution"
);
let engine = self.engine.clone();
let epoch_deadline = self.config.epoch_deadline;
let enable_epochs = self.config.enable_epochs;
let instance_id_clone = instance_id.clone();
let handle = if is_component {
tokio::spawn(async move {
Self::execute_component(
engine,
wasm_bytes,
env_vars,
args,
mounts,
epoch_deadline,
enable_epochs,
resource_limits,
capabilities,
)
.await
})
} else {
tokio::spawn(async move {
Self::execute_module(
engine,
wasm_bytes,
env_vars,
args,
mounts,
epoch_deadline,
enable_epochs,
resource_limits,
capabilities,
)
.await
})
};
{
let mut instances = self.instances.write().await;
if let Some(instance) = instances.get_mut(&instance_id) {
instance.execution_handle = Some(handle);
}
}
tracing::info!(
instance = %instance_id_clone,
"WASM instance started"
);
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "wasm.stop",
container.id = %self.instance_id(id),
service.name = %id.service,
)
)]
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let instance_id = self.instance_id(id);
tracing::info!(
instance = %instance_id,
timeout = ?timeout,
"stopping WASM instance"
);
let handle = {
let mut instances = self.instances.write().await;
let instance = instances
.get_mut(&instance_id)
.ok_or_else(|| AgentError::NotFound {
container: instance_id.clone(),
reason: "WASM instance not found".to_string(),
})?;
if !matches!(instance.state, InstanceState::Running { .. }) {
return Ok(());
}
instance.execution_handle.take()
};
if let Some(handle) = handle {
let result = tokio::time::timeout(timeout, handle).await;
match result {
Ok(Ok(Ok(exec_result))) => {
let mut instances = self.instances.write().await;
if let Some(instance) = instances.get_mut(&instance_id) {
instance.stdout = exec_result.stdout;
instance.stderr = exec_result.stderr;
instance.state = InstanceState::Completed {
exit_code: exec_result.exit_code,
};
}
}
Ok(Ok(Err(e))) => {
let mut instances = self.instances.write().await;
if let Some(instance) = instances.get_mut(&instance_id) {
instance.state = InstanceState::Failed { reason: e };
}
}
Ok(Err(join_error)) => {
let mut instances = self.instances.write().await;
if let Some(instance) = instances.get_mut(&instance_id) {
instance.state = InstanceState::Failed {
reason: format!("task join error: {join_error}"),
};
}
}
Err(_timeout) => {
let mut instances = self.instances.write().await;
if let Some(instance) = instances.get_mut(&instance_id) {
instance.state = InstanceState::Failed {
reason: "execution timed out".to_string(),
};
}
}
}
}
tracing::info!(instance = %instance_id, "WASM instance stopped");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "wasm.remove",
container.id = %self.instance_id(id),
service.name = %id.service,
)
)]
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let instance_id = self.instance_id(id);
tracing::info!(instance = %instance_id, "removing WASM instance");
let mut instances = self.instances.write().await;
if let Some(mut instance) = instances.remove(&instance_id) {
if let Some(handle) = instance.execution_handle.take() {
handle.abort();
}
}
tracing::info!(instance = %instance_id, "WASM instance removed");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "wasm.state",
container.id = %self.instance_id(id),
)
)]
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let instance_id = self.instance_id(id);
let instances = self.instances.read().await;
let instance = instances
.get(&instance_id)
.ok_or_else(|| AgentError::NotFound {
container: instance_id.clone(),
reason: "WASM instance not found".to_string(),
})?;
let state = match &instance.state {
InstanceState::Pending => ContainerState::Pending,
InstanceState::Running { .. } => ContainerState::Running,
InstanceState::Completed { exit_code } => ContainerState::Exited { code: *exit_code },
InstanceState::Failed { reason } => ContainerState::Failed {
reason: reason.clone(),
},
};
Ok(state)
}
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let instance_id = self.instance_id(id);
let instances = self.instances.read().await;
let instance = instances
.get(&instance_id)
.ok_or_else(|| AgentError::NotFound {
container: instance_id.clone(),
reason: "WASM instance not found".to_string(),
})?;
let source = LogSource::Container(instance_id.clone());
let mut entries = Vec::new();
for line in String::from_utf8_lossy(&instance.stdout).lines() {
entries.push(LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: line.to_string(),
source: source.clone(),
service: Some(id.service.clone()),
deployment: None,
});
}
for line in String::from_utf8_lossy(&instance.stderr).lines() {
entries.push(LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stderr,
message: line.to_string(),
source: source.clone(),
service: Some(id.service.clone()),
deployment: None,
});
}
if tail > 0 && entries.len() > tail {
entries = entries.split_off(entries.len() - tail);
}
Ok(entries)
}
async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
let instance_id = self.instance_id(id);
Err(AgentError::Internal(format!(
"exec not supported for WASM instance '{instance_id}': WASM modules are single-process and don't support exec"
)))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let instance_id = self.instance_id(id);
let instances = self.instances.read().await;
if !instances.contains_key(&instance_id) {
return Err(AgentError::NotFound {
container: instance_id.clone(),
reason: "WASM instance not found".to_string(),
});
}
Ok(ContainerStats {
cpu_usage_usec: 0,
memory_bytes: 0,
memory_limit: u64::MAX,
timestamp: Instant::now(),
})
}
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let instance_id = self.instance_id(id);
tracing::debug!(instance = %instance_id, "waiting for WASM instance to exit");
let poll_interval = Duration::from_millis(100);
let max_wait = self.config.max_execution_time;
let start = Instant::now();
loop {
if start.elapsed() > max_wait {
return Err(AgentError::Timeout { timeout: max_wait });
}
{
let mut instances = self.instances.write().await;
if let Some(instance) = instances.get_mut(&instance_id) {
if let Some(handle) = &mut instance.execution_handle {
if handle.is_finished() {
let handle = instance.execution_handle.take().unwrap();
match handle.await {
Ok(Ok(exec_result)) => {
instance.stdout = exec_result.stdout;
instance.stderr = exec_result.stderr;
instance.state = InstanceState::Completed {
exit_code: exec_result.exit_code,
};
}
Ok(Err(e)) => {
instance.state = InstanceState::Failed { reason: e };
}
Err(e) => {
instance.state = InstanceState::Failed {
reason: format!("task join error: {e}"),
};
}
}
}
}
match &instance.state {
InstanceState::Completed { exit_code } => {
return Ok(*exit_code);
}
InstanceState::Failed { reason } => {
return Err(AgentError::Internal(format!(
"WASM execution failed: {reason}"
)));
}
InstanceState::Pending | InstanceState::Running { .. } => {
}
}
} else {
return Err(AgentError::NotFound {
container: instance_id.clone(),
reason: "WASM instance not found".to_string(),
});
}
}
tokio::time::sleep(poll_interval).await;
}
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
self.container_logs(id, 0).await
}
async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
Ok(None)
}
async fn get_container_ip(&self, _id: &ContainerId) -> Result<Option<std::net::IpAddr>> {
Ok(Some(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)))
}
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let _canonical = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
let instance_id = self.instance_id(id);
tracing::info!(instance = %instance_id, "killing WASM instance");
let mut instances = self.instances.write().await;
let instance = instances
.get_mut(&instance_id)
.ok_or_else(|| AgentError::NotFound {
container: instance_id.clone(),
reason: "WASM instance not found".to_string(),
})?;
if !matches!(instance.state, InstanceState::Running { .. }) {
return Ok(());
}
if let Some(handle) = instance.execution_handle.take() {
handle.abort();
}
instance.state = InstanceState::Failed {
reason: "killed".to_string(),
};
Ok(())
}
async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
if source.trim().is_empty() || target.trim().is_empty() {
return Err(AgentError::InvalidSpec(
"source and target must be non-empty image references".to_string(),
));
}
if source == target {
return Ok(());
}
let sanitize = |s: &str| s.replace(['/', ':', '@'], "_");
let src_path = self
.config
.cache_dir
.join(format!("{}.wasm", sanitize(source)));
let dst_path = self
.config
.cache_dir
.join(format!("{}.wasm", sanitize(target)));
if !src_path.exists() {
return Err(AgentError::NotFound {
container: source.to_string(),
reason: format!(
"source WASM module '{source}' not found in cache ({})",
src_path.display()
),
});
}
tokio::fs::copy(&src_path, &dst_path).await.map_err(|e| {
AgentError::Internal(format!(
"failed to tag WASM module '{source}' -> '{target}': {e}"
))
})?;
tracing::info!(source = %source, target = %target, "tagged WASM module");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wasm_config_default() {
let config = WasmConfig::default();
assert_eq!(
config.cache_dir,
zlayer_paths::ZLayerDirs::system_default().wasm()
);
assert!(config.enable_epochs);
assert_eq!(config.epoch_deadline, 1_000_000);
assert_eq!(config.max_execution_time, Duration::from_secs(3600));
assert!(config.cache_type.is_none());
}
#[test]
fn test_instance_id_generation() {
let id = ContainerId {
service: "myservice".to_string(),
replica: 1,
};
let expected = "wasm-myservice-1";
let result = format!("wasm-{}-{}", id.service, id.replica);
assert_eq!(result, expected);
}
#[test]
fn test_cache_key_sanitization() {
let images = vec![
("ghcr.io/org/module:v1.0", "ghcr.io_org_module_v1.0"),
(
"registry.example.com/wasm@sha256:abc",
"registry.example.com_wasm_sha256_abc",
),
];
for (image, expected) in images {
let sanitized = image.replace(['/', ':', '@'], "_");
assert_eq!(sanitized, expected);
}
}
#[test]
fn test_instance_state_debug() {
let pending = InstanceState::Pending;
let running = InstanceState::Running {
started_at: Instant::now(),
};
let completed = InstanceState::Completed { exit_code: 0 };
let failed = InstanceState::Failed {
reason: "test error".to_string(),
};
assert!(!format!("{pending:?}").is_empty());
assert!(!format!("{running:?}").is_empty());
assert!(!format!("{completed:?}").is_empty());
assert!(!format!("{failed:?}").is_empty());
}
#[test]
fn test_wasm_config_clone() {
let config = WasmConfig {
cache_dir: PathBuf::from("/custom/cache"),
enable_epochs: false,
epoch_deadline: 500_000,
max_execution_time: Duration::from_secs(60),
cache_type: Some(zlayer_registry::CacheType::Memory),
};
let cloned = config.clone();
assert_eq!(cloned.cache_dir, config.cache_dir);
assert_eq!(cloned.enable_epochs, config.enable_epochs);
assert_eq!(cloned.epoch_deadline, config.epoch_deadline);
assert_eq!(cloned.max_execution_time, config.max_execution_time);
assert!(cloned.cache_type.is_some());
}
#[tokio::test]
async fn test_wasm_network_enabled() {
let mut builder = WasiCtxBuilder::new();
builder.inherit_network();
let _ctx = builder.build();
}
#[test]
fn test_parse_memory_limit_valid() {
assert_eq!(parse_memory_limit("64Mi").unwrap(), 64 * 1024 * 1024);
assert_eq!(parse_memory_limit("256Mi").unwrap(), 256 * 1024 * 1024);
assert_eq!(parse_memory_limit("1Gi").unwrap(), 1024 * 1024 * 1024);
assert_eq!(
parse_memory_limit("2Ti").unwrap(),
2 * 1024 * 1024 * 1024 * 1024
);
assert_eq!(parse_memory_limit("512Ki").unwrap(), 512 * 1024);
}
#[test]
fn test_parse_memory_limit_with_whitespace() {
assert_eq!(parse_memory_limit(" 64Mi ").unwrap(), 64 * 1024 * 1024);
}
#[test]
fn test_parse_memory_limit_invalid() {
assert!(parse_memory_limit("").is_err());
assert!(parse_memory_limit("64").is_err());
assert!(parse_memory_limit("Mi").is_err());
assert!(parse_memory_limit("64MB").is_err());
assert!(parse_memory_limit("abcMi").is_err());
}
#[test]
fn test_build_store_limits_with_memory() {
let wasm_config = zlayer_spec::WasmConfig {
max_memory: Some("64Mi".to_string()),
..Default::default()
};
let limits = build_store_limits(Some(&wasm_config));
assert!(limits.is_some());
}
#[test]
fn test_build_store_limits_without_memory() {
let wasm_config = zlayer_spec::WasmConfig::default();
let limits = build_store_limits(Some(&wasm_config));
assert!(limits.is_none());
}
#[test]
fn test_build_store_limits_none_config() {
let limits = build_store_limits(None);
assert!(limits.is_none());
}
#[test]
fn test_resource_limits_from_spec() {
let wasm_config = zlayer_spec::WasmConfig {
max_memory: Some("128Mi".to_string()),
max_fuel: 1_000_000,
epoch_interval: Some(Duration::from_millis(100)),
..Default::default()
};
let limits = ResourceLimits::from_spec(Some(&wasm_config));
assert!(limits.store_limits.is_some());
assert_eq!(limits.max_fuel, 1_000_000);
assert_eq!(limits.epoch_interval, Some(Duration::from_millis(100)));
}
#[test]
fn test_resource_limits_from_spec_none() {
let limits = ResourceLimits::from_spec(None);
assert!(limits.store_limits.is_none());
assert_eq!(limits.max_fuel, 0);
assert!(limits.epoch_interval.is_none());
}
}