#![cfg(all(target_os = "windows", feature = "wsl"))]
use std::collections::HashMap;
use std::io::Read;
use std::net::IpAddr;
use std::path::PathBuf;
use std::process::{Output, Stdio};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio_stream::wrappers::ReceiverStream;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_registry::{CompressionType, ImageConfig};
use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
use crate::bundle::BundleBuilder;
use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::overlay_manager::make_interface_name;
use crate::runtime::{
validate_signal, ContainerId, ContainerInspectDetails, ContainerState, ExecEvent,
ExecEventStream, ImageInfo, PruneResult, Runtime,
};
const DEFAULT_BUNDLE_ROOT: &str = "/var/lib/zlayer/bundles";
const DEFAULT_LOG_ROOT: &str = "/var/lib/zlayer/logs";
const WAIT_POLL_CAP: Duration = Duration::from_secs(24 * 60 * 60);
const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(500);
#[derive(Clone, Debug)]
pub struct Wsl2DelegateConfig {
pub distro: String,
pub youki_path: Option<String>,
pub bundle_root: String,
pub log_root: String,
}
impl Default for Wsl2DelegateConfig {
fn default() -> Self {
Self {
distro: zlayer_wsl::distro::DISTRO_NAME.to_string(),
youki_path: None,
bundle_root: DEFAULT_BUNDLE_ROOT.to_string(),
log_root: DEFAULT_LOG_ROOT.to_string(),
}
}
}
#[derive(Clone, Debug)]
struct CachedImage {
layers: Vec<(Vec<u8>, String)>,
config: ImageConfig,
}
#[derive(Debug, Clone)]
enum NetnsState {
Configured {
host_iface: String,
#[allow(dead_code)]
ip: IpAddr,
},
HostFallback,
}
#[async_trait]
pub trait WslRunner: Send + Sync + 'static {
async fn run(&self, cmd: &str, args: &[&str]) -> anyhow::Result<Output>;
}
struct DefaultWslRunner;
#[async_trait]
impl WslRunner for DefaultWslRunner {
async fn run(&self, cmd: &str, args: &[&str]) -> anyhow::Result<Output> {
zlayer_wsl::distro::wsl_exec(cmd, args).await
}
}
pub struct Wsl2DelegateRuntime {
config: ResolvedConfig,
pids: Arc<RwLock<HashMap<ContainerId, u32>>>,
ips: Arc<RwLock<HashMap<ContainerId, IpAddr>>>,
bundle_roots: Arc<RwLock<HashMap<ContainerId, String>>>,
image_cache: Arc<RwLock<HashMap<String, CachedImage>>>,
gateways: Arc<RwLock<HashMap<ContainerId, IpAddr>>>,
netns: Arc<RwLock<HashMap<ContainerId, NetnsState>>>,
runner: Arc<dyn WslRunner>,
}
#[derive(Clone, Debug)]
struct ResolvedConfig {
distro: String,
youki_path: String,
bundle_root: String,
log_root: String,
}
impl std::fmt::Debug for Wsl2DelegateRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Wsl2DelegateRuntime")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl Wsl2DelegateRuntime {
pub async fn try_new() -> Result<Option<Self>> {
Self::try_new_with_config(Wsl2DelegateConfig::default()).await
}
pub async fn try_new_with_config(config: Wsl2DelegateConfig) -> Result<Option<Self>> {
let status = match zlayer_wsl::detect::detect_wsl().await {
Ok(s) => s,
Err(e) => {
tracing::warn!(
error = %e,
"WSL2 detection failed; Linux container support disabled"
);
return Ok(None);
}
};
if !status.wsl2_available {
tracing::info!(
wsl_installed = status.wsl_installed,
"WSL2 not available; Linux container support disabled on this node"
);
return Ok(None);
}
if config.distro == zlayer_wsl::distro::DISTRO_NAME {
if let Err(e) = zlayer_wsl::setup::ensure_wsl_backend_ready().await {
tracing::warn!(
error = %e,
"failed to bootstrap WSL2 zlayer distro; Linux container support disabled"
);
return Ok(None);
}
}
let youki_path = match config.youki_path.as_deref() {
Some(explicit) => {
let probe = wsl_exec_in(&config.distro, "test", &["-x", explicit]).await;
match probe {
Ok(out) if out.status.success() => explicit.to_string(),
Ok(out) => {
return Err(AgentError::Configuration(format!(
"configured youki_path '{explicit}' is not executable in WSL2 \
distro '{}' (status {:?}): {}",
config.distro,
out.status.code(),
String::from_utf8_lossy(&out.stderr).trim(),
)));
}
Err(e) => {
return Err(AgentError::Configuration(format!(
"failed to verify configured youki_path '{explicit}' in WSL2 \
distro '{}': {e}",
config.distro,
)));
}
}
}
None => match wsl_exec_in(&config.distro, "which", &["youki"]).await {
Ok(out) if out.status.success() => {
let resolved = String::from_utf8_lossy(&out.stdout).trim().to_string();
if resolved.is_empty() {
return Err(AgentError::Configuration(format!(
"youki not found in WSL2 distro '{}'; install it or \
configure runtime.wsl2.youki_path",
config.distro,
)));
}
resolved
}
Ok(out) => {
let stderr = String::from_utf8_lossy(&out.stderr);
return Err(AgentError::Configuration(format!(
"youki not found in WSL2 distro '{}' (status {:?}: {}); \
install it or configure runtime.wsl2.youki_path",
config.distro,
out.status.code(),
stderr.trim(),
)));
}
Err(e) => {
return Err(AgentError::Configuration(format!(
"probing for youki in WSL2 distro '{}' failed: {e}; \
install it or configure runtime.wsl2.youki_path",
config.distro,
)));
}
},
};
if let Err(e) = wsl_exec_in(&config.distro, "mkdir", &["-p", &config.log_root]).await {
tracing::warn!(
distro = %config.distro,
log_root = %config.log_root,
error = %e,
"failed to pre-create youki log root; container_logs may be empty until youki creates it"
);
}
Ok(Some(Self {
config: ResolvedConfig {
distro: config.distro,
youki_path,
bundle_root: config.bundle_root,
log_root: config.log_root,
},
pids: Arc::new(RwLock::new(HashMap::new())),
ips: Arc::new(RwLock::new(HashMap::new())),
bundle_roots: Arc::new(RwLock::new(HashMap::new())),
image_cache: Arc::new(RwLock::new(HashMap::new())),
gateways: Arc::new(RwLock::new(HashMap::new())),
netns: Arc::new(RwLock::new(HashMap::new())),
runner: Arc::new(DefaultWslRunner),
}))
}
pub async fn record_container_ip(&self, id: &ContainerId, ip: IpAddr) {
self.ips.write().await.insert(id.clone(), ip);
}
pub async fn record_container_gateway(&self, id: &ContainerId, gateway: IpAddr) {
self.gateways.write().await.insert(id.clone(), gateway);
}
fn bundle_dir(&self, id: &ContainerId) -> String {
format!("{}/{}", self.config.bundle_root, id_slug(id))
}
fn log_path(&self, id: &ContainerId) -> String {
format!("{}/{}.youki.log", self.config.log_root, id_slug(id))
}
async fn youki(&self, args: &[&str]) -> Result<Output> {
wsl_exec_in(&self.config.distro, &self.config.youki_path, args).await
}
async fn wsl(&self, cmd: &str, args: &[&str]) -> Result<Output> {
wsl_exec_in(&self.config.distro, cmd, args).await
}
async fn wsl_run(&self, cmd: &str, args: &[&str]) -> Result<Output> {
self.runner.run(cmd, args).await.map_err(|e| {
AgentError::Network(format!("wsl.exe -d {} -- {cmd}: {e}", self.config.distro))
})
}
async fn setup_container_netns(&self, id: &ContainerId) {
let Some(ip) = self.ips.read().await.get(id).copied() else {
return;
};
let slug = id_slug(id);
let gateway = self.gateways.read().await.get(id).copied();
let host_iface = make_interface_name(&[&slug], "h");
let cont_iface = make_interface_name(&[&slug], "c");
let final_iface = "eth0".to_string();
match self
.setup_container_netns_inner(&slug, ip, gateway, &host_iface, &cont_iface, &final_iface)
.await
{
Ok(()) => {
self.netns.write().await.insert(
id.clone(),
NetnsState::Configured {
host_iface: host_iface.clone(),
ip,
},
);
tracing::info!(
container = %id,
%ip,
host_iface = %host_iface,
"WSL2 container netns configured"
);
}
Err(e) => {
tracing::warn!(
container = %id,
%ip,
error = %e,
"WSL2 container netns setup failed; falling back to host networking \
(container will boot but without overlay IP)"
);
let _ = self.wsl_run("ip", &["link", "delete", &host_iface]).await;
let _ = self.wsl_run("ip", &["netns", "delete", &slug]).await;
self.netns
.write()
.await
.insert(id.clone(), NetnsState::HostFallback);
}
}
}
#[allow(clippy::too_many_lines)]
async fn setup_container_netns_inner(
&self,
slug: &str,
ip: IpAddr,
gateway: Option<IpAddr>,
host_iface: &str,
cont_iface: &str,
final_iface: &str,
) -> Result<()> {
let mk_ns = self.wsl_run("ip", &["netns", "add", slug]).await?;
if !mk_ns.status.success() {
let stderr = String::from_utf8_lossy(&mk_ns.stderr);
if !stderr.to_ascii_lowercase().contains("file exists") {
return Err(AgentError::Network(format!(
"ip netns add {slug} failed (status {:?}): {}",
mk_ns.status.code(),
stderr.trim()
)));
}
}
let _ = self.wsl_run("ip", &["link", "delete", host_iface]).await;
let mk_veth = self
.wsl_run(
"ip",
&[
"link", "add", host_iface, "type", "veth", "peer", "name", cont_iface,
],
)
.await?;
if !mk_veth.status.success() {
let stderr = String::from_utf8_lossy(&mk_veth.stderr);
return Err(AgentError::Network(format!(
"ip link add veth {host_iface}/{cont_iface} failed (status {:?}): {}",
mk_veth.status.code(),
stderr.trim()
)));
}
let move_end = self
.wsl_run("ip", &["link", "set", cont_iface, "netns", slug])
.await?;
if !move_end.status.success() {
let stderr = String::from_utf8_lossy(&move_end.stderr);
return Err(AgentError::Network(format!(
"ip link set {cont_iface} netns {slug} failed (status {:?}): {}",
move_end.status.code(),
stderr.trim()
)));
}
let rename = self
.wsl_run(
"ip",
&[
"netns",
"exec",
slug,
"ip",
"link",
"set",
cont_iface,
"name",
final_iface,
],
)
.await?;
if !rename.status.success() {
let stderr = String::from_utf8_lossy(&rename.stderr);
return Err(AgentError::Network(format!(
"ip link rename {cont_iface} -> {final_iface} (in {slug}) failed (status {:?}): {}",
rename.status.code(),
stderr.trim()
)));
}
let cidr = match ip {
IpAddr::V4(_) => format!("{ip}/24"),
IpAddr::V6(_) => format!("{ip}/64"),
};
let addr = self
.wsl_run(
"ip",
&[
"netns",
"exec",
slug,
"ip",
"addr",
"add",
&cidr,
"dev",
final_iface,
],
)
.await?;
if !addr.status.success() {
let stderr = String::from_utf8_lossy(&addr.stderr);
return Err(AgentError::Network(format!(
"ip addr add {cidr} dev {final_iface} (in {slug}) failed (status {:?}): {}",
addr.status.code(),
stderr.trim()
)));
}
let up = self
.wsl_run(
"ip",
&[
"netns",
"exec",
slug,
"ip",
"link",
"set",
final_iface,
"up",
],
)
.await?;
if !up.status.success() {
let stderr = String::from_utf8_lossy(&up.stderr);
return Err(AgentError::Network(format!(
"ip link set {final_iface} up (in {slug}) failed (status {:?}): {}",
up.status.code(),
stderr.trim()
)));
}
let _ = self
.wsl_run(
"ip",
&["netns", "exec", slug, "ip", "link", "set", "lo", "up"],
)
.await;
let host_up = self
.wsl_run("ip", &["link", "set", host_iface, "up"])
.await?;
if !host_up.status.success() {
let stderr = String::from_utf8_lossy(&host_up.stderr);
return Err(AgentError::Network(format!(
"ip link set {host_iface} up failed (status {:?}): {}",
host_up.status.code(),
stderr.trim()
)));
}
if let Some(gw) = gateway {
let route = self
.wsl_run(
"ip",
&[
"netns",
"exec",
slug,
"ip",
"route",
"add",
"default",
"via",
&gw.to_string(),
],
)
.await?;
if !route.status.success() {
let stderr = String::from_utf8_lossy(&route.stderr);
return Err(AgentError::Network(format!(
"ip route add default via {gw} (in {slug}) failed (status {:?}): {}",
route.status.code(),
stderr.trim()
)));
}
} else {
tracing::warn!(
container = %slug,
"no overlay gateway recorded; skipping default-route setup. \
Container will have overlay-local connectivity only."
);
}
Ok(())
}
async fn teardown_container_netns(&self, id: &ContainerId) {
let state = self.netns.write().await.remove(id);
let Some(state) = state else {
return;
};
let slug = id_slug(id);
match state {
NetnsState::Configured { host_iface, .. } => {
let _ = self.wsl_run("ip", &["link", "delete", &host_iface]).await;
let _ = self.wsl_run("ip", &["netns", "delete", &slug]).await;
}
NetnsState::HostFallback => {
}
}
}
async fn query_state(&self, id: &ContainerId) -> Result<YoukiState> {
let slug = id_slug(id);
let output = self.youki(&["state", &slug]).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.to_ascii_lowercase().contains("does not exist")
|| stderr.to_ascii_lowercase().contains("not found")
{
return Err(AgentError::NotFound {
container: id.to_string(),
reason: format!("youki state reports container unknown: {stderr}"),
});
}
return Err(youki_error("state", &output));
}
parse_youki_state(&output)
}
}
#[async_trait]
impl Runtime for Wsl2DelegateRuntime {
async fn pull_image(&self, image: &str) -> Result<()> {
self.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
.await
}
async fn pull_image_with_policy(
&self,
image: &str,
policy: PullPolicy,
auth: Option<&RegistryAuth>,
) -> Result<()> {
if matches!(policy, PullPolicy::IfNotPresent | PullPolicy::Never)
&& self.image_cache.read().await.contains_key(image)
{
if matches!(policy, PullPolicy::Never) {
return Ok(());
}
tracing::debug!(image, "image cache hit; skipping re-pull");
return Ok(());
}
if matches!(policy, PullPolicy::Never) {
return Err(AgentError::PullFailed {
image: image.to_string(),
reason: "PullPolicy::Never but image is not in the WSL2 delegate cache".to_string(),
});
}
let registry_auth = match auth {
Some(a) => zlayer_registry::RegistryAuth::Basic(a.username.clone(), a.password.clone()),
None => zlayer_registry::RegistryAuth::Anonymous,
};
let cache = zlayer_registry::BlobCache::new().map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("failed to create blob cache: {e}"),
})?;
let cache_arc: Arc<Box<dyn zlayer_registry::BlobCacheBackend>> = Arc::new(Box::new(cache));
let puller = zlayer_registry::ImagePuller::with_platform(
cache_arc,
zlayer_spec::TargetPlatform::new(
zlayer_spec::OsKind::Linux,
zlayer_spec::ArchKind::Amd64,
),
);
let force_refresh = matches!(policy, PullPolicy::Always);
let layers = puller
.pull_image_with_policy(image, ®istry_auth, force_refresh)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("registry pull failed: {e}"),
})?;
let config = puller
.pull_image_config(image, ®istry_auth)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("image config fetch failed: {e}"),
})?;
self.image_cache
.write()
.await
.insert(image.to_string(), CachedImage { layers, config });
Ok(())
}
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let bundle_dir = self.bundle_dir(id);
let rootfs_dir = format!("{bundle_dir}/rootfs");
let slug = id_slug(id);
let mkdir = self.wsl("mkdir", &["-p", &rootfs_dir]).await?;
if !mkdir.status.success() {
return Err(AgentError::CreateFailed {
id: id.to_string(),
reason: format!(
"mkdir -p {rootfs_dir} failed (status {:?}): {}",
mkdir.status.code(),
String::from_utf8_lossy(&mkdir.stderr).trim()
),
});
}
let cached = match self.image_cache.read().await.get(&spec.image.name).cloned() {
Some(c) => c,
None => {
self.pull_image_with_policy(&spec.image.name, spec.image.pull_policy, None)
.await?;
self.image_cache
.read()
.await
.get(&spec.image.name)
.cloned()
.ok_or_else(|| AgentError::CreateFailed {
id: id.to_string(),
reason: format!(
"image {} missing from WSL2 delegate cache after pull",
spec.image.name
),
})?
}
};
let tar_sh_cmd = format!("cd {rootfs_dir} && tar -xf - --no-same-owner");
for (i, (data, media_type)) in cached.layers.iter().enumerate() {
let tar_bytes =
decompress_layer(data, media_type).map_err(|e| AgentError::CreateFailed {
id: id.to_string(),
reason: format!(
"failed to decompress layer {i} ({media_type}) of {}: {e}",
spec.image.name
),
})?;
wsl_stdin_pipe(
&["-d", &self.config.distro, "--", "sh", "-c", &tar_sh_cmd],
&tar_bytes,
)
.await
.map_err(|e| AgentError::CreateFailed {
id: id.to_string(),
reason: format!(
"streaming layer {i} ({media_type}) of {} into WSL2 rootfs failed: {e}",
spec.image.name
),
})?;
}
let builder = BundleBuilder::new(PathBuf::from(&bundle_dir))
.with_image_config(cached.config.clone())
.with_hostname(slug.clone());
let oci_spec = builder
.build_spec_only(id, spec, &HashMap::new())
.await
.map_err(|e| AgentError::CreateFailed {
id: id.to_string(),
reason: format!("failed to build OCI spec on Windows host: {e}"),
})?;
let config_json =
serde_json::to_string_pretty(&oci_spec).map_err(|e| AgentError::CreateFailed {
id: id.to_string(),
reason: format!("failed to serialize OCI spec to JSON: {e}"),
})?;
let config_path = format!("{bundle_dir}/config.json");
let tee_sh_cmd = format!("tee {config_path} > /dev/null");
wsl_stdin_pipe(
&["-d", &self.config.distro, "--", "sh", "-c", &tee_sh_cmd],
config_json.as_bytes(),
)
.await
.map_err(|e| AgentError::CreateFailed {
id: id.to_string(),
reason: format!("failed to write config.json into WSL2 bundle: {e}"),
})?;
let log_path = self.log_path(id);
let create = self
.youki(&["--log", &log_path, "create", "--bundle", &bundle_dir, &slug])
.await?;
if !create.status.success() {
let stderr = String::from_utf8_lossy(&create.stderr).trim().to_string();
let _ = self.wsl("rm", &["-rf", &bundle_dir]).await;
return Err(AgentError::CreateFailed {
id: id.to_string(),
reason: format!(
"youki create failed (status {:?}): {stderr}",
create.status.code(),
),
});
}
self.bundle_roots
.write()
.await
.insert(id.clone(), bundle_dir);
Ok(())
}
async fn start_container(&self, id: &ContainerId) -> Result<()> {
self.setup_container_netns(id).await;
let slug = id_slug(id);
let output = self.youki(&["start", &slug]).await?;
if !output.status.success() {
self.teardown_container_netns(id).await;
return Err(AgentError::StartFailed {
id: id.to_string(),
reason: format!(
"youki start failed (status {:?}): {}",
output.status.code(),
String::from_utf8_lossy(&output.stderr).trim()
),
});
}
if let Ok(state) = self.query_state(id).await {
if let Some(pid) = state.pid {
self.pids.write().await.insert(id.clone(), pid);
}
}
Ok(())
}
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let slug = id_slug(id);
let term = self.youki(&["kill", "--all", &slug, "SIGTERM"]).await?;
if !term.status.success() {
let stderr = String::from_utf8_lossy(&term.stderr).to_ascii_lowercase();
if !(stderr.contains("stopped") || stderr.contains("not running")) {
return Err(youki_error("kill", &term));
}
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
match self.query_state(id).await {
Ok(state) if state.is_stopped() => return Ok(()),
Ok(_) => {}
Err(AgentError::NotFound { .. }) => return Ok(()),
Err(e) => return Err(e),
}
if tokio::time::Instant::now() >= deadline {
break;
}
tokio::time::sleep(WAIT_POLL_INTERVAL).await;
}
let kill = self.youki(&["kill", "--all", &slug, "SIGKILL"]).await?;
if !kill.status.success() {
let stderr = String::from_utf8_lossy(&kill.stderr).to_ascii_lowercase();
if !(stderr.contains("stopped") || stderr.contains("not running")) {
return Err(youki_error("kill", &kill));
}
}
Ok(())
}
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let slug = id_slug(id);
let output = self.youki(&["delete", &slug]).await?;
self.teardown_container_netns(id).await;
self.pids.write().await.remove(id);
self.ips.write().await.remove(id);
self.gateways.write().await.remove(id);
if let Some(bundle_dir) = self.bundle_roots.write().await.remove(id) {
if let Err(e) = self.wsl("rm", &["-rf", &bundle_dir]).await {
tracing::debug!(
container = %id,
bundle_dir = %bundle_dir,
error = %e,
"failed to remove WSL2 bundle dir; leaving for GC"
);
}
}
let log_path = self.log_path(id);
if let Err(e) = self.wsl("rm", &["-f", &log_path]).await {
tracing::debug!(
container = %id,
log_path = %log_path,
error = %e,
"failed to remove youki log file; leaving for GC"
);
}
if output.status.success() {
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr).to_ascii_lowercase();
if stderr.contains("does not exist") || stderr.contains("not found") {
return Ok(());
}
Err(youki_error("delete", &output))
}
}
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let state = self.query_state(id).await?;
Ok(state.as_container_state())
}
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let log_path = self.log_path(id);
let tail_str = tail.to_string();
let output = self.wsl("tail", &["-n", &tail_str, &log_path]).await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).to_ascii_lowercase();
if stderr.contains("no such file") || stderr.contains("cannot open") {
return Ok(Vec::new());
}
return Err(youki_error("tail", &output));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let now = chrono::Utc::now();
let source = LogSource::Container(id.to_string());
let service = Some(id.service.clone());
let entries = stdout
.lines()
.filter(|l| !l.is_empty())
.map(|line| LogEntry {
timestamp: now,
stream: LogStream::Stdout,
message: line.to_string(),
source: source.clone(),
service: service.clone(),
deployment: None,
})
.collect();
Ok(entries)
}
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
if cmd.is_empty() {
return Err(AgentError::InvalidSpec(
"exec command must not be empty".to_string(),
));
}
let slug = id_slug(id);
let mut args: Vec<&str> = vec!["exec", &slug, "--"];
args.extend(cmd.iter().map(String::as_str));
let output = self.youki(&args).await?;
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
let exit = output.status.code().unwrap_or(-1);
Ok((exit, stdout, stderr))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let slug = id_slug(id);
let output = self.youki(&["events", "--stats", &slug]).await;
let now = std::time::Instant::now();
match output {
Ok(out) if out.status.success() => Ok(parse_youki_stats(&out, now)),
_ => {
Ok(ContainerStats {
cpu_usage_usec: 0,
memory_bytes: 0,
memory_limit: u64::MAX,
timestamp: now,
})
}
}
}
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let start = tokio::time::Instant::now();
loop {
match self.query_state(id).await {
Ok(state) if state.is_stopped() => {
return Ok(state.exit_code.unwrap_or(0));
}
Ok(_) => {}
Err(e) => return Err(e),
}
if start.elapsed() >= WAIT_POLL_CAP {
return Err(AgentError::Timeout {
timeout: WAIT_POLL_CAP,
});
}
tokio::time::sleep(WAIT_POLL_INTERVAL).await;
}
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
self.container_logs(id, usize::MAX).await
}
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
if let Some(pid) = self.pids.read().await.get(id).copied() {
return Ok(Some(pid));
}
match self.query_state(id).await {
Ok(state) => {
if let Some(pid) = state.pid {
self.pids.write().await.insert(id.clone(), pid);
Ok(Some(pid))
} else {
Ok(None)
}
}
Err(AgentError::NotFound { .. }) => Ok(None),
Err(e) => Err(e),
}
}
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
if matches!(
self.netns.read().await.get(id),
Some(NetnsState::HostFallback)
) {
return Ok(None);
}
Ok(self.ips.read().await.get(id).copied())
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
Err(AgentError::Unsupported(
"list_images is not supported by the WSL2 delegate runtime \
(youki has no image registry; images are managed on the host)"
.to_string(),
))
}
async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
Err(AgentError::Unsupported(
"remove_image is not supported by the WSL2 delegate runtime".to_string(),
))
}
async fn prune_images(&self) -> Result<PruneResult> {
Err(AgentError::Unsupported(
"prune_images is not supported by the WSL2 delegate runtime".to_string(),
))
}
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
let slug = id_slug(id);
let output = self.youki(&["kill", &slug, &canonical]).await?;
if output.status.success() {
Ok(())
} else {
Err(youki_error("kill", &output))
}
}
async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
Err(AgentError::Unsupported(
"tag_image is not supported by the WSL2 delegate runtime".to_string(),
))
}
async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
Ok(ContainerInspectDetails::default())
}
async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
if cmd.is_empty() {
return Err(AgentError::InvalidSpec(
"exec command must not be empty".to_string(),
));
}
let slug = id_slug(id);
let mut argv: Vec<String> = Vec::with_capacity(6 + cmd.len());
argv.push("-d".to_string());
argv.push(self.config.distro.clone());
argv.push("--".to_string());
argv.push(self.config.youki_path.clone());
argv.push("exec".to_string());
argv.push(slug);
argv.push("--".to_string());
argv.extend(cmd.iter().cloned());
let mut child = tokio::process::Command::new("wsl.exe")
.args(&argv)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| AgentError::Network(format!("wsl.exe spawn for exec_stream: {e}")))?;
let stdout = child.stdout.take().ok_or_else(|| {
AgentError::Internal("wsl.exe child did not expose a stdout handle".to_string())
})?;
let stderr = child.stderr.take().ok_or_else(|| {
AgentError::Internal("wsl.exe child did not expose a stderr handle".to_string())
})?;
Ok(spawn_exec_event_stream(child, stdout, stderr))
}
}
async fn wsl_exec_in(distro: &str, cmd: &str, args: &[&str]) -> Result<Output> {
let mut wsl_args: Vec<&str> = vec!["-d", distro, "--", cmd];
wsl_args.extend_from_slice(args);
tokio::process::Command::new("wsl.exe")
.args(&wsl_args)
.output()
.await
.map_err(|e| AgentError::Network(format!("wsl.exe -d {distro} -- {cmd}: {e}")))
}
fn spawn_exec_event_stream(
mut child: tokio::process::Child,
stdout: tokio::process::ChildStdout,
stderr: tokio::process::ChildStderr,
) -> ExecEventStream {
let (tx, rx) = mpsc::channel::<ExecEvent>(128);
let tx_stdout = tx.clone();
let stdout_task = tokio::spawn(async move {
let mut reader = BufReader::new(stdout).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
if tx_stdout.send(ExecEvent::Stdout(line)).await.is_err() {
break;
}
}
Ok(None) => break,
Err(e) => {
tracing::warn!(error = %e, "exec_stream: stdout read error");
break;
}
}
}
});
let tx_stderr = tx.clone();
let stderr_task = tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
if tx_stderr.send(ExecEvent::Stderr(line)).await.is_err() {
break;
}
}
Ok(None) => break,
Err(e) => {
tracing::warn!(error = %e, "exec_stream: stderr read error");
break;
}
}
}
});
tokio::spawn(async move {
let _ = stdout_task.await;
let _ = stderr_task.await;
let exit_code = match child.wait().await {
Ok(status) => status.code().unwrap_or(-1),
Err(e) => {
tracing::warn!(
error = %e,
"wsl.exe exec_stream child wait failed; reporting exit -1"
);
-1
}
};
let _ = tx.send(ExecEvent::Exit(exit_code)).await;
});
Box::pin(ReceiverStream::new(rx))
}
async fn wsl_stdin_pipe(args: &[&str], stdin_bytes: &[u8]) -> std::io::Result<()> {
let mut child = tokio::process::Command::new("wsl.exe")
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
if let Some(mut stdin) = child.stdin.take() {
stdin.write_all(stdin_bytes).await?;
stdin.shutdown().await?;
} else {
return Err(std::io::Error::other(
"wsl.exe child did not expose a stdin handle",
));
}
let output = child.wait_with_output().await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(std::io::Error::other(format!(
"wsl.exe exited with {:?}: {}",
output.status.code(),
stderr.trim()
)));
}
Ok(())
}
fn decompress_layer(data: &[u8], media_type: &str) -> std::io::Result<Vec<u8>> {
let compression = CompressionType::detect(media_type, data);
match compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Gzip => {
let mut out = Vec::with_capacity(data.len());
let mut decoder = flate2::read::GzDecoder::new(data);
decoder.read_to_end(&mut out)?;
Ok(out)
}
CompressionType::Zstd => {
let mut out = Vec::with_capacity(data.len());
let mut decoder = zstd::stream::Decoder::new(data)?;
decoder.read_to_end(&mut out)?;
Ok(out)
}
}
}
fn youki_error(subcommand: &str, output: &Output) -> AgentError {
let status = output.status.code();
let stderr = String::from_utf8_lossy(&output.stderr);
AgentError::Network(format!(
"youki {subcommand} failed (status {status:?}): {}",
stderr.trim()
))
}
fn id_slug(id: &ContainerId) -> String {
let raw = id.to_string();
raw.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'-'
}
})
.collect()
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct YoukiState {
status: String,
#[serde(default)]
pid: Option<u32>,
#[serde(default)]
exit_code: Option<i32>,
}
impl YoukiState {
fn is_stopped(&self) -> bool {
self.status.eq_ignore_ascii_case("stopped")
}
fn as_container_state(&self) -> ContainerState {
match self.status.to_ascii_lowercase().as_str() {
"creating" => ContainerState::Pending,
"created" => ContainerState::Initializing,
"running" => ContainerState::Running,
"stopped" => ContainerState::Exited {
code: self.exit_code.unwrap_or(0),
},
other => ContainerState::Failed {
reason: format!("unknown youki state: {other}"),
},
}
}
}
fn parse_youki_state(output: &Output) -> Result<YoukiState> {
let stdout = std::str::from_utf8(&output.stdout)
.map_err(|e| AgentError::Internal(format!("youki state: stdout not utf-8: {e}")))?;
serde_json::from_str::<YoukiState>(stdout.trim()).map_err(|e| {
AgentError::Internal(format!(
"youki state: failed to parse JSON: {e} (raw: {:?})",
stdout.chars().take(256).collect::<String>()
))
})
}
fn parse_youki_stats(output: &Output, timestamp: std::time::Instant) -> ContainerStats {
let raw = String::from_utf8_lossy(&output.stdout);
let v: serde_json::Value = match serde_json::from_str(raw.trim()) {
Ok(v) => v,
Err(_) => {
return ContainerStats {
cpu_usage_usec: 0,
memory_bytes: 0,
memory_limit: u64::MAX,
timestamp,
};
}
};
let cpu_ns = v
.pointer("/cpu/usage/total")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
let cpu_usage_usec = cpu_ns / 1_000;
let memory_bytes = v
.pointer("/memory/usage/usage")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0);
let memory_limit = v
.pointer("/memory/usage/limit")
.and_then(serde_json::Value::as_u64)
.unwrap_or(u64::MAX);
ContainerStats {
cpu_usage_usec,
memory_bytes,
memory_limit,
timestamp,
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
#[cfg(not(target_os = "windows"))]
use std::os::unix::process::ExitStatusExt;
#[cfg(target_os = "windows")]
use std::os::windows::process::ExitStatusExt;
use std::process::ExitStatus;
use std::sync::Mutex as StdMutex;
fn cid(service: &str, replica: u32) -> ContainerId {
ContainerId {
service: service.to_string(),
replica,
}
}
#[cfg(target_os = "windows")]
fn make_exit_status(code: i32) -> ExitStatus {
#[allow(clippy::cast_sign_loss)]
let raw = code as u32;
ExitStatus::from_raw(raw)
}
#[cfg(not(target_os = "windows"))]
fn make_exit_status(code: i32) -> ExitStatus {
ExitStatus::from_raw(code << 8)
}
fn fake_output(stdout: &str, code: i32) -> Output {
Output {
status: make_exit_status(code),
stdout: stdout.as_bytes().to_vec(),
stderr: Vec::new(),
}
}
fn fake_output_err(stderr: &str, code: i32) -> Output {
Output {
status: make_exit_status(code),
stdout: Vec::new(),
stderr: stderr.as_bytes().to_vec(),
}
}
type CallLog = Arc<StdMutex<Vec<(String, Vec<String>)>>>;
struct RecordingRunner {
calls: CallLog,
responses: StdMutex<HashMap<String, Output>>,
}
impl RecordingRunner {
fn new() -> Self {
Self {
calls: Arc::new(StdMutex::new(Vec::new())),
responses: StdMutex::new(HashMap::new()),
}
}
fn calls_handle(&self) -> CallLog {
Arc::clone(&self.calls)
}
fn key(cmd: &str, args: &[&str]) -> String {
let mut k = String::from(cmd);
for a in args {
k.push(' ');
k.push_str(a);
}
k
}
fn set_response(&self, cmd: &str, args: &[&str], output: Output) {
self.responses
.lock()
.expect("responses mutex poisoned")
.insert(Self::key(cmd, args), output);
}
}
#[async_trait]
impl WslRunner for RecordingRunner {
async fn run(&self, cmd: &str, args: &[&str]) -> anyhow::Result<Output> {
self.calls.lock().expect("calls mutex poisoned").push((
cmd.to_string(),
args.iter().map(|s| (*s).to_string()).collect(),
));
let key = Self::key(cmd, args);
if let Some(out) = self
.responses
.lock()
.expect("responses mutex poisoned")
.remove(&key)
{
Ok(out)
} else {
Ok(fake_output("", 0))
}
}
}
fn default_resolved_config() -> ResolvedConfig {
ResolvedConfig {
distro: zlayer_wsl::distro::DISTRO_NAME.to_string(),
youki_path: "youki".to_string(),
bundle_root: DEFAULT_BUNDLE_ROOT.to_string(),
log_root: DEFAULT_LOG_ROOT.to_string(),
}
}
fn test_runtime_with_runner(
config: ResolvedConfig,
runner: Arc<dyn WslRunner>,
) -> Wsl2DelegateRuntime {
Wsl2DelegateRuntime {
config,
pids: Arc::new(RwLock::new(HashMap::new())),
ips: Arc::new(RwLock::new(HashMap::new())),
bundle_roots: Arc::new(RwLock::new(HashMap::new())),
image_cache: Arc::new(RwLock::new(HashMap::new())),
gateways: Arc::new(RwLock::new(HashMap::new())),
netns: Arc::new(RwLock::new(HashMap::new())),
runner,
}
}
fn test_runtime(config: ResolvedConfig) -> Wsl2DelegateRuntime {
test_runtime_with_runner(config, Arc::new(DefaultWslRunner))
}
fn make_runtime(runner: Arc<dyn WslRunner>) -> Wsl2DelegateRuntime {
test_runtime_with_runner(default_resolved_config(), runner)
}
#[test]
fn id_slug_sanitizes_special_chars() {
assert_eq!(id_slug(&cid("web", 0)), "web-rep-0");
let weird = cid("my svc/with:quotes", 3);
let slug = id_slug(&weird);
assert!(
slug.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_'),
"slug must be alnum/dash/underscore only: {slug}"
);
assert!(
slug.contains("my-svc"),
"slug should preserve alnum: {slug}"
);
assert!(slug.ends_with("-rep-3"), "slug should keep replica: {slug}");
}
#[test]
fn parse_youki_state_running() {
let raw = r#"{"ociVersion":"1.0.2","id":"web-rep-0","status":"running","pid":12345,"bundle":"/var/lib/zlayer/bundles/web-rep-0"}"#;
let out = fake_output(raw, 0);
let state = parse_youki_state(&out).expect("valid state JSON");
assert_eq!(state.status, "running");
assert_eq!(state.pid, Some(12345));
assert_eq!(state.exit_code, None);
assert!(!state.is_stopped());
assert_eq!(state.as_container_state(), ContainerState::Running);
}
#[test]
fn parse_youki_state_stopped() {
let raw = r#"{"ociVersion":"1.0.2","id":"job-rep-0","status":"stopped","exitCode":42}"#;
let out = fake_output(raw, 0);
let state = parse_youki_state(&out).expect("valid state JSON");
assert_eq!(state.status, "stopped");
assert_eq!(state.exit_code, Some(42));
assert!(state.is_stopped());
assert_eq!(
state.as_container_state(),
ContainerState::Exited { code: 42 }
);
}
#[test]
fn parse_youki_state_creating_maps_to_pending() {
let raw = r#"{"ociVersion":"1.0.2","id":"x","status":"creating"}"#;
let out = fake_output(raw, 0);
let state = parse_youki_state(&out).expect("valid state JSON");
assert_eq!(state.as_container_state(), ContainerState::Pending);
}
#[test]
fn parse_youki_state_unknown_maps_to_failed() {
let raw = r#"{"ociVersion":"1.0.2","id":"x","status":"paused"}"#;
let out = fake_output(raw, 0);
let state = parse_youki_state(&out).expect("valid state JSON");
assert!(matches!(
state.as_container_state(),
ContainerState::Failed { .. }
));
}
#[test]
fn parse_youki_stats_handles_full_payload() {
let raw = r#"{"cpu":{"usage":{"total":2500000000}},"memory":{"usage":{"usage":104857600,"limit":268435456}}}"#;
let out = fake_output(raw, 0);
let stats = parse_youki_stats(&out, std::time::Instant::now());
assert_eq!(stats.cpu_usage_usec, 2_500_000);
assert_eq!(stats.memory_bytes, 104_857_600);
assert_eq!(stats.memory_limit, 268_435_456);
}
#[test]
fn parse_youki_stats_defaults_on_malformed() {
let out = fake_output("not json", 0);
let stats = parse_youki_stats(&out, std::time::Instant::now());
assert_eq!(stats.cpu_usage_usec, 0);
assert_eq!(stats.memory_bytes, 0);
assert_eq!(stats.memory_limit, u64::MAX);
}
#[tokio::test]
async fn record_container_ip_then_get_returns_it() {
let runtime = make_runtime(Arc::new(RecordingRunner::new()));
let id = cid("web", 2);
let ip = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 42));
assert_eq!(runtime.get_container_ip(&id).await.unwrap(), None);
runtime.record_container_ip(&id, ip).await;
assert_eq!(runtime.get_container_ip(&id).await.unwrap(), Some(ip));
}
#[test]
fn decompress_layer_plain_tar_is_noop() {
let payload = b"ustar-tar-payload".to_vec();
let out = decompress_layer(&payload, zlayer_registry::unpack::media_types::TAR)
.expect("plain tar is a no-op");
assert_eq!(out, payload);
}
#[test]
fn decompress_layer_gzip_roundtrip() {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write as _;
let original: &[u8] = b"hello-wsl2-linux-layer-bytes";
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(original).unwrap();
let compressed = encoder.finish().unwrap();
let out = decompress_layer(&compressed, zlayer_registry::unpack::media_types::TAR_GZIP)
.expect("gzip decompression must succeed");
assert_eq!(out, original);
}
#[test]
fn decompress_layer_honors_magic_bytes_when_media_type_unknown() {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write as _;
let original: &[u8] = b"magic-byte-fallback";
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(original).unwrap();
let compressed = encoder.finish().unwrap();
let out = decompress_layer(&compressed, "application/octet-stream")
.expect("fallback-to-magic path must still decompress");
assert_eq!(out, original);
}
#[tokio::test]
async fn create_container_no_longer_returns_unsupported() {
use zlayer_spec::DeploymentSpec;
let runtime = test_runtime(default_resolved_config());
let id = cid("svc", 0);
let yaml = r"
version: v1
deployment: wsl2-g2-test
services:
svc:
rtype: service
image:
name: docker.io/library/alpine:3.19
endpoints:
- name: http
protocol: http
port: 8080
";
let spec = serde_yaml::from_str::<DeploymentSpec>(yaml)
.expect("valid deployment yaml")
.services
.remove("svc")
.expect("service 'svc' present");
let err = runtime.create_container(&id, &spec).await.unwrap_err();
assert!(
!matches!(err, AgentError::Unsupported(_)),
"create_container must not return Unsupported after G-2 (got {err:?})",
);
}
#[tokio::test]
async fn exec_stream_pump_yields_line_events_then_exit() {
use futures_util::stream::StreamExt as _;
let mut child = tokio::process::Command::new("cmd.exe")
.args(["/c", "echo a & echo b & exit 7"])
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("cmd.exe must be available on Windows test hosts");
let stdout = child.stdout.take().expect("piped stdout");
let stderr = child.stderr.take().expect("piped stderr");
let mut stream = spawn_exec_event_stream(child, stdout, stderr);
let mut stdout_lines: Vec<String> = Vec::new();
let mut exit_code: Option<i32> = None;
while let Some(ev) = stream.next().await {
match ev {
ExecEvent::Stdout(line) => stdout_lines.push(line.trim().to_string()),
ExecEvent::Stderr(_) => {}
ExecEvent::Exit(code) => {
exit_code = Some(code);
break;
}
}
}
assert_eq!(
stdout_lines,
vec!["a".to_string(), "b".to_string()],
"stdout should have been split into one event per line",
);
assert_eq!(
exit_code,
Some(7),
"terminal Exit event must carry the child's exit code",
);
}
#[test]
fn log_path_uses_configured_log_root() {
let runtime = test_runtime(ResolvedConfig {
distro: "zlayer".to_string(),
youki_path: "youki".to_string(),
bundle_root: "/custom/bundles".to_string(),
log_root: "/custom/logs".to_string(),
});
let id = cid("web", 7);
assert_eq!(
runtime.log_path(&id),
"/custom/logs/web-rep-7.youki.log",
"log_path must be <log_root>/<slug>.youki.log",
);
}
#[test]
fn log_path_uses_default_log_root_by_default() {
let runtime = test_runtime(default_resolved_config());
let id = cid("svc", 0);
assert!(
runtime.log_path(&id).starts_with(DEFAULT_LOG_ROOT),
"default log_path should live under DEFAULT_LOG_ROOT ({DEFAULT_LOG_ROOT}), got {}",
runtime.log_path(&id),
);
}
#[test]
fn default_config_matches_previous_hardcoded_values() {
let cfg = Wsl2DelegateConfig::default();
assert_eq!(cfg.distro, zlayer_wsl::distro::DISTRO_NAME);
assert_eq!(cfg.youki_path, None);
assert_eq!(cfg.bundle_root, DEFAULT_BUNDLE_ROOT);
assert_eq!(cfg.log_root, DEFAULT_LOG_ROOT);
}
#[test]
fn custom_config_propagates_into_runtime_fields() {
let runtime = test_runtime(ResolvedConfig {
distro: "ubuntu-lts".to_string(),
youki_path: "/opt/youki/bin/youki".to_string(),
bundle_root: "/srv/zlayer/bundles".to_string(),
log_root: "/srv/zlayer/logs".to_string(),
});
let id = cid("api", 0);
assert_eq!(runtime.config.distro, "ubuntu-lts");
assert_eq!(runtime.config.youki_path, "/opt/youki/bin/youki");
assert_eq!(
runtime.bundle_dir(&id),
"/srv/zlayer/bundles/api-rep-0",
"bundle_dir should use the configured bundle_root",
);
assert_eq!(
runtime.log_path(&id),
"/srv/zlayer/logs/api-rep-0.youki.log",
"log_path should use the configured log_root",
);
}
#[tokio::test]
async fn setup_container_netns_emits_full_sequence() {
let runner = Arc::new(RecordingRunner::new());
let calls = runner.calls_handle();
let runtime = make_runtime(runner.clone());
let id = cid("web", 0);
let ip = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 42));
let gw = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
runtime.record_container_ip(&id, ip).await;
runtime.record_container_gateway(&id, gw).await;
runtime.setup_container_netns(&id).await;
let recorded = calls.lock().unwrap().clone();
let joined: Vec<String> = recorded
.iter()
.map(|(c, a)| format!("{} {}", c, a.join(" ")))
.collect();
let must_contain = [
"ip netns add web-rep-0",
"type veth peer name",
"netns web-rep-0",
"ip addr add 10.200.0.42/24 dev eth0",
"ip route add default via 10.200.0.1",
];
for needle in must_contain {
assert!(
joined.iter().any(|c| c.contains(needle)),
"expected a command matching {needle:?} in {joined:?}"
);
}
let netns = runtime.netns.read().await;
match netns.get(&id) {
Some(NetnsState::Configured {
host_iface,
ip: got_ip,
}) => {
assert_eq!(*got_ip, ip);
assert!(
host_iface.starts_with("zl-") && host_iface.len() <= 15,
"host_iface must be a zl- prefixed IFNAMSIZ-safe name, got {host_iface:?}"
);
}
other => panic!("expected Configured, got {other:?}"),
}
}
#[tokio::test]
async fn setup_container_netns_no_ip_is_noop() {
let runner = Arc::new(RecordingRunner::new());
let calls = runner.calls_handle();
let runtime = make_runtime(runner.clone());
let id = cid("web", 1);
runtime.setup_container_netns(&id).await;
assert!(
calls.lock().unwrap().is_empty(),
"no IP => no wsl.exe calls"
);
assert!(runtime.netns.read().await.get(&id).is_none());
}
#[tokio::test]
async fn setup_container_netns_failure_falls_back_to_host_network() {
let runner = Arc::new(RecordingRunner::new());
let runtime = make_runtime(runner.clone());
let id = cid("web", 7);
let ip = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 99));
runtime.record_container_ip(&id, ip).await;
let slug = "web-rep-7";
let host_iface = make_interface_name(&[slug], "h");
let cont_iface = make_interface_name(&[slug], "c");
runner.set_response(
"ip",
&[
"link",
"add",
&host_iface,
"type",
"veth",
"peer",
"name",
&cont_iface,
],
fake_output_err("RTNETLINK answers: Operation not permitted", 2),
);
runtime.setup_container_netns(&id).await;
assert!(matches!(
runtime.netns.read().await.get(&id),
Some(NetnsState::HostFallback)
));
assert_eq!(runtime.get_container_ip(&id).await.unwrap(), None);
}
#[tokio::test]
async fn teardown_container_netns_deletes_veth_and_ns() {
let runner = Arc::new(RecordingRunner::new());
let calls = runner.calls_handle();
let runtime = make_runtime(runner.clone());
let id = cid("web", 0);
let ip = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 42));
let gw = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
runtime.record_container_ip(&id, ip).await;
runtime.record_container_gateway(&id, gw).await;
runtime.setup_container_netns(&id).await;
calls.lock().unwrap().clear();
runtime.teardown_container_netns(&id).await;
let recorded = calls.lock().unwrap().clone();
let joined: Vec<String> = recorded
.iter()
.map(|(c, a)| format!("{} {}", c, a.join(" ")))
.collect();
assert!(
joined.iter().any(|c| c.starts_with("ip link delete")),
"teardown must delete the host veth, got {joined:?}"
);
assert!(
joined.iter().any(|c| c == "ip netns delete web-rep-0"),
"teardown must delete the netns, got {joined:?}"
);
assert!(runtime.netns.read().await.get(&id).is_none());
}
#[tokio::test]
async fn setup_container_netns_idempotent_on_existing_ns() {
let runner = Arc::new(RecordingRunner::new());
let runtime = make_runtime(runner.clone());
let id = cid("web", 3);
let ip = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 42));
runtime.record_container_ip(&id, ip).await;
runner.set_response(
"ip",
&["netns", "add", "web-rep-3"],
fake_output_err(
"Cannot create namespace file \"/run/netns/web-rep-3\": File exists",
1,
),
);
runtime.setup_container_netns(&id).await;
assert!(matches!(
runtime.netns.read().await.get(&id),
Some(NetnsState::Configured { .. })
));
}
}