mecha10_nodes_diagnostics/
node.rs

1//! Diagnostics node implementation
2
3use anyhow::Result;
4use mecha10_core::prelude::*;
5use mecha10_diagnostics::collectors::{DockerCollector, RedisCollector, SystemCollector};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::Mutex;
9use tokio::time;
10use tracing::{info, warn};
11
12/// Diagnostics node that publishes Docker, system, and Redis metrics
13pub struct DiagnosticsNode {
14    ctx: Arc<Context>,
15    docker_collector: Arc<DockerCollector>,
16    system_collector: Arc<SystemCollector>,
17    redis_collector: Arc<Mutex<RedisCollector>>,
18}
19
20impl DiagnosticsNode {
21    /// Create a new diagnostics node
22    pub async fn new() -> Result<Self> {
23        let ctx = Arc::new(Context::new("diagnostics-node").await?);
24
25        // Get Redis URL from context's infrastructure config
26        let redis_url = Context::get_redis_url()?;
27
28        // Create collectors
29        let docker_collector = Arc::new(DockerCollector::new("diagnostics-node").await);
30        let system_collector = Arc::new(SystemCollector::new("diagnostics-node"));
31        let redis_collector = Arc::new(Mutex::new(RedisCollector::new("diagnostics-node", &redis_url).await?));
32
33        Ok(Self {
34            ctx,
35            docker_collector,
36            system_collector,
37            redis_collector,
38        })
39    }
40
41    /// Run the diagnostics node
42    pub async fn run(&self) -> Result<()> {
43        info!("Starting diagnostics node");
44
45        // Spawn Docker metrics task
46        let ctx_docker = self.ctx.clone();
47        let docker_collector = self.docker_collector.clone();
48        tokio::spawn(async move {
49            let mut interval = time::interval(Duration::from_secs(5));
50            loop {
51                interval.tick().await;
52                if let Err(e) = docker_collector.collect_all_containers(&ctx_docker).await {
53                    warn!("Failed to collect Docker metrics: {}", e);
54                }
55            }
56        });
57
58        // Spawn system metrics task
59        let ctx_system = self.ctx.clone();
60        let system_collector = self.system_collector.clone();
61        tokio::spawn(async move {
62            let mut interval = time::interval(Duration::from_secs(5));
63            loop {
64                interval.tick().await;
65                if let Err(e) = system_collector.collect_metrics(&ctx_system).await {
66                    warn!("Failed to collect system metrics: {}", e);
67                }
68            }
69        });
70
71        // Spawn Redis metrics task
72        let ctx_redis = self.ctx.clone();
73        let redis_collector = self.redis_collector.clone();
74        tokio::spawn(async move {
75            let mut interval = time::interval(Duration::from_secs(5));
76            loop {
77                interval.tick().await;
78                let mut collector = redis_collector.lock().await;
79                if let Err(e) = collector.collect_all(&ctx_redis).await {
80                    warn!("Failed to collect Redis metrics: {}", e);
81                }
82            }
83        });
84
85        info!("Diagnostics node running - publishing Docker, system, and Redis metrics every 5 seconds");
86
87        // Keep the node running
88        loop {
89            time::sleep(Duration::from_secs(60)).await;
90            info!("Diagnostics node alive - all metrics publishing");
91        }
92    }
93}