mecha10-nodes-diagnostics 0.1.25

Diagnostics node that publishes Docker and system resource metrics
Documentation
//! Diagnostics node implementation

use anyhow::Result;
use mecha10_core::prelude::*;
use mecha10_diagnostics::collectors::{DockerCollector, RedisCollector, SystemCollector};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time;
use tracing::{info, warn};

/// Diagnostics node that publishes Docker, system, and Redis metrics
pub struct DiagnosticsNode {
    ctx: Arc<Context>,
    docker_collector: Arc<DockerCollector>,
    system_collector: Arc<SystemCollector>,
    redis_collector: Arc<Mutex<RedisCollector>>,
}

impl DiagnosticsNode {
    /// Create a new diagnostics node
    pub async fn new() -> Result<Self> {
        let ctx = Arc::new(Context::new("diagnostics-node").await?);

        // Get Redis URL from context's infrastructure config
        let redis_url = Context::get_redis_url()?;

        // Create collectors
        let docker_collector = Arc::new(DockerCollector::new("diagnostics-node").await);
        let system_collector = Arc::new(SystemCollector::new("diagnostics-node"));
        let redis_collector = Arc::new(Mutex::new(RedisCollector::new("diagnostics-node", &redis_url).await?));

        Ok(Self {
            ctx,
            docker_collector,
            system_collector,
            redis_collector,
        })
    }

    /// Run the diagnostics node
    pub async fn run(&self) -> Result<()> {
        info!("Starting diagnostics node");

        // Spawn Docker metrics task
        let ctx_docker = self.ctx.clone();
        let docker_collector = self.docker_collector.clone();
        tokio::spawn(async move {
            let mut interval = time::interval(Duration::from_secs(5));
            loop {
                interval.tick().await;
                if let Err(e) = docker_collector.collect_all_containers(&ctx_docker).await {
                    warn!("Failed to collect Docker metrics: {}", e);
                }
            }
        });

        // Spawn system metrics task
        let ctx_system = self.ctx.clone();
        let system_collector = self.system_collector.clone();
        tokio::spawn(async move {
            let mut interval = time::interval(Duration::from_secs(5));
            loop {
                interval.tick().await;
                if let Err(e) = system_collector.collect_metrics(&ctx_system).await {
                    warn!("Failed to collect system metrics: {}", e);
                }
            }
        });

        // Spawn Redis metrics task
        let ctx_redis = self.ctx.clone();
        let redis_collector = self.redis_collector.clone();
        tokio::spawn(async move {
            let mut interval = time::interval(Duration::from_secs(5));
            loop {
                interval.tick().await;
                let mut collector = redis_collector.lock().await;
                if let Err(e) = collector.collect_all(&ctx_redis).await {
                    warn!("Failed to collect Redis metrics: {}", e);
                }
            }
        });

        info!("Diagnostics node running - publishing Docker, system, and Redis metrics every 5 seconds");

        // Keep the node running
        loop {
            time::sleep(Duration::from_secs(60)).await;
            info!("Diagnostics node alive - all metrics publishing");
        }
    }
}