mecha10_nodes_diagnostics/
node.rs1use anyhow::Result;
4use mecha10_core::prelude::*;
5use mecha10_diagnostics::collectors::{DockerCollector, RedisCollector, SystemCollector};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::Mutex;
9use tokio::time;
10use tracing::{info, warn};
11
12pub struct DiagnosticsNode {
14 ctx: Arc<Context>,
15 docker_collector: Arc<DockerCollector>,
16 system_collector: Arc<SystemCollector>,
17 redis_collector: Arc<Mutex<RedisCollector>>,
18}
19
20impl DiagnosticsNode {
21 pub async fn new() -> Result<Self> {
23 let ctx = Arc::new(Context::new("diagnostics-node").await?);
24
25 let redis_url = Context::get_redis_url()?;
27
28 let docker_collector = Arc::new(DockerCollector::new("diagnostics-node").await);
30 let system_collector = Arc::new(SystemCollector::new("diagnostics-node"));
31 let redis_collector = Arc::new(Mutex::new(RedisCollector::new("diagnostics-node", &redis_url).await?));
32
33 Ok(Self {
34 ctx,
35 docker_collector,
36 system_collector,
37 redis_collector,
38 })
39 }
40
41 pub async fn run(&self) -> Result<()> {
43 info!("Starting diagnostics node");
44
45 let ctx_docker = self.ctx.clone();
47 let docker_collector = self.docker_collector.clone();
48 tokio::spawn(async move {
49 let mut interval = time::interval(Duration::from_secs(5));
50 loop {
51 interval.tick().await;
52 if let Err(e) = docker_collector.collect_all_containers(&ctx_docker).await {
53 warn!("Failed to collect Docker metrics: {}", e);
54 }
55 }
56 });
57
58 let ctx_system = self.ctx.clone();
60 let system_collector = self.system_collector.clone();
61 tokio::spawn(async move {
62 let mut interval = time::interval(Duration::from_secs(5));
63 loop {
64 interval.tick().await;
65 if let Err(e) = system_collector.collect_metrics(&ctx_system).await {
66 warn!("Failed to collect system metrics: {}", e);
67 }
68 }
69 });
70
71 let ctx_redis = self.ctx.clone();
73 let redis_collector = self.redis_collector.clone();
74 tokio::spawn(async move {
75 let mut interval = time::interval(Duration::from_secs(5));
76 loop {
77 interval.tick().await;
78 let mut collector = redis_collector.lock().await;
79 if let Err(e) = collector.collect_all(&ctx_redis).await {
80 warn!("Failed to collect Redis metrics: {}", e);
81 }
82 }
83 });
84
85 info!("Diagnostics node running - publishing Docker, system, and Redis metrics every 5 seconds");
86
87 loop {
89 time::sleep(Duration::from_secs(60)).await;
90 info!("Diagnostics node alive - all metrics publishing");
91 }
92 }
93}