#![cfg(feature = "docker")]
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use zlayer_agent::runtimes::DockerRuntime;
use zlayer_agent::{ContainerId, ContainerState, Runtime};
use zlayer_spec::{
CommandSpec, ErrorsSpec, HealthCheck, HealthSpec, ImageSpec, InitSpec, NodeMode, PullPolicy,
ResourceType, ResourcesSpec, ScaleSpec, ServiceNetworkSpec, ServiceSpec, ServiceType,
};
const TEST_IMAGE: &str = "alpine:latest";
const LONG_TIMEOUT: Duration = Duration::from_secs(120);
const SHORT_TIMEOUT: Duration = Duration::from_secs(30);
async fn skip_if_no_docker() -> Option<DockerRuntime> {
DockerRuntime::new(None).await.ok()
}
fn unique_container_name(prefix: &str) -> String {
use rand::Rng;
let suffix: u32 = rand::rng().random_range(10000..99999);
#[allow(clippy::cast_possible_truncation)]
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
% 1_000_000;
format!("test-{prefix}-{timestamp}-{suffix}")
}
fn unique_container_id(prefix: &str) -> ContainerId {
ContainerId {
service: unique_container_name(prefix),
replica: 1,
}
}
fn create_test_spec(image: &str) -> ServiceSpec {
ServiceSpec {
rtype: ResourceType::Service,
schedule: None,
image: ImageSpec {
name: image.parse().expect("valid image reference"),
pull_policy: PullPolicy::IfNotPresent,
},
resources: ResourcesSpec::default(),
env: HashMap::new(),
command: CommandSpec::default(),
network: ServiceNetworkSpec::default(),
endpoints: vec![],
scale: ScaleSpec::default(),
depends: vec![],
health: HealthSpec {
start_grace: None,
interval: None,
timeout: None,
retries: 3,
check: HealthCheck::Tcp { port: 0 },
},
init: InitSpec::default(),
errors: ErrorsSpec::default(),
devices: vec![],
storage: vec![],
port_mappings: vec![],
capabilities: vec![],
privileged: false,
node_mode: NodeMode::default(),
node_selector: None,
service_type: ServiceType::default(),
wasm: None,
logs: None,
host_network: false,
hostname: None,
dns: Vec::new(),
extra_hosts: Vec::new(),
restart_policy: None,
platform: None,
}
}
fn create_echo_spec(message: &str) -> ServiceSpec {
let mut spec = create_test_spec(TEST_IMAGE);
spec.command = CommandSpec {
entrypoint: None,
args: Some(vec![
"sh".to_string(),
"-c".to_string(),
format!("echo '{}'", message),
]),
workdir: None,
};
spec
}
fn create_sleep_spec(seconds: u32) -> ServiceSpec {
let mut spec = create_test_spec(TEST_IMAGE);
spec.command = CommandSpec {
entrypoint: None,
args: Some(vec!["sleep".to_string(), seconds.to_string()]),
workdir: None,
};
spec
}
struct ContainerGuard {
runtime: Arc<DockerRuntime>,
id: ContainerId,
}
impl ContainerGuard {
fn new(runtime: Arc<DockerRuntime>, id: ContainerId) -> Self {
Self { runtime, id }
}
}
impl Drop for ContainerGuard {
fn drop(&mut self) {
let runtime = self.runtime.clone();
let id = self.id.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = runtime.stop_container(&id, Duration::from_secs(5)).await;
let _ = runtime.remove_container(&id).await;
});
}
}
}
async fn wait_for_state(
runtime: &DockerRuntime,
id: &ContainerId,
expected: ContainerState,
timeout: Duration,
) -> Result<ContainerState, String> {
let start = std::time::Instant::now();
let poll_interval = Duration::from_millis(100);
while start.elapsed() < timeout {
match runtime.container_state(id).await {
Ok(state) => {
match (&state, &expected) {
(ContainerState::Exited { .. }, ContainerState::Exited { .. }) => {
return Ok(state);
}
_ if state == expected => return Ok(state),
_ => {}
}
}
Err(e) => {
if start.elapsed() > Duration::from_secs(5) {
return Err(format!("Error getting container state: {e}"));
}
}
}
tokio::time::sleep(poll_interval).await;
}
Err(format!(
"Timeout waiting for container {id:?} to reach state {expected:?}"
))
}
#[tokio::test]
async fn test_docker_connection() {
let Some(_runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
println!("Successfully connected to Docker daemon");
}
#[tokio::test]
async fn test_pull_image() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
println!("Pulling image: {TEST_IMAGE}");
let result = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
match result {
Ok(Ok(())) => println!("Image pulled successfully"),
Ok(Err(e)) => panic!("Failed to pull image: {e}"),
Err(e) => panic!("Timeout pulling image: {e}"),
}
}
#[tokio::test]
async fn test_pull_image_if_not_present() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
println!("Ensuring image exists: {TEST_IMAGE}");
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
println!("Testing IfNotPresent policy (should skip pull)");
let start = std::time::Instant::now();
let result = tokio::time::timeout(
SHORT_TIMEOUT,
runtime.pull_image_with_policy(TEST_IMAGE, PullPolicy::IfNotPresent, None),
)
.await;
let elapsed = start.elapsed();
println!("IfNotPresent pull completed in {elapsed:?}");
match result {
Ok(Ok(())) => {
assert!(
elapsed < Duration::from_secs(5),
"IfNotPresent should be fast for existing images"
);
println!("IfNotPresent correctly skipped pull");
}
Ok(Err(e)) => panic!("Failed with IfNotPresent policy: {e}"),
Err(e) => panic!("Timeout with IfNotPresent policy: {e}"),
}
}
#[tokio::test]
async fn test_container_lifecycle() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("lifecycle");
let spec = create_sleep_spec(300);
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
println!("Creating container: {}", id.service);
let create_result = runtime.create_container(&id, &spec).await;
assert!(
create_result.is_ok(),
"Failed to create container: {create_result:?}"
);
let state = runtime.container_state(&id).await;
assert!(state.is_ok(), "Failed to get container state: {state:?}");
assert_eq!(
state.unwrap(),
ContainerState::Pending,
"Container should be Pending after create"
);
println!("Starting container: {}", id.service);
let start_result = runtime.start_container(&id).await;
assert!(
start_result.is_ok(),
"Failed to start container: {start_result:?}"
);
let wait_result = wait_for_state(&runtime, &id, ContainerState::Running, SHORT_TIMEOUT).await;
assert!(
wait_result.is_ok(),
"Container did not reach Running state: {}",
wait_result.unwrap_err()
);
println!("Container is running");
println!("Stopping container: {}", id.service);
let stop_result = runtime.stop_container(&id, Duration::from_secs(10)).await;
assert!(
stop_result.is_ok(),
"Failed to stop container: {stop_result:?}"
);
let state = runtime.container_state(&id).await;
assert!(state.is_ok(), "Failed to get container state after stop");
match state.unwrap() {
ContainerState::Exited { code } => {
println!("Container exited with code: {code}");
}
other => panic!("Expected Exited state, got: {other:?}"),
}
println!("Removing container: {}", id.service);
let remove_result = runtime.remove_container(&id).await;
assert!(
remove_result.is_ok(),
"Failed to remove container: {remove_result:?}"
);
let state = runtime.container_state(&id).await;
assert!(state.is_err(), "Container should not exist after removal");
println!("Container successfully removed");
}
#[tokio::test]
async fn test_container_logs() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("logs");
let test_message = "Hello from ZLayer Docker test!";
let spec = create_echo_spec(test_message);
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
println!("Creating container that outputs: {test_message}");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create container");
runtime
.start_container(&id)
.await
.expect("Failed to start container");
let _ = wait_for_state(
&runtime,
&id,
ContainerState::Exited { code: 0 },
SHORT_TIMEOUT,
)
.await;
println!("Retrieving container logs");
let logs = runtime
.container_logs(&id, 100)
.await
.expect("Failed to get logs");
let logs_text: String = logs
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join("\n");
println!("Logs: {logs_text}");
assert!(
logs.iter().any(|e| e.message.contains(test_message)),
"Logs should contain the test message"
);
let log_lines = runtime
.get_logs(&id)
.await
.expect("Failed to get log lines");
println!("Log lines: {log_lines:?}");
assert!(
log_lines
.iter()
.any(|entry| entry.message.contains(test_message)),
"Log lines should contain the test message"
);
}
#[tokio::test]
async fn test_container_exec() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("exec");
let spec = create_sleep_spec(300);
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
println!("Creating long-running container for exec test");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create container");
runtime
.start_container(&id)
.await
.expect("Failed to start container");
wait_for_state(&runtime, &id, ContainerState::Running, SHORT_TIMEOUT)
.await
.expect("Container did not start");
println!("Executing command inside container");
let cmd = vec!["echo".to_string(), "exec test output".to_string()];
let (exit_code, stdout, stderr) = runtime.exec(&id, &cmd).await.expect("Failed to exec");
println!("Exec result - exit_code: {exit_code}, stdout: {stdout}, stderr: {stderr}");
assert_eq!(exit_code, 0, "Exec should succeed with exit code 0");
assert!(
stdout.contains("exec test output"),
"Stdout should contain the echoed message"
);
println!("Testing command that writes to stderr");
let cmd = vec![
"sh".to_string(),
"-c".to_string(),
"echo 'stderr output' >&2".to_string(),
];
let (exit_code, stdout, stderr) = runtime.exec(&id, &cmd).await.expect("Failed to exec");
println!("Stderr exec - exit_code: {exit_code}, stdout: '{stdout}', stderr: '{stderr}'");
assert_eq!(exit_code, 0, "Exec should succeed");
assert!(
stderr.contains("stderr output"),
"Stderr should contain the error output"
);
println!("Testing command that fails");
let cmd = vec!["sh".to_string(), "-c".to_string(), "exit 42".to_string()];
let (exit_code, _, _) = runtime.exec(&id, &cmd).await.expect("Failed to exec");
assert_eq!(exit_code, 42, "Exec should return the expected exit code");
}
#[tokio::test]
async fn test_container_stats() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("stats");
let spec = create_sleep_spec(300);
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
println!("Creating container for stats test");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create container");
runtime
.start_container(&id)
.await
.expect("Failed to start container");
wait_for_state(&runtime, &id, ContainerState::Running, SHORT_TIMEOUT)
.await
.expect("Container did not start");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Getting container stats");
let stats = runtime
.get_container_stats(&id)
.await
.expect("Failed to get stats");
println!(
"Container stats - CPU usage: {} usec, Memory: {} bytes, Memory limit: {} bytes",
stats.cpu_usage_usec, stats.memory_bytes, stats.memory_limit
);
assert!(
stats.memory_bytes > 0,
"Memory usage should be greater than 0"
);
assert!(
stats.memory_limit > 0,
"Memory limit should be greater than 0"
);
}
#[tokio::test]
async fn test_wait_container_success() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("wait-success");
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
let mut spec = create_test_spec(TEST_IMAGE);
spec.command = CommandSpec {
entrypoint: None,
args: Some(vec![
"sh".to_string(),
"-c".to_string(),
"sleep 1 && exit 0".to_string(),
]),
workdir: None,
};
println!("Testing wait_container with exit code 0");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create container");
runtime
.start_container(&id)
.await
.expect("Failed to start container");
let exit_code = tokio::time::timeout(SHORT_TIMEOUT, runtime.wait_container(&id))
.await
.expect("Timeout waiting for container")
.expect("Failed to wait for container");
println!("Container exited with code: {exit_code}");
assert_eq!(exit_code, 0, "Expected exit code 0");
}
#[tokio::test]
async fn test_wait_container_failure() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("wait-nonzero");
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
let mut spec = create_test_spec(TEST_IMAGE);
spec.command = CommandSpec {
entrypoint: None,
args: Some(vec![
"sh".to_string(),
"-c".to_string(),
"sleep 2 && exit 42".to_string(),
]),
workdir: None,
};
println!("Creating and starting container for exit code 42 test");
println!("Container ID: {id:?}");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create container");
runtime
.start_container(&id)
.await
.expect("Failed to start container");
let state = runtime
.container_state(&id)
.await
.expect("Failed to get state after start");
println!("State after start: {state:?}");
println!("Waiting for container to exit...");
let exit_code = loop {
let state = runtime
.container_state(&id)
.await
.expect("Failed to get state");
match state {
ContainerState::Exited { code } => {
break code;
}
ContainerState::Running => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
other => {
panic!("Unexpected state: {other:?}");
}
}
};
println!("Container exited with code: {exit_code}");
assert_eq!(exit_code, 42, "Expected exit code 42");
}
#[tokio::test]
async fn test_wait_container_timing() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("wait-brief");
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
let mut spec = create_test_spec(TEST_IMAGE);
spec.command = CommandSpec {
entrypoint: None,
args: Some(vec!["sleep".to_string(), "1".to_string()]),
workdir: None,
};
println!("Testing wait_container with brief sleep");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create container");
runtime
.start_container(&id)
.await
.expect("Failed to start container");
let start = std::time::Instant::now();
let exit_code = tokio::time::timeout(SHORT_TIMEOUT, runtime.wait_container(&id))
.await
.expect("Timeout waiting for container")
.expect("Failed to wait for container");
let elapsed = start.elapsed();
println!("Container exited with code {exit_code} after {elapsed:?}");
assert_eq!(exit_code, 0, "Expected exit code 0");
assert!(
elapsed >= Duration::from_millis(900),
"Should have waited at least ~1 second"
);
}
#[tokio::test]
async fn test_concurrent_containers() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let container_count = 3;
let mut guards = Vec::new();
let mut ids = Vec::new();
println!("Creating {container_count} containers concurrently");
let mut create_handles = Vec::new();
for i in 0..container_count {
let runtime_clone = runtime.clone();
let id = unique_container_id(&format!("concurrent-{i}"));
let spec = create_sleep_spec(300);
let id_clone = id.clone();
ids.push(id.clone());
guards.push(ContainerGuard::new(runtime.clone(), id));
create_handles.push(tokio::spawn(async move {
runtime_clone.create_container(&id_clone, &spec).await
}));
}
for handle in create_handles {
let result = handle.await.expect("Task panicked");
assert!(result.is_ok(), "Failed to create container: {result:?}");
}
println!("Starting {container_count} containers concurrently");
let mut start_handles = Vec::new();
for id in &ids {
let runtime_clone = runtime.clone();
let id_clone = id.clone();
start_handles.push(tokio::spawn(async move {
runtime_clone.start_container(&id_clone).await
}));
}
for handle in start_handles {
let result = handle.await.expect("Task panicked");
assert!(result.is_ok(), "Failed to start container: {result:?}");
}
for id in &ids {
let state = runtime
.container_state(id)
.await
.expect("Failed to get state");
assert_eq!(
state,
ContainerState::Running,
"Container {} should be running",
id.service
);
}
println!("All {container_count} containers are running");
println!("Stopping {container_count} containers concurrently");
let mut stop_handles = Vec::new();
for id in &ids {
let runtime_clone = runtime.clone();
let id_clone = id.clone();
stop_handles.push(tokio::spawn(async move {
runtime_clone
.stop_container(&id_clone, Duration::from_secs(10))
.await
}));
}
for handle in stop_handles {
let result = handle.await.expect("Task panicked");
assert!(result.is_ok(), "Failed to stop container: {result:?}");
}
println!("All containers stopped successfully");
}
#[tokio::test]
async fn test_remove_nonexistent_container() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let id = unique_container_id("nonexistent");
println!(
"Attempting to remove non-existent container: {}",
id.service
);
let result = runtime.remove_container(&id).await;
match result {
Ok(()) => println!("Remove succeeded (idempotent behavior)"),
Err(e) => println!("Remove returned error (also acceptable): {e}"),
}
}
#[tokio::test]
async fn test_state_nonexistent_container() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let id = unique_container_id("ghost");
println!("Getting state of non-existent container: {}", id.service);
let result = runtime.container_state(&id).await;
assert!(result.is_err(), "Should fail for non-existent container");
println!("Got expected error: {:?}", result.unwrap_err());
}
#[tokio::test]
async fn test_pull_never_policy() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let nonexistent_image = "this-image-definitely-does-not-exist:never";
println!("Testing Never pull policy with non-existent image");
let result = runtime
.pull_image_with_policy(nonexistent_image, PullPolicy::Never, None)
.await;
assert!(
result.is_ok(),
"Never policy should succeed without pulling: {result:?}"
);
println!("Never policy correctly skipped pull attempt");
}
#[tokio::test]
async fn test_pull_always_policy() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
println!("Testing Always pull policy");
let result = tokio::time::timeout(
LONG_TIMEOUT,
runtime.pull_image_with_policy(TEST_IMAGE, PullPolicy::Always, None),
)
.await;
match result {
Ok(Ok(())) => println!("Always policy pulled successfully"),
Ok(Err(e)) => panic!("Failed with Always policy: {e}"),
Err(e) => panic!("Timeout with Always policy: {e}"),
}
}
#[tokio::test]
async fn test_container_with_env() {
let Some(runtime) = skip_if_no_docker().await else {
eprintln!("Skipping test: Docker not available");
return;
};
let runtime = Arc::new(runtime);
let _ = tokio::time::timeout(LONG_TIMEOUT, runtime.pull_image(TEST_IMAGE)).await;
let id = unique_container_id("env");
let mut spec = create_test_spec(TEST_IMAGE);
spec.env
.insert("TEST_VAR".to_string(), "test_value".to_string());
spec.env
.insert("ANOTHER_VAR".to_string(), "another_value".to_string());
spec.command = CommandSpec {
entrypoint: None,
args: Some(vec!["sh".to_string(), "-c".to_string(), "env".to_string()]),
workdir: None,
};
let _guard = ContainerGuard::new(runtime.clone(), id.clone());
println!("Creating container with environment variables");
runtime
.create_container(&id, &spec)
.await
.expect("Failed to create container");
runtime
.start_container(&id)
.await
.expect("Failed to start container");
let _ = wait_for_state(
&runtime,
&id,
ContainerState::Exited { code: 0 },
SHORT_TIMEOUT,
)
.await;
let logs = runtime
.container_logs(&id, 100)
.await
.expect("Failed to get logs");
let logs_text: String = logs
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join("\n");
println!("Container output:\n{logs_text}");
assert!(
logs.iter()
.any(|e| e.message.contains("TEST_VAR=test_value")),
"Logs should contain TEST_VAR"
);
assert!(
logs.iter()
.any(|e| e.message.contains("ANOTHER_VAR=another_value")),
"Logs should contain ANOTHER_VAR"
);
}