mecha10-diagnostics 0.1.25

Diagnostics and metrics collection for Mecha10 robotics framework
Documentation
//! Redis connection pool and operation metrics collector
//!
//! Collects Redis server statistics using the INFO command.

use crate::topics::*;
use crate::types::*;
use ::redis::aio::ConnectionManager;
use anyhow::{Context as _, Result};
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use std::collections::HashMap;

/// Redis metrics collector
///
/// Collects real-time Redis server statistics using the INFO command.
pub struct RedisCollector {
    source: String,
    /// Redis connection manager for INFO queries
    connection: ConnectionManager,
}

impl RedisCollector {
    /// Create a new Redis collector
    pub async fn new(source: impl Into<String>, redis_url: &str) -> Result<Self> {
        let client = ::redis::Client::open(redis_url).context("Failed to create Redis client")?;

        let connection = ConnectionManager::new(client)
            .await
            .context("Failed to create Redis connection manager")?;

        Ok(Self {
            source: source.into(),
            connection,
        })
    }

    /// Parse Redis INFO output into a HashMap
    fn parse_info(info: &str) -> HashMap<String, String> {
        let mut map = HashMap::new();
        for line in info.lines() {
            // Skip comments and section headers
            if line.starts_with('#') || line.is_empty() {
                continue;
            }

            // Parse key:value pairs
            if let Some((key, value)) = line.split_once(':') {
                map.insert(key.to_string(), value.to_string());
            }
        }
        map
    }

    /// Collect and publish Redis server info metrics
    pub async fn collect_server_info(&mut self, ctx: &Context) -> Result<()> {
        // Query Redis INFO command
        let info_result: ::redis::RedisResult<String> = ::redis::cmd("INFO").query_async(&mut self.connection).await;

        let info: String = match info_result {
            Ok(info) => info,
            Err(e) => {
                tracing::warn!("Failed to query Redis INFO: {}", e);
                return Ok(()); // Don't propagate error, just skip this collection cycle
            }
        };

        // Parse INFO response
        let info_map = Self::parse_info(&info);

        // Extract key metrics
        let redis_version = info_map
            .get("redis_version")
            .cloned()
            .unwrap_or_else(|| "unknown".to_string());

        let uptime_seconds = info_map
            .get("uptime_in_seconds")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let connected_clients = info_map
            .get("connected_clients")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let used_memory = info_map
            .get("used_memory")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let used_memory_rss = info_map
            .get("used_memory_rss")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let used_memory_peak = info_map
            .get("used_memory_peak")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let total_connections_received = info_map
            .get("total_connections_received")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let total_commands_processed = info_map
            .get("total_commands_processed")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let instantaneous_ops_per_sec = info_map
            .get("instantaneous_ops_per_sec")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let keyspace_hits = info_map
            .get("keyspace_hits")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        let keyspace_misses = info_map
            .get("keyspace_misses")
            .and_then(|s| s.parse::<u64>().ok())
            .unwrap_or(0);

        // Parse db0 keyspace info (format: "db0:keys=10,expires=2,avg_ttl=...")
        let (db0_keys, db0_expires) = if let Some(db0_info) = info_map.get("db0") {
            let mut keys = 0;
            let mut expires = 0;

            for part in db0_info.split(',') {
                if let Some((k, v)) = part.split_once('=') {
                    match k {
                        "keys" => keys = v.parse::<u64>().unwrap_or(0),
                        "expires" => expires = v.parse::<u64>().unwrap_or(0),
                        _ => {}
                    }
                }
            }

            (keys, expires)
        } else {
            (0, 0)
        };

        let metrics = RedisServerInfoMetrics {
            redis_version,
            uptime_seconds,
            connected_clients,
            used_memory,
            used_memory_rss,
            used_memory_peak,
            total_connections_received,
            total_commands_processed,
            instantaneous_ops_per_sec,
            keyspace_hits,
            keyspace_misses,
            db0_keys,
            db0_expires,
        };

        let msg = DiagnosticMessage::new(&self.source, metrics);
        ctx.publish_to(
            Topic::<DiagnosticMessage<RedisServerInfoMetrics>>::new(TOPIC_DIAGNOSTICS_REDIS_INFO),
            &msg,
        )
        .await?;

        Ok(())
    }

    /// Collect and publish all Redis metrics
    pub async fn collect_all(&mut self, ctx: &Context) -> Result<()> {
        self.collect_server_info(ctx).await?;
        Ok(())
    }
}