#![cfg(target_os = "windows")]
use std::sync::Arc;
use std::time::Duration;
use zlayer_agent::error::Result;
use zlayer_agent::runtime::{ContainerId, Runtime};
use zlayer_agent::runtimes::composite::CompositeRuntime;
use zlayer_agent::runtimes::hcs::{HcsConfig, HcsRuntime};
#[cfg(feature = "wsl")]
use zlayer_agent::runtimes::wsl2_delegate::Wsl2DelegateRuntime;
use zlayer_overlay::ipnet;
use zlayer_spec::{ArchKind, DeploymentSpec, OsKind, ServiceSpec, TargetPlatform};
const TEST_SLICE: &str = "10.220.99.0/28";
const TEST_OWNER_TAG: &str = "zlayer-e2e-test";
const WINDOWS_IMAGE: &str = "mcr.microsoft.com/windows/nanoserver:ltsc2022";
const LINUX_IMAGE: &str = "alpine:3.19";
fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
let yaml = format!(
r"
version: v1
deployment: composite-e2e
services:
test:
rtype: service
image:
name: {image}
endpoints:
- name: http
protocol: http
port: 8080
"
);
let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
.expect("valid deployment yaml")
.services
.remove("test")
.expect("service 'test' present");
spec.platform = platform;
spec
}
fn cid(service: &str, replica: u32) -> ContainerId {
ContainerId {
service: service.to_string(),
replica,
}
}
fn test_slice() -> ipnet::IpNet {
TEST_SLICE
.parse::<ipnet::IpNet>()
.expect("TEST_SLICE must be a valid CIDR")
}
fn test_hcs_config(storage_suffix: &str) -> HcsConfig {
HcsConfig {
storage_root: std::env::temp_dir()
.join("zlayer-composite-e2e")
.join(storage_suffix),
slice_cidr: Some(test_slice()),
..HcsConfig::default()
}
}
async fn try_build_composite(
storage_suffix: &str,
require_wsl: bool,
) -> Option<(CompositeRuntime, bool)> {
let hcs_cfg = test_hcs_config(storage_suffix);
let primary = match HcsRuntime::new(hcs_cfg).await {
Ok(rt) => Arc::new(rt) as Arc<dyn Runtime>,
Err(e) => {
eprintln!(
"SKIP: HcsRuntime::new failed ({e}); HCS may be unavailable \
(non-elevated, Hyper-V feature off, or vmcompute service down)"
);
return None;
}
};
#[cfg(feature = "wsl")]
let (delegate, wsl_available): (Option<Arc<dyn Runtime>>, bool) =
match Wsl2DelegateRuntime::try_new().await {
Ok(Some(d)) => (Some(Arc::new(d) as Arc<dyn Runtime>), true),
Ok(None) => {
if require_wsl {
eprintln!(
"SKIP: Wsl2DelegateRuntime::try_new returned None \
(WSL2 not installed or zlayer distro unavailable)"
);
return None;
}
(None, false)
}
Err(e) => {
if require_wsl {
eprintln!("SKIP: Wsl2DelegateRuntime::try_new failed: {e}");
return None;
}
eprintln!(
"note: Wsl2DelegateRuntime::try_new failed: {e}; continuing without delegate"
);
(None, false)
}
};
#[cfg(not(feature = "wsl"))]
let (delegate, wsl_available): (Option<Arc<dyn Runtime>>, bool) = {
if require_wsl {
eprintln!(
"SKIP: this build does not have the `wsl` feature enabled; \
rebuild with --features wsl to exercise the delegate path"
);
return None;
}
(None, false)
};
Some((CompositeRuntime::new(primary, delegate), wsl_available))
}
async fn best_effort_remove(rt: &CompositeRuntime, id: &ContainerId) {
if let Err(e) = rt.remove_container(id).await {
eprintln!(
"cleanup: remove_container({id}) failed: {e} \
(container may need manual teardown)"
);
}
}
#[cfg(feature = "wsl")]
async fn cleanup_distro_container(container_id_slug: &str) {
if let Err(e) = zlayer_wsl::distro::wsl_exec("youki", &["delete", container_id_slug]).await {
eprintln!("cleanup: youki delete {container_id_slug} failed: {e}");
}
}
#[cfg(not(feature = "wsl"))]
#[allow(clippy::unused_async)]
async fn cleanup_distro_container(_container_id_slug: &str) {}
fn cleanup_hcn_test_owner() {
let owned = match zlayer_hns::attach::list_owned_endpoints(TEST_OWNER_TAG) {
Ok(list) => list,
Err(e) => {
eprintln!("cleanup: list_owned_endpoints({TEST_OWNER_TAG}) failed: {e}");
return;
}
};
for (endpoint_id, name) in owned {
if let Err(e) = zlayer_hns::endpoint::Endpoint::delete(endpoint_id) {
eprintln!("cleanup: HcnDeleteEndpoint({endpoint_id:?}, name={name}) failed: {e}");
}
}
}
async fn assert_container_ip_in_slice(
rt: &CompositeRuntime,
id: &ContainerId,
slice: ipnet::IpNet,
) -> std::result::Result<std::net::IpAddr, String> {
let ip = match rt.get_container_ip(id).await {
Ok(Some(ip)) => ip,
Ok(None) => {
return Err(format!(
"container {id} has no IP — composite returned None from get_container_ip"
));
}
Err(e) => return Err(format!("get_container_ip({id}) failed: {e}")),
};
if !slice.contains(&ip) {
return Err(format!(
"container {id} IP {ip} is outside the expected test slice {slice}"
));
}
Ok(ip)
}
async fn hcs_has_system(hcs_id: &str) -> std::result::Result<bool, String> {
let systems = zlayer_hcs::enumerate::list_by_owner("zlayer")
.await
.map_err(|e| format!("HcsEnumerateComputeSystems failed: {e}"))?;
Ok(systems.iter().any(|s| s.id == hcs_id))
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "creates real HCS compute system + HCN endpoint; requires admin on Windows"]
async fn composite_dispatches_windows_spec_to_hcs() {
let Some((composite, _wsl_available)) = try_build_composite("win-dispatch", false).await else {
return;
};
let id = cid("win-svc", 0);
let spec = make_spec(
WINDOWS_IMAGE,
Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
);
if let Err(e) = composite.pull_image(&spec.image.name.to_string()).await {
eprintln!("SKIP: pull_image({WINDOWS_IMAGE}) failed: {e}");
return;
}
let create_result = composite.create_container(&id, &spec).await;
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
create_result.expect("create_container for Windows spec must succeed");
}));
let hcs_check: std::result::Result<(), String> = if assertion_outcome.is_ok() {
let runtime_for_async = &composite;
let id_for_async = &id;
async move {
if !hcs_has_system(&id_for_async.to_string()).await? {
return Err(format!(
"HCS list_by_owner(\"zlayer\") did not include {id_for_async} \
— composite did not dispatch to the HCS primary"
));
}
let ip =
assert_container_ip_in_slice(runtime_for_async, id_for_async, test_slice()).await?;
eprintln!("PASS: Windows container {id_for_async} has HCN endpoint IP {ip}");
Ok(())
}
.await
} else {
Ok(())
};
best_effort_remove(&composite, &id).await;
cleanup_hcn_test_owner();
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
if let Err(msg) = hcs_check {
panic!("{msg}");
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "creates real WSL2 youki container; requires the zlayer WSL2 distro + youki"]
async fn composite_dispatches_linux_spec_to_wsl2() {
let Some((composite, wsl_available)) = try_build_composite("linux-dispatch", true).await else {
return;
};
if !wsl_available {
eprintln!("SKIP: WSL2 not available on this host");
return;
}
let id = cid("lin-svc", 0);
let spec = make_spec(
LINUX_IMAGE,
Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
);
if let Err(e) = composite.pull_image(&spec.image.name.to_string()).await {
eprintln!("SKIP: pull_image({LINUX_IMAGE}) failed: {e}");
return;
}
let create_result = composite.create_container(&id, &spec).await;
let slug = format!("{}-rep-{}", id.service, id.replica);
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
create_result
.as_ref()
.expect("create_container for Linux spec must succeed via the WSL2 delegate (G-2)");
}));
let list_check: std::result::Result<(), String> = if create_result.is_ok() {
#[cfg(feature = "wsl")]
{
match zlayer_wsl::distro::wsl_exec("youki", &["list"]).await {
Ok(out) if out.status.success() => {
let stdout = String::from_utf8_lossy(&out.stdout);
if stdout.contains(&slug) {
eprintln!("PASS: youki list inside the zlayer distro includes {slug}");
Ok(())
} else {
Err(format!(
"youki list stdout does not mention {slug}: {stdout}"
))
}
}
Ok(out) => Err(format!(
"youki list failed (status {:?}): {}",
out.status.code(),
String::from_utf8_lossy(&out.stderr).trim()
)),
Err(e) => Err(format!("wsl.exe -d zlayer -- youki list: {e}")),
}
}
#[cfg(not(feature = "wsl"))]
{
let _ = slug;
Ok(())
}
} else {
Ok(())
};
if create_result.is_ok() {
best_effort_remove(&composite, &id).await;
}
cleanup_distro_container(&slug).await;
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
if let Err(msg) = list_check {
panic!("{msg}");
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "creates real HCS compute system + HCN endpoint; requires admin on Windows"]
async fn composite_falls_through_to_primary_when_no_platform_specified() {
let Some((composite, _wsl_available)) = try_build_composite("fallthrough", false).await else {
return;
};
let id = cid("fallthrough-svc", 0);
let spec = make_spec(WINDOWS_IMAGE, None);
if let Err(e) = composite.pull_image(&spec.image.name.to_string()).await {
eprintln!("SKIP: pull_image({WINDOWS_IMAGE}) failed: {e}");
return;
}
let create_result = composite.create_container(&id, &spec).await;
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
create_result.expect("create_container without platform must succeed on HCS primary");
}));
let hcs_check: std::result::Result<(), String> = if assertion_outcome.is_ok() {
async {
if !hcs_has_system(&id.to_string()).await? {
return Err(format!(
"HCS list_by_owner(\"zlayer\") did not include {id} \
— composite did not dispatch to the primary (Windows cache hit)"
));
}
eprintln!("PASS: no-platform Windows spec landed on HCS primary as expected");
Ok(())
}
.await
} else {
Ok(())
};
best_effort_remove(&composite, &id).await;
cleanup_hcn_test_owner();
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
if let Err(msg) = hcs_check {
panic!("{msg}");
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "creates real WSL2 youki container after a Linux image pull"]
async fn composite_uses_image_os_cache_after_pull() {
let Some((composite, wsl_available)) = try_build_composite("image-os-cache", true).await else {
return;
};
if !wsl_available {
eprintln!("SKIP: WSL2 not available on this host");
return;
}
let id = cid("cache-svc", 0);
let spec = make_spec(LINUX_IMAGE, None);
if let Err(e) = composite.pull_image(&spec.image.name.to_string()).await {
eprintln!("SKIP: pull_image({LINUX_IMAGE}) failed: {e}");
return;
}
let create_result = composite.create_container(&id, &spec).await;
let slug = format!("{}-rep-{}", id.service, id.replica);
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
create_result.as_ref().expect(
"image-OS cache hit for a Linux image must dispatch to the delegate and succeed (G-2)",
);
}));
let list_check: std::result::Result<(), String> = if create_result.is_ok() {
#[cfg(feature = "wsl")]
{
match zlayer_wsl::distro::wsl_exec("youki", &["list"]).await {
Ok(out) if out.status.success() => {
let stdout = String::from_utf8_lossy(&out.stdout);
if stdout.contains(&slug) {
eprintln!(
"PASS: cache-routed Linux spec created container {slug} in the distro"
);
Ok(())
} else {
Err(format!(
"youki list stdout does not mention {slug}: {stdout}"
))
}
}
Ok(out) => Err(format!(
"youki list failed (status {:?}): {}",
out.status.code(),
String::from_utf8_lossy(&out.stderr).trim()
)),
Err(e) => Err(format!("wsl.exe -d zlayer -- youki list: {e}")),
}
}
#[cfg(not(feature = "wsl"))]
{
let _ = slug;
Ok(())
}
} else {
Ok(())
};
if create_result.is_ok() {
best_effort_remove(&composite, &id).await;
}
cleanup_distro_container(&slug).await;
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
if let Err(msg) = list_check {
panic!("{msg}");
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "creates real HCS compute system; requires admin on Windows"]
async fn composite_create_then_stop_uses_same_runtime() {
let Some((composite, _wsl_available)) = try_build_composite("create-stop", false).await else {
return;
};
let id = cid("lifecycle-svc", 0);
let spec = make_spec(
WINDOWS_IMAGE,
Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
);
if let Err(e) = composite.pull_image(&spec.image.name.to_string()).await {
eprintln!("SKIP: pull_image({WINDOWS_IMAGE}) failed: {e}");
return;
}
if let Err(e) = composite.create_container(&id, &spec).await {
eprintln!("SKIP: create_container failed: {e}");
return;
}
let stop_result = composite.stop_container(&id, Duration::from_secs(5)).await;
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
stop_result.expect("stop_container must succeed on the same runtime that created");
}));
best_effort_remove(&composite, &id).await;
cleanup_hcn_test_owner();
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
}
#[allow(dead_code, clippy::extra_unused_type_parameters)]
fn _trait_sanity<R: Runtime + Send + Sync + 'static>() -> fn() -> Result<()> {
|| Ok(())
}