#![cfg(all(target_os = "linux", feature = "youki-runtime"))]
#![allow(deprecated)]
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use zlayer_agent::{
AgentError, ContainerId, ContainerState, HealthChecker, OverlayManager, ProxyManager,
ProxyManagerConfig, Runtime, ServiceInstance, ServiceManager, YoukiConfig, YoukiRuntime,
};
use zlayer_overlay::DnsServer;
use zlayer_proxy::ServiceRegistry;
use zlayer_spec::{DeploymentSpec, HealthCheck, ServiceSpec};
macro_rules! with_timeout {
($timeout_secs:expr, $body:expr) => {{
tokio::time::timeout(std::time::Duration::from_secs($timeout_secs), async move {
$body
})
.await
.expect(concat!(
"Test timed out after ",
stringify!($timeout_secs),
" seconds"
))
}};
}
const E2E_TEST_DIR: &str = "/tmp/zlayer-youki-e2e-test";
const ALPINE_IMAGE: &str = "docker.io/library/alpine:latest";
#[allow(unsafe_code)]
fn has_root_privileges() -> bool {
unsafe { libc::geteuid() == 0 }
}
macro_rules! skip_without_root {
() => {
if !has_root_privileges() {
eprintln!("Skipping test: root privileges required for container operations");
return;
}
};
}
#[allow(clippy::cast_possible_truncation)]
fn unique_name(prefix: &str) -> String {
use rand::Rng;
let suffix: u32 = rand::rng().random_range(10000..99999);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
% 1_000_000;
format!("{prefix}-{timestamp}-{suffix}")
}
async fn wait_for_state(
runtime: &dyn Runtime,
id: &ContainerId,
expected: ContainerState,
timeout: Duration,
) -> Result<(), String> {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(100);
while start.elapsed() < timeout {
match runtime.container_state(id).await {
Ok(state) => {
if state == expected {
return Ok(());
}
if matches!(
(&state, &expected),
(ContainerState::Exited { .. }, ContainerState::Exited { .. })
) {
return Ok(());
}
}
Err(AgentError::NotFound { .. })
if matches!(expected, ContainerState::Exited { .. }) =>
{
return Ok(());
}
Err(e) => {
return Err(format!("Error getting container state: {e}"));
}
}
tokio::time::sleep(poll_interval).await;
}
Err(format!(
"Timeout waiting for container {id:?} to reach state {expected:?}"
))
}
#[allow(dead_code)]
async fn wait_for_port(addr: &str, timeout: Duration) -> Result<(), String> {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(100);
while start.elapsed() < timeout {
if tokio::net::TcpStream::connect(addr).await.is_ok() {
return Ok(());
}
tokio::time::sleep(poll_interval).await;
}
Err(format!(
"Timeout waiting for port {addr} to become available"
))
}
async fn create_e2e_runtime() -> Result<YoukiRuntime, AgentError> {
let test_dir = PathBuf::from(E2E_TEST_DIR);
let config = YoukiConfig {
state_dir: test_dir.join("state"),
rootfs_dir: test_dir.join("rootfs"),
bundle_dir: test_dir.join("bundles"),
cache_dir: test_dir.join("cache"),
volume_dir: test_dir.join("volumes"),
use_systemd: false,
cache_type: None,
log_base_dir: Some(test_dir.join("logs")),
deployment_name: Some("e2e-test".to_string()),
};
YoukiRuntime::new(config, None).await
}
#[allow(dead_code)]
fn create_test_spec(image: &str, port: u16) -> ServiceSpec {
let yaml = format!(
r"
version: v1
deployment: e2e-test
services:
test:
rtype: service
image:
name: {image}
endpoints:
- name: http
protocol: http
port: {port}
scale:
mode: fixed
replicas: 1
health:
check:
type: tcp
port: {port}
retries: 3
"
);
serde_yaml::from_str::<DeploymentSpec>(&yaml)
.expect("Failed to parse test spec")
.services
.remove("test")
.expect("Missing test service")
}
fn create_alpine_spec() -> ServiceSpec {
let yaml = r"
version: v1
deployment: e2e-test
services:
alpine:
rtype: service
image:
name: docker.io/library/alpine:latest
endpoints:
- name: dummy
protocol: tcp
port: 8080
scale:
mode: fixed
replicas: 1
";
serde_yaml::from_str::<DeploymentSpec>(yaml)
.expect("Failed to parse alpine spec")
.services
.remove("alpine")
.expect("Missing alpine service")
}
fn create_nginx_spec() -> ServiceSpec {
let yaml = r"
version: v1
deployment: e2e-test
services:
nginx:
rtype: service
image:
name: docker.io/library/nginx:alpine
endpoints:
- name: http
protocol: http
port: 80
expose: public
scale:
mode: fixed
replicas: 1
health:
check:
type: tcp
port: 80
retries: 3
";
serde_yaml::from_str::<DeploymentSpec>(yaml)
.expect("Failed to parse nginx spec")
.services
.remove("nginx")
.expect("Missing nginx service")
}
struct ContainerGuard {
runtime: Arc<dyn Runtime + Send + Sync>,
id: ContainerId,
}
impl ContainerGuard {
fn new(runtime: Arc<dyn Runtime + Send + Sync>, id: ContainerId) -> Self {
Self { runtime, id }
}
}
impl Drop for ContainerGuard {
fn drop(&mut self) {
let runtime = self.runtime.clone();
let id = self.id.clone();
tokio::spawn(async move {
let _ = runtime.stop_container(&id, Duration::from_secs(5)).await;
let _ = runtime.remove_container(&id).await;
});
}
}
#[tokio::test]
async fn test_container_lifecycle() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let service_name = unique_name("lifecycle");
let id = ContainerId {
service: service_name.clone(),
replica: 1,
};
let spec = create_alpine_spec();
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
println!("Pulling image: {ALPINE_IMAGE}");
let pull_result = runtime.pull_image(ALPINE_IMAGE).await;
assert!(pull_result.is_ok(), "Failed to pull image: {pull_result:?}");
println!("Creating container: {id}");
let create_result = runtime.create_container(&id, &spec).await;
assert!(
create_result.is_ok(),
"Failed to create container: {create_result:?}"
);
let state = runtime.container_state(&id).await;
assert!(state.is_ok(), "Failed to get container state: {state:?}");
assert_eq!(state.unwrap(), ContainerState::Pending);
println!("Starting container: {id}");
let start_result = runtime.start_container(&id).await;
assert!(
start_result.is_ok(),
"Failed to start container: {start_result:?}"
);
let wait_result = wait_for_state(
runtime.as_ref(),
&id,
ContainerState::Running,
Duration::from_secs(30),
)
.await;
assert!(
wait_result.is_ok(),
"Container did not reach Running state: {}",
wait_result.unwrap_err()
);
println!("Stopping container: {id}");
let stop_result = runtime.stop_container(&id, Duration::from_secs(10)).await;
assert!(
stop_result.is_ok(),
"Failed to stop container: {stop_result:?}"
);
println!("Removing container: {id}");
let remove_result = runtime.remove_container(&id).await;
assert!(
remove_result.is_ok(),
"Failed to remove container: {remove_result:?}"
);
let state = runtime.container_state(&id).await;
assert!(state.is_err(), "Container should not exist after removal");
});
}
#[tokio::test]
async fn test_service_scaling() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let service_name = unique_name("scale");
let spec = create_alpine_spec();
let manager = ServiceManager::new(runtime.clone());
let upsert_result = Box::pin(manager.upsert_service(service_name.clone(), spec)).await;
assert!(
upsert_result.is_ok(),
"Failed to upsert service: {upsert_result:?}"
);
println!("Scaling {service_name} to 2 replicas");
let scale_result = manager.scale_service(&service_name, 2).await;
assert!(scale_result.is_ok(), "Failed to scale up: {scale_result:?}");
let count = manager.service_replica_count(&service_name).await;
assert!(count.is_ok(), "Failed to get replica count: {count:?}");
assert_eq!(count.unwrap(), 2, "Expected 2 replicas after scale up");
println!("Scaling {service_name} to 1 replica");
let scale_result = manager.scale_service(&service_name, 1).await;
assert!(
scale_result.is_ok(),
"Failed to scale down: {scale_result:?}"
);
let count = manager.service_replica_count(&service_name).await;
assert!(count.is_ok(), "Failed to get replica count: {count:?}");
assert_eq!(count.unwrap(), 1, "Expected 1 replica after scale down");
let _ = manager.scale_service(&service_name, 0).await;
});
}
#[tokio::test]
async fn test_health_checks_tcp() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let service_name = unique_name("health");
let id = ContainerId {
service: service_name.clone(),
replica: 1,
};
let _spec = create_nginx_spec();
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
let health_check = HealthCheck::Tcp { port: 80 };
let checker = HealthChecker::new(health_check, None);
let check_result = checker.check(&id, Duration::from_secs(5)).await;
println!("Health check result: {check_result:?}");
let _ = runtime.stop_container(&id, Duration::from_secs(10)).await;
let _ = runtime.remove_container(&id).await;
});
}
#[tokio::test]
async fn test_proxy_routing() {
with_timeout!(180, {
skip_without_root!();
let service_name = unique_name("proxy");
let spec = create_nginx_spec();
let port: u16 = 30000 + (rand::random::<u16>() % 10000);
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let config = ProxyManagerConfig::new(addr);
let service_registry = Arc::new(ServiceRegistry::new());
let manager = ProxyManager::new(config, service_registry, None);
manager.add_service(&service_name, &spec).await;
assert!(
manager.has_service(&service_name).await,
"Service should be registered"
);
let route_count = manager.route_count().await;
assert!(route_count > 0, "Should have at least one route");
let backend_addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let backend_addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();
manager.add_backend(&service_name, backend_addr1).await;
manager.add_backend(&service_name, backend_addr2).await;
let lb = manager.load_balancer();
assert_eq!(lb.backend_count(&service_name), 2, "Should have 2 backends");
manager
.update_backend_health(&service_name, backend_addr1, false)
.await;
assert_eq!(
lb.healthy_count(&service_name),
1,
"One backend should be unhealthy"
);
manager.remove_backend(&service_name, backend_addr1).await;
assert_eq!(
lb.backend_count(&service_name),
1,
"Should have 1 backend after removal"
);
manager.remove_service(&service_name).await;
assert!(
!manager.has_service(&service_name).await,
"Service should be removed"
);
});
}
#[tokio::test]
async fn test_container_logs() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let service_name = unique_name("logs");
let id = ContainerId {
service: service_name.clone(),
replica: 1,
};
let spec = create_alpine_spec();
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
runtime
.pull_image(ALPINE_IMAGE)
.await
.expect("Failed to pull");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create");
runtime.start_container(&id).await.expect("Failed to start");
wait_for_state(
runtime.as_ref(),
&id,
ContainerState::Running,
Duration::from_secs(30),
)
.await
.ok();
tokio::time::sleep(Duration::from_secs(1)).await;
let logs_result = runtime.container_logs(&id, 100).await;
println!("Logs result: {logs_result:?}");
match logs_result {
Ok(logs) => {
let logs_text: String = logs
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join("\n");
println!("Container logs:\n{logs_text}");
}
Err(e) => {
println!("Could not get logs (expected if container exited): {e}");
}
}
let _ = runtime.stop_container(&id, Duration::from_secs(5)).await;
let _ = runtime.remove_container(&id).await;
});
}
#[tokio::test]
async fn test_remove_nonexistent_is_idempotent() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => r,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let id = ContainerId {
service: unique_name("nonexistent"),
replica: 999,
};
println!("Attempting to remove non-existent container: {id}");
let result = runtime.remove_container(&id).await;
assert!(
result.is_ok(),
"remove_container should be idempotent: {result:?}"
);
println!("remove_container succeeded (idempotent behavior)");
});
}
#[tokio::test]
async fn test_error_state_nonexistent() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => r,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let id = ContainerId {
service: unique_name("ghost"),
replica: 1,
};
let result = runtime.container_state(&id).await;
assert!(result.is_err(), "Should fail for non-existent container");
match result {
Err(AgentError::NotFound { .. }) => {
println!("Got expected NotFound error for container state");
}
Err(other) => {
println!("Got different error: {other:?}");
}
Ok(state) => panic!("Should not get state for non-existent container, got: {state:?}"),
}
});
}
#[tokio::test]
async fn test_concurrent_containers() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
runtime
.pull_image(ALPINE_IMAGE)
.await
.expect("Failed to pull image");
let container_count = 3;
let base_name = unique_name("concurrent");
let spec = create_alpine_spec();
let mut handles = Vec::new();
for i in 0..container_count {
let runtime_clone = runtime.clone();
let spec_clone = spec.clone();
let name = base_name.clone();
handles.push(tokio::spawn(async move {
let id = ContainerId {
service: format!("{name}-{i}"),
replica: 1,
};
let create_result = runtime_clone.create_container(&id, &spec_clone).await;
(id, create_result)
}));
}
let mut created_ids = Vec::new();
for handle in handles {
let (id, result) = handle.await.expect("Task panicked");
if result.is_ok() {
created_ids.push(id);
} else {
eprintln!("Failed to create container: {result:?}");
}
}
println!("Created {} containers concurrently", created_ids.len());
for id in created_ids {
let _ = runtime.remove_container(&id).await;
}
});
}
#[tokio::test]
async fn test_service_instance_lifecycle() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let service_name = unique_name("instance");
let spec = create_alpine_spec();
let instance = ServiceInstance::new(service_name.clone(), spec, runtime.clone(), None);
assert_eq!(instance.replica_count().await, 0);
println!("Scaling {service_name} to 1 replica via ServiceInstance");
let scale_result = instance.scale_to(1).await;
assert!(
scale_result.is_ok(),
"Failed to scale to 1: {scale_result:?}"
);
assert_eq!(instance.replica_count().await, 1);
let ids = instance.container_ids().await;
assert_eq!(ids.len(), 1, "Should have 1 container ID");
println!("Container IDs: {ids:?}");
println!("Scaling {service_name} to 0 replicas");
let scale_result = instance.scale_to(0).await;
assert!(
scale_result.is_ok(),
"Failed to scale to 0: {scale_result:?}"
);
assert_eq!(instance.replica_count().await, 0);
});
}
#[tokio::test]
async fn test_cleanup_state_directory() {
with_timeout!(180, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let service_name = unique_name("cleanup");
let id = ContainerId {
service: service_name.clone(),
replica: 1,
};
let spec = create_alpine_spec();
runtime
.pull_image(ALPINE_IMAGE)
.await
.expect("Failed to pull");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create");
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
runtime.start_container(&id).await.expect("Failed to start");
tokio::time::sleep(Duration::from_millis(500)).await;
let state_dir = format!("{E2E_TEST_DIR}/state/{}-{}", id.service, id.replica);
let exists_before = tokio::fs::metadata(&state_dir).await.is_ok();
println!("State directory {state_dir} exists before removal: {exists_before}");
let _ = runtime.stop_container(&id, Duration::from_secs(5)).await;
runtime
.remove_container(&id)
.await
.expect("Failed to remove");
let exists_after = tokio::fs::metadata(&state_dir).await.is_ok();
println!("State directory {state_dir} exists after removal: {exists_after}");
});
}
async fn create_test_overlay_manager() -> Option<Arc<tokio::sync::RwLock<OverlayManager>>> {
if !has_root_privileges() {
return None;
}
match OverlayManager::new("e2e-test".to_string()).await {
Ok(manager) => Some(Arc::new(tokio::sync::RwLock::new(manager))),
Err(e) => {
eprintln!("Could not create overlay manager: {e}");
None
}
}
}
fn create_test_dns_server() -> Option<(Arc<DnsServer>, SocketAddr)> {
let port: u16 = 50000 + (rand::random::<u16>() % 10000);
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
match DnsServer::new(addr, "service.local.") {
Ok(server) => {
let listen_addr = server.listen_addr();
Some((Arc::new(server), listen_addr))
}
Err(e) => {
eprintln!("Could not create DNS server: {e}");
None
}
}
}
fn create_test_proxy_manager() -> Arc<ProxyManager> {
let port: u16 = 30000 + (rand::random::<u16>() % 10000);
let addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let config = ProxyManagerConfig::new(addr);
let service_registry = Arc::new(ServiceRegistry::new());
Arc::new(ProxyManager::new(config, service_registry, None))
}
#[tokio::test]
async fn test_service_instance_full_integration() {
with_timeout!(300, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let overlay = create_test_overlay_manager().await;
let dns = create_test_dns_server();
let proxy = create_test_proxy_manager();
let service_name = unique_name("integration");
let spec = create_nginx_spec();
proxy.add_service(&service_name, &spec).await;
assert!(
proxy.has_service(&service_name).await,
"Service should be registered with proxy"
);
let mut instance = ServiceInstance::new(
service_name.clone(),
spec.clone(),
runtime.clone(),
overlay.clone(),
);
if let Some((dns_server, dns_addr)) = &dns {
instance.set_dns_server(dns_server.clone());
println!("DNS server configured at {dns_addr}");
}
instance.set_proxy_manager(proxy.clone());
println!("Scaling {service_name} to 1 replica");
let scale_result = instance.scale_to(1).await;
assert!(
scale_result.is_ok(),
"scale_to failed: {:?}",
scale_result.err()
);
assert_eq!(instance.replica_count().await, 1, "Should have 1 replica");
{
let containers = instance.containers().read().await;
for (id, container) in containers.iter() {
println!("Container {id}: overlay_ip = {:?}", container.overlay_ip);
if overlay.is_some() {
if container.overlay_ip.is_some() {
println!(
"Container {id} has overlay IP: {}",
container.overlay_ip.unwrap()
);
let backend_addr = SocketAddr::new(
container.overlay_ip.unwrap(),
spec.endpoints[0].target_port(),
);
proxy.add_backend(&service_name, backend_addr).await;
}
}
}
}
let route_count = proxy.route_count().await;
println!("Proxy route count: {route_count}");
assert!(route_count > 0, "Should have at least one route");
println!("Scaling {service_name} to 2 replicas");
let scale_result = instance.scale_to(2).await;
assert!(
scale_result.is_ok(),
"scale_to(2) failed: {:?}",
scale_result.err()
);
assert_eq!(instance.replica_count().await, 2, "Should have 2 replicas");
println!("Scaling {service_name} to 0 replicas");
let scale_result = instance.scale_to(0).await;
assert!(
scale_result.is_ok(),
"scale_to(0) failed: {:?}",
scale_result.err()
);
assert_eq!(instance.replica_count().await, 0, "Should have 0 replicas");
proxy.remove_service(&service_name).await;
assert!(
!proxy.has_service(&service_name).await,
"Service should be removed from proxy"
);
});
}
#[tokio::test]
async fn test_health_callback_updates_proxy() {
with_timeout!(300, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let proxy = create_test_proxy_manager();
let service_name = unique_name("health-cb");
let spec = create_nginx_spec();
proxy.add_service(&service_name, &spec).await;
let mut instance =
ServiceInstance::new(service_name.clone(), spec.clone(), runtime.clone(), None);
instance.set_proxy_manager(proxy.clone());
instance.scale_to(1).await.expect("scale_to failed");
let test_backend: SocketAddr = "10.200.0.10:80".parse().unwrap();
proxy.add_backend(&service_name, test_backend).await;
let lb = proxy.load_balancer();
assert_eq!(lb.backend_count(&service_name), 1, "Should have 1 backend");
let initial_healthy = lb.healthy_count(&service_name);
println!("Initial healthy count: {initial_healthy}");
proxy
.update_backend_health(&service_name, test_backend, false)
.await;
let unhealthy_count = lb.healthy_count(&service_name);
println!("Healthy count after marking unhealthy: {unhealthy_count}");
assert_eq!(unhealthy_count, 0, "Backend should be marked as unhealthy");
proxy
.update_backend_health(&service_name, test_backend, true)
.await;
let healthy_count = lb.healthy_count(&service_name);
println!("Healthy count after marking healthy: {healthy_count}");
assert_eq!(healthy_count, 1, "Backend should be marked as healthy");
instance.scale_to(0).await.expect("scale down failed");
proxy.remove_service(&service_name).await;
});
}
#[tokio::test]
async fn test_health_callback_updates_proxy_ipv6() {
with_timeout!(300, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let proxy = create_test_proxy_manager();
let service_name = unique_name("health-cb-v6");
let spec = create_nginx_spec();
proxy.add_service(&service_name, &spec).await;
let mut instance =
ServiceInstance::new(service_name.clone(), spec.clone(), runtime.clone(), None);
instance.set_proxy_manager(proxy.clone());
instance.scale_to(1).await.expect("scale_to failed");
let test_backend: SocketAddr =
SocketAddr::new(IpAddr::V6("fd00:200::10".parse::<Ipv6Addr>().unwrap()), 80);
proxy.add_backend(&service_name, test_backend).await;
let lb = proxy.load_balancer();
assert_eq!(
lb.backend_count(&service_name),
1,
"Should have 1 IPv6 backend"
);
proxy
.update_backend_health(&service_name, test_backend, false)
.await;
assert_eq!(
lb.healthy_count(&service_name),
0,
"IPv6 backend should be marked as unhealthy"
);
proxy
.update_backend_health(&service_name, test_backend, true)
.await;
assert_eq!(
lb.healthy_count(&service_name),
1,
"IPv6 backend should be marked as healthy"
);
instance.scale_to(0).await.expect("scale down failed");
proxy.remove_service(&service_name).await;
});
}
#[tokio::test]
async fn test_dns_record_lifecycle_ipv6() {
with_timeout!(60, {
let Some((dns_server, dns_addr)) = create_test_dns_server() else {
eprintln!("Could not create DNS server, skipping test");
return;
};
println!("DNS server created at {dns_addr}");
let test_ip: IpAddr = "fd00:200::42".parse().unwrap();
let hostname = "myservice-v6.service.local";
let add_result = dns_server.add_record(hostname, test_ip).await;
assert!(
add_result.is_ok(),
"Failed to add IPv6 DNS record: {:?}",
add_result.err()
);
println!("Added AAAA DNS record: {hostname} -> {test_ip}");
let replica_hostname = "1.myservice-v6.service.local";
let replica_ip: IpAddr = "fd00:200::43".parse().unwrap();
let add_result = dns_server.add_record(replica_hostname, replica_ip).await;
assert!(
add_result.is_ok(),
"Failed to add replica IPv6 DNS record: {:?}",
add_result.err()
);
println!("Added replica AAAA DNS record: {replica_hostname} -> {replica_ip}");
let remove_result = dns_server.remove_record(replica_hostname).await;
assert!(
remove_result.is_ok(),
"Failed to remove IPv6 DNS record: {:?}",
remove_result.err()
);
println!("Removed IPv6 DNS record: {replica_hostname}");
let remove_result = dns_server.remove_record(hostname).await;
assert!(
remove_result.is_ok(),
"Failed to remove service IPv6 DNS record: {:?}",
remove_result.err()
);
println!("Removed service IPv6 DNS record: {hostname}");
});
}
#[tokio::test]
async fn test_dns_record_lifecycle() {
with_timeout!(60, {
let Some((dns_server, dns_addr)) = create_test_dns_server() else {
eprintln!("Could not create DNS server, skipping test");
return;
};
println!("DNS server created at {dns_addr}");
let test_ip = Ipv4Addr::new(10, 200, 0, 42);
let hostname = "myservice.service.local";
let add_result = dns_server.add_record(hostname, IpAddr::V4(test_ip)).await;
assert!(
add_result.is_ok(),
"Failed to add DNS record: {:?}",
add_result.err()
);
println!("Added DNS record: {hostname} -> {test_ip}");
let replica_hostname = "1.myservice.service.local";
let replica_ip = Ipv4Addr::new(10, 200, 0, 43);
let add_result = dns_server
.add_record(replica_hostname, IpAddr::V4(replica_ip))
.await;
assert!(
add_result.is_ok(),
"Failed to add replica DNS record: {:?}",
add_result.err()
);
println!("Added replica DNS record: {replica_hostname} -> {replica_ip}");
let remove_result = dns_server.remove_record(replica_hostname).await;
assert!(
remove_result.is_ok(),
"Failed to remove DNS record: {:?}",
remove_result.err()
);
println!("Removed DNS record: {replica_hostname}");
let remove_result = dns_server.remove_record(hostname).await;
assert!(
remove_result.is_ok(),
"Failed to remove service DNS record: {:?}",
remove_result.err()
);
println!("Removed service DNS record: {hostname}");
});
}
#[tokio::test]
async fn test_dns_cleanup_on_scale_down() {
with_timeout!(300, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let Some((dns_server, dns_addr)) = create_test_dns_server() else {
eprintln!("Could not create DNS server, skipping test");
return;
};
println!("DNS server created at {dns_addr}");
let service_name = unique_name("dns-cleanup");
let spec = create_alpine_spec();
let mut instance = ServiceInstance::new(service_name.clone(), spec, runtime.clone(), None);
instance.set_dns_server(dns_server.clone());
println!("Scaling {service_name} to 2 replicas");
instance.scale_to(2).await.expect("scale_to(2) failed");
assert_eq!(instance.replica_count().await, 2);
println!("Scaling {service_name} to 1 replica");
instance.scale_to(1).await.expect("scale_to(1) failed");
assert_eq!(instance.replica_count().await, 1);
println!("Scaling {service_name} to 0 replicas");
instance.scale_to(0).await.expect("scale_to(0) failed");
assert_eq!(instance.replica_count().await, 0);
println!("DNS cleanup test completed successfully");
});
}
#[tokio::test]
async fn test_proxy_backend_lifecycle() {
with_timeout!(60, {
let proxy = create_test_proxy_manager();
let service_name = unique_name("proxy-lifecycle");
let spec = create_nginx_spec();
proxy.add_service(&service_name, &spec).await;
assert!(proxy.has_service(&service_name).await);
let backend1: SocketAddr = "10.200.0.10:80".parse().unwrap();
let backend2: SocketAddr = "10.200.0.11:80".parse().unwrap();
let backend3: SocketAddr = "10.200.0.12:80".parse().unwrap();
proxy.add_backend(&service_name, backend1).await;
proxy.add_backend(&service_name, backend2).await;
proxy.add_backend(&service_name, backend3).await;
let lb = proxy.load_balancer();
assert_eq!(lb.backend_count(&service_name), 3, "Should have 3 backends");
proxy
.update_backend_health(&service_name, backend2, false)
.await;
assert_eq!(
lb.healthy_count(&service_name),
2,
"Should have 2 healthy backends"
);
proxy.remove_backend(&service_name, backend1).await;
assert_eq!(
lb.backend_count(&service_name),
2,
"Should have 2 backends after removal"
);
proxy.remove_backend(&service_name, backend2).await;
assert_eq!(lb.backend_count(&service_name), 1, "Should have 1 backend");
assert_eq!(
lb.healthy_count(&service_name),
1,
"Should have 1 healthy backend"
);
proxy.remove_service(&service_name).await;
assert!(!proxy.has_service(&service_name).await);
});
}
#[tokio::test]
async fn test_proxy_backend_lifecycle_ipv6() {
with_timeout!(60, {
let proxy = create_test_proxy_manager();
let service_name = unique_name("proxy-v6-lifecycle");
let spec = create_nginx_spec();
proxy.add_service(&service_name, &spec).await;
assert!(proxy.has_service(&service_name).await);
let backend1: SocketAddr =
SocketAddr::new(IpAddr::V6("fd00:200::10".parse::<Ipv6Addr>().unwrap()), 80);
let backend2: SocketAddr =
SocketAddr::new(IpAddr::V6("fd00:200::11".parse::<Ipv6Addr>().unwrap()), 80);
let backend3: SocketAddr =
SocketAddr::new(IpAddr::V6("fd00:200::12".parse::<Ipv6Addr>().unwrap()), 80);
proxy.add_backend(&service_name, backend1).await;
proxy.add_backend(&service_name, backend2).await;
proxy.add_backend(&service_name, backend3).await;
let lb = proxy.load_balancer();
assert_eq!(
lb.backend_count(&service_name),
3,
"Should have 3 IPv6 backends"
);
proxy
.update_backend_health(&service_name, backend2, false)
.await;
assert_eq!(
lb.healthy_count(&service_name),
2,
"Should have 2 healthy IPv6 backends"
);
proxy.remove_backend(&service_name, backend1).await;
assert_eq!(
lb.backend_count(&service_name),
2,
"Should have 2 IPv6 backends after removal"
);
proxy.remove_backend(&service_name, backend2).await;
assert_eq!(
lb.backend_count(&service_name),
1,
"Should have 1 IPv6 backend"
);
assert_eq!(
lb.healthy_count(&service_name),
1,
"Should have 1 healthy IPv6 backend"
);
proxy.remove_service(&service_name).await;
assert!(!proxy.has_service(&service_name).await);
});
}
#[tokio::test]
async fn test_proxy_backend_lifecycle_dual_stack() {
with_timeout!(60, {
let proxy = create_test_proxy_manager();
let service_name = unique_name("proxy-dualstack");
let spec = create_nginx_spec();
proxy.add_service(&service_name, &spec).await;
let backend_v4: SocketAddr = "10.200.0.10:80".parse().unwrap();
let backend_v6: SocketAddr =
SocketAddr::new(IpAddr::V6("fd00:200::10".parse::<Ipv6Addr>().unwrap()), 80);
proxy.add_backend(&service_name, backend_v4).await;
proxy.add_backend(&service_name, backend_v6).await;
let lb = proxy.load_balancer();
assert_eq!(
lb.backend_count(&service_name),
2,
"Should have 2 backends (1 IPv4 + 1 IPv6)"
);
assert_eq!(
lb.healthy_count(&service_name),
2,
"Both backends should be healthy"
);
proxy
.update_backend_health(&service_name, backend_v6, false)
.await;
assert_eq!(
lb.healthy_count(&service_name),
1,
"Should have 1 healthy backend (IPv4)"
);
proxy
.update_backend_health(&service_name, backend_v4, false)
.await;
assert_eq!(
lb.healthy_count(&service_name),
0,
"Should have 0 healthy backends"
);
proxy
.update_backend_health(&service_name, backend_v4, true)
.await;
proxy
.update_backend_health(&service_name, backend_v6, true)
.await;
assert_eq!(
lb.healthy_count(&service_name),
2,
"Both backends should be healthy again"
);
proxy.remove_service(&service_name).await;
});
}
#[tokio::test]
async fn test_service_manager_full_integration() {
with_timeout!(300, {
skip_without_root!();
let runtime = match create_e2e_runtime().await {
Ok(r) => Arc::new(r) as Arc<dyn Runtime + Send + Sync>,
Err(e) => {
eprintln!("Failed to create runtime: {e}");
return;
}
};
let proxy = create_test_proxy_manager();
let dns = create_test_dns_server();
let mut manager = ServiceManager::new(runtime.clone());
manager.set_proxy_manager(proxy.clone());
if let Some((dns_server, dns_addr)) = dns {
manager.set_dns_server(dns_server);
println!("DNS server configured at {dns_addr}");
}
let service_name = unique_name("mgr-integration");
let spec = create_nginx_spec();
proxy.add_service(&service_name, &spec).await;
Box::pin(manager.upsert_service(service_name.clone(), spec))
.await
.expect("upsert_service failed");
println!("Scaling {service_name} to 2 replicas via ServiceManager");
manager
.scale_service(&service_name, 2)
.await
.expect("scale_service failed");
let count = manager
.service_replica_count(&service_name)
.await
.expect("service_replica_count failed");
assert_eq!(count, 2, "Should have 2 replicas");
assert!(proxy.has_service(&service_name).await);
println!("Scaling {service_name} to 0 replicas");
manager
.scale_service(&service_name, 0)
.await
.expect("scale down failed");
let count = manager
.service_replica_count(&service_name)
.await
.expect("service_replica_count failed");
assert_eq!(count, 0, "Should have 0 replicas");
proxy.remove_service(&service_name).await;
});
}