use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{
signal_name_from_exit_code, ContainerId, ContainerInspectDetails, ContainerState, ExecEvent,
ExecEventStream, HealthDetail, ImageInfo, NetworkAttachmentDetail, PruneResult, Runtime,
WaitOutcome, WaitReason,
};
use bollard::auth::DockerCredentials;
use bollard::errors::Error as BollardError;
use bollard::exec::{CreateExecOptions, StartExecResults};
use bollard::models::{
ContainerCreateBody, DeviceMapping, DeviceRequest, HostConfig, ImageInspect, PortBinding,
RestartPolicy, RestartPolicyNameEnum,
};
use bollard::query_parameters::{
CreateContainerOptions, CreateImageOptions, LogsOptions, RemoveContainerOptions,
StartContainerOptions, StatsOptions, StopContainerOptions, WaitContainerOptions,
};
use bollard::Docker;
use futures_util::StreamExt;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tracing::instrument;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{PullPolicy, RegistryAuth, RegistryAuthType, ServiceSpec};
pub struct DockerRuntime {
docker: Docker,
auth_context: Option<crate::runtime::ContainerAuthContext>,
}
impl std::fmt::Debug for DockerRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DockerRuntime").finish_non_exhaustive()
}
}
impl DockerRuntime {
pub async fn new(auth_context: Option<crate::runtime::ContainerAuthContext>) -> Result<Self> {
let docker = Docker::connect_with_local_defaults()
.map_err(|e| AgentError::Internal(format!("Failed to connect to Docker: {e}")))?;
docker
.ping()
.await
.map_err(|e| AgentError::Internal(format!("Docker ping failed: {e}")))?;
tracing::info!("Connected to Docker daemon");
Ok(Self {
docker,
auth_context,
})
}
#[must_use]
pub fn with_client(docker: Docker) -> Self {
Self {
docker,
auth_context: None,
}
}
async fn do_pull(&self, image: &str, auth: Option<&RegistryAuth>) -> Result<()> {
let (name, tag) = parse_image_ref(image);
tracing::info!(
image = %image,
name = %name,
tag = %tag,
inline_auth = auth.is_some(),
"pulling image"
);
let options = CreateImageOptions {
from_image: Some(name.to_string()),
tag: if tag.is_empty() {
None
} else {
Some(tag.to_string())
},
..Default::default()
};
let credentials = auth.map(docker_credentials_from_registry_auth);
let mut stream = self.docker.create_image(Some(options), None, credentials);
while let Some(result) = stream.next().await {
match result {
Ok(info) => {
if let Some(status) = info.status {
tracing::debug!(status = %status, "pull progress");
}
}
Err(e) => {
return Err(AgentError::PullFailed {
image: image.to_string(),
reason: e.to_string(),
});
}
}
}
tracing::info!(image = %image, "image pulled successfully");
Ok(())
}
}
fn container_name(id: &ContainerId) -> String {
format!("zlayer-{}-{}", id.service, id.replica)
}
async fn emit_lines(
tx: &tokio::sync::mpsc::Sender<ExecEvent>,
partial: &mut String,
chunk: &str,
is_stdout: bool,
) -> std::result::Result<(), ()> {
let combined: String = if partial.is_empty() {
chunk.to_string()
} else {
let mut s = std::mem::take(partial);
s.push_str(chunk);
s
};
let ends_with_newline = combined.ends_with('\n');
let mut parts: Vec<&str> = combined.split('\n').collect();
if ends_with_newline {
parts.pop();
} else if let Some(last) = parts.pop() {
partial.push_str(last);
}
for line in parts {
let event = if is_stdout {
ExecEvent::Stdout(line.to_string())
} else {
ExecEvent::Stderr(line.to_string())
};
if tx.send(event).await.is_err() {
return Err(());
}
}
Ok(())
}
fn docker_credentials_from_registry_auth(auth: &RegistryAuth) -> DockerCredentials {
match auth.auth_type {
RegistryAuthType::Basic | RegistryAuthType::Token => {}
}
DockerCredentials {
username: Some(auth.username.clone()),
password: Some(auth.password.clone()),
..Default::default()
}
}
fn parse_image_ref(image: &str) -> (&str, &str) {
if image.contains('@') {
return (image, "");
}
if let Some((name, tag)) = image.rsplit_once(':') {
if !tag.contains('/') {
return (name, tag);
}
}
(image, "latest")
}
fn extract_local_digest(inspect: &ImageInspect, repo_name: &str) -> Option<String> {
let digests = inspect.repo_digests.as_ref()?;
digests.iter().find_map(|entry| {
let (repo, digest) = entry.split_once('@')?;
if repo == repo_name {
Some(digest.to_string())
} else {
None
}
})
}
fn build_exposed_ports(spec: &ServiceSpec) -> Vec<String> {
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut out: Vec<String> = Vec::new();
for endpoint in &spec.endpoints {
let key = format!("{}/tcp", endpoint.target_port());
if seen.insert(key.clone()) {
out.push(key);
}
}
for mapping in &spec.port_mappings {
let key = format!("{}/{}", mapping.container_port, mapping.protocol.as_str());
if seen.insert(key.clone()) {
out.push(key);
}
}
out
}
#[allow(clippy::too_many_lines)]
fn build_host_config(
spec: &ServiceSpec,
auth_socket_path: Option<&str>,
gpu_indices: Option<&[u32]>,
) -> HostConfig {
let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
for endpoint in &spec.endpoints {
let key = format!("{}/tcp", endpoint.target_port());
let binding = PortBinding {
host_ip: Some("0.0.0.0".to_string()),
host_port: Some(endpoint.port.to_string()),
};
port_bindings.insert(key, Some(vec![binding]));
}
for mapping in &spec.port_mappings {
let key = format!("{}/{}", mapping.container_port, mapping.protocol.as_str());
let host_ip = if mapping.host_ip.is_empty() {
"0.0.0.0".to_string()
} else {
mapping.host_ip.clone()
};
let host_port = match mapping.host_port {
Some(0) | None => None,
Some(p) => Some(p.to_string()),
};
let binding = PortBinding {
host_ip: Some(host_ip),
host_port,
};
port_bindings
.entry(key)
.or_insert_with(|| Some(Vec::new()))
.as_mut()
.expect("entry initialised to Some")
.push(binding);
}
let memory = spec.resources.memory.as_ref().and_then(|m| parse_memory(m));
#[allow(clippy::cast_possible_truncation)]
let nano_cpus = spec.resources.cpu.map(|c| (c * 1_000_000_000.0) as i64);
let mut devices: Vec<DeviceMapping> = spec
.devices
.iter()
.map(|d| {
let mut permissions = String::new();
if d.read {
permissions.push('r');
}
if d.write {
permissions.push('w');
}
if d.mknod {
permissions.push('m');
}
if permissions.is_empty() {
permissions = "rw".to_string();
}
DeviceMapping {
path_on_host: Some(d.path.clone()),
path_in_container: Some(d.path.clone()),
cgroup_permissions: Some(permissions),
}
})
.collect();
let mut device_requests: Option<Vec<DeviceRequest>> = None;
if let Some(ref gpu) = spec.resources.gpu {
let indices: Vec<u32> =
gpu_indices.map_or_else(|| (0..gpu.count).collect(), <[u32]>::to_vec);
match gpu.vendor.as_str() {
"nvidia" => {
device_requests = Some(vec![DeviceRequest {
driver: Some("nvidia".into()),
device_ids: Some(indices.iter().map(ToString::to_string).collect()),
count: None, capabilities: Some(vec![vec!["gpu".into()]]),
..Default::default()
}]);
}
"amd" => {
devices.push(DeviceMapping {
path_on_host: Some("/dev/kfd".into()),
path_in_container: Some("/dev/kfd".into()),
cgroup_permissions: Some("rwm".into()),
});
for i in &indices {
let render_path = format!("/dev/dri/renderD{}", 128 + i);
devices.push(DeviceMapping {
path_on_host: Some(render_path.clone()),
path_in_container: Some(render_path),
cgroup_permissions: Some("rwm".into()),
});
let card_path = format!("/dev/dri/card{i}");
devices.push(DeviceMapping {
path_on_host: Some(card_path.clone()),
path_in_container: Some(card_path),
cgroup_permissions: Some("rwm".into()),
});
}
}
"intel" => {
for i in &indices {
let render_path = format!("/dev/dri/renderD{}", 128 + i);
devices.push(DeviceMapping {
path_on_host: Some(render_path.clone()),
path_in_container: Some(render_path),
cgroup_permissions: Some("rwm".into()),
});
let card_path = format!("/dev/dri/card{i}");
devices.push(DeviceMapping {
path_on_host: Some(card_path.clone()),
path_in_container: Some(card_path),
cgroup_permissions: Some("rwm".into()),
});
}
}
other => {
tracing::warn!(
vendor = %other,
"Unknown GPU vendor for Docker, attempting DRI device passthrough"
);
for i in &indices {
let render_path = format!("/dev/dri/renderD{}", 128 + i);
devices.push(DeviceMapping {
path_on_host: Some(render_path.clone()),
path_in_container: Some(render_path),
cgroup_permissions: Some("rwm".into()),
});
}
}
}
}
let cap_add = if spec.capabilities.is_empty() {
None
} else {
Some(spec.capabilities.clone())
};
let mut binds: Vec<String> = Vec::new();
if let Some(socket) = auth_socket_path {
binds.push(format!(
"{socket}:{}:ro",
zlayer_paths::ZLayerDirs::default_socket_path()
));
}
let (dns, extra_hosts) = if spec.host_network {
(None, None)
} else {
let dns = if spec.dns.is_empty() {
None
} else {
Some(spec.dns.clone())
};
let extra_hosts = if spec.extra_hosts.is_empty() {
None
} else {
Some(spec.extra_hosts.clone())
};
(dns, extra_hosts)
};
let restart_policy = spec.restart_policy.as_ref().map(translate_restart_policy);
HostConfig {
port_bindings: Some(port_bindings),
privileged: Some(spec.privileged),
memory,
nano_cpus,
devices: if devices.is_empty() {
None
} else {
Some(devices)
},
device_requests,
cap_add,
binds: if binds.is_empty() { None } else { Some(binds) },
dns,
extra_hosts,
restart_policy,
..Default::default()
}
}
fn translate_restart_policy(policy: &zlayer_spec::ContainerRestartPolicy) -> RestartPolicy {
use zlayer_spec::ContainerRestartKind;
if policy.delay.is_some() {
tracing::warn!(
kind = ?policy.kind,
delay = ?policy.delay,
"ContainerRestartPolicy.delay is not supported by the Docker backend; \
Docker applies its own exponential backoff starting at 100ms"
);
}
match policy.kind {
ContainerRestartKind::No => RestartPolicy {
name: Some(RestartPolicyNameEnum::NO),
maximum_retry_count: None,
},
ContainerRestartKind::Always => RestartPolicy {
name: Some(RestartPolicyNameEnum::ALWAYS),
maximum_retry_count: None,
},
ContainerRestartKind::UnlessStopped => RestartPolicy {
name: Some(RestartPolicyNameEnum::UNLESS_STOPPED),
maximum_retry_count: None,
},
ContainerRestartKind::OnFailure => RestartPolicy {
name: Some(RestartPolicyNameEnum::ON_FAILURE),
maximum_retry_count: policy.max_attempts.map(i64::from),
},
}
}
fn parse_memory(memory: &str) -> Option<i64> {
let memory = memory.trim();
let mut split_idx = 0;
for (i, c) in memory.char_indices() {
if !c.is_ascii_digit() && c != '.' {
split_idx = i;
break;
}
}
if split_idx == 0 {
return memory.parse::<i64>().ok();
}
let (num_str, unit) = memory.split_at(split_idx);
let num: f64 = num_str.parse().ok()?;
let multiplier: i64 = match unit.to_uppercase().as_str() {
"B" | "" => 1,
"K" | "KB" | "KI" | "KIB" => 1024,
"M" | "MB" | "MI" | "MIB" => 1024 * 1024,
"G" | "GB" | "GI" | "GIB" => 1024 * 1024 * 1024,
"T" | "TB" | "TI" | "TIB" => 1024 * 1024 * 1024 * 1024,
_ => return None,
};
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
Some((num * multiplier as f64) as i64)
}
#[async_trait::async_trait]
impl Runtime for DockerRuntime {
#[instrument(
skip(self),
fields(
otel.name = "image.pull",
container.image.name = %image,
)
)]
async fn pull_image(&self, image: &str) -> Result<()> {
self.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
.await
}
#[instrument(
skip(self, auth),
fields(
otel.name = "image.pull",
container.image.name = %image,
pull_policy = ?policy,
inline_auth = auth.is_some(),
)
)]
async fn pull_image_with_policy(
&self,
image: &str,
policy: PullPolicy,
auth: Option<&RegistryAuth>,
) -> Result<()> {
if matches!(policy, PullPolicy::Never) {
tracing::debug!(image = %image, "pull policy is Never, skipping pull");
return Ok(());
}
if matches!(policy, PullPolicy::IfNotPresent) {
let local_inspect = match self.docker.inspect_image(image).await {
Ok(inspect) => inspect,
Err(BollardError::DockerResponseServerError {
status_code: 404, ..
}) => {
tracing::debug!(image = %image, "image not present locally, pulling");
return self.do_pull(image, auth).await;
}
Err(e) => {
return Err(AgentError::Internal(format!(
"failed to inspect image '{image}': {e}"
)));
}
};
let (name, tag) = parse_image_ref(image);
if tag.is_empty() {
tracing::debug!(
image = %image,
"image pinned by digest and present, skipping pull"
);
return Ok(());
}
let local_digest = extract_local_digest(&local_inspect, name);
match self.docker.inspect_registry_image(image, None).await {
Ok(dist) => {
let remote_digest = dist.descriptor.digest;
match (&local_digest, &remote_digest) {
(Some(local), Some(remote)) if local == remote => {
tracing::debug!(
image = %image,
digest = %local,
"image up-to-date, skipping pull"
);
return Ok(());
}
(Some(local), Some(remote)) => {
tracing::info!(
image = %image,
local = %local,
remote = %remote,
"image digests differ, re-pulling"
);
return self.do_pull(image, auth).await;
}
_ => {
tracing::info!(
image = %image,
"local or remote digest missing, re-pulling to be safe"
);
return self.do_pull(image, auth).await;
}
}
}
Err(e) => {
tracing::warn!(
image = %image,
error = %e,
"failed to query registry digest, using local image"
);
return Ok(());
}
}
}
self.do_pull(image, auth).await
}
#[instrument(
skip(self, spec),
fields(
otel.name = "container.create",
container.id = %container_name(id),
service.name = %id.service,
service.replica = %id.replica,
container.image.name = %spec.image.name,
)
)]
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let name = container_name(id);
if self.docker.inspect_container(&name, None).await.is_ok() {
tracing::warn!(
container = %name,
"stale container already exists, removing before re-create"
);
let _ = self
.docker
.stop_container(
&name,
Some(StopContainerOptions {
t: Some(5),
..Default::default()
}),
)
.await;
self.docker
.remove_container(
&name,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await
.map_err(|e| {
AgentError::Internal(format!("failed to remove stale container {name}: {e}"))
})?;
tracing::info!(container = %name, "stale container removed");
}
let mut env: Vec<String> = spec.env.iter().map(|(k, v)| format!("{k}={v}")).collect();
if let Some(ref auth_ctx) = self.auth_context {
let token = crate::auth::mint_container_token(
&auth_ctx.jwt_secret,
&id.service,
&format!("{}-{}", id.service, id.replica),
std::time::Duration::from_secs(86400 * 365),
)
.map_err(|e| crate::error::AgentError::CreateFailed {
id: id.to_string(),
reason: format!("Failed to mint container token: {e}"),
})?;
env.push(format!("ZLAYER_API_URL={}", auth_ctx.api_url));
env.push(format!("ZLAYER_TOKEN={token}"));
env.push(format!(
"ZLAYER_SOCKET={}",
zlayer_paths::ZLayerDirs::default_socket_path()
));
}
if let Some(ref gpu) = spec.resources.gpu {
let indices: Vec<String> = (0..gpu.count).map(|i| i.to_string()).collect();
let device_list = indices.join(",");
match gpu.vendor.as_str() {
"nvidia" => {
env.push(format!("NVIDIA_VISIBLE_DEVICES={device_list}"));
env.push(format!("CUDA_VISIBLE_DEVICES={device_list}"));
}
"amd" => {
env.push(format!("ROCR_VISIBLE_DEVICES={device_list}"));
env.push(format!("HIP_VISIBLE_DEVICES={device_list}"));
}
"intel" => {
env.push(format!("ZE_AFFINITY_MASK={device_list}"));
}
_ => {}
}
}
if let Some(ref gpu) = spec.resources.gpu {
if let Some(ref dist) = gpu.distributed {
env.push(format!("MASTER_PORT={}", dist.master_port));
env.push(format!("MASTER_ADDR={}", id.service));
env.push("WORLD_SIZE=1".to_string());
env.push("RANK=0".to_string());
env.push("LOCAL_RANK=0".to_string());
match dist.backend.as_str() {
"nccl" => env.push("NCCL_SOCKET_IFNAME=eth0".to_string()),
"gloo" => env.push("GLOO_SOCKET_IFNAME=eth0".to_string()),
_ => {}
}
}
}
let exposed_ports = build_exposed_ports(spec);
let auth_socket = self
.auth_context
.as_ref()
.map(|ctx| ctx.socket_path.as_str());
let host_config = build_host_config(spec, auth_socket, None);
let cmd = spec.command.args.clone();
let entrypoint = spec.command.entrypoint.clone();
let working_dir = spec.command.workdir.clone();
let hostname = if spec.host_network {
None
} else {
spec.hostname.clone()
};
let config = ContainerCreateBody {
image: Some(spec.image.name.clone()),
hostname,
env: if env.is_empty() { None } else { Some(env) },
cmd,
entrypoint,
working_dir,
exposed_ports: if exposed_ports.is_empty() {
None
} else {
Some(exposed_ports)
},
host_config: Some(host_config),
..Default::default()
};
let options = CreateContainerOptions {
name: Some(name.clone()),
platform: String::new(),
};
tracing::info!(container = %name, image = %spec.image.name, "creating container");
self.docker
.create_container(Some(options), config)
.await
.map_err(|e| AgentError::CreateFailed {
id: name.clone(),
reason: e.to_string(),
})?;
tracing::info!(container = %name, "container created successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.start",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let name = container_name(id);
tracing::info!(container = %name, "starting container");
self.docker
.start_container(&name, None::<StartContainerOptions>)
.await
.map_err(|e| AgentError::StartFailed {
id: name.clone(),
reason: e.to_string(),
})?;
tracing::info!(container = %name, "container started successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.stop",
container.id = %container_name(id),
service.name = %id.service,
timeout_ms = %timeout.as_millis(),
)
)]
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let name = container_name(id);
tracing::info!(container = %name, timeout = ?timeout, "stopping container");
#[allow(clippy::cast_possible_truncation)]
let options = StopContainerOptions {
t: Some(timeout.as_secs() as i32),
signal: None,
};
self.docker
.stop_container(&name, Some(options))
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to stop container: {e}"),
})?;
tracing::info!(container = %name, "container stopped successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.remove",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let name = container_name(id);
tracing::info!(container = %name, "removing container");
let options = RemoveContainerOptions {
force: true,
..Default::default()
};
self.docker
.remove_container(&name, Some(options))
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to remove container: {e}"),
})?;
tracing::info!(container = %name, "container removed successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.state",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
let state = inspect.state.ok_or_else(|| {
AgentError::Internal(format!("Container {name} has no state information"))
})?;
let container_state = match state.status {
Some(bollard::models::ContainerStateStatusEnum::CREATED) => ContainerState::Pending,
Some(
bollard::models::ContainerStateStatusEnum::RUNNING
| bollard::models::ContainerStateStatusEnum::PAUSED,
) => ContainerState::Running, Some(bollard::models::ContainerStateStatusEnum::RESTARTING) => {
ContainerState::Initializing
}
Some(bollard::models::ContainerStateStatusEnum::REMOVING) => ContainerState::Stopping,
Some(bollard::models::ContainerStateStatusEnum::EXITED) => {
#[allow(clippy::cast_possible_truncation)]
let code = state.exit_code.unwrap_or(0) as i32;
ContainerState::Exited { code }
}
Some(bollard::models::ContainerStateStatusEnum::DEAD) => {
let error = state
.error
.unwrap_or_else(|| "container is dead".to_string());
ContainerState::Failed { reason: error }
}
None | Some(bollard::models::ContainerStateStatusEnum::EMPTY) => {
ContainerState::Pending
}
};
tracing::debug!(container = %name, state = ?container_state, "got container state");
Ok(container_state)
}
#[instrument(
skip(self),
fields(
otel.name = "container.logs",
container.id = %container_name(id),
service.name = %id.service,
tail = %tail,
)
)]
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let name = container_name(id);
let options = LogsOptions {
stdout: true,
stderr: true,
tail: tail.to_string(),
timestamps: false,
..Default::default()
};
let mut stream = self.docker.logs(&name, Some(options));
let mut entries = Vec::new();
let source = LogSource::Container(name.clone());
while let Some(result) = stream.next().await {
match result {
Ok(log_output) => {
let (stream_type, text) = match &log_output {
bollard::container::LogOutput::StdOut { message } => {
(LogStream::Stdout, String::from_utf8_lossy(message))
}
bollard::container::LogOutput::StdErr { message } => {
(LogStream::Stderr, String::from_utf8_lossy(message))
}
_ => (LogStream::Stdout, log_output.to_string().into()),
};
for line in text.lines() {
entries.push(LogEntry {
timestamp: chrono::Utc::now(),
stream: stream_type,
message: line.to_string(),
source: source.clone(),
service: Some(id.service.clone()),
deployment: None,
});
}
}
Err(e) => {
return Err(AgentError::NotFound {
container: name.clone(),
reason: format!("failed to get logs: {e}"),
});
}
}
}
tracing::debug!(container = %name, entry_count = entries.len(), "got container logs");
Ok(entries)
}
#[instrument(
skip(self, cmd),
fields(
otel.name = "container.exec",
container.id = %container_name(id),
service.name = %id.service,
cmd = ?cmd,
)
)]
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
let name = container_name(id);
let exec_options = CreateExecOptions {
cmd: Some(cmd.to_vec()),
attach_stdout: Some(true),
attach_stderr: Some(true),
..Default::default()
};
let exec_created = self
.docker
.create_exec(&name, exec_options)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to create exec: {e}"),
})?;
let start_result = self
.docker
.start_exec(&exec_created.id, None)
.await
.map_err(|e| AgentError::Internal(format!("failed to start exec: {e}")))?;
let mut stdout = String::new();
let mut stderr = String::new();
match start_result {
StartExecResults::Attached { mut output, .. } => {
while let Some(result) = output.next().await {
match result {
Ok(bollard::container::LogOutput::StdOut { message }) => {
stdout.push_str(&String::from_utf8_lossy(&message));
}
Ok(bollard::container::LogOutput::StdErr { message }) => {
stderr.push_str(&String::from_utf8_lossy(&message));
}
Ok(_) => {} Err(e) => {
tracing::warn!(error = %e, "error reading exec output");
}
}
}
}
StartExecResults::Detached => {
tracing::warn!("exec started in detached mode unexpectedly");
}
}
let exec_inspect = self
.docker
.inspect_exec(&exec_created.id)
.await
.map_err(|e| AgentError::Internal(format!("failed to inspect exec: {e}")))?;
#[allow(clippy::cast_possible_truncation)]
let exit_code = exec_inspect.exit_code.unwrap_or(0) as i32;
tracing::debug!(
container = %name,
exit_code = exit_code,
stdout_len = stdout.len(),
stderr_len = stderr.len(),
"exec completed"
);
Ok((exit_code, stdout, stderr))
}
#[instrument(
skip(self, cmd),
fields(
otel.name = "container.exec_stream",
container.id = %container_name(id),
service.name = %id.service,
cmd = ?cmd,
)
)]
async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
let name = container_name(id);
let exec_options = CreateExecOptions {
cmd: Some(cmd.to_vec()),
attach_stdout: Some(true),
attach_stderr: Some(true),
..Default::default()
};
let exec_created = self
.docker
.create_exec(&name, exec_options)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to create exec: {e}"),
})?;
let start_result = self
.docker
.start_exec(&exec_created.id, None)
.await
.map_err(|e| AgentError::Internal(format!("failed to start exec: {e}")))?;
let (tx, rx) = tokio::sync::mpsc::channel::<ExecEvent>(256);
let docker = self.docker.clone();
let exec_id = exec_created.id.clone();
let container_name_for_task = name.clone();
tokio::spawn(async move {
let mut stdout_partial = String::new();
let mut stderr_partial = String::new();
if let StartExecResults::Attached { mut output, .. } = start_result {
while let Some(result) = output.next().await {
match result {
Ok(bollard::container::LogOutput::StdOut { message }) => {
let text = String::from_utf8_lossy(&message).into_owned();
if emit_lines(&tx, &mut stdout_partial, &text, true)
.await
.is_err()
{
return;
}
}
Ok(bollard::container::LogOutput::StdErr { message }) => {
let text = String::from_utf8_lossy(&message).into_owned();
if emit_lines(&tx, &mut stderr_partial, &text, false)
.await
.is_err()
{
return;
}
}
Ok(_) => {}
Err(e) => {
tracing::warn!(
container = %container_name_for_task,
error = %e,
"error reading exec stream output"
);
}
}
}
} else {
tracing::warn!(
container = %container_name_for_task,
"exec started in detached mode unexpectedly"
);
}
if !stdout_partial.is_empty()
&& tx
.send(ExecEvent::Stdout(std::mem::take(&mut stdout_partial)))
.await
.is_err()
{
return;
}
if !stderr_partial.is_empty()
&& tx
.send(ExecEvent::Stderr(std::mem::take(&mut stderr_partial)))
.await
.is_err()
{
return;
}
let exit_code = match docker.inspect_exec(&exec_id).await {
Ok(inspect) => {
#[allow(clippy::cast_possible_truncation)]
let code = inspect.exit_code.unwrap_or(0) as i32;
code
}
Err(e) => {
tracing::warn!(
container = %container_name_for_task,
error = %e,
"failed to inspect exec for exit code"
);
0
}
};
let _ = tx.send(ExecEvent::Exit(exit_code)).await;
});
Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
#[instrument(
skip(self),
fields(
otel.name = "container.stats",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let name = container_name(id);
let options = StatsOptions {
stream: false,
one_shot: true,
};
let mut stream = self.docker.stats(&name, Some(options));
let stats = stream
.next()
.await
.ok_or_else(|| AgentError::NotFound {
container: name.clone(),
reason: "no stats available".to_string(),
})?
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to get stats: {e}"),
})?;
let cpu_usage_usec = stats
.cpu_stats
.and_then(|s| s.cpu_usage)
.and_then(|u| u.total_usage)
.unwrap_or(0)
/ 1000;
let memory_bytes = stats
.memory_stats
.as_ref()
.and_then(|s| s.usage)
.unwrap_or(0);
let memory_limit = stats.memory_stats.and_then(|s| s.limit).unwrap_or(u64::MAX);
let container_stats = ContainerStats {
cpu_usage_usec,
memory_bytes,
memory_limit,
timestamp: Instant::now(),
};
tracing::debug!(
container = %name,
cpu_usec = cpu_usage_usec,
memory_bytes = memory_bytes,
memory_limit = memory_limit,
"got container stats"
);
Ok(container_stats)
}
#[instrument(
skip(self),
fields(
otel.name = "container.wait",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let name = container_name(id);
tracing::debug!(container = %name, "waiting for container to exit");
let options = WaitContainerOptions {
condition: "not-running".to_string(),
};
let mut stream = self.docker.wait_container(&name, Some(options));
let wait_response = stream
.next()
.await
.ok_or_else(|| AgentError::NotFound {
container: name.clone(),
reason: "wait stream closed unexpectedly".to_string(),
})?
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to wait for container: {e}"),
})?;
#[allow(clippy::cast_possible_truncation)]
let exit_code = wait_response.status_code as i32;
tracing::info!(container = %name, exit_code = exit_code, "container exited");
Ok(exit_code)
}
#[instrument(
skip(self),
fields(
otel.name = "container.wait_outcome",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
let name = container_name(id);
let exit_code_from_wait = self.wait_container(id).await?;
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container after wait: {e}"),
})?;
let state = inspect.state.unwrap_or_default();
#[allow(clippy::cast_possible_truncation)]
let exit_code = state.exit_code.map_or(exit_code_from_wait, |c| c as i32);
let oom = state.oom_killed.unwrap_or(false);
let error = state.error.unwrap_or_default();
let error_trimmed = error.trim();
let (reason, signal) = if oom {
(WaitReason::OomKilled, None)
} else if let Some(sig) = signal_name_from_exit_code(exit_code) {
(WaitReason::Signal, Some(sig))
} else if !error_trimmed.is_empty() && exit_code == 0 {
(WaitReason::RuntimeError, None)
} else {
(WaitReason::Exited, None)
};
let finished_at = state.finished_at.as_deref().and_then(|s| {
if s.starts_with("0001-") || s.is_empty() {
None
} else {
chrono::DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&chrono::Utc))
}
});
tracing::info!(
container = %name,
exit_code,
reason = ?reason,
signal = signal.as_deref().unwrap_or(""),
oom_killed = oom,
"container wait_outcome resolved",
);
Ok(WaitOutcome {
exit_code,
reason,
signal,
finished_at,
})
}
#[instrument(
skip(self),
fields(
otel.name = "container.get_logs",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
let name = container_name(id);
let options = LogsOptions {
stdout: true,
stderr: true,
tail: "all".to_string(),
timestamps: false,
..Default::default()
};
let mut stream = self.docker.logs(&name, Some(options));
let mut entries = Vec::new();
let source = LogSource::Container(name.clone());
while let Some(result) = stream.next().await {
match result {
Ok(log_output) => {
let (stream_type, text) = match &log_output {
bollard::container::LogOutput::StdOut { message } => {
(LogStream::Stdout, String::from_utf8_lossy(message))
}
bollard::container::LogOutput::StdErr { message } => {
(LogStream::Stderr, String::from_utf8_lossy(message))
}
_ => (LogStream::Stdout, log_output.to_string().into()),
};
for line in text.lines() {
entries.push(LogEntry {
timestamp: chrono::Utc::now(),
stream: stream_type,
message: line.to_string(),
source: source.clone(),
service: Some(id.service.clone()),
deployment: None,
});
}
}
Err(e) => {
return Err(AgentError::NotFound {
container: name.clone(),
reason: format!("failed to get logs: {e}"),
});
}
}
}
tracing::debug!(container = %name, entry_count = entries.len(), "got container logs");
Ok(entries)
}
#[instrument(
skip(self),
fields(
otel.name = "container.get_pid",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let pid =
inspect
.state
.and_then(|s| s.pid)
.and_then(|p| if p > 0 { Some(p as u32) } else { None });
tracing::debug!(container = %name, pid = ?pid, "got container PID");
Ok(pid)
}
#[instrument(
skip(self),
fields(
otel.name = "container.get_ip",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<std::net::IpAddr>> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
let ip = inspect
.network_settings
.and_then(|ns| ns.networks)
.and_then(|nets| nets.get("bridge").cloned())
.and_then(|bridge| bridge.ip_address)
.filter(|ip| !ip.is_empty())
.and_then(|ip| ip.parse::<std::net::IpAddr>().ok());
tracing::debug!(container = %name, ip = ?ip, "got container IP from Docker inspect");
Ok(ip)
}
#[instrument(skip(self), fields(otel.name = "image.list"))]
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
use bollard::query_parameters::ListImagesOptionsBuilder;
let options = ListImagesOptionsBuilder::default().all(true).build();
let summaries = self
.docker
.list_images(Some(options))
.await
.map_err(|e| AgentError::Internal(format!("failed to list images: {e}")))?;
let mut infos = Vec::with_capacity(summaries.len());
for summary in summaries {
let reference = summary
.repo_tags
.iter()
.find(|t| !t.is_empty() && t.as_str() != "<none>:<none>")
.cloned()
.unwrap_or_else(|| summary.id.clone());
let digest = summary
.repo_digests
.iter()
.find_map(|rd| rd.split_once('@').map(|(_, d)| d.to_string()));
let size_bytes = u64::try_from(summary.size).ok();
infos.push(ImageInfo {
reference,
digest,
size_bytes,
});
}
Ok(infos)
}
#[instrument(skip(self), fields(otel.name = "image.remove", container.image.name = %image, force))]
async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
use bollard::query_parameters::RemoveImageOptionsBuilder;
let options = RemoveImageOptionsBuilder::default().force(force).build();
self.docker
.remove_image(image, Some(options), None)
.await
.map_err(|e| AgentError::Internal(format!("failed to remove image '{image}': {e}")))?;
Ok(())
}
#[instrument(skip(self), fields(otel.name = "image.prune"))]
async fn prune_images(&self) -> Result<PruneResult> {
let response = self
.docker
.prune_images(None::<bollard::query_parameters::PruneImagesOptions>)
.await
.map_err(|e| AgentError::Internal(format!("failed to prune images: {e}")))?;
let deleted: Vec<String> = response
.images_deleted
.into_iter()
.flatten()
.filter_map(|item| item.deleted.or(item.untagged))
.collect();
let space_reclaimed = response
.space_reclaimed
.and_then(|v| u64::try_from(v).ok())
.unwrap_or(0);
Ok(PruneResult {
deleted,
space_reclaimed,
})
}
#[instrument(
skip(self),
fields(
otel.name = "container.kill",
container.id = %container_name(id),
service.name = %id.service,
signal = ?signal,
)
)]
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
use bollard::query_parameters::KillContainerOptionsBuilder;
let canonical = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
let name = container_name(id);
tracing::info!(container = %name, signal = %canonical, "killing container");
let options = KillContainerOptionsBuilder::default()
.signal(&canonical)
.build();
self.docker
.kill_container(&name, Some(options))
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => AgentError::Internal(format!(
"failed to send {canonical} to container '{name}': {other}"
)),
})?;
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "image.tag",
source = %source,
target = %target,
)
)]
async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
use bollard::query_parameters::TagImageOptionsBuilder;
if source.trim().is_empty() || target.trim().is_empty() {
return Err(AgentError::InvalidSpec(
"source and target must be non-empty image references".to_string(),
));
}
let (repo, tag) = match target.rsplit_once(':') {
Some((r, t)) if !r.is_empty() && !t.is_empty() && !t.contains('/') => {
(r.to_string(), t.to_string())
}
_ => (target.to_string(), "latest".to_string()),
};
let options = TagImageOptionsBuilder::default()
.repo(&repo)
.tag(&tag)
.build();
self.docker
.tag_image(source, Some(options))
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: source.to_string(),
reason: format!("source image '{source}' not found"),
},
other => AgentError::Internal(format!(
"failed to tag image '{source}' -> '{target}': {other}"
)),
})?;
tracing::info!(source = %source, target = %target, "tagged image");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.inspect_detailed",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
Ok(translate_inspect_details(&inspect))
}
}
fn parse_port_key(key: &str) -> Option<(u16, zlayer_spec::PortProtocol)> {
let (port_str, proto_str) = key.split_once('/')?;
let container_port: u16 = port_str.parse().ok()?;
let protocol = match proto_str {
"tcp" => zlayer_spec::PortProtocol::Tcp,
"udp" => zlayer_spec::PortProtocol::Udp,
_ => return None,
};
Some((container_port, protocol))
}
pub(crate) fn translate_inspect_details(
inspect: &bollard::models::ContainerInspectResponse,
) -> ContainerInspectDetails {
let ports = inspect
.network_settings
.as_ref()
.and_then(|ns| ns.ports.as_ref())
.map(|port_map| {
let mut out = Vec::with_capacity(port_map.len());
for (key, maybe_bindings) in port_map {
let Some((container_port, protocol)) = parse_port_key(key) else {
continue;
};
let Some(bindings) = maybe_bindings else {
out.push(zlayer_spec::PortMapping {
host_port: None,
container_port,
protocol,
host_ip: String::new(),
});
continue;
};
for binding in bindings {
let host_port = binding.host_port.as_deref().and_then(|s| s.parse().ok());
let host_ip = binding.host_ip.clone().unwrap_or_default();
out.push(zlayer_spec::PortMapping {
host_port,
container_port,
protocol,
host_ip,
});
}
}
out
})
.unwrap_or_default();
let networks = inspect
.network_settings
.as_ref()
.and_then(|ns| ns.networks.as_ref())
.map(|nets| {
nets.iter()
.map(|(name, endpoint)| NetworkAttachmentDetail {
network: name.clone(),
aliases: endpoint.aliases.clone().unwrap_or_default(),
ipv4: endpoint
.ip_address
.as_ref()
.filter(|s| !s.is_empty())
.cloned(),
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let ipv4 = networks
.iter()
.find(|n| n.network == "bridge" && n.ipv4.is_some())
.and_then(|n| n.ipv4.clone())
.or_else(|| networks.iter().find_map(|n| n.ipv4.clone()));
let health = inspect
.state
.as_ref()
.and_then(|s| s.health.as_ref())
.map(|h| {
use bollard::models::HealthStatusEnum;
let status = match h.status {
Some(HealthStatusEnum::STARTING) => "starting",
Some(HealthStatusEnum::HEALTHY) => "healthy",
Some(HealthStatusEnum::UNHEALTHY) => "unhealthy",
Some(HealthStatusEnum::NONE | HealthStatusEnum::EMPTY) | None => "none",
}
.to_string();
let failing_streak = h.failing_streak.and_then(|n| u32::try_from(n).ok());
let last_output = h
.log
.as_ref()
.and_then(|entries| entries.last())
.and_then(|entry| entry.output.clone())
.filter(|s| !s.is_empty());
HealthDetail {
status,
failing_streak,
last_output,
}
});
let exit_code = inspect.state.as_ref().and_then(|s| {
use bollard::models::ContainerStateStatusEnum;
match s.status {
Some(ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::DEAD) =>
{
#[allow(clippy::cast_possible_truncation)]
s.exit_code.map(|c| c as i32)
}
_ => None,
}
});
ContainerInspectDetails {
ports,
networks,
ipv4,
health,
exit_code,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_container_name() {
let id = ContainerId {
service: "myservice".to_string(),
replica: 1,
};
assert_eq!(container_name(&id), "zlayer-myservice-1");
}
#[test]
fn test_container_name_with_different_replicas() {
let id1 = ContainerId {
service: "api".to_string(),
replica: 0,
};
let id2 = ContainerId {
service: "api".to_string(),
replica: 42,
};
assert_eq!(container_name(&id1), "zlayer-api-0");
assert_eq!(container_name(&id2), "zlayer-api-42");
}
#[test]
fn test_parse_image_ref_with_tag() {
let (name, tag) = parse_image_ref("nginx:1.25");
assert_eq!(name, "nginx");
assert_eq!(tag, "1.25");
}
#[test]
fn test_parse_image_ref_without_tag() {
let (name, tag) = parse_image_ref("nginx");
assert_eq!(name, "nginx");
assert_eq!(tag, "latest");
}
#[test]
fn test_parse_image_ref_with_registry_and_tag() {
let (name, tag) = parse_image_ref("ghcr.io/org/image:v1.0.0");
assert_eq!(name, "ghcr.io/org/image");
assert_eq!(tag, "v1.0.0");
}
#[test]
fn test_parse_image_ref_with_registry_port_and_tag() {
let (name, tag) = parse_image_ref("localhost:5000/myimage:latest");
assert_eq!(name, "localhost:5000/myimage");
assert_eq!(tag, "latest");
}
#[test]
fn test_parse_image_ref_with_digest() {
let image = "nginx@sha256:abc123def456";
let (name, tag) = parse_image_ref(image);
assert_eq!(name, image);
assert_eq!(tag, "");
}
#[test]
fn extract_local_digest_matches_repo() {
let inspect = ImageInspect {
repo_digests: Some(vec!["zachhandley/zlayer-manager@sha256:abc123".to_string()]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
Some("sha256:abc123".to_string())
);
}
#[test]
fn extract_local_digest_returns_none_for_non_matching_repo() {
let inspect = ImageInspect {
repo_digests: Some(vec!["otherrepo/image@sha256:def456".to_string()]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
None
);
}
#[test]
fn extract_local_digest_returns_none_when_repo_digests_missing() {
let inspect = ImageInspect {
repo_digests: None,
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
None
);
}
#[test]
fn extract_local_digest_skips_malformed_entries() {
let inspect = ImageInspect {
repo_digests: Some(vec![
"malformed-no-at-sign".to_string(),
"zachhandley/zlayer-manager@sha256:abc123".to_string(),
]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
Some("sha256:abc123".to_string())
);
}
#[test]
fn extract_local_digest_picks_matching_when_multiple() {
let inspect = ImageInspect {
repo_digests: Some(vec![
"otherrepo/image@sha256:111".to_string(),
"zachhandley/zlayer-manager@sha256:abc123".to_string(),
"thirdparty/image@sha256:222".to_string(),
]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
Some("sha256:abc123".to_string())
);
}
#[test]
fn test_parse_memory_bytes() {
assert_eq!(parse_memory("1024"), Some(1024));
}
#[test]
fn test_parse_memory_kilobytes() {
assert_eq!(parse_memory("1Ki"), Some(1024));
assert_eq!(parse_memory("1K"), Some(1024));
assert_eq!(parse_memory("1KB"), Some(1024));
}
#[test]
fn test_parse_memory_megabytes() {
assert_eq!(parse_memory("512Mi"), Some(512 * 1024 * 1024));
assert_eq!(parse_memory("512M"), Some(512 * 1024 * 1024));
assert_eq!(parse_memory("512MB"), Some(512 * 1024 * 1024));
}
#[test]
fn test_parse_memory_gigabytes() {
assert_eq!(parse_memory("1Gi"), Some(1024 * 1024 * 1024));
assert_eq!(parse_memory("1G"), Some(1024 * 1024 * 1024));
assert_eq!(parse_memory("2GB"), Some(2 * 1024 * 1024 * 1024));
}
#[test]
fn test_parse_memory_with_decimals() {
assert_eq!(parse_memory("0.5Gi"), Some(512 * 1024 * 1024));
assert_eq!(parse_memory("1.5G"), Some(1536 * 1024 * 1024));
}
#[test]
fn test_parse_memory_invalid() {
assert_eq!(parse_memory("invalid"), None);
assert_eq!(parse_memory("XYZ"), None);
}
#[test]
fn test_build_exposed_ports_empty() {
let spec = create_test_spec(vec![]);
let ports = build_exposed_ports(&spec);
assert!(ports.is_empty());
}
#[test]
fn test_build_exposed_ports_single() {
let spec = create_test_spec(vec![8080]);
let ports = build_exposed_ports(&spec);
assert!(ports.contains(&"8080/tcp".to_string()));
assert_eq!(ports.len(), 1);
}
#[test]
fn test_build_exposed_ports_multiple() {
let spec = create_test_spec(vec![8080, 9090, 3000]);
let ports = build_exposed_ports(&spec);
assert!(ports.contains(&"8080/tcp".to_string()));
assert!(ports.contains(&"9090/tcp".to_string()));
assert!(ports.contains(&"3000/tcp".to_string()));
assert_eq!(ports.len(), 3);
}
#[test]
fn test_build_host_config_ports() {
let spec = create_test_spec(vec![8080]);
let host_config = build_host_config(&spec, None, None);
let port_bindings = host_config.port_bindings.unwrap();
let bindings = port_bindings.get("8080/tcp").unwrap().as_ref().unwrap();
assert_eq!(bindings.len(), 1);
assert_eq!(bindings[0].host_port.as_ref().unwrap(), "8080");
assert_eq!(bindings[0].host_ip.as_ref().unwrap(), "0.0.0.0");
}
#[test]
fn test_build_host_config_privileged() {
let mut spec = create_test_spec(vec![]);
spec.privileged = true;
let host_config = build_host_config(&spec, None, None);
assert_eq!(host_config.privileged, Some(true));
}
fn create_test_spec(ports: Vec<u16>) -> ServiceSpec {
use zlayer_spec::*;
let endpoints: Vec<EndpointSpec> = ports
.into_iter()
.enumerate()
.map(|(i, port)| EndpointSpec {
name: format!("endpoint{i}"),
protocol: Protocol::Http,
port,
target_port: None,
path: None,
host: None,
expose: ExposeType::Internal,
stream: None,
tunnel: None,
})
.collect();
ServiceSpec {
rtype: ResourceType::Service,
schedule: None,
image: ImageSpec {
name: "test:latest".to_string(),
pull_policy: PullPolicy::IfNotPresent,
},
resources: ResourcesSpec::default(),
env: HashMap::new(),
command: CommandSpec::default(),
network: ServiceNetworkSpec::default(),
endpoints,
scale: ScaleSpec::default(),
depends: vec![],
health: HealthSpec {
start_grace: None,
interval: None,
timeout: None,
retries: 3,
check: HealthCheck::Tcp { port: 0 },
},
init: InitSpec::default(),
errors: ErrorsSpec::default(),
devices: vec![],
storage: vec![],
port_mappings: vec![],
capabilities: vec![],
privileged: false,
node_mode: NodeMode::default(),
node_selector: None,
service_type: ServiceType::default(),
wasm: None,
logs: None,
host_network: false,
hostname: None,
dns: Vec::new(),
extra_hosts: Vec::new(),
restart_policy: None,
platform: None,
}
}
#[test]
fn test_build_host_config_port_mappings_static_and_ephemeral() {
use zlayer_spec::{PortMapping, PortProtocol};
let mut spec = create_test_spec(vec![]);
spec.port_mappings = vec![
PortMapping {
host_port: Some(8080),
container_port: 80,
protocol: PortProtocol::Tcp,
host_ip: "0.0.0.0".to_string(),
},
PortMapping {
host_port: None,
container_port: 53,
protocol: PortProtocol::Udp,
host_ip: "127.0.0.1".to_string(),
},
PortMapping {
host_port: Some(0),
container_port: 443,
protocol: PortProtocol::Tcp,
host_ip: String::new(), },
];
let host_config = build_host_config(&spec, None, None);
let port_bindings = host_config.port_bindings.expect("port_bindings set");
let tcp80 = port_bindings
.get("80/tcp")
.expect("80/tcp key")
.as_ref()
.expect("80/tcp bindings");
assert_eq!(tcp80.len(), 1);
assert_eq!(tcp80[0].host_port.as_deref(), Some("8080"));
assert_eq!(tcp80[0].host_ip.as_deref(), Some("0.0.0.0"));
let udp53 = port_bindings
.get("53/udp")
.expect("53/udp key")
.as_ref()
.expect("53/udp bindings");
assert_eq!(udp53.len(), 1);
assert!(udp53[0].host_port.is_none(), "ephemeral host_port is None");
assert_eq!(udp53[0].host_ip.as_deref(), Some("127.0.0.1"));
let tcp443 = port_bindings
.get("443/tcp")
.expect("443/tcp key")
.as_ref()
.expect("443/tcp bindings");
assert_eq!(tcp443.len(), 1);
assert!(
tcp443[0].host_port.is_none(),
"Some(0) should become None (ephemeral)"
);
assert_eq!(tcp443[0].host_ip.as_deref(), Some("0.0.0.0"));
let exposed = build_exposed_ports(&spec);
assert!(exposed.contains(&"80/tcp".to_string()));
assert!(exposed.contains(&"53/udp".to_string()));
assert!(exposed.contains(&"443/tcp".to_string()));
}
#[test]
fn translate_restart_policy_covers_all_kinds() {
use zlayer_spec::{ContainerRestartKind, ContainerRestartPolicy};
let cases: &[(
ContainerRestartKind,
Option<u32>,
RestartPolicyNameEnum,
Option<i64>,
)] = &[
(
ContainerRestartKind::No,
None,
RestartPolicyNameEnum::NO,
None,
),
(
ContainerRestartKind::Always,
None,
RestartPolicyNameEnum::ALWAYS,
None,
),
(
ContainerRestartKind::UnlessStopped,
None,
RestartPolicyNameEnum::UNLESS_STOPPED,
None,
),
(
ContainerRestartKind::OnFailure,
Some(7),
RestartPolicyNameEnum::ON_FAILURE,
Some(7),
),
(
ContainerRestartKind::OnFailure,
None,
RestartPolicyNameEnum::ON_FAILURE,
None,
),
];
for (kind, max_attempts, expected_name, expected_retry) in cases {
let policy = ContainerRestartPolicy {
kind: *kind,
max_attempts: *max_attempts,
delay: None,
};
let translated = translate_restart_policy(&policy);
assert_eq!(
translated.name,
Some(*expected_name),
"bad name for {kind:?}"
);
assert_eq!(
translated.maximum_retry_count, *expected_retry,
"bad retry count for {kind:?}"
);
}
}
#[test]
fn translate_restart_policy_ignores_delay_but_still_returns_policy() {
use zlayer_spec::{ContainerRestartKind, ContainerRestartPolicy};
let translated = translate_restart_policy(&ContainerRestartPolicy {
kind: ContainerRestartKind::Always,
max_attempts: None,
delay: Some("500ms".to_string()),
});
assert_eq!(translated.name, Some(RestartPolicyNameEnum::ALWAYS));
assert!(translated.maximum_retry_count.is_none());
}
#[test]
fn build_host_config_sets_restart_policy_when_specified() {
let mut spec = create_test_spec(vec![]);
spec.restart_policy = Some(zlayer_spec::ContainerRestartPolicy {
kind: zlayer_spec::ContainerRestartKind::OnFailure,
max_attempts: Some(3),
delay: None,
});
let host_config = build_host_config(&spec, None, None);
let rp = host_config
.restart_policy
.expect("restart_policy should be populated");
assert_eq!(rp.name, Some(RestartPolicyNameEnum::ON_FAILURE));
assert_eq!(rp.maximum_retry_count, Some(3));
}
#[test]
fn build_host_config_omits_restart_policy_when_none() {
let spec = create_test_spec(vec![]);
let host_config = build_host_config(&spec, None, None);
assert!(
host_config.restart_policy.is_none(),
"no restart_policy on spec should leave the field None"
);
}
#[test]
fn parse_port_key_accepts_tcp_and_udp() {
use zlayer_spec::PortProtocol;
assert_eq!(parse_port_key("80/tcp"), Some((80, PortProtocol::Tcp)));
assert_eq!(parse_port_key("53/udp"), Some((53, PortProtocol::Udp)));
assert_eq!(parse_port_key(""), None);
assert_eq!(parse_port_key("80"), None);
assert_eq!(parse_port_key("abc/tcp"), None);
assert_eq!(parse_port_key("80/sctp"), None);
}
#[test]
fn translate_inspect_details_translates_ports_and_networks() {
use bollard::models::{
ContainerInspectResponse, ContainerState, ContainerStateStatusEnum, EndpointSettings,
Health, HealthStatusEnum, HealthcheckResult, NetworkSettings, PortBinding,
};
use std::collections::HashMap;
let mut ports: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
ports.insert(
"80/tcp".to_string(),
Some(vec![PortBinding {
host_ip: Some("0.0.0.0".to_string()),
host_port: Some("8080".to_string()),
}]),
);
ports.insert(
"53/udp".to_string(),
Some(vec![PortBinding {
host_ip: Some("127.0.0.1".to_string()),
host_port: None,
}]),
);
let mut networks: HashMap<String, EndpointSettings> = HashMap::new();
networks.insert(
"bridge".to_string(),
EndpointSettings {
aliases: Some(vec!["myapp".to_string()]),
ip_address: Some("172.17.0.2".to_string()),
..Default::default()
},
);
let inspect = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::RUNNING),
health: Some(Health {
status: Some(HealthStatusEnum::HEALTHY),
failing_streak: Some(0),
log: Some(vec![HealthcheckResult {
output: Some("OK".to_string()),
..Default::default()
}]),
}),
..Default::default()
}),
network_settings: Some(NetworkSettings {
ports: Some(ports),
networks: Some(networks),
..Default::default()
}),
..Default::default()
};
let details = translate_inspect_details(&inspect);
assert_eq!(details.ports.len(), 2);
let tcp80 = details
.ports
.iter()
.find(|p| p.container_port == 80)
.expect("80/tcp mapping");
assert_eq!(tcp80.host_port, Some(8080));
assert_eq!(tcp80.host_ip, "0.0.0.0");
assert_eq!(tcp80.protocol, zlayer_spec::PortProtocol::Tcp);
let udp53 = details
.ports
.iter()
.find(|p| p.container_port == 53)
.expect("53/udp mapping");
assert_eq!(udp53.host_port, None);
assert_eq!(udp53.host_ip, "127.0.0.1");
assert_eq!(udp53.protocol, zlayer_spec::PortProtocol::Udp);
assert_eq!(details.networks.len(), 1);
assert_eq!(details.networks[0].network, "bridge");
assert_eq!(details.networks[0].aliases, vec!["myapp".to_string()]);
assert_eq!(details.networks[0].ipv4.as_deref(), Some("172.17.0.2"));
assert_eq!(details.ipv4.as_deref(), Some("172.17.0.2"));
let health = details.health.expect("health translated");
assert_eq!(health.status, "healthy");
assert_eq!(health.failing_streak, Some(0));
assert_eq!(health.last_output.as_deref(), Some("OK"));
assert!(details.exit_code.is_none());
}
#[test]
fn translate_inspect_details_exit_code_only_on_exited() {
use bollard::models::{ContainerInspectResponse, ContainerState, ContainerStateStatusEnum};
let running = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::RUNNING),
exit_code: Some(0),
..Default::default()
}),
..Default::default()
};
assert!(translate_inspect_details(&running).exit_code.is_none());
let exited = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::EXITED),
exit_code: Some(137),
..Default::default()
}),
..Default::default()
};
assert_eq!(translate_inspect_details(&exited).exit_code, Some(137));
let dead = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::DEAD),
exit_code: Some(255),
..Default::default()
}),
..Default::default()
};
assert_eq!(translate_inspect_details(&dead).exit_code, Some(255));
}
#[test]
fn translate_inspect_details_empty_response_yields_default() {
let inspect = bollard::models::ContainerInspectResponse::default();
let details = translate_inspect_details(&inspect);
assert!(details.ports.is_empty());
assert!(details.networks.is_empty());
assert!(details.ipv4.is_none());
assert!(details.health.is_none());
assert!(details.exit_code.is_none());
}
}