#![cfg(target_os = "windows")]
use std::time::Duration;
use reqwest::StatusCode;
use serde_json::Value as JsonValue;
const DEPLOYMENT_NAME: &str = "k4-mixed-workload";
const LINUX_SVC: &str = "svc-linux";
const WIN_SVC: &str = "svc-win";
const OVERLAY_DNS_SUFFIX: &str = "overlay.local";
const DEFAULT_DAEMON_ADDR: &str = "http://127.0.0.1:3669";
const LINUX_IMAGE: &str = "alpine:3.19";
const WINDOWS_IMAGE: &str = "mcr.microsoft.com/windows/nanoserver:ltsc2022";
const RUNNING_TIMEOUT: Duration = Duration::from_secs(600);
const POLL_INTERVAL: Duration = Duration::from_secs(5);
const HTTP_REQ_TIMEOUT: Duration = Duration::from_secs(30);
fn deployment_yaml() -> String {
format!(
r"
version: v1
deployment: {DEPLOYMENT_NAME}
services:
{LINUX_SVC}:
rtype: service
image:
name: {LINUX_IMAGE}
platform:
os: linux
arch: amd64
scale:
mode: fixed
replicas: 1
command:
entrypoint:
- sh
- -c
- 'while true; do sleep 3600; done'
{WIN_SVC}:
rtype: service
image:
name: {WINDOWS_IMAGE}
platform:
os: windows
arch: amd64
scale:
mode: fixed
replicas: 1
command:
entrypoint:
- cmd.exe
- /c
- 'ping -n 999999 127.0.0.1 >nul'
",
)
}
fn linux_peer_addr() -> Option<String> {
std::env::var("ZLAYER_LINUX_PEER")
.ok()
.filter(|v| !v.is_empty())
}
fn local_daemon_addr() -> String {
std::env::var("ZLAYER_DAEMON_ADDR")
.ok()
.filter(|v| !v.is_empty())
.unwrap_or_else(|| DEFAULT_DAEMON_ADDR.to_string())
}
fn api_token() -> Option<String> {
std::env::var("ZLAYER_API_TOKEN")
.ok()
.filter(|v| !v.is_empty())
}
fn http_client() -> reqwest::Client {
reqwest::Client::builder()
.timeout(HTTP_REQ_TIMEOUT)
.build()
.expect("reqwest client should build with default TLS backend")
}
async fn send_json(
client: &reqwest::Client,
req: reqwest::RequestBuilder,
context: &str,
) -> Result<(StatusCode, String), String> {
let req = if let Some(t) = api_token() {
req.bearer_auth(t)
} else {
req
};
let resp = req
.send()
.await
.map_err(|e| format!("{context}: request failed: {e}"))?;
let status = resp.status();
let text = resp
.text()
.await
.map_err(|e| format!("{context}: reading body failed: {e}"))?;
let _ = client;
Ok((status, text))
}
async fn submit_deployment(client: &reqwest::Client, base: &str) -> Result<JsonValue, String> {
let url = format!("{base}/api/v1/deployments");
let body = serde_json::json!({ "spec": deployment_yaml() });
let req = client.post(&url).json(&body);
let (status, text) = send_json(client, req, "POST /api/v1/deployments").await?;
if !status.is_success() {
return Err(format!(
"POST /api/v1/deployments returned {status}: {text}"
));
}
serde_json::from_str::<JsonValue>(&text)
.map_err(|e| format!("POST /api/v1/deployments body was not JSON: {e}; body={text}"))
}
async fn get_deployment(
client: &reqwest::Client,
base: &str,
name: &str,
) -> Result<Option<JsonValue>, String> {
let url = format!("{base}/api/v1/deployments/{name}");
let req = client.get(&url);
let (status, text) = send_json(client, req, "GET /api/v1/deployments/{name}").await?;
if status == StatusCode::NOT_FOUND {
return Ok(None);
}
if !status.is_success() {
return Err(format!(
"GET /api/v1/deployments/{name} returned {status}: {text}"
));
}
let parsed = serde_json::from_str::<JsonValue>(&text).map_err(|e| {
format!("GET /api/v1/deployments/{name} body was not JSON: {e}; body={text}")
})?;
Ok(Some(parsed))
}
async fn wait_for_running(client: &reqwest::Client, base: &str, name: &str) -> Result<(), String> {
let deadline = std::time::Instant::now() + RUNNING_TIMEOUT;
let mut last_status = String::from("<no response>");
while std::time::Instant::now() < deadline {
match get_deployment(client, base, name).await {
Ok(Some(dep)) => {
last_status = dep
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("<missing>")
.to_string();
if last_status.eq_ignore_ascii_case("running") {
return Ok(());
}
if last_status.eq_ignore_ascii_case("failed") {
return Err(format!(
"deployment {name} reached terminal Failed status: {dep}"
));
}
}
Ok(None) => {
return Err(format!(
"deployment {name} disappeared (404) while waiting for Running"
));
}
Err(e) => {
eprintln!("wait_for_running: transient error: {e}");
}
}
tokio::time::sleep(POLL_INTERVAL).await;
}
Err(format!(
"deployment {name} did not reach Running within {RUNNING_TIMEOUT:?}; last status: {last_status}"
))
}
async fn exec_in_service(
client: &reqwest::Client,
base: &str,
deployment: &str,
service: &str,
cmd: &[&str],
) -> Result<JsonValue, String> {
let url = format!("{base}/api/v1/deployments/{deployment}/services/{service}/exec");
let body = serde_json::json!({ "command": cmd });
let req = client.post(&url).json(&body);
let (status, text) = send_json(client, req, "POST .../exec").await?;
if !status.is_success() {
return Err(format!(
"exec on {deployment}/{service} returned {status}: {text}"
));
}
serde_json::from_str::<JsonValue>(&text)
.map_err(|e| format!("exec body was not JSON: {e}; body={text}"))
}
async fn cleanup_deployment(client: &reqwest::Client, base: &str, name: &str) {
let url = format!("{base}/api/v1/deployments/{name}");
let req = client.delete(&url);
match send_json(client, req, "DELETE /api/v1/deployments/{name}").await {
Ok((status, text)) if status.is_success() || status == StatusCode::NOT_FOUND => {
eprintln!("cleanup: DELETE {name} -> {status}");
let _ = text;
}
Ok((status, text)) => {
eprintln!("cleanup: DELETE {name} returned {status}: {text}");
}
Err(e) => eprintln!("cleanup: DELETE {name} failed: {e}"),
}
cleanup_hcn_leftovers();
}
fn cleanup_hcn_leftovers() {
let owned = match zlayer_hns::attach::list_owned_endpoints("zlayer") {
Ok(list) => list,
Err(e) => {
eprintln!("cleanup: list_owned_endpoints(\"zlayer\") failed: {e}");
return;
}
};
for (endpoint_id, name) in owned {
if !name.contains(DEPLOYMENT_NAME) {
continue;
}
if let Err(e) = zlayer_hns::endpoint::Endpoint::delete(endpoint_id) {
eprintln!("cleanup: HcnDeleteEndpoint({endpoint_id:?}, name={name}) failed: {e}");
}
}
}
fn exec_output(resp: &JsonValue) -> String {
let stdout = resp
.get("stdout")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if stdout.is_empty() {
resp.get("stderr")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string()
} else {
stdout
}
}
async fn setup_or_skip() -> Option<(reqwest::Client, String)> {
let Some(peer) = linux_peer_addr() else {
eprintln!(
"SKIP: ZLAYER_LINUX_PEER env var not set; this test needs a real \
Linux peer reachable from the Windows host"
);
return None;
};
eprintln!("using Linux peer: {peer}");
let base = local_daemon_addr();
let client = http_client();
let health_url = format!("{base}/health");
match client.get(&health_url).send().await {
Ok(resp) if resp.status().is_success() => {
eprintln!("local daemon {base} /health -> {}", resp.status());
}
Ok(resp) => {
eprintln!(
"SKIP: local daemon {base} /health returned {}; start the daemon before running this test",
resp.status()
);
return None;
}
Err(e) => {
eprintln!(
"SKIP: could not reach local daemon at {base}: {e}; start the daemon before running this test"
);
return None;
}
}
Some((client, base))
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires real Windows host with Wintun + zlayer WSL2 distro + a reachable Linux peer via ZLAYER_LINUX_PEER env var"]
async fn cluster_mixed_workload_reach_running() {
let Some((client, base)) = setup_or_skip().await else {
return;
};
let deploy_result = submit_deployment(&client, &base).await;
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
deploy_result
.as_ref()
.expect("POST /api/v1/deployments must succeed for a mixed-OS spec");
}));
let running_check: Result<(), String> = if deploy_result.is_ok() {
wait_for_running(&client, &base, DEPLOYMENT_NAME)
.await
.map(|()| {
eprintln!(
"PASS: deployment {DEPLOYMENT_NAME} reports Running (both Linux + Windows services)"
);
})
} else {
Ok(())
};
cleanup_deployment(&client, &base, DEPLOYMENT_NAME).await;
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
if let Err(msg) = running_check {
panic!("{msg}");
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires real Windows host with Wintun + zlayer WSL2 distro + a reachable Linux peer via ZLAYER_LINUX_PEER env var"]
async fn cluster_mixed_workload_overlay_dns_resolves() {
let Some((client, base)) = setup_or_skip().await else {
return;
};
let deploy_result = submit_deployment(&client, &base).await;
let body: Result<(), String> = async {
deploy_result
.as_ref()
.map_err(|e| format!("deploy failed: {e}"))?;
wait_for_running(&client, &base, DEPLOYMENT_NAME).await?;
let linux_target = format!("{WIN_SVC}.{OVERLAY_DNS_SUFFIX}");
let lin_out = exec_in_service(
&client,
&base,
DEPLOYMENT_NAME,
LINUX_SVC,
&["nslookup", &linux_target],
)
.await?;
let lin_text = exec_output(&lin_out);
if !lin_text.contains(&linux_target) && !lin_text.contains("Address") {
return Err(format!(
"nslookup from Linux did not resolve {linux_target}: {lin_text}"
));
}
eprintln!("PASS: Linux container resolved {linux_target}");
let win_target = format!("{LINUX_SVC}.{OVERLAY_DNS_SUFFIX}");
let win_out = exec_in_service(
&client,
&base,
DEPLOYMENT_NAME,
WIN_SVC,
&["nslookup", &win_target],
)
.await?;
let win_text = exec_output(&win_out);
if !win_text.contains(&win_target) && !win_text.contains("Address") {
return Err(format!(
"nslookup from Windows did not resolve {win_target}: {win_text}"
));
}
eprintln!("PASS: Windows container resolved {win_target}");
Ok(())
}
.await;
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
body.as_ref()
.expect("overlay DNS resolution must succeed on both sides");
}));
cleanup_deployment(&client, &base, DEPLOYMENT_NAME).await;
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
if let Err(msg) = body {
panic!("{msg}");
}
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "requires real Windows host with Wintun + zlayer WSL2 distro + a reachable Linux peer via ZLAYER_LINUX_PEER env var"]
async fn cluster_mixed_workload_overlay_ping() {
let Some((client, base)) = setup_or_skip().await else {
return;
};
let deploy_result = submit_deployment(&client, &base).await;
let body: Result<(), String> = async {
deploy_result
.as_ref()
.map_err(|e| format!("deploy failed: {e}"))?;
wait_for_running(&client, &base, DEPLOYMENT_NAME).await?;
let win_target = format!("{WIN_SVC}.{OVERLAY_DNS_SUFFIX}");
let lin_out = exec_in_service(
&client,
&base,
DEPLOYMENT_NAME,
LINUX_SVC,
&["ping", "-c", "1", "-W", "5", &win_target],
)
.await?;
let lin_exit = lin_out
.get("exit_code")
.and_then(JsonValue::as_i64)
.unwrap_or(-1);
if lin_exit != 0 {
return Err(format!(
"Linux ping -> {win_target} exited {lin_exit}: {}",
exec_output(&lin_out)
));
}
eprintln!("PASS: Linux pinged {win_target}");
let lin_target = format!("{LINUX_SVC}.{OVERLAY_DNS_SUFFIX}");
let win_out = exec_in_service(
&client,
&base,
DEPLOYMENT_NAME,
WIN_SVC,
&["ping", "-n", "1", "-w", "5000", &lin_target],
)
.await?;
let win_exit = win_out
.get("exit_code")
.and_then(JsonValue::as_i64)
.unwrap_or(-1);
if win_exit != 0 {
return Err(format!(
"Windows ping -> {lin_target} exited {win_exit}: {}",
exec_output(&win_out)
));
}
eprintln!("PASS: Windows pinged {lin_target}");
Ok(())
}
.await;
let assertion_outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
body.as_ref()
.expect("overlay ping must succeed in both directions");
}));
cleanup_deployment(&client, &base, DEPLOYMENT_NAME).await;
if let Err(p) = assertion_outcome {
std::panic::resume_unwind(p);
}
if let Err(msg) = body {
panic!("{msg}");
}
}