use crate::commands::diagnostics::DiagnosticsArgs;
use crate::context::CliContext;
use crate::ui::{DiagnosticsState, DiagnosticsTui};
use anyhow::{Context as AnyhowContext, Result};
use crossterm::{
event::{self, Event},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use mecha10_core::prelude::*;
use mecha10_diagnostics::prelude::*;
use ratatui::{backend::CrosstermBackend, Terminal};
use std::io::stdout;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing_subscriber::layer::SubscriberExt;
pub async fn handle_diagnostics(_ctx: &mut CliContext, _args: &DiagnosticsArgs) -> Result<()> {
let _guard = tracing::subscriber::set_default(
tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_writer(std::io::sink)),
);
let local_ctx = Arc::new(
Context::new("mecha10-cli-diagnostics")
.await
.context("Failed to connect to Redis. Is the control plane running?")?,
);
let redis_url = Context::get_redis_url()?;
let state = DiagnosticsState::new();
let _collector_handles = spawn_collectors(local_ctx.clone(), &redis_url, state.clone()).await?;
enable_raw_mode()?;
let mut stdout_handle = stdout();
execute!(stdout_handle, EnterAlternateScreen)?;
let backend = CrosstermBackend::new(stdout_handle);
let mut terminal = Terminal::new(backend)?;
let mut tui = DiagnosticsTui::new(state);
let result = run_event_loop(&mut terminal, &mut tui).await;
disable_raw_mode()?;
execute!(terminal.backend_mut(), LeaveAlternateScreen)?;
result
}
async fn run_event_loop(
terminal: &mut Terminal<CrosstermBackend<std::io::Stdout>>,
tui: &mut DiagnosticsTui,
) -> Result<()> {
loop {
terminal.draw(|f| tui.draw(f))?;
if event::poll(Duration::from_millis(100))? {
if let Event::Key(key) = event::read()? {
tui.handle_key(key);
}
}
if tui.should_quit() {
break;
}
}
Ok(())
}
async fn spawn_collectors(ctx: Arc<Context>, redis_url: &str, state: DiagnosticsState) -> Result<Vec<JoinHandle<()>>> {
let mut handles = Vec::new();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use sysinfo::{Disks, Networks, System};
let mut sys = System::new_all();
let mut networks = Networks::new_with_refreshed_list();
let disks = Disks::new_with_refreshed_list();
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
sys.refresh_all();
sys.refresh_cpu_usage();
networks.refresh(true);
let cpus = sys.cpus();
let cpu_per_core: Vec<f64> = cpus.iter().map(|cpu| cpu.cpu_usage() as f64).collect();
let cpu_percent = if !cpu_per_core.is_empty() {
cpu_per_core.iter().sum::<f64>() / cpu_per_core.len() as f64
} else {
0.0
};
let memory_total = sys.total_memory();
let memory_used = sys.used_memory();
let memory_percent = if memory_total > 0 {
(memory_used as f64 / memory_total as f64) * 100.0
} else {
0.0
};
let (disk_total, disk_used) = disks.iter().fold((0u64, 0u64), |(total, used), disk| {
(
total + disk.total_space(),
used + (disk.total_space() - disk.available_space()),
)
});
let disk_percent = if disk_total > 0 {
(disk_used as f64 / disk_total as f64) * 100.0
} else {
0.0
};
let (rx_bytes, tx_bytes) = networks.iter().fold((0u64, 0u64), |(rx, tx), (_, data)| {
(rx + data.received(), tx + data.transmitted())
});
let metrics = SystemResourceMetrics {
cpu_percent,
cpu_per_core,
memory_percent,
memory_used_bytes: memory_used,
memory_total_bytes: memory_total,
disk_percent,
disk_used_bytes: disk_used,
disk_total_bytes: disk_total,
network_rx_bytes_per_sec: rx_bytes,
network_tx_bytes_per_sec: tx_bytes,
};
let msg = DiagnosticMessage {
source: "mecha10-cli-diagnostics".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
payload: metrics,
};
state_clone.update_system(msg);
}
}));
let redis_url_owned = redis_url.to_string();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use ::redis::Client;
let mut interval = tokio::time::interval(Duration::from_secs(2));
let client = match Client::open(redis_url_owned.as_str()) {
Ok(c) => c,
Err(_) => return, };
loop {
interval.tick().await;
if let Ok(mut conn) = client.get_multiplexed_async_connection().await {
if let Ok(info_str) = ::redis::cmd("INFO").query_async::<String>(&mut conn).await {
if let Some(metrics) = parse_redis_info(&info_str) {
let msg = DiagnosticMessage {
source: "mecha10-cli-diagnostics".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
payload: metrics,
};
state_clone.update_redis_info(msg);
}
}
}
}
}));
let ctx_clone = ctx.clone();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use mecha10_core::health::HealthReport;
use mecha10_core::topics::Topic;
if let Ok(mut rx) = ctx_clone.subscribe(Topic::<HealthReport>::new("/system/health")).await {
while let Some(report) = rx.recv().await {
state_clone.update_node_health(report);
}
}
}));
let ctx_clone = ctx.clone();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use mecha10_core::topics::Topic;
if let Ok(mut rx) = ctx_clone
.subscribe(Topic::<DiagnosticMessage<StreamingPipelineMetrics>>::new(
TOPIC_DIAGNOSTICS_STREAMING_PIPELINE,
))
.await
{
while let Some(msg) = rx.recv().await {
state_clone.update_streaming_pipeline(msg);
}
}
}));
let ctx_clone = ctx.clone();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use mecha10_core::topics::Topic;
if let Ok(mut rx) = ctx_clone
.subscribe(Topic::<DiagnosticMessage<EncodingMetrics>>::new(
TOPIC_DIAGNOSTICS_STREAMING_ENCODING,
))
.await
{
while let Some(msg) = rx.recv().await {
state_clone.update_streaming_encoding(msg);
}
}
}));
let ctx_clone = ctx.clone();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use mecha10_core::topics::Topic;
if let Ok(mut rx) = ctx_clone
.subscribe(Topic::<DiagnosticMessage<BandwidthMetrics>>::new(
TOPIC_DIAGNOSTICS_STREAMING_BANDWIDTH,
))
.await
{
while let Some(msg) = rx.recv().await {
state_clone.update_streaming_bandwidth(msg);
}
}
}));
let ctx_clone = ctx.clone();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use mecha10_core::topics::Topic;
if let Ok(mut rx) = ctx_clone
.subscribe(Topic::<DiagnosticMessage<GodotConnectionMetrics>>::new(
TOPIC_DIAGNOSTICS_GODOT_CONNECTION,
))
.await
{
while let Some(msg) = rx.recv().await {
state_clone.update_godot_connection(msg);
}
}
}));
let ctx_clone = ctx.clone();
let state_clone = state.clone();
handles.push(tokio::spawn(async move {
use mecha10_core::topics::Topic;
if let Ok(mut rx) = ctx_clone
.subscribe(Topic::<DiagnosticMessage<GodotPerformanceMetrics>>::new(
TOPIC_DIAGNOSTICS_GODOT_PERFORMANCE,
))
.await
{
while let Some(msg) = rx.recv().await {
state_clone.update_godot_performance(msg);
}
}
}));
Ok(handles)
}
fn parse_redis_info(info: &str) -> Option<RedisServerInfoMetrics> {
let mut metrics = RedisServerInfoMetrics {
redis_version: String::new(),
uptime_seconds: 0,
connected_clients: 0,
used_memory: 0,
used_memory_rss: 0,
used_memory_peak: 0,
total_connections_received: 0,
total_commands_processed: 0,
instantaneous_ops_per_sec: 0,
keyspace_hits: 0,
keyspace_misses: 0,
db0_keys: 0,
db0_expires: 0,
};
for line in info.lines() {
if let Some((key, value)) = line.split_once(':') {
match key {
"redis_version" => metrics.redis_version = value.to_string(),
"uptime_in_seconds" => metrics.uptime_seconds = value.parse().unwrap_or(0),
"connected_clients" => metrics.connected_clients = value.parse().unwrap_or(0),
"used_memory" => metrics.used_memory = value.parse().unwrap_or(0),
"used_memory_rss" => metrics.used_memory_rss = value.parse().unwrap_or(0),
"used_memory_peak" => metrics.used_memory_peak = value.parse().unwrap_or(0),
"total_connections_received" => metrics.total_connections_received = value.parse().unwrap_or(0),
"total_commands_processed" => metrics.total_commands_processed = value.parse().unwrap_or(0),
"instantaneous_ops_per_sec" => metrics.instantaneous_ops_per_sec = value.parse().unwrap_or(0),
"keyspace_hits" => metrics.keyspace_hits = value.parse().unwrap_or(0),
"keyspace_misses" => metrics.keyspace_misses = value.parse().unwrap_or(0),
"db0" => {
for part in value.split(',') {
if let Some(keys_val) = part.strip_prefix("keys=") {
metrics.db0_keys = keys_val.parse().unwrap_or(0);
} else if let Some(expires_val) = part.strip_prefix("expires=") {
metrics.db0_expires = expires_val.parse().unwrap_or(0);
}
}
}
_ => {}
}
}
}
if metrics.redis_version.is_empty() {
None
} else {
Some(metrics)
}
}