use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{
signal_name_from_exit_code, ArchivePutOptions, ArchiveStream, CommitOptions, CommitOutcome,
ContainerId, ContainerInspectDetails, ContainerResourceUpdate, ContainerState,
ContainerUpdateOutcome, ExecEvent, ExecEventStream, ExecHandle, ExecOptions, ExecPtyStream,
HealthDetail, ImageExportStream, ImageHistoryEntry, ImageInfo, ImageInspectInfo,
ImageSearchResult, LoadProgress, LoadProgressStream, LogChannel, LogChunk, LogsStream,
LogsStreamOptions, NetworkAttachmentDetail, PathStat, PruneResult, PullProgress,
PullProgressStream, Runtime, StatsSample, StatsStream, WaitCondition, WaitOutcome, WaitReason,
};
use bollard::auth::DockerCredentials;
use bollard::errors::Error as BollardError;
use bollard::exec::{CreateExecOptions, ResizeExecOptions, StartExecOptions, StartExecResults};
use bollard::models::{
ContainerCreateBody, ContainerStatsResponse, ContainerUpdateBody, CreateImageInfo,
DeviceMapping, DeviceRequest, ExecInspectResponse, HostConfig, ImageInspect, PortBinding,
RestartPolicy, RestartPolicyNameEnum,
};
use bollard::query_parameters::{
CreateContainerOptions, CreateImageOptions, LogsOptions, RemoveContainerOptions,
RenameContainerOptionsBuilder, StartContainerOptions, StatsOptions, StopContainerOptions,
WaitContainerOptions,
};
use bollard::Docker;
use futures_util::{Stream, StreamExt};
use std::collections::HashMap;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::instrument;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{PullPolicy, RegistryAuth, RegistryAuthType, ServiceSpec};
use zlayer_types::ImageReference;
pub struct DockerRuntime {
docker: Docker,
auth_context: Option<crate::runtime::ContainerAuthContext>,
}
impl std::fmt::Debug for DockerRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DockerRuntime").finish_non_exhaustive()
}
}
impl DockerRuntime {
pub async fn new(auth_context: Option<crate::runtime::ContainerAuthContext>) -> Result<Self> {
let docker = Docker::connect_with_local_defaults()
.map_err(|e| AgentError::Internal(format!("Failed to connect to Docker: {e}")))?;
docker
.ping()
.await
.map_err(|e| AgentError::Internal(format!("Docker ping failed: {e}")))?;
tracing::info!("Connected to Docker daemon");
Ok(Self {
docker,
auth_context,
})
}
#[must_use]
pub fn with_client(docker: Docker) -> Self {
Self {
docker,
auth_context: None,
}
}
async fn do_pull(&self, image: &str, auth: Option<&RegistryAuth>) -> Result<()> {
let (name, tag) = match ImageReference::from_str(image) {
Ok(r) => (
format!("{}/{}", r.registry(), r.repository()),
r.tag().unwrap_or("latest").to_string(),
),
Err(_) => (image.to_string(), "latest".to_string()),
};
tracing::info!(
image = %image,
name = %name,
tag = %tag,
inline_auth = auth.is_some(),
"pulling image"
);
let options = CreateImageOptions {
from_image: Some(name.clone()),
tag: if tag.is_empty() {
None
} else {
Some(tag.clone())
},
..Default::default()
};
let credentials = auth.map(docker_credentials_from_registry_auth);
let mut stream = self.docker.create_image(Some(options), None, credentials);
while let Some(result) = stream.next().await {
match result {
Ok(info) => {
if let Some(status) = info.status {
tracing::debug!(status = %status, "pull progress");
}
}
Err(e) => {
return Err(AgentError::PullFailed {
image: image.to_string(),
reason: e.to_string(),
});
}
}
}
tracing::info!(image = %image, "image pulled successfully");
Ok(())
}
}
fn container_name(id: &ContainerId) -> String {
format!("zlayer-{}-{}", id.service, id.replica)
}
async fn emit_lines(
tx: &tokio::sync::mpsc::Sender<ExecEvent>,
partial: &mut String,
chunk: &str,
is_stdout: bool,
) -> std::result::Result<(), ()> {
let combined: String = if partial.is_empty() {
chunk.to_string()
} else {
let mut s = std::mem::take(partial);
s.push_str(chunk);
s
};
let ends_with_newline = combined.ends_with('\n');
let mut parts: Vec<&str> = combined.split('\n').collect();
if ends_with_newline {
parts.pop();
} else if let Some(last) = parts.pop() {
partial.push_str(last);
}
for line in parts {
let event = if is_stdout {
ExecEvent::Stdout(line.to_string())
} else {
ExecEvent::Stderr(line.to_string())
};
if tx.send(event).await.is_err() {
return Err(());
}
}
Ok(())
}
fn docker_credentials_from_registry_auth(auth: &RegistryAuth) -> DockerCredentials {
match auth.auth_type {
RegistryAuthType::Basic | RegistryAuthType::Token => {}
}
DockerCredentials {
username: Some(auth.username.clone()),
password: Some(auth.password.clone()),
..Default::default()
}
}
fn extract_local_digest(inspect: &ImageInspect, repo_name: &str) -> Option<String> {
let digests = inspect.repo_digests.as_ref()?;
digests.iter().find_map(|entry| {
let (repo, digest) = entry.split_once('@')?;
if repo == repo_name {
Some(digest.to_string())
} else {
None
}
})
}
fn build_labels(spec: &ServiceSpec) -> Option<HashMap<String, String>> {
if spec.labels.is_empty() {
None
} else {
Some(spec.labels.clone())
}
}
fn build_exposed_ports(spec: &ServiceSpec) -> Vec<String> {
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut out: Vec<String> = Vec::new();
for endpoint in &spec.endpoints {
let key = format!("{}/tcp", endpoint.target_port());
if seen.insert(key.clone()) {
out.push(key);
}
}
for mapping in &spec.port_mappings {
let key = format!("{}/{}", mapping.container_port, mapping.protocol.as_str());
if seen.insert(key.clone()) {
out.push(key);
}
}
out
}
#[allow(clippy::too_many_lines)]
fn build_host_config(
spec: &ServiceSpec,
auth_socket_path: Option<&str>,
gpu_indices: Option<&[u32]>,
) -> HostConfig {
let mut port_bindings: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
for endpoint in &spec.endpoints {
let key = format!("{}/tcp", endpoint.target_port());
let binding = PortBinding {
host_ip: Some("0.0.0.0".to_string()),
host_port: Some(endpoint.port.to_string()),
};
port_bindings.insert(key, Some(vec![binding]));
}
for mapping in &spec.port_mappings {
let key = format!("{}/{}", mapping.container_port, mapping.protocol.as_str());
let host_ip = if mapping.host_ip.is_empty() {
"0.0.0.0".to_string()
} else {
mapping.host_ip.clone()
};
let host_port = match mapping.host_port {
Some(0) | None => None,
Some(p) => Some(p.to_string()),
};
let binding = PortBinding {
host_ip: Some(host_ip),
host_port,
};
port_bindings
.entry(key)
.or_insert_with(|| Some(Vec::new()))
.as_mut()
.expect("entry initialised to Some")
.push(binding);
}
let memory = spec.resources.memory.as_ref().and_then(|m| parse_memory(m));
#[allow(clippy::cast_possible_truncation)]
let nano_cpus = spec.resources.cpu.map(|c| (c * 1_000_000_000.0) as i64);
let mut devices: Vec<DeviceMapping> = spec
.devices
.iter()
.map(|d| {
let mut permissions = String::new();
if d.read {
permissions.push('r');
}
if d.write {
permissions.push('w');
}
if d.mknod {
permissions.push('m');
}
if permissions.is_empty() {
permissions = "rw".to_string();
}
DeviceMapping {
path_on_host: Some(d.path.clone()),
path_in_container: Some(d.path.clone()),
cgroup_permissions: Some(permissions),
}
})
.collect();
let mut device_requests: Option<Vec<DeviceRequest>> = None;
if let Some(ref gpu) = spec.resources.gpu {
let indices: Vec<u32> =
gpu_indices.map_or_else(|| (0..gpu.count).collect(), <[u32]>::to_vec);
match gpu.vendor.as_str() {
"nvidia" => {
device_requests = Some(vec![DeviceRequest {
driver: Some("nvidia".into()),
device_ids: Some(indices.iter().map(ToString::to_string).collect()),
count: None, capabilities: Some(vec![vec!["gpu".into()]]),
..Default::default()
}]);
}
"amd" => {
devices.push(DeviceMapping {
path_on_host: Some("/dev/kfd".into()),
path_in_container: Some("/dev/kfd".into()),
cgroup_permissions: Some("rwm".into()),
});
for i in &indices {
let render_path = format!("/dev/dri/renderD{}", 128 + i);
devices.push(DeviceMapping {
path_on_host: Some(render_path.clone()),
path_in_container: Some(render_path),
cgroup_permissions: Some("rwm".into()),
});
let card_path = format!("/dev/dri/card{i}");
devices.push(DeviceMapping {
path_on_host: Some(card_path.clone()),
path_in_container: Some(card_path),
cgroup_permissions: Some("rwm".into()),
});
}
}
"intel" => {
for i in &indices {
let render_path = format!("/dev/dri/renderD{}", 128 + i);
devices.push(DeviceMapping {
path_on_host: Some(render_path.clone()),
path_in_container: Some(render_path),
cgroup_permissions: Some("rwm".into()),
});
let card_path = format!("/dev/dri/card{i}");
devices.push(DeviceMapping {
path_on_host: Some(card_path.clone()),
path_in_container: Some(card_path),
cgroup_permissions: Some("rwm".into()),
});
}
}
other => {
tracing::warn!(
vendor = %other,
"Unknown GPU vendor for Docker, attempting DRI device passthrough"
);
for i in &indices {
let render_path = format!("/dev/dri/renderD{}", 128 + i);
devices.push(DeviceMapping {
path_on_host: Some(render_path.clone()),
path_in_container: Some(render_path),
cgroup_permissions: Some("rwm".into()),
});
}
}
}
}
let cap_add = if spec.capabilities.is_empty() {
None
} else {
Some(spec.capabilities.clone())
};
let mut binds: Vec<String> = Vec::new();
if let Some(socket) = auth_socket_path {
binds.push(format!(
"{socket}:{}:ro",
zlayer_paths::ZLayerDirs::default_socket_path()
));
}
let (dns, extra_hosts) = if spec.host_network {
(None, None)
} else {
let dns = if spec.dns.is_empty() {
None
} else {
Some(spec.dns.clone())
};
let extra_hosts = if spec.extra_hosts.is_empty() {
None
} else {
Some(spec.extra_hosts.clone())
};
(dns, extra_hosts)
};
let restart_policy = spec.restart_policy.as_ref().map(translate_restart_policy);
HostConfig {
port_bindings: Some(port_bindings),
privileged: Some(spec.privileged),
memory,
nano_cpus,
devices: if devices.is_empty() {
None
} else {
Some(devices)
},
device_requests,
cap_add,
binds: if binds.is_empty() { None } else { Some(binds) },
dns,
extra_hosts,
restart_policy,
..Default::default()
}
}
fn translate_restart_policy(policy: &zlayer_spec::ContainerRestartPolicy) -> RestartPolicy {
use zlayer_spec::ContainerRestartKind;
if policy.delay.is_some() {
tracing::warn!(
kind = ?policy.kind,
delay = ?policy.delay,
"ContainerRestartPolicy.delay is not supported by the Docker backend; \
Docker applies its own exponential backoff starting at 100ms"
);
}
match policy.kind {
ContainerRestartKind::No => RestartPolicy {
name: Some(RestartPolicyNameEnum::NO),
maximum_retry_count: None,
},
ContainerRestartKind::Always => RestartPolicy {
name: Some(RestartPolicyNameEnum::ALWAYS),
maximum_retry_count: None,
},
ContainerRestartKind::UnlessStopped => RestartPolicy {
name: Some(RestartPolicyNameEnum::UNLESS_STOPPED),
maximum_retry_count: None,
},
ContainerRestartKind::OnFailure => RestartPolicy {
name: Some(RestartPolicyNameEnum::ON_FAILURE),
maximum_retry_count: policy.max_attempts.map(i64::from),
},
}
}
fn parse_memory(memory: &str) -> Option<i64> {
let memory = memory.trim();
let mut split_idx = 0;
for (i, c) in memory.char_indices() {
if !c.is_ascii_digit() && c != '.' {
split_idx = i;
break;
}
}
if split_idx == 0 {
return memory.parse::<i64>().ok();
}
let (num_str, unit) = memory.split_at(split_idx);
let num: f64 = num_str.parse().ok()?;
let multiplier: i64 = match unit.to_uppercase().as_str() {
"B" | "" => 1,
"K" | "KB" | "KI" | "KIB" => 1024,
"M" | "MB" | "MI" | "MIB" => 1024 * 1024,
"G" | "GB" | "GI" | "GIB" => 1024 * 1024 * 1024,
"T" | "TB" | "TI" | "TIB" => 1024 * 1024 * 1024 * 1024,
_ => return None,
};
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)]
Some((num * multiplier as f64) as i64)
}
#[async_trait::async_trait]
impl Runtime for DockerRuntime {
#[instrument(
skip(self),
fields(
otel.name = "image.pull",
container.image.name = %image,
)
)]
async fn pull_image(&self, image: &str) -> Result<()> {
self.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
.await
}
#[instrument(
skip(self, auth),
fields(
otel.name = "image.pull",
container.image.name = %image,
pull_policy = ?policy,
inline_auth = auth.is_some(),
)
)]
async fn pull_image_with_policy(
&self,
image: &str,
policy: PullPolicy,
auth: Option<&RegistryAuth>,
) -> Result<()> {
if matches!(policy, PullPolicy::Never) {
tracing::debug!(image = %image, "pull policy is Never, skipping pull");
return Ok(());
}
if matches!(policy, PullPolicy::IfNotPresent) {
let local_inspect = match self.docker.inspect_image(image).await {
Ok(inspect) => inspect,
Err(BollardError::DockerResponseServerError {
status_code: 404, ..
}) => {
tracing::debug!(image = %image, "image not present locally, pulling");
return self.do_pull(image, auth).await;
}
Err(e) => {
return Err(AgentError::Internal(format!(
"failed to inspect image '{image}': {e}"
)));
}
};
let parsed = ImageReference::from_str(image).ok();
let name = parsed.as_ref().map_or_else(
|| image.to_string(),
|r| format!("{}/{}", r.registry(), r.repository()),
);
let pinned_by_digest = parsed.as_ref().is_some_and(|r| r.tag().is_none());
if pinned_by_digest {
tracing::debug!(
image = %image,
"image pinned by digest and present, skipping pull"
);
return Ok(());
}
let local_digest = extract_local_digest(&local_inspect, &name);
match self.docker.inspect_registry_image(image, None).await {
Ok(dist) => {
let remote_digest = dist.descriptor.digest;
match (&local_digest, &remote_digest) {
(Some(local), Some(remote)) if local == remote => {
tracing::debug!(
image = %image,
digest = %local,
"image up-to-date, skipping pull"
);
return Ok(());
}
(Some(local), Some(remote)) => {
tracing::info!(
image = %image,
local = %local,
remote = %remote,
"image digests differ, re-pulling"
);
return self.do_pull(image, auth).await;
}
_ => {
tracing::info!(
image = %image,
"local or remote digest missing, re-pulling to be safe"
);
return self.do_pull(image, auth).await;
}
}
}
Err(e) => {
tracing::warn!(
image = %image,
error = %e,
"failed to query registry digest, using local image"
);
return Ok(());
}
}
}
self.do_pull(image, auth).await
}
#[instrument(
skip(self, spec),
fields(
otel.name = "container.create",
container.id = %container_name(id),
service.name = %id.service,
service.replica = %id.replica,
container.image.name = %spec.image.name,
)
)]
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let name = container_name(id);
if self.docker.inspect_container(&name, None).await.is_ok() {
tracing::warn!(
container = %name,
"stale container already exists, removing before re-create"
);
let _ = self
.docker
.stop_container(
&name,
Some(StopContainerOptions {
t: Some(5),
..Default::default()
}),
)
.await;
self.docker
.remove_container(
&name,
Some(RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await
.map_err(|e| {
AgentError::Internal(format!("failed to remove stale container {name}: {e}"))
})?;
tracing::info!(container = %name, "stale container removed");
}
let mut env: Vec<String> = spec.env.iter().map(|(k, v)| format!("{k}={v}")).collect();
if let Some(ref auth_ctx) = self.auth_context {
let token = crate::auth::mint_container_token(
&auth_ctx.jwt_secret,
&id.service,
&format!("{}-{}", id.service, id.replica),
std::time::Duration::from_secs(86400 * 365),
)
.map_err(|e| crate::error::AgentError::CreateFailed {
id: id.to_string(),
reason: format!("Failed to mint container token: {e}"),
})?;
env.push(format!("ZLAYER_API_URL={}", auth_ctx.api_url));
env.push(format!("ZLAYER_TOKEN={token}"));
env.push(format!(
"ZLAYER_SOCKET={}",
zlayer_paths::ZLayerDirs::default_socket_path()
));
}
if let Some(ref gpu) = spec.resources.gpu {
let indices: Vec<String> = (0..gpu.count).map(|i| i.to_string()).collect();
let device_list = indices.join(",");
match gpu.vendor.as_str() {
"nvidia" => {
env.push(format!("NVIDIA_VISIBLE_DEVICES={device_list}"));
env.push(format!("CUDA_VISIBLE_DEVICES={device_list}"));
}
"amd" => {
env.push(format!("ROCR_VISIBLE_DEVICES={device_list}"));
env.push(format!("HIP_VISIBLE_DEVICES={device_list}"));
}
"intel" => {
env.push(format!("ZE_AFFINITY_MASK={device_list}"));
}
_ => {}
}
}
if let Some(ref gpu) = spec.resources.gpu {
if let Some(ref dist) = gpu.distributed {
env.push(format!("MASTER_PORT={}", dist.master_port));
env.push(format!("MASTER_ADDR={}", id.service));
env.push("WORLD_SIZE=1".to_string());
env.push("RANK=0".to_string());
env.push("LOCAL_RANK=0".to_string());
match dist.backend.as_str() {
"nccl" => env.push("NCCL_SOCKET_IFNAME=eth0".to_string()),
"gloo" => env.push("GLOO_SOCKET_IFNAME=eth0".to_string()),
_ => {}
}
}
}
let exposed_ports = build_exposed_ports(spec);
let auth_socket = self
.auth_context
.as_ref()
.map(|ctx| ctx.socket_path.as_str());
let host_config = build_host_config(spec, auth_socket, None);
let cmd = spec.command.args.clone();
let entrypoint = spec.command.entrypoint.clone();
let working_dir = spec.command.workdir.clone();
let hostname = if spec.host_network {
None
} else {
spec.hostname.clone()
};
let config = ContainerCreateBody {
image: Some(spec.image.name.to_string()),
hostname,
env: if env.is_empty() { None } else { Some(env) },
cmd,
entrypoint,
working_dir,
exposed_ports: if exposed_ports.is_empty() {
None
} else {
Some(exposed_ports)
},
labels: build_labels(spec),
host_config: Some(host_config),
..Default::default()
};
let options = CreateContainerOptions {
name: Some(name.clone()),
platform: String::new(),
};
tracing::info!(container = %name, image = %spec.image.name, "creating container");
self.docker
.create_container(Some(options), config)
.await
.map_err(|e| AgentError::CreateFailed {
id: name.clone(),
reason: e.to_string(),
})?;
tracing::info!(container = %name, "container created successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.start",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let name = container_name(id);
tracing::info!(container = %name, "starting container");
self.docker
.start_container(&name, None::<StartContainerOptions>)
.await
.map_err(|e| AgentError::StartFailed {
id: name.clone(),
reason: e.to_string(),
})?;
tracing::info!(container = %name, "container started successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.stop",
container.id = %container_name(id),
service.name = %id.service,
timeout_ms = %timeout.as_millis(),
)
)]
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let name = container_name(id);
tracing::info!(container = %name, timeout = ?timeout, "stopping container");
#[allow(clippy::cast_possible_truncation)]
let options = StopContainerOptions {
t: Some(timeout.as_secs() as i32),
signal: None,
};
self.docker
.stop_container(&name, Some(options))
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to stop container: {e}"),
})?;
tracing::info!(container = %name, "container stopped successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.remove",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let name = container_name(id);
tracing::info!(container = %name, "removing container");
let options = RemoveContainerOptions {
force: true,
..Default::default()
};
self.docker
.remove_container(&name, Some(options))
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to remove container: {e}"),
})?;
tracing::info!(container = %name, "container removed successfully");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.state",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
let state = inspect.state.ok_or_else(|| {
AgentError::Internal(format!("Container {name} has no state information"))
})?;
let container_state = match state.status {
Some(bollard::models::ContainerStateStatusEnum::CREATED) => ContainerState::Pending,
Some(
bollard::models::ContainerStateStatusEnum::RUNNING
| bollard::models::ContainerStateStatusEnum::PAUSED,
) => ContainerState::Running, Some(bollard::models::ContainerStateStatusEnum::RESTARTING) => {
ContainerState::Initializing
}
Some(bollard::models::ContainerStateStatusEnum::REMOVING) => ContainerState::Stopping,
Some(bollard::models::ContainerStateStatusEnum::EXITED) => {
#[allow(clippy::cast_possible_truncation)]
let code = state.exit_code.unwrap_or(0) as i32;
ContainerState::Exited { code }
}
Some(bollard::models::ContainerStateStatusEnum::DEAD) => {
let error = state
.error
.unwrap_or_else(|| "container is dead".to_string());
ContainerState::Failed { reason: error }
}
None | Some(bollard::models::ContainerStateStatusEnum::EMPTY) => {
ContainerState::Pending
}
};
tracing::debug!(container = %name, state = ?container_state, "got container state");
Ok(container_state)
}
#[instrument(
skip(self),
fields(
otel.name = "container.logs",
container.id = %container_name(id),
service.name = %id.service,
tail = %tail,
)
)]
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let name = container_name(id);
let options = LogsOptions {
stdout: true,
stderr: true,
tail: tail.to_string(),
timestamps: false,
..Default::default()
};
let mut stream = self.docker.logs(&name, Some(options));
let mut entries = Vec::new();
let source = LogSource::Container(name.clone());
while let Some(result) = stream.next().await {
match result {
Ok(log_output) => {
let (stream_type, text) = match &log_output {
bollard::container::LogOutput::StdOut { message } => {
(LogStream::Stdout, String::from_utf8_lossy(message))
}
bollard::container::LogOutput::StdErr { message } => {
(LogStream::Stderr, String::from_utf8_lossy(message))
}
_ => (LogStream::Stdout, log_output.to_string().into()),
};
for line in text.lines() {
entries.push(LogEntry {
timestamp: chrono::Utc::now(),
stream: stream_type,
message: line.to_string(),
source: source.clone(),
service: Some(id.service.clone()),
deployment: None,
});
}
}
Err(e) => {
return Err(AgentError::NotFound {
container: name.clone(),
reason: format!("failed to get logs: {e}"),
});
}
}
}
tracing::debug!(container = %name, entry_count = entries.len(), "got container logs");
Ok(entries)
}
#[instrument(
skip(self, cmd),
fields(
otel.name = "container.exec",
container.id = %container_name(id),
service.name = %id.service,
cmd = ?cmd,
)
)]
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
let name = container_name(id);
let exec_options = CreateExecOptions {
cmd: Some(cmd.to_vec()),
attach_stdout: Some(true),
attach_stderr: Some(true),
..Default::default()
};
let exec_created = self
.docker
.create_exec(&name, exec_options)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to create exec: {e}"),
})?;
let start_result = self
.docker
.start_exec(&exec_created.id, None)
.await
.map_err(|e| AgentError::Internal(format!("failed to start exec: {e}")))?;
let mut stdout = String::new();
let mut stderr = String::new();
match start_result {
StartExecResults::Attached { mut output, .. } => {
while let Some(result) = output.next().await {
match result {
Ok(bollard::container::LogOutput::StdOut { message }) => {
stdout.push_str(&String::from_utf8_lossy(&message));
}
Ok(bollard::container::LogOutput::StdErr { message }) => {
stderr.push_str(&String::from_utf8_lossy(&message));
}
Ok(_) => {} Err(e) => {
tracing::warn!(error = %e, "error reading exec output");
}
}
}
}
StartExecResults::Detached => {
tracing::warn!("exec started in detached mode unexpectedly");
}
}
let exec_inspect = self
.docker
.inspect_exec(&exec_created.id)
.await
.map_err(|e| AgentError::Internal(format!("failed to inspect exec: {e}")))?;
#[allow(clippy::cast_possible_truncation)]
let exit_code = exec_inspect.exit_code.unwrap_or(0) as i32;
tracing::debug!(
container = %name,
exit_code = exit_code,
stdout_len = stdout.len(),
stderr_len = stderr.len(),
"exec completed"
);
Ok((exit_code, stdout, stderr))
}
#[instrument(
skip(self, cmd),
fields(
otel.name = "container.exec_stream",
container.id = %container_name(id),
service.name = %id.service,
cmd = ?cmd,
)
)]
async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
let name = container_name(id);
let exec_options = CreateExecOptions {
cmd: Some(cmd.to_vec()),
attach_stdout: Some(true),
attach_stderr: Some(true),
..Default::default()
};
let exec_created = self
.docker
.create_exec(&name, exec_options)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to create exec: {e}"),
})?;
let start_result = self
.docker
.start_exec(&exec_created.id, None)
.await
.map_err(|e| AgentError::Internal(format!("failed to start exec: {e}")))?;
let (tx, rx) = tokio::sync::mpsc::channel::<ExecEvent>(256);
let docker = self.docker.clone();
let exec_id = exec_created.id.clone();
let container_name_for_task = name.clone();
tokio::spawn(async move {
let mut stdout_partial = String::new();
let mut stderr_partial = String::new();
if let StartExecResults::Attached { mut output, .. } = start_result {
while let Some(result) = output.next().await {
match result {
Ok(bollard::container::LogOutput::StdOut { message }) => {
let text = String::from_utf8_lossy(&message).into_owned();
if emit_lines(&tx, &mut stdout_partial, &text, true)
.await
.is_err()
{
return;
}
}
Ok(bollard::container::LogOutput::StdErr { message }) => {
let text = String::from_utf8_lossy(&message).into_owned();
if emit_lines(&tx, &mut stderr_partial, &text, false)
.await
.is_err()
{
return;
}
}
Ok(_) => {}
Err(e) => {
tracing::warn!(
container = %container_name_for_task,
error = %e,
"error reading exec stream output"
);
}
}
}
} else {
tracing::warn!(
container = %container_name_for_task,
"exec started in detached mode unexpectedly"
);
}
if !stdout_partial.is_empty()
&& tx
.send(ExecEvent::Stdout(std::mem::take(&mut stdout_partial)))
.await
.is_err()
{
return;
}
if !stderr_partial.is_empty()
&& tx
.send(ExecEvent::Stderr(std::mem::take(&mut stderr_partial)))
.await
.is_err()
{
return;
}
let exit_code = match docker.inspect_exec(&exec_id).await {
Ok(inspect) => {
#[allow(clippy::cast_possible_truncation)]
let code = inspect.exit_code.unwrap_or(0) as i32;
code
}
Err(e) => {
tracing::warn!(
container = %container_name_for_task,
error = %e,
"failed to inspect exec for exit code"
);
0
}
};
let _ = tx.send(ExecEvent::Exit(exit_code)).await;
});
Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
#[instrument(
skip(self, opts),
fields(
otel.name = "container.exec_pty",
container.id = %container_name(id),
service.name = %id.service,
tty = opts.tty,
)
)]
async fn exec_pty(&self, id: &ContainerId, opts: ExecOptions) -> Result<ExecHandle> {
let name = container_name(id);
let create_opts = build_create_exec_options(&opts);
let exec_created = self
.docker
.create_exec(&name, create_opts)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to create exec: {e}"),
})?;
let start_result = self
.docker
.start_exec(
&exec_created.id,
Some(StartExecOptions {
detach: false,
tty: opts.tty,
output_capacity: None,
}),
)
.await
.map_err(|e| AgentError::Internal(format!("failed to start exec: {e}")))?;
let (output, input) = match start_result {
StartExecResults::Attached { output, input } => (output, input),
StartExecResults::Detached => {
return Err(AgentError::Internal(
"exec started in detached mode despite detach=false".into(),
));
}
};
let duplex = ExecPtyDuplex {
output,
input,
current_chunk: bytes::Bytes::new(),
output_done: false,
};
let stream: ExecPtyStream = Box::new(duplex);
let (resize_tx, mut resize_rx) = tokio::sync::mpsc::channel::<(u16, u16)>(8);
let docker_for_resize = self.docker.clone();
let exec_id_for_resize = exec_created.id.clone();
let container_for_resize = name.clone();
tokio::spawn(async move {
while let Some((rows, cols)) = resize_rx.recv().await {
let opts = resize_options_for(rows, cols);
if let Err(e) = docker_for_resize
.resize_exec(&exec_id_for_resize, opts)
.await
{
tracing::debug!(
container = %container_for_resize,
rows = rows,
cols = cols,
error = %e,
"resize_exec failed (exec may have exited)"
);
}
}
});
let docker_for_exit = self.docker.clone();
let exec_id_for_exit = exec_created.id.clone();
let exit: crate::runtime::ExecExitFuture = Box::pin(async move {
let interval = Duration::from_millis(100);
loop {
match docker_for_exit.inspect_exec(&exec_id_for_exit).await {
Ok(inspect) => {
if inspect.running == Some(false) {
return Ok(extract_exec_exit_code(&inspect));
}
}
Err(e) => {
return Err(AgentError::Internal(format!(
"failed to inspect exec for exit code: {e}"
)));
}
}
tokio::time::sleep(interval).await;
}
});
Ok(ExecHandle {
stream,
resize: resize_tx,
exit,
})
}
#[instrument(
skip(self),
fields(
otel.name = "container.stats",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let name = container_name(id);
let options = StatsOptions {
stream: false,
one_shot: true,
};
let mut stream = self.docker.stats(&name, Some(options));
let stats = stream
.next()
.await
.ok_or_else(|| AgentError::NotFound {
container: name.clone(),
reason: "no stats available".to_string(),
})?
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to get stats: {e}"),
})?;
let cpu_usage_usec = stats
.cpu_stats
.and_then(|s| s.cpu_usage)
.and_then(|u| u.total_usage)
.unwrap_or(0)
/ 1000;
let memory_bytes = stats
.memory_stats
.as_ref()
.and_then(|s| s.usage)
.unwrap_or(0);
let memory_limit = stats.memory_stats.and_then(|s| s.limit).unwrap_or(u64::MAX);
let container_stats = ContainerStats {
cpu_usage_usec,
memory_bytes,
memory_limit,
timestamp: Instant::now(),
};
tracing::debug!(
container = %name,
cpu_usec = cpu_usage_usec,
memory_bytes = memory_bytes,
memory_limit = memory_limit,
"got container stats"
);
Ok(container_stats)
}
#[instrument(
skip(self),
fields(
otel.name = "container.wait",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let name = container_name(id);
tracing::debug!(container = %name, "waiting for container to exit");
let options = WaitContainerOptions {
condition: "not-running".to_string(),
};
let mut stream = self.docker.wait_container(&name, Some(options));
let wait_response = stream
.next()
.await
.ok_or_else(|| AgentError::NotFound {
container: name.clone(),
reason: "wait stream closed unexpectedly".to_string(),
})?
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to wait for container: {e}"),
})?;
#[allow(clippy::cast_possible_truncation)]
let exit_code = wait_response.status_code as i32;
tracing::info!(container = %name, exit_code = exit_code, "container exited");
Ok(exit_code)
}
#[instrument(
skip(self),
fields(
otel.name = "container.wait_outcome",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
let name = container_name(id);
let exit_code_from_wait = self.wait_container(id).await?;
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container after wait: {e}"),
})?;
let state = inspect.state.unwrap_or_default();
#[allow(clippy::cast_possible_truncation)]
let exit_code = state.exit_code.map_or(exit_code_from_wait, |c| c as i32);
let oom = state.oom_killed.unwrap_or(false);
let error = state.error.unwrap_or_default();
let error_trimmed = error.trim();
let (reason, signal) = if oom {
(WaitReason::OomKilled, None)
} else if let Some(sig) = signal_name_from_exit_code(exit_code) {
(WaitReason::Signal, Some(sig))
} else if !error_trimmed.is_empty() && exit_code == 0 {
(WaitReason::RuntimeError, None)
} else {
(WaitReason::Exited, None)
};
let finished_at = state.finished_at.as_deref().and_then(|s| {
if s.starts_with("0001-") || s.is_empty() {
None
} else {
chrono::DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&chrono::Utc))
}
});
tracing::info!(
container = %name,
exit_code,
reason = ?reason,
signal = signal.as_deref().unwrap_or(""),
oom_killed = oom,
"container wait_outcome resolved",
);
Ok(WaitOutcome {
exit_code,
reason,
signal,
finished_at,
})
}
#[instrument(
skip(self),
fields(
otel.name = "container.wait_outcome_with_condition",
container.id = %container_name(id),
service.name = %id.service,
wait.condition = %condition.as_wire_str(),
)
)]
async fn wait_outcome_with_condition(
&self,
id: &ContainerId,
condition: WaitCondition,
) -> Result<WaitOutcome> {
let name = container_name(id);
let options = WaitContainerOptions {
condition: condition.as_wire_str().to_string(),
};
let mut stream = self.docker.wait_container(&name, Some(options));
let wait_response = stream
.next()
.await
.ok_or_else(|| AgentError::NotFound {
container: name.clone(),
reason: "wait stream closed unexpectedly".to_string(),
})?
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to wait for container: {e}"),
})?;
#[allow(clippy::cast_possible_truncation)]
let exit_code_from_wait = wait_response.status_code as i32;
if matches!(condition, WaitCondition::Removed) {
return Ok(WaitOutcome::exited(exit_code_from_wait));
}
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container after wait: {e}"),
})?;
let state = inspect.state.unwrap_or_default();
#[allow(clippy::cast_possible_truncation)]
let exit_code = state.exit_code.map_or(exit_code_from_wait, |c| c as i32);
let oom = state.oom_killed.unwrap_or(false);
let error = state.error.unwrap_or_default();
let error_trimmed = error.trim();
let (reason, signal) = if oom {
(WaitReason::OomKilled, None)
} else if let Some(sig) = signal_name_from_exit_code(exit_code) {
(WaitReason::Signal, Some(sig))
} else if !error_trimmed.is_empty() && exit_code == 0 {
(WaitReason::RuntimeError, None)
} else {
(WaitReason::Exited, None)
};
let finished_at = state.finished_at.as_deref().and_then(|s| {
if s.starts_with("0001-") || s.is_empty() {
None
} else {
chrono::DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&chrono::Utc))
}
});
Ok(WaitOutcome {
exit_code,
reason,
signal,
finished_at,
})
}
#[instrument(
skip(self),
fields(
otel.name = "container.rename",
container.id = %container_name(id),
service.name = %id.service,
new_name = %new_name,
)
)]
async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
let current = container_name(id);
let options = RenameContainerOptionsBuilder::default()
.name(new_name)
.build();
self.docker
.rename_container(¤t, options)
.await
.map_err(|e| match &e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: current.clone(),
reason: format!("rename target not found: {e}"),
},
_ => AgentError::Internal(format!("failed to rename container: {e}")),
})?;
tracing::info!(
container = %current,
new_name = %new_name,
"renamed Docker container",
);
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.get_logs",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
let name = container_name(id);
let options = LogsOptions {
stdout: true,
stderr: true,
tail: "all".to_string(),
timestamps: false,
..Default::default()
};
let mut stream = self.docker.logs(&name, Some(options));
let mut entries = Vec::new();
let source = LogSource::Container(name.clone());
while let Some(result) = stream.next().await {
match result {
Ok(log_output) => {
let (stream_type, text) = match &log_output {
bollard::container::LogOutput::StdOut { message } => {
(LogStream::Stdout, String::from_utf8_lossy(message))
}
bollard::container::LogOutput::StdErr { message } => {
(LogStream::Stderr, String::from_utf8_lossy(message))
}
_ => (LogStream::Stdout, log_output.to_string().into()),
};
for line in text.lines() {
entries.push(LogEntry {
timestamp: chrono::Utc::now(),
stream: stream_type,
message: line.to_string(),
source: source.clone(),
service: Some(id.service.clone()),
deployment: None,
});
}
}
Err(e) => {
return Err(AgentError::NotFound {
container: name.clone(),
reason: format!("failed to get logs: {e}"),
});
}
}
}
tracing::debug!(container = %name, entry_count = entries.len(), "got container logs");
Ok(entries)
}
#[instrument(
skip(self),
fields(
otel.name = "container.get_pid",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let pid =
inspect
.state
.and_then(|s| s.pid)
.and_then(|p| if p > 0 { Some(p as u32) } else { None });
tracing::debug!(container = %name, pid = ?pid, "got container PID");
Ok(pid)
}
#[instrument(
skip(self),
fields(
otel.name = "container.get_ip",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<std::net::IpAddr>> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
let ip = inspect
.network_settings
.and_then(|ns| ns.networks)
.and_then(|nets| nets.get("bridge").cloned())
.and_then(|bridge| bridge.ip_address)
.filter(|ip| !ip.is_empty())
.and_then(|ip| ip.parse::<std::net::IpAddr>().ok());
tracing::debug!(container = %name, ip = ?ip, "got container IP from Docker inspect");
Ok(ip)
}
#[instrument(skip(self), fields(otel.name = "image.list"))]
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
use bollard::query_parameters::ListImagesOptionsBuilder;
let options = ListImagesOptionsBuilder::default().all(true).build();
let summaries = self
.docker
.list_images(Some(options))
.await
.map_err(|e| AgentError::Internal(format!("failed to list images: {e}")))?;
let mut infos = Vec::with_capacity(summaries.len());
for summary in summaries {
let reference = summary
.repo_tags
.iter()
.find(|t| !t.is_empty() && t.as_str() != "<none>:<none>")
.cloned()
.unwrap_or_else(|| summary.id.clone());
let digest = summary
.repo_digests
.iter()
.find_map(|rd| rd.split_once('@').map(|(_, d)| d.to_string()));
let size_bytes = u64::try_from(summary.size).ok();
infos.push(ImageInfo {
reference,
digest,
size_bytes,
});
}
Ok(infos)
}
#[instrument(skip(self), fields(otel.name = "image.remove", container.image.name = %image, force))]
async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
use bollard::query_parameters::RemoveImageOptionsBuilder;
let options = RemoveImageOptionsBuilder::default().force(force).build();
self.docker
.remove_image(image, Some(options), None)
.await
.map_err(|e| AgentError::Internal(format!("failed to remove image '{image}': {e}")))?;
Ok(())
}
#[instrument(skip(self), fields(otel.name = "image.prune"))]
async fn prune_images(&self) -> Result<PruneResult> {
let response = self
.docker
.prune_images(None::<bollard::query_parameters::PruneImagesOptions>)
.await
.map_err(|e| AgentError::Internal(format!("failed to prune images: {e}")))?;
let deleted: Vec<String> = response
.images_deleted
.into_iter()
.flatten()
.filter_map(|item| item.deleted.or(item.untagged))
.collect();
let space_reclaimed = response
.space_reclaimed
.and_then(|v| u64::try_from(v).ok())
.unwrap_or(0);
Ok(PruneResult {
deleted,
space_reclaimed,
})
}
#[instrument(
skip(self),
fields(
otel.name = "container.kill",
container.id = %container_name(id),
service.name = %id.service,
signal = ?signal,
)
)]
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
use bollard::query_parameters::KillContainerOptionsBuilder;
let canonical = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
let name = container_name(id);
tracing::info!(container = %name, signal = %canonical, "killing container");
let options = KillContainerOptionsBuilder::default()
.signal(&canonical)
.build();
self.docker
.kill_container(&name, Some(options))
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => AgentError::Internal(format!(
"failed to send {canonical} to container '{name}': {other}"
)),
})?;
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.pause",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn pause_container(&self, id: &ContainerId) -> Result<()> {
let name = container_name(id);
self.docker
.pause_container(&name)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => {
AgentError::Internal(format!("failed to pause container '{name}': {other}"))
}
})?;
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.unpause",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn unpause_container(&self, id: &ContainerId) -> Result<()> {
let name = container_name(id);
self.docker
.unpause_container(&name)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => {
AgentError::Internal(format!("failed to unpause container '{name}': {other}"))
}
})?;
Ok(())
}
#[instrument(
skip(self, update),
fields(
otel.name = "container.update",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn update_container_resources(
&self,
id: &ContainerId,
update: &ContainerResourceUpdate,
) -> Result<ContainerUpdateOutcome> {
let name = container_name(id);
if update.is_empty() {
return Ok(ContainerUpdateOutcome::default());
}
let restart_policy = update.restart_policy.as_ref().map(|rp| {
let name_enum = rp.name.as_deref().and_then(|s| match s {
"" => Some(RestartPolicyNameEnum::EMPTY),
"no" => Some(RestartPolicyNameEnum::NO),
"always" => Some(RestartPolicyNameEnum::ALWAYS),
"unless-stopped" => Some(RestartPolicyNameEnum::UNLESS_STOPPED),
"on-failure" => Some(RestartPolicyNameEnum::ON_FAILURE),
_ => None,
});
RestartPolicy {
name: name_enum,
maximum_retry_count: rp.maximum_retry_count,
}
});
let body = ContainerUpdateBody {
cpu_shares: update.cpu_shares,
memory: update.memory,
cpu_period: update.cpu_period,
cpu_quota: update.cpu_quota,
cpu_realtime_period: update.cpu_realtime_period,
cpu_realtime_runtime: update.cpu_realtime_runtime,
cpuset_cpus: update.cpuset_cpus.clone(),
cpuset_mems: update.cpuset_mems.clone(),
memory_reservation: update.memory_reservation,
memory_swap: update.memory_swap,
blkio_weight: update.blkio_weight,
pids_limit: update.pids_limit,
restart_policy,
..Default::default()
};
self.docker
.update_container(&name, body)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
BollardError::DockerResponseServerError {
status_code: 400,
message,
} => AgentError::InvalidSpec(message),
other => {
AgentError::Internal(format!("failed to update container '{name}': {other}"))
}
})?;
let mut warnings = Vec::new();
if update.kernel_memory.is_some() {
warnings.push(
"KernelMemory is deprecated upstream and was not forwarded to the daemon".into(),
);
}
Ok(ContainerUpdateOutcome { warnings })
}
#[instrument(
skip(self),
fields(
otel.name = "container.top",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn top_container(
&self,
id: &ContainerId,
ps_args: &[String],
) -> Result<crate::runtime::ContainerTopOutput> {
use bollard::query_parameters::TopOptionsBuilder;
let name = container_name(id);
let options = if ps_args.is_empty() {
None
} else {
Some(TopOptionsBuilder::new().ps_args(&ps_args.join(" ")).build())
};
let response = self
.docker
.top_processes(&name, options)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => AgentError::Internal(format!("failed to top container '{name}': {other}")),
})?;
Ok(crate::runtime::ContainerTopOutput {
titles: response.titles.unwrap_or_default(),
processes: response.processes.unwrap_or_default(),
})
}
#[instrument(
skip(self),
fields(
otel.name = "container.changes",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn changes_container(
&self,
id: &ContainerId,
) -> Result<Vec<crate::runtime::FilesystemChangeEntry>> {
use crate::runtime::{FilesystemChangeEntry, FilesystemChangeKind};
use bollard::models::ChangeType;
let name = container_name(id);
let changes = self
.docker
.container_changes(&name)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => AgentError::Internal(format!(
"failed to fetch changes for container '{name}': {other}"
)),
})?
.unwrap_or_default();
Ok(changes
.into_iter()
.map(|change| {
let kind = match change.kind {
ChangeType::_0 => FilesystemChangeKind::Modified,
ChangeType::_1 => FilesystemChangeKind::Added,
ChangeType::_2 => FilesystemChangeKind::Deleted,
};
FilesystemChangeEntry {
path: change.path,
kind,
}
})
.collect())
}
#[instrument(
skip(self),
fields(
otel.name = "container.port",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn port_mappings_container(
&self,
id: &ContainerId,
) -> Result<Vec<crate::runtime::PortMappingEntry>> {
use crate::runtime::PortMappingEntry;
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => {
AgentError::Internal(format!("failed to inspect container '{name}': {other}"))
}
})?;
let mut out = Vec::new();
let Some(port_map) = inspect
.network_settings
.as_ref()
.and_then(|ns| ns.ports.as_ref())
else {
return Ok(out);
};
for (key, maybe_bindings) in port_map {
let Some((container_port, protocol)) = parse_port_key(key) else {
continue;
};
let proto_str = protocol.as_str().to_string();
match maybe_bindings {
Some(bindings) if !bindings.is_empty() => {
for binding in bindings {
let host_ip = binding.host_ip.as_ref().filter(|s| !s.is_empty()).cloned();
let host_port = binding
.host_port
.as_deref()
.and_then(|s| s.parse::<u16>().ok());
out.push(PortMappingEntry {
container_port,
protocol: proto_str.clone(),
host_ip,
host_port,
});
}
}
_ => {
out.push(PortMappingEntry {
container_port,
protocol: proto_str,
host_ip: None,
host_port: None,
});
}
}
}
Ok(out)
}
#[instrument(skip(self), fields(otel.name = "container.prune"))]
async fn prune_containers(&self) -> Result<crate::runtime::ContainerPruneResult> {
let response = self
.docker
.prune_containers(None::<bollard::query_parameters::PruneContainersOptions>)
.await
.map_err(|e| AgentError::Internal(format!("failed to prune containers: {e}")))?;
let deleted = response.containers_deleted.unwrap_or_default();
let space_reclaimed = response
.space_reclaimed
.and_then(|v| u64::try_from(v).ok())
.unwrap_or(0);
Ok(crate::runtime::ContainerPruneResult {
deleted,
space_reclaimed,
})
}
#[instrument(
skip(self),
fields(
otel.name = "image.tag",
source = %source,
target = %target,
)
)]
async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
use bollard::query_parameters::TagImageOptionsBuilder;
if source.trim().is_empty() || target.trim().is_empty() {
return Err(AgentError::InvalidSpec(
"source and target must be non-empty image references".to_string(),
));
}
let (repo, tag) = match target.rsplit_once(':') {
Some((r, t)) if !r.is_empty() && !t.is_empty() && !t.contains('/') => {
(r.to_string(), t.to_string())
}
_ => (target.to_string(), "latest".to_string()),
};
let options = TagImageOptionsBuilder::default()
.repo(&repo)
.tag(&tag)
.build();
self.docker
.tag_image(source, Some(options))
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: source.to_string(),
reason: format!("source image '{source}' not found"),
},
other => AgentError::Internal(format!(
"failed to tag image '{source}' -> '{target}': {other}"
)),
})?;
tracing::info!(source = %source, target = %target, "tagged image");
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.inspect_detailed",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
let name = container_name(id);
let inspect = self
.docker
.inspect_container(&name, None)
.await
.map_err(|e| AgentError::NotFound {
container: name.clone(),
reason: format!("failed to inspect container: {e}"),
})?;
Ok(translate_inspect_details(&inspect))
}
#[instrument(
skip(self, opts),
fields(
otel.name = "container.logs_stream",
container.id = %container_name(id),
service.name = %id.service,
follow = opts.follow,
tail = ?opts.tail,
since = ?opts.since,
until = ?opts.until,
timestamps = opts.timestamps,
stdout = opts.stdout,
stderr = opts.stderr,
)
)]
async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
let name = container_name(id);
let bollard_opts = build_logs_options(&opts);
let want_timestamps = opts.timestamps;
let (tx, rx) = tokio::sync::mpsc::channel::<Result<LogChunk>>(256);
let docker = self.docker.clone();
let container_name_for_task = name.clone();
tokio::spawn(async move {
let mut stream = docker.logs(&container_name_for_task, Some(bollard_opts));
while let Some(result) = stream.next().await {
let send_result = match result {
Ok(log_output) => {
let chunk = log_output_to_chunk(log_output, want_timestamps);
tx.send(Ok(chunk)).await
}
Err(e) => {
let err = AgentError::NotFound {
container: container_name_for_task.clone(),
reason: format!("failed to read logs: {e}"),
};
let _ = tx.send(Err(err)).await;
break;
}
};
if send_result.is_err() {
return;
}
}
});
Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
#[instrument(
skip(self),
fields(
otel.name = "container.stats_stream",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
let name = container_name(id);
let options = StatsOptions {
stream: true,
one_shot: false,
};
let (tx, rx) = tokio::sync::mpsc::channel::<Result<StatsSample>>(64);
let docker = self.docker.clone();
let container_name_for_task = name.clone();
tokio::spawn(async move {
let mut stream = docker.stats(&container_name_for_task, Some(options));
while let Some(result) = stream.next().await {
let send_result = match result {
Ok(response) => {
let sample = translate_stats_sample(&response);
tx.send(Ok(sample)).await
}
Err(e) => {
let err = AgentError::NotFound {
container: container_name_for_task.clone(),
reason: format!("failed to read stats: {e}"),
};
let _ = tx.send(Err(err)).await;
break;
}
};
if send_result.is_err() {
return;
}
}
});
Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
#[instrument(
skip(self, auth),
fields(
otel.name = "image.pull_stream",
container.image.name = %image,
inline_auth = auth.is_some(),
)
)]
async fn pull_image_stream(
&self,
image: &str,
auth: Option<&RegistryAuth>,
) -> Result<PullProgressStream> {
let (name, tag) = match ImageReference::from_str(image) {
Ok(r) => (
format!("{}/{}", r.registry(), r.repository()),
r.tag().unwrap_or("latest").to_string(),
),
Err(_) => (image.to_string(), "latest".to_string()),
};
let options = CreateImageOptions {
from_image: Some(name.clone()),
tag: if tag.is_empty() { None } else { Some(tag) },
..Default::default()
};
let credentials = auth.map(docker_credentials_from_registry_auth);
let (tx, rx) = tokio::sync::mpsc::channel::<Result<PullProgress>>(64);
let docker = self.docker.clone();
let image_for_task = image.to_string();
tokio::spawn(async move {
let mut stream = docker.create_image(Some(options), None, credentials);
let mut last_digest: Option<String> = None;
while let Some(result) = stream.next().await {
match result {
Ok(info) => {
if let Some(digest) = extract_digest_from_status(info.status.as_deref()) {
last_digest = Some(digest);
}
let progress = translate_pull_progress(info);
if tx.send(Ok(progress)).await.is_err() {
return;
}
}
Err(e) => {
let err = AgentError::PullFailed {
image: image_for_task.clone(),
reason: e.to_string(),
};
let _ = tx.send(Err(err)).await;
return;
}
}
}
let _ = tx
.send(Ok(PullProgress::Done {
reference: image_for_task.clone(),
digest: last_digest,
}))
.await;
});
Ok(Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)))
}
#[instrument(
skip(self),
fields(
otel.name = "container.archive_get",
container.id = %container_name(id),
service.name = %id.service,
archive.path = %path,
)
)]
async fn archive_get(&self, id: &ContainerId, path: &str) -> Result<ArchiveStream> {
use bollard::query_parameters::DownloadFromContainerOptionsBuilder;
let name = container_name(id);
let options = DownloadFromContainerOptionsBuilder::default()
.path(path)
.build();
let _ = self.archive_head(id, path).await?;
let stream = self.docker.download_from_container(&name, Some(options));
let mapped = stream.map(|item| {
item.map_err(|e| AgentError::Internal(format!("archive_get stream error: {e}")))
});
Ok(Box::pin(mapped))
}
#[instrument(
skip(self, tar_bytes),
fields(
otel.name = "container.archive_put",
container.id = %container_name(id),
service.name = %id.service,
archive.path = %path,
archive.bytes = tar_bytes.len(),
)
)]
async fn archive_put(
&self,
id: &ContainerId,
path: &str,
tar_bytes: bytes::Bytes,
opts: ArchivePutOptions,
) -> Result<()> {
use bollard::query_parameters::UploadToContainerOptionsBuilder;
let name = container_name(id);
let options = UploadToContainerOptionsBuilder::default()
.path(path)
.no_overwrite_dir_non_dir(if opts.no_overwrite_dir_non_dir {
"1"
} else {
"0"
})
.copy_uidgid(if opts.copy_uid_gid { "1" } else { "0" })
.build();
self.docker
.upload_to_container(&name, Some(options), bollard::body_full(tar_bytes))
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' or path '{path}' not found"),
},
BollardError::DockerResponseServerError {
status_code: 400 | 403,
message,
} => AgentError::InvalidSpec(message),
other => AgentError::Internal(format!(
"failed to upload archive to container '{name}': {other}"
)),
})?;
Ok(())
}
#[instrument(
skip(self),
fields(
otel.name = "container.archive_head",
container.id = %container_name(id),
service.name = %id.service,
archive.path = %path,
)
)]
async fn archive_head(&self, id: &ContainerId, path: &str) -> Result<PathStat> {
use bollard::query_parameters::ContainerArchiveInfoOptionsBuilder;
let name = container_name(id);
let options = ContainerArchiveInfoOptionsBuilder::default()
.path(path)
.build();
let stat = self
.docker
.get_container_archive_info(&name, Some(options))
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("path '{path}' not found in container '{name}'"),
},
other => AgentError::Internal(format!(
"failed to stat path '{path}' in container '{name}': {other}"
)),
})?;
Ok(PathStat {
name: stat.name,
size: stat.size,
mode: stat.file_mode,
mtime: stat.modification_time,
link_target: stat.link_target,
})
}
#[instrument(skip(self), fields(otel.name = "image.inspect", container.image.name = %image))]
async fn inspect_image_native(&self, image: &str) -> Result<ImageInspectInfo> {
let inspect = self
.docker
.inspect_image(image)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: image.to_string(),
reason: format!("image '{image}' not found"),
},
other => {
AgentError::Internal(format!("failed to inspect image '{image}': {other}"))
}
})?;
Ok(translate_image_inspect(inspect))
}
#[instrument(skip(self), fields(otel.name = "image.history", container.image.name = %image))]
async fn image_history(&self, image: &str) -> Result<Vec<ImageHistoryEntry>> {
let history = self
.docker
.image_history(image)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: image.to_string(),
reason: format!("image '{image}' not found"),
},
other => AgentError::Internal(format!("failed to fetch image history: {other}")),
})?;
Ok(history
.into_iter()
.map(|item| ImageHistoryEntry {
id: item.id,
created: item.created,
created_by: item.created_by,
tags: item.tags,
size: u64::try_from(item.size).unwrap_or(0),
comment: item.comment,
})
.collect())
}
#[instrument(skip(self), fields(otel.name = "image.search", search.term = %term, search.limit = %limit))]
async fn search_images(&self, term: &str, limit: u32) -> Result<Vec<ImageSearchResult>> {
use bollard::query_parameters::SearchImagesOptionsBuilder;
if term.trim().is_empty() {
return Err(AgentError::InvalidSpec(
"search term must not be empty".to_string(),
));
}
let mut builder = SearchImagesOptionsBuilder::default().term(term);
if limit > 0 {
builder = builder.limit(i32::try_from(limit).unwrap_or(i32::MAX));
}
let options = builder.build();
let results = self.docker.search_images(options).await.map_err(|e| {
AgentError::Internal(format!("failed to search images for '{term}': {e}"))
})?;
Ok(results
.into_iter()
.map(|item| ImageSearchResult {
name: item.name.unwrap_or_default(),
description: item.description.unwrap_or_default(),
star_count: u64::try_from(item.star_count.unwrap_or(0)).unwrap_or(0),
official: item.is_official.unwrap_or(false),
automated: item.is_automated.unwrap_or(false),
})
.collect())
}
#[instrument(skip(self, names), fields(otel.name = "image.save", count = names.len()))]
async fn save_images(&self, names: &[String]) -> Result<ImageExportStream> {
if names.is_empty() {
return Err(AgentError::InvalidSpec(
"save_images requires at least one image reference".to_string(),
));
}
let refs: Vec<&str> = names.iter().map(String::as_str).collect();
let stream = self.docker.export_images(&refs);
let mapped = stream.map(|res| {
res.map_err(|e| AgentError::Internal(format!("image export stream error: {e}")))
});
Ok(Box::pin(mapped))
}
#[instrument(skip(self, tar_bytes), fields(otel.name = "image.load", quiet = %quiet, bytes = tar_bytes.len()))]
async fn load_images(
&self,
tar_bytes: bytes::Bytes,
quiet: bool,
) -> Result<LoadProgressStream> {
use bollard::query_parameters::ImportImageOptionsBuilder;
let options = ImportImageOptionsBuilder::default().quiet(quiet).build();
let body = bollard::body_full(tar_bytes);
let stream = self.docker.import_image(options, body, None);
let mapped = stream.map(|res| {
res.map_err(|e| AgentError::Internal(format!("image load stream error: {e}")))
.map(|info| {
if let Some(stream_msg) = info.stream {
let trimmed = stream_msg.trim();
LoadProgress::Status {
id: None,
status: trimmed.to_string(),
}
} else if let Some(status_msg) = info.status {
LoadProgress::Status {
id: info.id,
status: status_msg,
}
} else {
LoadProgress::Status {
id: info.id,
status: String::new(),
}
}
})
});
Ok(Box::pin(mapped))
}
#[instrument(skip(self, tar_bytes), fields(otel.name = "image.import", repo = ?repo, tag = ?tag, bytes = tar_bytes.len()))]
async fn import_image(
&self,
tar_bytes: bytes::Bytes,
repo: Option<&str>,
tag: Option<&str>,
) -> Result<String> {
use bollard::query_parameters::ImportImageOptionsBuilder;
let options = ImportImageOptionsBuilder::default().build();
let body = bollard::body_full(tar_bytes);
let mut stream = self.docker.import_image(options, body, None);
let mut last_id: Option<String> = None;
while let Some(item) = stream.next().await {
let info =
item.map_err(|e| AgentError::Internal(format!("image import stream error: {e}")))?;
if let Some(id) = info.id {
last_id = Some(id);
} else if let Some(stream_msg) = info.stream {
if let Some(digest) = extract_loaded_id(&stream_msg) {
last_id = Some(digest);
}
}
}
let id = last_id.ok_or_else(|| {
AgentError::Internal("import_image stream produced no image id".to_string())
})?;
if let Some(repo) = repo.filter(|s| !s.trim().is_empty()) {
let target = match tag.filter(|s| !s.trim().is_empty()) {
Some(t) => format!("{repo}:{t}"),
None => format!("{repo}:latest"),
};
self.tag_image(&id, &target).await?;
}
Ok(id)
}
#[instrument(
skip(self),
fields(
otel.name = "container.export",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn export_container_fs(&self, id: &ContainerId) -> Result<ImageExportStream> {
let name = container_name(id);
let stream = self.docker.export_container(&name);
let mapped = stream.map(|res| {
res.map_err(|e| AgentError::Internal(format!("container export stream error: {e}")))
});
Ok(Box::pin(mapped))
}
#[instrument(
skip(self, opts),
fields(
otel.name = "container.commit",
container.id = %container_name(id),
service.name = %id.service,
)
)]
async fn commit_container(
&self,
id: &ContainerId,
opts: &CommitOptions,
) -> Result<CommitOutcome> {
use bollard::query_parameters::CommitContainerOptionsBuilder;
let name = container_name(id);
let mut builder = CommitContainerOptionsBuilder::default()
.container(&name)
.pause(opts.pause);
if let Some(repo) = opts.repo.as_deref().filter(|s| !s.is_empty()) {
builder = builder.repo(repo);
}
if let Some(tag) = opts.tag.as_deref().filter(|s| !s.is_empty()) {
builder = builder.tag(tag);
}
if let Some(comment) = opts.comment.as_deref().filter(|s| !s.is_empty()) {
builder = builder.comment(comment);
}
if let Some(author) = opts.author.as_deref().filter(|s| !s.is_empty()) {
builder = builder.author(author);
}
if let Some(changes) = opts.changes.as_deref().filter(|s| !s.is_empty()) {
builder = builder.changes(changes);
}
let options = builder.build();
let config = bollard::models::ContainerConfig::default();
let resp = self
.docker
.commit_container(options, config)
.await
.map_err(|e| match e {
BollardError::DockerResponseServerError {
status_code: 404, ..
} => AgentError::NotFound {
container: name.clone(),
reason: format!("container '{name}' not found"),
},
other => {
AgentError::Internal(format!("failed to commit container '{name}': {other}"))
}
})?;
Ok(CommitOutcome { id: resp.id })
}
}
fn parse_port_key(key: &str) -> Option<(u16, zlayer_spec::PortProtocol)> {
let (port_str, proto_str) = key.split_once('/')?;
let container_port: u16 = port_str.parse().ok()?;
let protocol = match proto_str {
"tcp" => zlayer_spec::PortProtocol::Tcp,
"udp" => zlayer_spec::PortProtocol::Udp,
_ => return None,
};
Some((container_port, protocol))
}
pub(crate) fn translate_inspect_details(
inspect: &bollard::models::ContainerInspectResponse,
) -> ContainerInspectDetails {
let ports = inspect
.network_settings
.as_ref()
.and_then(|ns| ns.ports.as_ref())
.map(|port_map| {
let mut out = Vec::with_capacity(port_map.len());
for (key, maybe_bindings) in port_map {
let Some((container_port, protocol)) = parse_port_key(key) else {
continue;
};
let Some(bindings) = maybe_bindings else {
out.push(zlayer_spec::PortMapping {
host_port: None,
container_port,
protocol,
host_ip: String::new(),
});
continue;
};
for binding in bindings {
let host_port = binding.host_port.as_deref().and_then(|s| s.parse().ok());
let host_ip = binding.host_ip.clone().unwrap_or_default();
out.push(zlayer_spec::PortMapping {
host_port,
container_port,
protocol,
host_ip,
});
}
}
out
})
.unwrap_or_default();
let networks = inspect
.network_settings
.as_ref()
.and_then(|ns| ns.networks.as_ref())
.map(|nets| {
nets.iter()
.map(|(name, endpoint)| NetworkAttachmentDetail {
network: name.clone(),
aliases: endpoint.aliases.clone().unwrap_or_default(),
ipv4: endpoint
.ip_address
.as_ref()
.filter(|s| !s.is_empty())
.cloned(),
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let ipv4 = networks
.iter()
.find(|n| n.network == "bridge" && n.ipv4.is_some())
.and_then(|n| n.ipv4.clone())
.or_else(|| networks.iter().find_map(|n| n.ipv4.clone()));
let health = inspect
.state
.as_ref()
.and_then(|s| s.health.as_ref())
.map(|h| {
use bollard::models::HealthStatusEnum;
let status = match h.status {
Some(HealthStatusEnum::STARTING) => "starting",
Some(HealthStatusEnum::HEALTHY) => "healthy",
Some(HealthStatusEnum::UNHEALTHY) => "unhealthy",
Some(HealthStatusEnum::NONE | HealthStatusEnum::EMPTY) | None => "none",
}
.to_string();
let failing_streak = h.failing_streak.and_then(|n| u32::try_from(n).ok());
let last_output = h
.log
.as_ref()
.and_then(|entries| entries.last())
.and_then(|entry| entry.output.clone())
.filter(|s| !s.is_empty());
HealthDetail {
status,
failing_streak,
last_output,
}
});
let exit_code = inspect.state.as_ref().and_then(|s| {
use bollard::models::ContainerStateStatusEnum;
match s.status {
Some(ContainerStateStatusEnum::EXITED | ContainerStateStatusEnum::DEAD) =>
{
#[allow(clippy::cast_possible_truncation)]
s.exit_code.map(|c| c as i32)
}
_ => None,
}
});
ContainerInspectDetails {
ports,
networks,
ipv4,
health,
exit_code,
}
}
fn build_logs_options(opts: &LogsStreamOptions) -> LogsOptions {
LogsOptions {
follow: opts.follow,
stdout: opts.stdout,
stderr: opts.stderr,
since: opts
.since
.map_or(0, |v| i32::try_from(v).unwrap_or(i32::MAX)),
until: opts
.until
.map_or(0, |v| i32::try_from(v).unwrap_or(i32::MAX)),
timestamps: opts.timestamps,
tail: opts
.tail
.map_or_else(|| "all".to_string(), |n| n.to_string()),
}
}
fn log_output_to_chunk(
log_output: bollard::container::LogOutput,
want_timestamps: bool,
) -> LogChunk {
let stream = match &log_output {
bollard::container::LogOutput::StdOut { .. }
| bollard::container::LogOutput::Console { .. } => LogChannel::Stdout,
bollard::container::LogOutput::StdErr { .. } => LogChannel::Stderr,
bollard::container::LogOutput::StdIn { .. } => LogChannel::Stdin,
};
let bytes = log_output.into_bytes();
let timestamp = if want_timestamps {
Some(chrono::Utc::now())
} else {
None
};
LogChunk {
stream,
bytes,
timestamp,
}
}
fn translate_stats_sample(response: &ContainerStatsResponse) -> StatsSample {
let cpu_total_ns = response
.cpu_stats
.as_ref()
.and_then(|cs| cs.cpu_usage.as_ref())
.and_then(|u| u.total_usage)
.unwrap_or(0);
let cpu_system_ns = response
.cpu_stats
.as_ref()
.and_then(|cs| cs.system_cpu_usage)
.unwrap_or(0);
let online_cpus = response
.cpu_stats
.as_ref()
.and_then(|cs| cs.online_cpus)
.unwrap_or(0);
let mem_used_bytes = response
.memory_stats
.as_ref()
.and_then(|m| m.usage)
.unwrap_or(0);
let mem_limit_bytes = response
.memory_stats
.as_ref()
.and_then(|m| m.limit)
.unwrap_or(0);
let (net_received_bytes, net_transmitted_bytes) =
response.networks.as_ref().map_or((0_u64, 0_u64), |nets| {
nets.values().fold((0_u64, 0_u64), |(rx_acc, tx_acc), n| {
(
rx_acc.saturating_add(n.rx_bytes.unwrap_or(0)),
tx_acc.saturating_add(n.tx_bytes.unwrap_or(0)),
)
})
});
let (blkio_read_bytes, blkio_write_bytes) = response
.blkio_stats
.as_ref()
.and_then(|b| b.io_service_bytes_recursive.as_ref())
.map_or((0, 0), |entries| {
entries.iter().fold((0_u64, 0_u64), |(r, w), entry| {
let v = entry.value.unwrap_or(0);
match entry.op.as_deref().map(str::to_ascii_lowercase).as_deref() {
Some("read") => (r.saturating_add(v), w),
Some("write") => (r, w.saturating_add(v)),
_ => (r, w),
}
})
});
let pids_current = response
.pids_stats
.as_ref()
.and_then(|p| p.current)
.unwrap_or(0);
let pids_limit = response
.pids_stats
.as_ref()
.and_then(|p| p.limit)
.filter(|&l| l > 0);
StatsSample {
cpu_total_ns,
cpu_system_ns,
online_cpus,
mem_used_bytes,
mem_limit_bytes,
net_rx_bytes: net_received_bytes,
net_tx_bytes: net_transmitted_bytes,
blkio_read_bytes,
blkio_write_bytes,
pids_current,
pids_limit,
timestamp: chrono::Utc::now(),
}
}
fn translate_pull_progress(info: CreateImageInfo) -> PullProgress {
let CreateImageInfo {
id,
status,
progress_detail,
..
} = info;
let (current, total) = progress_detail.map_or((None, None), |d| {
(
d.current.and_then(|v| u64::try_from(v).ok()),
d.total.and_then(|v| u64::try_from(v).ok()),
)
});
PullProgress::Status {
id,
status: status.unwrap_or_default(),
progress: None,
current,
total,
}
}
fn extract_digest_from_status(status: Option<&str>) -> Option<String> {
let s = status?.trim();
let prefix = "Digest:";
if !s.starts_with(prefix) {
return None;
}
let candidate = s[prefix.len()..].trim();
if candidate.starts_with("sha256:") || candidate.starts_with("sha512:") {
Some(candidate.to_string())
} else {
None
}
}
fn extract_loaded_id(line: &str) -> Option<String> {
let trimmed = line.trim();
let prefix = "Loaded image ID:";
if let Some(after) = trimmed.strip_prefix(prefix) {
let candidate = after.trim();
if candidate.starts_with("sha256:") {
return Some(candidate.to_string());
}
}
let alt = "Loaded image:";
if let Some(after) = trimmed.strip_prefix(alt) {
let candidate = after.trim();
if !candidate.is_empty() {
return Some(candidate.to_string());
}
}
None
}
fn translate_image_inspect(inspect: bollard::models::ImageInspect) -> ImageInspectInfo {
let bollard::models::ImageInspect {
id,
repo_tags,
repo_digests,
comment,
created,
author,
config,
architecture,
os,
size,
root_fs,
..
} = inspect;
let mut env = Vec::new();
let mut cmd = Vec::new();
let mut entrypoint = Vec::new();
let mut working_dir = None;
let mut user = None;
let mut labels = std::collections::BTreeMap::new();
if let Some(cfg) = config {
env = cfg.env.unwrap_or_default();
cmd = cfg.cmd.unwrap_or_default();
entrypoint = cfg.entrypoint.unwrap_or_default();
working_dir = cfg.working_dir.filter(|s| !s.is_empty());
user = cfg.user.filter(|s| !s.is_empty());
if let Some(ls) = cfg.labels {
for (k, v) in ls {
labels.insert(k, v);
}
}
}
let layers = root_fs
.map(|fs| fs.layers.unwrap_or_default())
.unwrap_or_default();
ImageInspectInfo {
id,
repo_tags: repo_tags.unwrap_or_default(),
repo_digests: repo_digests.unwrap_or_default(),
parent: None,
comment,
created: created.map(|t| t.to_string()),
container: None,
docker_version: None,
author,
architecture,
os,
size: size.and_then(|v| u64::try_from(v).ok()),
layers,
env,
cmd,
entrypoint,
working_dir,
user,
labels,
}
}
fn build_create_exec_options(opts: &ExecOptions) -> CreateExecOptions<String> {
CreateExecOptions {
cmd: if opts.command.is_empty() {
None
} else {
Some(opts.command.clone())
},
env: if opts.env.is_empty() {
None
} else {
Some(opts.env.clone())
},
working_dir: opts.working_dir.clone(),
user: opts.user.clone(),
privileged: Some(opts.privileged),
tty: Some(opts.tty),
attach_stdin: Some(opts.attach_stdin),
attach_stdout: Some(opts.attach_stdout),
attach_stderr: Some(opts.attach_stderr),
detach_keys: None,
}
}
fn resize_options_for(rows: u16, cols: u16) -> ResizeExecOptions {
ResizeExecOptions {
height: rows,
width: cols,
}
}
#[allow(clippy::cast_possible_truncation)]
fn extract_exec_exit_code(inspect: &ExecInspectResponse) -> i32 {
match inspect.exit_code {
None => 0,
Some(code) if code > i64::from(i32::MAX) => i32::MAX,
Some(code) if code < i64::from(i32::MIN) => i32::MIN,
Some(code) => code as i32,
}
}
struct ExecPtyDuplex {
output: Pin<
Box<
dyn Stream<Item = std::result::Result<bollard::container::LogOutput, BollardError>>
+ Send,
>,
>,
input: Pin<Box<dyn AsyncWrite + Send>>,
current_chunk: bytes::Bytes,
output_done: bool,
}
impl AsyncRead for ExecPtyDuplex {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
loop {
if !self.current_chunk.is_empty() {
let take = self.current_chunk.len().min(buf.remaining());
let chunk = self.current_chunk.split_to(take);
buf.put_slice(&chunk);
return Poll::Ready(Ok(()));
}
if self.output_done {
return Poll::Ready(Ok(()));
}
match self.output.as_mut().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
self.output_done = true;
return Poll::Ready(Ok(()));
}
Poll::Ready(Some(Ok(frame))) => {
self.current_chunk = frame.into_bytes();
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Err(std::io::Error::other(e.to_string())));
}
}
}
}
}
impl AsyncWrite for ExecPtyDuplex {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.input.as_mut().poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.input.as_mut().poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.input.as_mut().poll_shutdown(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_container_name() {
let id = ContainerId {
service: "myservice".to_string(),
replica: 1,
};
assert_eq!(container_name(&id), "zlayer-myservice-1");
}
#[test]
fn test_container_name_with_different_replicas() {
let id1 = ContainerId {
service: "api".to_string(),
replica: 0,
};
let id2 = ContainerId {
service: "api".to_string(),
replica: 42,
};
assert_eq!(container_name(&id1), "zlayer-api-0");
assert_eq!(container_name(&id2), "zlayer-api-42");
}
#[test]
fn extract_local_digest_matches_repo() {
let inspect = ImageInspect {
repo_digests: Some(vec!["zachhandley/zlayer-manager@sha256:abc123".to_string()]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
Some("sha256:abc123".to_string())
);
}
#[test]
fn extract_local_digest_returns_none_for_non_matching_repo() {
let inspect = ImageInspect {
repo_digests: Some(vec!["otherrepo/image@sha256:def456".to_string()]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
None
);
}
#[test]
fn extract_local_digest_returns_none_when_repo_digests_missing() {
let inspect = ImageInspect {
repo_digests: None,
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
None
);
}
#[test]
fn extract_local_digest_skips_malformed_entries() {
let inspect = ImageInspect {
repo_digests: Some(vec![
"malformed-no-at-sign".to_string(),
"zachhandley/zlayer-manager@sha256:abc123".to_string(),
]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
Some("sha256:abc123".to_string())
);
}
#[test]
fn extract_local_digest_picks_matching_when_multiple() {
let inspect = ImageInspect {
repo_digests: Some(vec![
"otherrepo/image@sha256:111".to_string(),
"zachhandley/zlayer-manager@sha256:abc123".to_string(),
"thirdparty/image@sha256:222".to_string(),
]),
..Default::default()
};
assert_eq!(
extract_local_digest(&inspect, "zachhandley/zlayer-manager"),
Some("sha256:abc123".to_string())
);
}
#[test]
fn test_parse_memory_bytes() {
assert_eq!(parse_memory("1024"), Some(1024));
}
#[test]
fn test_parse_memory_kilobytes() {
assert_eq!(parse_memory("1Ki"), Some(1024));
assert_eq!(parse_memory("1K"), Some(1024));
assert_eq!(parse_memory("1KB"), Some(1024));
}
#[test]
fn test_parse_memory_megabytes() {
assert_eq!(parse_memory("512Mi"), Some(512 * 1024 * 1024));
assert_eq!(parse_memory("512M"), Some(512 * 1024 * 1024));
assert_eq!(parse_memory("512MB"), Some(512 * 1024 * 1024));
}
#[test]
fn test_parse_memory_gigabytes() {
assert_eq!(parse_memory("1Gi"), Some(1024 * 1024 * 1024));
assert_eq!(parse_memory("1G"), Some(1024 * 1024 * 1024));
assert_eq!(parse_memory("2GB"), Some(2 * 1024 * 1024 * 1024));
}
#[test]
fn test_parse_memory_with_decimals() {
assert_eq!(parse_memory("0.5Gi"), Some(512 * 1024 * 1024));
assert_eq!(parse_memory("1.5G"), Some(1536 * 1024 * 1024));
}
#[test]
fn test_parse_memory_invalid() {
assert_eq!(parse_memory("invalid"), None);
assert_eq!(parse_memory("XYZ"), None);
}
#[test]
fn test_build_exposed_ports_empty() {
let spec = create_test_spec(vec![]);
let ports = build_exposed_ports(&spec);
assert!(ports.is_empty());
}
#[test]
fn test_build_exposed_ports_single() {
let spec = create_test_spec(vec![8080]);
let ports = build_exposed_ports(&spec);
assert!(ports.contains(&"8080/tcp".to_string()));
assert_eq!(ports.len(), 1);
}
#[test]
fn test_build_exposed_ports_multiple() {
let spec = create_test_spec(vec![8080, 9090, 3000]);
let ports = build_exposed_ports(&spec);
assert!(ports.contains(&"8080/tcp".to_string()));
assert!(ports.contains(&"9090/tcp".to_string()));
assert!(ports.contains(&"3000/tcp".to_string()));
assert_eq!(ports.len(), 3);
}
#[test]
fn test_build_host_config_ports() {
let spec = create_test_spec(vec![8080]);
let host_config = build_host_config(&spec, None, None);
let port_bindings = host_config.port_bindings.unwrap();
let bindings = port_bindings.get("8080/tcp").unwrap().as_ref().unwrap();
assert_eq!(bindings.len(), 1);
assert_eq!(bindings[0].host_port.as_ref().unwrap(), "8080");
assert_eq!(bindings[0].host_ip.as_ref().unwrap(), "0.0.0.0");
}
#[test]
fn test_build_host_config_privileged() {
let mut spec = create_test_spec(vec![]);
spec.privileged = true;
let host_config = build_host_config(&spec, None, None);
assert_eq!(host_config.privileged, Some(true));
}
#[test]
fn build_labels_returns_none_for_empty_map() {
let spec = create_test_spec(vec![]);
assert!(spec.labels.is_empty(), "test fixture starts with no labels");
assert!(build_labels(&spec).is_none());
}
#[test]
fn build_labels_forwards_zlayer_container_id_label() {
let mut spec = create_test_spec(vec![]);
let hex = "deadbeef".repeat(8); spec.labels
.insert("com.zlayer.container_id".to_string(), hex.clone());
spec.labels
.insert("user-key".to_string(), "user-value".to_string());
let labels = build_labels(&spec).expect("non-empty labels must be Some");
assert_eq!(labels.get("com.zlayer.container_id"), Some(&hex));
assert_eq!(
labels.get("user-key"),
Some(&"user-value".to_string()),
"user-supplied labels must be preserved alongside the reserved key"
);
assert_eq!(labels.len(), 2, "no extra labels should appear");
}
fn create_test_spec(ports: Vec<u16>) -> ServiceSpec {
use zlayer_spec::*;
let endpoints: Vec<EndpointSpec> = ports
.into_iter()
.enumerate()
.map(|(i, port)| EndpointSpec {
name: format!("endpoint{i}"),
protocol: Protocol::Http,
port,
target_port: None,
path: None,
host: None,
expose: ExposeType::Internal,
stream: None,
tunnel: None,
})
.collect();
ServiceSpec {
rtype: ResourceType::Service,
schedule: None,
image: ImageSpec {
name: "test:latest".parse().expect("valid image reference"),
pull_policy: PullPolicy::IfNotPresent,
},
resources: ResourcesSpec::default(),
env: HashMap::new(),
command: CommandSpec::default(),
network: ServiceNetworkSpec::default(),
endpoints,
scale: ScaleSpec::default(),
depends: vec![],
health: HealthSpec {
start_grace: None,
interval: None,
timeout: None,
retries: 3,
check: HealthCheck::Tcp { port: 0 },
},
init: InitSpec::default(),
errors: ErrorsSpec::default(),
lifecycle: zlayer_spec::LifecycleSpec::default(),
devices: vec![],
storage: vec![],
port_mappings: vec![],
capabilities: vec![],
cap_drop: vec![],
privileged: false,
node_mode: NodeMode::default(),
node_selector: None,
service_type: ServiceType::default(),
wasm: None,
logs: None,
host_network: false,
hostname: None,
dns: Vec::new(),
extra_hosts: Vec::new(),
restart_policy: None,
platform: None,
labels: std::collections::HashMap::new(),
user: None,
stop_signal: None,
stop_grace_period: None,
sysctls: std::collections::HashMap::new(),
ulimits: std::collections::HashMap::new(),
security_opt: Vec::new(),
pid_mode: None,
ipc_mode: None,
network_mode: zlayer_spec::NetworkMode::default(),
extra_groups: Vec::new(),
read_only_root_fs: false,
init_container: None,
tty: false,
stdin_open: false,
userns_mode: None,
cgroup_parent: None,
expose: Vec::new(),
}
}
#[test]
fn test_build_host_config_port_mappings_static_and_ephemeral() {
use zlayer_spec::{PortMapping, PortProtocol};
let mut spec = create_test_spec(vec![]);
spec.port_mappings = vec![
PortMapping {
host_port: Some(8080),
container_port: 80,
protocol: PortProtocol::Tcp,
host_ip: "0.0.0.0".to_string(),
},
PortMapping {
host_port: None,
container_port: 53,
protocol: PortProtocol::Udp,
host_ip: "127.0.0.1".to_string(),
},
PortMapping {
host_port: Some(0),
container_port: 443,
protocol: PortProtocol::Tcp,
host_ip: String::new(), },
];
let host_config = build_host_config(&spec, None, None);
let port_bindings = host_config.port_bindings.expect("port_bindings set");
let tcp80 = port_bindings
.get("80/tcp")
.expect("80/tcp key")
.as_ref()
.expect("80/tcp bindings");
assert_eq!(tcp80.len(), 1);
assert_eq!(tcp80[0].host_port.as_deref(), Some("8080"));
assert_eq!(tcp80[0].host_ip.as_deref(), Some("0.0.0.0"));
let udp53 = port_bindings
.get("53/udp")
.expect("53/udp key")
.as_ref()
.expect("53/udp bindings");
assert_eq!(udp53.len(), 1);
assert!(udp53[0].host_port.is_none(), "ephemeral host_port is None");
assert_eq!(udp53[0].host_ip.as_deref(), Some("127.0.0.1"));
let tcp443 = port_bindings
.get("443/tcp")
.expect("443/tcp key")
.as_ref()
.expect("443/tcp bindings");
assert_eq!(tcp443.len(), 1);
assert!(
tcp443[0].host_port.is_none(),
"Some(0) should become None (ephemeral)"
);
assert_eq!(tcp443[0].host_ip.as_deref(), Some("0.0.0.0"));
let exposed = build_exposed_ports(&spec);
assert!(exposed.contains(&"80/tcp".to_string()));
assert!(exposed.contains(&"53/udp".to_string()));
assert!(exposed.contains(&"443/tcp".to_string()));
}
#[test]
fn translate_restart_policy_covers_all_kinds() {
use zlayer_spec::{ContainerRestartKind, ContainerRestartPolicy};
let cases: &[(
ContainerRestartKind,
Option<u32>,
RestartPolicyNameEnum,
Option<i64>,
)] = &[
(
ContainerRestartKind::No,
None,
RestartPolicyNameEnum::NO,
None,
),
(
ContainerRestartKind::Always,
None,
RestartPolicyNameEnum::ALWAYS,
None,
),
(
ContainerRestartKind::UnlessStopped,
None,
RestartPolicyNameEnum::UNLESS_STOPPED,
None,
),
(
ContainerRestartKind::OnFailure,
Some(7),
RestartPolicyNameEnum::ON_FAILURE,
Some(7),
),
(
ContainerRestartKind::OnFailure,
None,
RestartPolicyNameEnum::ON_FAILURE,
None,
),
];
for (kind, max_attempts, expected_name, expected_retry) in cases {
let policy = ContainerRestartPolicy {
kind: *kind,
max_attempts: *max_attempts,
delay: None,
};
let translated = translate_restart_policy(&policy);
assert_eq!(
translated.name,
Some(*expected_name),
"bad name for {kind:?}"
);
assert_eq!(
translated.maximum_retry_count, *expected_retry,
"bad retry count for {kind:?}"
);
}
}
#[test]
fn translate_restart_policy_ignores_delay_but_still_returns_policy() {
use zlayer_spec::{ContainerRestartKind, ContainerRestartPolicy};
let translated = translate_restart_policy(&ContainerRestartPolicy {
kind: ContainerRestartKind::Always,
max_attempts: None,
delay: Some("500ms".to_string()),
});
assert_eq!(translated.name, Some(RestartPolicyNameEnum::ALWAYS));
assert!(translated.maximum_retry_count.is_none());
}
#[test]
fn build_host_config_sets_restart_policy_when_specified() {
let mut spec = create_test_spec(vec![]);
spec.restart_policy = Some(zlayer_spec::ContainerRestartPolicy {
kind: zlayer_spec::ContainerRestartKind::OnFailure,
max_attempts: Some(3),
delay: None,
});
let host_config = build_host_config(&spec, None, None);
let rp = host_config
.restart_policy
.expect("restart_policy should be populated");
assert_eq!(rp.name, Some(RestartPolicyNameEnum::ON_FAILURE));
assert_eq!(rp.maximum_retry_count, Some(3));
}
#[test]
fn build_host_config_omits_restart_policy_when_none() {
let spec = create_test_spec(vec![]);
let host_config = build_host_config(&spec, None, None);
assert!(
host_config.restart_policy.is_none(),
"no restart_policy on spec should leave the field None"
);
}
#[test]
fn parse_port_key_accepts_tcp_and_udp() {
use zlayer_spec::PortProtocol;
assert_eq!(parse_port_key("80/tcp"), Some((80, PortProtocol::Tcp)));
assert_eq!(parse_port_key("53/udp"), Some((53, PortProtocol::Udp)));
assert_eq!(parse_port_key(""), None);
assert_eq!(parse_port_key("80"), None);
assert_eq!(parse_port_key("abc/tcp"), None);
assert_eq!(parse_port_key("80/sctp"), None);
}
#[test]
fn translate_inspect_details_translates_ports_and_networks() {
use bollard::models::{
ContainerInspectResponse, ContainerState, ContainerStateStatusEnum, EndpointSettings,
Health, HealthStatusEnum, HealthcheckResult, NetworkSettings, PortBinding,
};
use std::collections::HashMap;
let mut ports: HashMap<String, Option<Vec<PortBinding>>> = HashMap::new();
ports.insert(
"80/tcp".to_string(),
Some(vec![PortBinding {
host_ip: Some("0.0.0.0".to_string()),
host_port: Some("8080".to_string()),
}]),
);
ports.insert(
"53/udp".to_string(),
Some(vec![PortBinding {
host_ip: Some("127.0.0.1".to_string()),
host_port: None,
}]),
);
let mut networks: HashMap<String, EndpointSettings> = HashMap::new();
networks.insert(
"bridge".to_string(),
EndpointSettings {
aliases: Some(vec!["myapp".to_string()]),
ip_address: Some("172.17.0.2".to_string()),
..Default::default()
},
);
let inspect = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::RUNNING),
health: Some(Health {
status: Some(HealthStatusEnum::HEALTHY),
failing_streak: Some(0),
log: Some(vec![HealthcheckResult {
output: Some("OK".to_string()),
..Default::default()
}]),
}),
..Default::default()
}),
network_settings: Some(NetworkSettings {
ports: Some(ports),
networks: Some(networks),
..Default::default()
}),
..Default::default()
};
let details = translate_inspect_details(&inspect);
assert_eq!(details.ports.len(), 2);
let tcp80 = details
.ports
.iter()
.find(|p| p.container_port == 80)
.expect("80/tcp mapping");
assert_eq!(tcp80.host_port, Some(8080));
assert_eq!(tcp80.host_ip, "0.0.0.0");
assert_eq!(tcp80.protocol, zlayer_spec::PortProtocol::Tcp);
let udp53 = details
.ports
.iter()
.find(|p| p.container_port == 53)
.expect("53/udp mapping");
assert_eq!(udp53.host_port, None);
assert_eq!(udp53.host_ip, "127.0.0.1");
assert_eq!(udp53.protocol, zlayer_spec::PortProtocol::Udp);
assert_eq!(details.networks.len(), 1);
assert_eq!(details.networks[0].network, "bridge");
assert_eq!(details.networks[0].aliases, vec!["myapp".to_string()]);
assert_eq!(details.networks[0].ipv4.as_deref(), Some("172.17.0.2"));
assert_eq!(details.ipv4.as_deref(), Some("172.17.0.2"));
let health = details.health.expect("health translated");
assert_eq!(health.status, "healthy");
assert_eq!(health.failing_streak, Some(0));
assert_eq!(health.last_output.as_deref(), Some("OK"));
assert!(details.exit_code.is_none());
}
#[test]
fn translate_inspect_details_exit_code_only_on_exited() {
use bollard::models::{ContainerInspectResponse, ContainerState, ContainerStateStatusEnum};
let running = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::RUNNING),
exit_code: Some(0),
..Default::default()
}),
..Default::default()
};
assert!(translate_inspect_details(&running).exit_code.is_none());
let exited = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::EXITED),
exit_code: Some(137),
..Default::default()
}),
..Default::default()
};
assert_eq!(translate_inspect_details(&exited).exit_code, Some(137));
let dead = ContainerInspectResponse {
state: Some(ContainerState {
status: Some(ContainerStateStatusEnum::DEAD),
exit_code: Some(255),
..Default::default()
}),
..Default::default()
};
assert_eq!(translate_inspect_details(&dead).exit_code, Some(255));
}
#[test]
fn translate_inspect_details_empty_response_yields_default() {
let inspect = bollard::models::ContainerInspectResponse::default();
let details = translate_inspect_details(&inspect);
assert!(details.ports.is_empty());
assert!(details.networks.is_empty());
assert!(details.ipv4.is_none());
assert!(details.health.is_none());
assert!(details.exit_code.is_none());
}
#[test]
fn build_logs_options_maps_every_field() {
let opts = LogsStreamOptions {
follow: true,
tail: Some(50),
since: Some(1_700_000_000),
until: Some(1_700_000_500),
timestamps: true,
stdout: true,
stderr: false,
};
let bollard = build_logs_options(&opts);
assert!(bollard.follow);
assert!(bollard.stdout);
assert!(!bollard.stderr);
assert!(bollard.timestamps);
assert_eq!(bollard.tail, "50");
assert_eq!(bollard.since, 1_700_000_000_i32);
assert_eq!(bollard.until, 1_700_000_500_i32);
}
#[test]
fn build_logs_options_default_tail_is_all_and_zero_window() {
let opts = LogsStreamOptions::default();
let bollard = build_logs_options(&opts);
assert_eq!(bollard.tail, "all");
assert_eq!(bollard.since, 0);
assert_eq!(bollard.until, 0);
assert!(!bollard.follow);
assert!(!bollard.stdout);
assert!(!bollard.stderr);
assert!(!bollard.timestamps);
}
#[test]
fn log_output_to_chunk_classifies_each_variant() {
let stdout = log_output_to_chunk(
bollard::container::LogOutput::StdOut {
message: bytes::Bytes::from_static(b"hello\n"),
},
false,
);
assert_eq!(stdout.stream, LogChannel::Stdout);
assert_eq!(stdout.bytes.as_ref(), b"hello\n");
assert!(
stdout.timestamp.is_none(),
"no timestamp when not requested"
);
let console = log_output_to_chunk(
bollard::container::LogOutput::Console {
message: bytes::Bytes::from_static(b"tty\n"),
},
false,
);
assert_eq!(console.stream, LogChannel::Stdout);
let stderr = log_output_to_chunk(
bollard::container::LogOutput::StdErr {
message: bytes::Bytes::from_static(b"oops\n"),
},
true,
);
assert_eq!(stderr.stream, LogChannel::Stderr);
assert_eq!(stderr.bytes.as_ref(), b"oops\n");
assert!(
stderr.timestamp.is_some(),
"timestamp populated when requested"
);
let stdin = log_output_to_chunk(
bollard::container::LogOutput::StdIn {
message: bytes::Bytes::from_static(b"in\n"),
},
false,
);
assert_eq!(stdin.stream, LogChannel::Stdin);
}
#[test]
fn translate_stats_sample_maps_cpu_memory_and_pids() {
use bollard::models::{
ContainerCpuStats, ContainerCpuUsage, ContainerMemoryStats, ContainerPidsStats,
};
let response = ContainerStatsResponse {
cpu_stats: Some(ContainerCpuStats {
cpu_usage: Some(ContainerCpuUsage {
total_usage: Some(123_456_789),
..Default::default()
}),
system_cpu_usage: Some(987_654_321),
online_cpus: Some(8),
..Default::default()
}),
memory_stats: Some(ContainerMemoryStats {
usage: Some(50 * 1024 * 1024),
limit: Some(256 * 1024 * 1024),
..Default::default()
}),
pids_stats: Some(ContainerPidsStats {
current: Some(17),
limit: Some(0), }),
..Default::default()
};
let sample = translate_stats_sample(&response);
assert_eq!(sample.cpu_total_ns, 123_456_789);
assert_eq!(sample.cpu_system_ns, 987_654_321);
assert_eq!(sample.online_cpus, 8);
assert_eq!(sample.mem_used_bytes, 50 * 1024 * 1024);
assert_eq!(sample.mem_limit_bytes, 256 * 1024 * 1024);
assert_eq!(sample.pids_current, 17);
assert_eq!(sample.pids_limit, None, "Docker `limit: 0` -> None");
assert_eq!(sample.net_rx_bytes, 0);
assert_eq!(sample.net_tx_bytes, 0);
assert_eq!(sample.blkio_read_bytes, 0);
assert_eq!(sample.blkio_write_bytes, 0);
}
#[test]
fn translate_stats_sample_sums_networks_and_blkio() {
use bollard::models::{
ContainerBlkioStatEntry, ContainerBlkioStats, ContainerNetworkStats, ContainerPidsStats,
};
use std::collections::HashMap;
let mut nets: HashMap<String, ContainerNetworkStats> = HashMap::new();
nets.insert(
"eth0".to_string(),
ContainerNetworkStats {
rx_bytes: Some(1000),
tx_bytes: Some(2000),
..Default::default()
},
);
nets.insert(
"eth1".to_string(),
ContainerNetworkStats {
rx_bytes: Some(500),
tx_bytes: Some(750),
..Default::default()
},
);
let blkio = ContainerBlkioStats {
io_service_bytes_recursive: Some(vec![
ContainerBlkioStatEntry {
op: Some("Read".to_string()),
value: Some(4096),
..Default::default()
},
ContainerBlkioStatEntry {
op: Some("Read".to_string()),
value: Some(2048),
..Default::default()
},
ContainerBlkioStatEntry {
op: Some("Write".to_string()),
value: Some(8192),
..Default::default()
},
ContainerBlkioStatEntry {
op: Some("Sync".to_string()),
value: Some(99),
..Default::default()
},
]),
..Default::default()
};
let response = ContainerStatsResponse {
networks: Some(nets),
blkio_stats: Some(blkio),
pids_stats: Some(ContainerPidsStats {
current: Some(5),
limit: Some(4096),
}),
..Default::default()
};
let sample = translate_stats_sample(&response);
assert_eq!(sample.net_rx_bytes, 1500);
assert_eq!(sample.net_tx_bytes, 2750);
assert_eq!(sample.blkio_read_bytes, 4096 + 2048);
assert_eq!(sample.blkio_write_bytes, 8192);
assert_eq!(sample.pids_current, 5);
assert_eq!(sample.pids_limit, Some(4096));
}
#[test]
fn translate_pull_progress_maps_status_and_detail() {
use bollard::models::ProgressDetail;
let info = CreateImageInfo {
id: Some("layer-abc".to_string()),
status: Some("Downloading".to_string()),
progress_detail: Some(ProgressDetail {
current: Some(1024),
total: Some(2048),
}),
..Default::default()
};
match translate_pull_progress(info) {
PullProgress::Status {
id,
status,
progress,
current,
total,
} => {
assert_eq!(id.as_deref(), Some("layer-abc"));
assert_eq!(status, "Downloading");
assert!(progress.is_none(), "preformatted bar not modelled");
assert_eq!(current, Some(1024));
assert_eq!(total, Some(2048));
}
done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
}
}
#[test]
fn translate_pull_progress_handles_missing_status_and_negative_detail() {
use bollard::models::ProgressDetail;
let info = CreateImageInfo {
id: None,
status: None,
progress_detail: Some(ProgressDetail {
current: Some(-1),
total: Some(-2),
}),
..Default::default()
};
match translate_pull_progress(info) {
PullProgress::Status {
id,
status,
current,
total,
..
} => {
assert!(id.is_none());
assert!(status.is_empty());
assert!(current.is_none());
assert!(total.is_none());
}
done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
}
}
#[test]
fn extract_digest_from_status_recognises_sha256() {
assert_eq!(
extract_digest_from_status(Some("Digest: sha256:abc123")).as_deref(),
Some("sha256:abc123")
);
assert_eq!(
extract_digest_from_status(Some(" Digest: sha256:def456 ")).as_deref(),
Some("sha256:def456")
);
assert!(extract_digest_from_status(Some("Pulling fs layer")).is_none());
assert!(extract_digest_from_status(Some("Digest: not-a-hash")).is_none());
assert!(extract_digest_from_status(None).is_none());
}
#[test]
fn docker_credentials_from_registry_auth_basic_and_token() {
let basic = RegistryAuth {
username: "alice".to_string(),
password: "hunter2".to_string(),
auth_type: RegistryAuthType::Basic,
};
let creds = docker_credentials_from_registry_auth(&basic);
assert_eq!(creds.username.as_deref(), Some("alice"));
assert_eq!(creds.password.as_deref(), Some("hunter2"));
assert!(creds.serveraddress.is_none());
assert!(creds.identitytoken.is_none());
let token = RegistryAuth {
username: "<token>".to_string(),
password: "ghp_xxx".to_string(),
auth_type: RegistryAuthType::Token,
};
let creds = docker_credentials_from_registry_auth(&token);
assert_eq!(creds.username.as_deref(), Some("<token>"));
assert_eq!(creds.password.as_deref(), Some("ghp_xxx"));
}
#[test]
fn build_create_exec_options_translates_full_payload() {
let opts = ExecOptions {
command: vec!["sh".into(), "-lc".into(), "echo hi".into()],
env: vec!["PATH=/usr/bin".into(), "FOO=bar".into()],
working_dir: Some("/work".into()),
user: Some("1000:1000".into()),
privileged: true,
tty: true,
attach_stdin: true,
attach_stdout: true,
attach_stderr: true,
};
let bollard_opts = build_create_exec_options(&opts);
assert_eq!(
bollard_opts.cmd.as_deref(),
Some(&["sh".to_string(), "-lc".to_string(), "echo hi".to_string()][..])
);
assert_eq!(
bollard_opts.env.as_deref(),
Some(&["PATH=/usr/bin".to_string(), "FOO=bar".to_string()][..])
);
assert_eq!(bollard_opts.working_dir.as_deref(), Some("/work"));
assert_eq!(bollard_opts.user.as_deref(), Some("1000:1000"));
assert_eq!(bollard_opts.privileged, Some(true));
assert_eq!(bollard_opts.tty, Some(true));
assert_eq!(bollard_opts.attach_stdin, Some(true));
assert_eq!(bollard_opts.attach_stdout, Some(true));
assert_eq!(bollard_opts.attach_stderr, Some(true));
assert!(bollard_opts.detach_keys.is_none());
}
#[test]
fn build_create_exec_options_collapses_empty_collections_to_none() {
let opts = ExecOptions::default();
let bollard_opts = build_create_exec_options(&opts);
assert!(
bollard_opts.cmd.is_none(),
"empty command must translate to None"
);
assert!(
bollard_opts.env.is_none(),
"empty env must translate to None"
);
assert!(bollard_opts.working_dir.is_none());
assert!(bollard_opts.user.is_none());
assert_eq!(bollard_opts.privileged, Some(false));
assert_eq!(bollard_opts.tty, Some(false));
assert_eq!(bollard_opts.attach_stdin, Some(false));
}
#[test]
fn resize_options_for_maps_rows_to_h_and_cols_to_w() {
let opts = resize_options_for(40, 132);
assert_eq!(opts.height, 40, "rows -> h (height)");
assert_eq!(opts.width, 132, "cols -> w (width)");
let zero = resize_options_for(0, 0);
assert_eq!(zero.height, 0);
assert_eq!(zero.width, 0);
let max = resize_options_for(u16::MAX, u16::MAX);
assert_eq!(max.height, u16::MAX);
assert_eq!(max.width, u16::MAX);
}
#[test]
fn extract_exec_exit_code_handles_present_missing_and_out_of_range() {
let inspect = ExecInspectResponse {
exit_code: Some(0),
..Default::default()
};
assert_eq!(extract_exec_exit_code(&inspect), 0);
let nonzero = ExecInspectResponse {
exit_code: Some(137), ..Default::default()
};
assert_eq!(extract_exec_exit_code(&nonzero), 137);
let missing = ExecInspectResponse {
exit_code: None,
..Default::default()
};
assert_eq!(extract_exec_exit_code(&missing), 0);
let too_big = ExecInspectResponse {
exit_code: Some(i64::from(i32::MAX) + 1),
..Default::default()
};
assert_eq!(extract_exec_exit_code(&too_big), i32::MAX);
let too_small = ExecInspectResponse {
exit_code: Some(i64::from(i32::MIN) - 1),
..Default::default()
};
assert_eq!(extract_exec_exit_code(&too_small), i32::MIN);
}
#[ignore = "requires a running local Docker daemon"]
#[tokio::test]
async fn exec_pty_round_trip_against_real_docker() {
use crate::runtime::ContainerId;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let runtime = DockerRuntime::new(None)
.await
.expect("connect to local docker");
let id = ContainerId {
service: "exec-pty-smoke".to_string(),
replica: 0,
};
let opts = ExecOptions {
command: vec!["sh".into()],
tty: true,
attach_stdin: true,
attach_stdout: true,
attach_stderr: true,
..Default::default()
};
let mut handle = runtime.exec_pty(&id, opts).await.expect("exec_pty starts");
handle
.stream
.write_all(b"exit 7\n")
.await
.expect("send exit 7");
handle.stream.flush().await.expect("flush");
let mut buf = Vec::new();
let _ = handle.stream.read_to_end(&mut buf).await;
let code = handle.exit.await.expect("exit future resolves");
assert_eq!(code, 7, "shell exited with the value we asked for");
}
}