use crate::topics::*;
use crate::types::*;
use ::redis::aio::ConnectionManager;
use anyhow::{Context as _, Result};
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use std::collections::HashMap;
pub struct RedisCollector {
source: String,
connection: ConnectionManager,
}
impl RedisCollector {
pub async fn new(source: impl Into<String>, redis_url: &str) -> Result<Self> {
let client = ::redis::Client::open(redis_url).context("Failed to create Redis client")?;
let connection = ConnectionManager::new(client)
.await
.context("Failed to create Redis connection manager")?;
Ok(Self {
source: source.into(),
connection,
})
}
fn parse_info(info: &str) -> HashMap<String, String> {
let mut map = HashMap::new();
for line in info.lines() {
if line.starts_with('#') || line.is_empty() {
continue;
}
if let Some((key, value)) = line.split_once(':') {
map.insert(key.to_string(), value.to_string());
}
}
map
}
pub async fn collect_server_info(&mut self, ctx: &Context) -> Result<()> {
let info_result: ::redis::RedisResult<String> = ::redis::cmd("INFO").query_async(&mut self.connection).await;
let info: String = match info_result {
Ok(info) => info,
Err(e) => {
tracing::warn!("Failed to query Redis INFO: {}", e);
return Ok(()); }
};
let info_map = Self::parse_info(&info);
let redis_version = info_map
.get("redis_version")
.cloned()
.unwrap_or_else(|| "unknown".to_string());
let uptime_seconds = info_map
.get("uptime_in_seconds")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let connected_clients = info_map
.get("connected_clients")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let used_memory = info_map
.get("used_memory")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let used_memory_rss = info_map
.get("used_memory_rss")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let used_memory_peak = info_map
.get("used_memory_peak")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let total_connections_received = info_map
.get("total_connections_received")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let total_commands_processed = info_map
.get("total_commands_processed")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let instantaneous_ops_per_sec = info_map
.get("instantaneous_ops_per_sec")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let keyspace_hits = info_map
.get("keyspace_hits")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let keyspace_misses = info_map
.get("keyspace_misses")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let (db0_keys, db0_expires) = if let Some(db0_info) = info_map.get("db0") {
let mut keys = 0;
let mut expires = 0;
for part in db0_info.split(',') {
if let Some((k, v)) = part.split_once('=') {
match k {
"keys" => keys = v.parse::<u64>().unwrap_or(0),
"expires" => expires = v.parse::<u64>().unwrap_or(0),
_ => {}
}
}
}
(keys, expires)
} else {
(0, 0)
};
let metrics = RedisServerInfoMetrics {
redis_version,
uptime_seconds,
connected_clients,
used_memory,
used_memory_rss,
used_memory_peak,
total_connections_received,
total_commands_processed,
instantaneous_ops_per_sec,
keyspace_hits,
keyspace_misses,
db0_keys,
db0_expires,
};
let msg = DiagnosticMessage::new(&self.source, metrics);
ctx.publish_to(
Topic::<DiagnosticMessage<RedisServerInfoMetrics>>::new(TOPIC_DIAGNOSTICS_REDIS_INFO),
&msg,
)
.await?;
Ok(())
}
pub async fn collect_all(&mut self, ctx: &Context) -> Result<()> {
self.collect_server_info(ctx).await?;
Ok(())
}
}