mecha10-diagnostics 0.1.25

Diagnostics and metrics collection for Mecha10 robotics framework
Documentation
//! Docker container metrics collector

use crate::topics::*;
use crate::types::*;
use anyhow::{Context as _, Result};
use bollard::container::StatsOptions;
use bollard::Docker;
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use tracing::{debug, error, info};

/// Docker container metrics collector
pub struct DockerCollector {
    source: String,
    docker: Option<Docker>,
}

impl DockerCollector {
    /// Create a new Docker collector
    ///
    /// Attempts to connect to Docker daemon. If connection fails, collector will be disabled.
    pub async fn new(source: impl Into<String>) -> Self {
        let source = source.into();

        let docker = Self::connect_to_docker().await;

        Self { source, docker }
    }

    /// Attempt to connect to Docker daemon with platform-specific fallbacks
    async fn connect_to_docker() -> Option<Docker> {
        // Try 1: Local defaults (checks DOCKER_HOST env var, then platform defaults)
        match Docker::connect_with_local_defaults() {
            Ok(d) => {
                if Self::verify_connection(&d).await {
                    info!("✅ Connected to Docker daemon (local defaults)");
                    return Some(d);
                }
            }
            Err(e) => {
                debug!("Failed to connect with local defaults: {}", e);
            }
        }

        // Try 2: macOS Docker Desktop socket
        #[cfg(target_os = "macos")]
        {
            if let Some(home) = std::env::var_os("HOME") {
                let socket_path = format!("{}/.docker/run/docker.sock", home.to_string_lossy());
                match Docker::connect_with_socket(&socket_path, 120, bollard::API_DEFAULT_VERSION) {
                    Ok(d) => {
                        if Self::verify_connection(&d).await {
                            info!("✅ Connected to Docker daemon (macOS: {})", socket_path);
                            return Some(d);
                        }
                    }
                    Err(e) => {
                        debug!("Failed to connect to macOS socket {}: {}", socket_path, e);
                    }
                }
            }
        }

        // Try 3: Linux default socket
        #[cfg(target_os = "linux")]
        {
            let socket_path = "/var/run/docker.sock";
            match Docker::connect_with_socket(socket_path, 120, bollard::API_DEFAULT_VERSION) {
                Ok(d) => {
                    if Self::verify_connection(&d).await {
                        info!("✅ Connected to Docker daemon (Linux: {})", socket_path);
                        return Some(d);
                    }
                }
                Err(e) => {
                    debug!("Failed to connect to Linux socket {}: {}", socket_path, e);
                }
            }
        }

        error!("❌ Failed to connect to Docker daemon on any socket. Docker metrics disabled.");
        error!("   Tried: DOCKER_HOST env var, platform defaults, and platform-specific sockets");
        None
    }

    /// Verify Docker connection by pinging the daemon
    async fn verify_connection(docker: &Docker) -> bool {
        match docker.ping().await {
            Ok(_) => true,
            Err(e) => {
                debug!("Docker connection verification failed: {}", e);
                false
            }
        }
    }

    /// Check if Docker is available
    pub fn is_available(&self) -> bool {
        self.docker.is_some()
    }

    /// Collect metrics for all running containers
    pub async fn collect_all_containers(&self, ctx: &Context) -> Result<()> {
        let docker = match &self.docker {
            Some(d) => d,
            None => return Ok(()), // Docker not available, skip silently
        };

        // List running containers
        let containers = docker
            .list_containers::<String>(None)
            .await
            .context("Failed to list containers")?;

        debug!("📦 Found {} running containers", containers.len());

        // Collect stats for each container
        for container in containers {
            let id = container.id.as_ref().unwrap_or(&"unknown".to_string()).clone();
            let name = container
                .names
                .as_ref()
                .and_then(|n| n.first())
                .map(|n| n.trim_start_matches('/').to_string())
                .unwrap_or_else(|| id.clone());

            // Get single stats snapshot (stream=false)
            let stats_options = StatsOptions {
                stream: false,
                one_shot: true,
            };

            match docker.stats(&id, Some(stats_options)).try_next().await {
                Ok(Some(stats)) => {
                    let metrics = self.parse_stats(&id, &name, stats)?;
                    self.publish_container_metrics(ctx, metrics).await?;
                }
                Ok(None) => {
                    debug!("No stats available for container {}", name);
                }
                Err(e) => {
                    error!("Failed to get stats for container {}: {}", name, e);
                }
            }
        }

        Ok(())
    }

    /// Parse Docker stats into our metric format
    fn parse_stats(&self, id: &str, name: &str, stats: bollard::container::Stats) -> Result<DockerContainerMetrics> {
        // CPU percentage
        let cpu_percent = {
            let cpu_stats = &stats.cpu_stats;
            let precpu_stats = &stats.precpu_stats;
            let cpu_delta = cpu_stats.cpu_usage.total_usage as f64 - precpu_stats.cpu_usage.total_usage as f64;
            let system_delta =
                cpu_stats.system_cpu_usage.unwrap_or(0) as f64 - precpu_stats.system_cpu_usage.unwrap_or(0) as f64;
            let num_cpus = cpu_stats.online_cpus.unwrap_or(1) as f64;

            if system_delta > 0.0 && cpu_delta > 0.0 {
                (cpu_delta / system_delta) * num_cpus * 100.0
            } else {
                0.0
            }
        };

        // Memory
        let memory_stats = stats.memory_stats;
        let memory_usage_bytes = memory_stats.usage.unwrap_or(0);
        let memory_limit_bytes = memory_stats.limit.unwrap_or(0);
        let memory_percent = if memory_limit_bytes > 0 {
            (memory_usage_bytes as f64 / memory_limit_bytes as f64) * 100.0
        } else {
            0.0
        };

        // Network
        let (network_rx_bytes, network_tx_bytes) = if let Some(networks) = &stats.networks {
            let rx: u64 = networks.values().map(|n| n.rx_bytes).sum();
            let tx: u64 = networks.values().map(|n| n.tx_bytes).sum();
            (rx, tx)
        } else {
            (0, 0)
        };

        // Block I/O
        let (block_io_read_bytes, block_io_write_bytes) = {
            let blkio_stats = &stats.blkio_stats;
            let read: u64 = blkio_stats
                .io_service_bytes_recursive
                .as_ref()
                .map(|entries| {
                    entries
                        .iter()
                        .filter(|e| e.op == "read" || e.op == "Read")
                        .map(|e| e.value)
                        .sum()
                })
                .unwrap_or(0);

            let write: u64 = blkio_stats
                .io_service_bytes_recursive
                .as_ref()
                .map(|entries| {
                    entries
                        .iter()
                        .filter(|e| e.op == "write" || e.op == "Write")
                        .map(|e| e.value)
                        .sum()
                })
                .unwrap_or(0);

            (read, write)
        };

        Ok(DockerContainerMetrics {
            container_id: id.to_string(),
            container_name: name.to_string(),
            cpu_percent,
            memory_usage_bytes,
            memory_limit_bytes,
            memory_percent,
            network_rx_bytes,
            network_tx_bytes,
            block_io_read_bytes,
            block_io_write_bytes,
        })
    }

    /// Publish container metrics to diagnostic topic
    async fn publish_container_metrics(&self, ctx: &Context, metrics: DockerContainerMetrics) -> Result<()> {
        let msg = DiagnosticMessage::new(&self.source, metrics);
        ctx.publish_to(
            Topic::<DiagnosticMessage<DockerContainerMetrics>>::new(TOPIC_DIAGNOSTICS_DOCKER_CONTAINERS),
            &msg,
        )
        .await?;

        Ok(())
    }
}

// Import for stream handling
use futures::stream::TryStreamExt;