use crate::topics::*;
use crate::types::*;
use anyhow::{Context as _, Result};
use bollard::container::StatsOptions;
use bollard::Docker;
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use tracing::{debug, error, info};
pub struct DockerCollector {
source: String,
docker: Option<Docker>,
}
impl DockerCollector {
pub async fn new(source: impl Into<String>) -> Self {
let source = source.into();
let docker = Self::connect_to_docker().await;
Self { source, docker }
}
async fn connect_to_docker() -> Option<Docker> {
match Docker::connect_with_local_defaults() {
Ok(d) => {
if Self::verify_connection(&d).await {
info!("✅ Connected to Docker daemon (local defaults)");
return Some(d);
}
}
Err(e) => {
debug!("Failed to connect with local defaults: {}", e);
}
}
#[cfg(target_os = "macos")]
{
if let Some(home) = std::env::var_os("HOME") {
let socket_path = format!("{}/.docker/run/docker.sock", home.to_string_lossy());
match Docker::connect_with_socket(&socket_path, 120, bollard::API_DEFAULT_VERSION) {
Ok(d) => {
if Self::verify_connection(&d).await {
info!("✅ Connected to Docker daemon (macOS: {})", socket_path);
return Some(d);
}
}
Err(e) => {
debug!("Failed to connect to macOS socket {}: {}", socket_path, e);
}
}
}
}
#[cfg(target_os = "linux")]
{
let socket_path = "/var/run/docker.sock";
match Docker::connect_with_socket(socket_path, 120, bollard::API_DEFAULT_VERSION) {
Ok(d) => {
if Self::verify_connection(&d).await {
info!("✅ Connected to Docker daemon (Linux: {})", socket_path);
return Some(d);
}
}
Err(e) => {
debug!("Failed to connect to Linux socket {}: {}", socket_path, e);
}
}
}
error!("❌ Failed to connect to Docker daemon on any socket. Docker metrics disabled.");
error!(" Tried: DOCKER_HOST env var, platform defaults, and platform-specific sockets");
None
}
async fn verify_connection(docker: &Docker) -> bool {
match docker.ping().await {
Ok(_) => true,
Err(e) => {
debug!("Docker connection verification failed: {}", e);
false
}
}
}
pub fn is_available(&self) -> bool {
self.docker.is_some()
}
pub async fn collect_all_containers(&self, ctx: &Context) -> Result<()> {
let docker = match &self.docker {
Some(d) => d,
None => return Ok(()), };
let containers = docker
.list_containers::<String>(None)
.await
.context("Failed to list containers")?;
debug!("📦 Found {} running containers", containers.len());
for container in containers {
let id = container.id.as_ref().unwrap_or(&"unknown".to_string()).clone();
let name = container
.names
.as_ref()
.and_then(|n| n.first())
.map(|n| n.trim_start_matches('/').to_string())
.unwrap_or_else(|| id.clone());
let stats_options = StatsOptions {
stream: false,
one_shot: true,
};
match docker.stats(&id, Some(stats_options)).try_next().await {
Ok(Some(stats)) => {
let metrics = self.parse_stats(&id, &name, stats)?;
self.publish_container_metrics(ctx, metrics).await?;
}
Ok(None) => {
debug!("No stats available for container {}", name);
}
Err(e) => {
error!("Failed to get stats for container {}: {}", name, e);
}
}
}
Ok(())
}
fn parse_stats(&self, id: &str, name: &str, stats: bollard::container::Stats) -> Result<DockerContainerMetrics> {
let cpu_percent = {
let cpu_stats = &stats.cpu_stats;
let precpu_stats = &stats.precpu_stats;
let cpu_delta = cpu_stats.cpu_usage.total_usage as f64 - precpu_stats.cpu_usage.total_usage as f64;
let system_delta =
cpu_stats.system_cpu_usage.unwrap_or(0) as f64 - precpu_stats.system_cpu_usage.unwrap_or(0) as f64;
let num_cpus = cpu_stats.online_cpus.unwrap_or(1) as f64;
if system_delta > 0.0 && cpu_delta > 0.0 {
(cpu_delta / system_delta) * num_cpus * 100.0
} else {
0.0
}
};
let memory_stats = stats.memory_stats;
let memory_usage_bytes = memory_stats.usage.unwrap_or(0);
let memory_limit_bytes = memory_stats.limit.unwrap_or(0);
let memory_percent = if memory_limit_bytes > 0 {
(memory_usage_bytes as f64 / memory_limit_bytes as f64) * 100.0
} else {
0.0
};
let (network_rx_bytes, network_tx_bytes) = if let Some(networks) = &stats.networks {
let rx: u64 = networks.values().map(|n| n.rx_bytes).sum();
let tx: u64 = networks.values().map(|n| n.tx_bytes).sum();
(rx, tx)
} else {
(0, 0)
};
let (block_io_read_bytes, block_io_write_bytes) = {
let blkio_stats = &stats.blkio_stats;
let read: u64 = blkio_stats
.io_service_bytes_recursive
.as_ref()
.map(|entries| {
entries
.iter()
.filter(|e| e.op == "read" || e.op == "Read")
.map(|e| e.value)
.sum()
})
.unwrap_or(0);
let write: u64 = blkio_stats
.io_service_bytes_recursive
.as_ref()
.map(|entries| {
entries
.iter()
.filter(|e| e.op == "write" || e.op == "Write")
.map(|e| e.value)
.sum()
})
.unwrap_or(0);
(read, write)
};
Ok(DockerContainerMetrics {
container_id: id.to_string(),
container_name: name.to_string(),
cpu_percent,
memory_usage_bytes,
memory_limit_bytes,
memory_percent,
network_rx_bytes,
network_tx_bytes,
block_io_read_bytes,
block_io_write_bytes,
})
}
async fn publish_container_metrics(&self, ctx: &Context, metrics: DockerContainerMetrics) -> Result<()> {
let msg = DiagnosticMessage::new(&self.source, metrics);
ctx.publish_to(
Topic::<DiagnosticMessage<DockerContainerMetrics>>::new(TOPIC_DIAGNOSTICS_DOCKER_CONTAINERS),
&msg,
)
.await?;
Ok(())
}
}
use futures::stream::TryStreamExt;