mecha10-cli 0.1.47

Mecha10 CLI tool
Documentation
//! Diagnostics command handlers
//!
//! Orchestrates real-time diagnostic monitoring with TUI dashboard.

use crate::commands::diagnostics::DiagnosticsArgs;
use crate::context::CliContext;
use crate::ui::{DiagnosticsState, DiagnosticsTui};
use anyhow::{Context as AnyhowContext, Result};
use crossterm::{
    event::{self, Event},
    execute,
    terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use mecha10_core::prelude::*;
use mecha10_diagnostics::prelude::*;
use ratatui::{backend::CrosstermBackend, Terminal};
use std::io::stdout;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing_subscriber::layer::SubscriberExt;

/// Handle diagnostics command - TUI dashboard
pub async fn handle_diagnostics(_ctx: &mut CliContext, _args: &DiagnosticsArgs) -> Result<()> {
    // Suppress all tracing output during TUI mode to prevent log corruption
    // We use a null/noop layer that drops all events
    let _guard = tracing::subscriber::set_default(
        tracing_subscriber::registry().with(tracing_subscriber::fmt::layer().with_writer(std::io::sink)),
    );

    // Create Mecha10 context for local Redis pub/sub
    let local_ctx = Arc::new(
        Context::new("mecha10-cli-diagnostics")
            .await
            .context("Failed to connect to Redis. Is the control plane running?")?,
    );

    // Get Redis URL for collectors
    let redis_url = Context::get_redis_url()?;

    // Create shared state
    let state = DiagnosticsState::new();

    // Spawn collectors to actively gather metrics (no subscriptions needed for collectors)
    let _collector_handles = spawn_collectors(local_ctx.clone(), &redis_url, state.clone()).await?;

    // Setup terminal for TUI
    enable_raw_mode()?;
    let mut stdout_handle = stdout();
    execute!(stdout_handle, EnterAlternateScreen)?;
    let backend = CrosstermBackend::new(stdout_handle);
    let mut terminal = Terminal::new(backend)?;

    // Create TUI
    let mut tui = DiagnosticsTui::new(state);

    // Event loop
    let result = run_event_loop(&mut terminal, &mut tui).await;

    // Cleanup terminal
    disable_raw_mode()?;
    execute!(terminal.backend_mut(), LeaveAlternateScreen)?;

    result
}

/// Run the TUI event loop
async fn run_event_loop(
    terminal: &mut Terminal<CrosstermBackend<std::io::Stdout>>,
    tui: &mut DiagnosticsTui,
) -> Result<()> {
    loop {
        // Draw the UI
        terminal.draw(|f| tui.draw(f))?;

        // Poll for events with timeout
        if event::poll(Duration::from_millis(100))? {
            if let Event::Key(key) = event::read()? {
                tui.handle_key(key);
            }
        }

        // Check if we should quit
        if tui.should_quit() {
            break;
        }
    }

    Ok(())
}

/// Spawn collectors that directly update state (no pub/sub to avoid log noise)
async fn spawn_collectors(ctx: Arc<Context>, redis_url: &str, state: DiagnosticsState) -> Result<Vec<JoinHandle<()>>> {
    let mut handles = Vec::new();

    // System metrics collector - runs every second using sysinfo directly
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use sysinfo::{Disks, Networks, System};
        let mut sys = System::new_all();
        let mut networks = Networks::new_with_refreshed_list();
        let disks = Disks::new_with_refreshed_list();
        let mut interval = tokio::time::interval(Duration::from_secs(1));

        loop {
            interval.tick().await;

            // Refresh system info
            sys.refresh_all();
            sys.refresh_cpu_usage();
            networks.refresh(true);

            // CPU metrics
            let cpus = sys.cpus();
            let cpu_per_core: Vec<f64> = cpus.iter().map(|cpu| cpu.cpu_usage() as f64).collect();
            let cpu_percent = if !cpu_per_core.is_empty() {
                cpu_per_core.iter().sum::<f64>() / cpu_per_core.len() as f64
            } else {
                0.0
            };

            // Memory metrics
            let memory_total = sys.total_memory();
            let memory_used = sys.used_memory();
            let memory_percent = if memory_total > 0 {
                (memory_used as f64 / memory_total as f64) * 100.0
            } else {
                0.0
            };

            // Disk metrics (use first disk or sum all)
            let (disk_total, disk_used) = disks.iter().fold((0u64, 0u64), |(total, used), disk| {
                (
                    total + disk.total_space(),
                    used + (disk.total_space() - disk.available_space()),
                )
            });
            let disk_percent = if disk_total > 0 {
                (disk_used as f64 / disk_total as f64) * 100.0
            } else {
                0.0
            };

            // Network metrics
            let (rx_bytes, tx_bytes) = networks.iter().fold((0u64, 0u64), |(rx, tx), (_, data)| {
                (rx + data.received(), tx + data.transmitted())
            });

            let metrics = SystemResourceMetrics {
                cpu_percent,
                cpu_per_core,
                memory_percent,
                memory_used_bytes: memory_used,
                memory_total_bytes: memory_total,
                disk_percent,
                disk_used_bytes: disk_used,
                disk_total_bytes: disk_total,
                network_rx_bytes_per_sec: rx_bytes,
                network_tx_bytes_per_sec: tx_bytes,
            };

            let msg = DiagnosticMessage {
                source: "mecha10-cli-diagnostics".to_string(),
                timestamp: std::time::SystemTime::now()
                    .duration_since(std::time::UNIX_EPOCH)
                    .map(|d| d.as_millis() as u64)
                    .unwrap_or(0),
                payload: metrics,
            };

            state_clone.update_system(msg);
        }
    }));

    // Redis metrics collector - runs every 2 seconds
    let redis_url_owned = redis_url.to_string();
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use ::redis::Client;
        let mut interval = tokio::time::interval(Duration::from_secs(2));

        // Try to connect to Redis
        let client = match Client::open(redis_url_owned.as_str()) {
            Ok(c) => c,
            Err(_) => return, // Can't connect, exit silently
        };

        loop {
            interval.tick().await;

            // Get Redis INFO
            if let Ok(mut conn) = client.get_multiplexed_async_connection().await {
                if let Ok(info_str) = ::redis::cmd("INFO").query_async::<String>(&mut conn).await {
                    if let Some(metrics) = parse_redis_info(&info_str) {
                        let msg = DiagnosticMessage {
                            source: "mecha10-cli-diagnostics".to_string(),
                            timestamp: std::time::SystemTime::now()
                                .duration_since(std::time::UNIX_EPOCH)
                                .map(|d| d.as_millis() as u64)
                                .unwrap_or(0),
                            payload: metrics,
                        };
                        state_clone.update_redis_info(msg);
                    }
                }
            }
        }
    }));

    // Node health subscriber - subscribes to /system/health topic
    let ctx_clone = ctx.clone();
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use mecha10_core::health::HealthReport;
        use mecha10_core::topics::Topic;

        // Subscribe to health topic
        if let Ok(mut rx) = ctx_clone.subscribe(Topic::<HealthReport>::new("/system/health")).await {
            while let Some(report) = rx.recv().await {
                state_clone.update_node_health(report);
            }
        }
    }));

    // Streaming pipeline metrics subscriber
    let ctx_clone = ctx.clone();
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use mecha10_core::topics::Topic;

        if let Ok(mut rx) = ctx_clone
            .subscribe(Topic::<DiagnosticMessage<StreamingPipelineMetrics>>::new(
                TOPIC_DIAGNOSTICS_STREAMING_PIPELINE,
            ))
            .await
        {
            while let Some(msg) = rx.recv().await {
                state_clone.update_streaming_pipeline(msg);
            }
        }
    }));

    // Streaming encoding metrics subscriber
    let ctx_clone = ctx.clone();
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use mecha10_core::topics::Topic;

        if let Ok(mut rx) = ctx_clone
            .subscribe(Topic::<DiagnosticMessage<EncodingMetrics>>::new(
                TOPIC_DIAGNOSTICS_STREAMING_ENCODING,
            ))
            .await
        {
            while let Some(msg) = rx.recv().await {
                state_clone.update_streaming_encoding(msg);
            }
        }
    }));

    // Streaming bandwidth metrics subscriber
    let ctx_clone = ctx.clone();
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use mecha10_core::topics::Topic;

        if let Ok(mut rx) = ctx_clone
            .subscribe(Topic::<DiagnosticMessage<BandwidthMetrics>>::new(
                TOPIC_DIAGNOSTICS_STREAMING_BANDWIDTH,
            ))
            .await
        {
            while let Some(msg) = rx.recv().await {
                state_clone.update_streaming_bandwidth(msg);
            }
        }
    }));

    // Godot connection metrics subscriber
    let ctx_clone = ctx.clone();
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use mecha10_core::topics::Topic;

        if let Ok(mut rx) = ctx_clone
            .subscribe(Topic::<DiagnosticMessage<GodotConnectionMetrics>>::new(
                TOPIC_DIAGNOSTICS_GODOT_CONNECTION,
            ))
            .await
        {
            while let Some(msg) = rx.recv().await {
                state_clone.update_godot_connection(msg);
            }
        }
    }));

    // Godot performance metrics subscriber
    let ctx_clone = ctx.clone();
    let state_clone = state.clone();
    handles.push(tokio::spawn(async move {
        use mecha10_core::topics::Topic;

        if let Ok(mut rx) = ctx_clone
            .subscribe(Topic::<DiagnosticMessage<GodotPerformanceMetrics>>::new(
                TOPIC_DIAGNOSTICS_GODOT_PERFORMANCE,
            ))
            .await
        {
            while let Some(msg) = rx.recv().await {
                state_clone.update_godot_performance(msg);
            }
        }
    }));

    Ok(handles)
}

/// Parse Redis INFO output into metrics
fn parse_redis_info(info: &str) -> Option<RedisServerInfoMetrics> {
    let mut metrics = RedisServerInfoMetrics {
        redis_version: String::new(),
        uptime_seconds: 0,
        connected_clients: 0,
        used_memory: 0,
        used_memory_rss: 0,
        used_memory_peak: 0,
        total_connections_received: 0,
        total_commands_processed: 0,
        instantaneous_ops_per_sec: 0,
        keyspace_hits: 0,
        keyspace_misses: 0,
        db0_keys: 0,
        db0_expires: 0,
    };

    for line in info.lines() {
        if let Some((key, value)) = line.split_once(':') {
            match key {
                "redis_version" => metrics.redis_version = value.to_string(),
                "uptime_in_seconds" => metrics.uptime_seconds = value.parse().unwrap_or(0),
                "connected_clients" => metrics.connected_clients = value.parse().unwrap_or(0),
                "used_memory" => metrics.used_memory = value.parse().unwrap_or(0),
                "used_memory_rss" => metrics.used_memory_rss = value.parse().unwrap_or(0),
                "used_memory_peak" => metrics.used_memory_peak = value.parse().unwrap_or(0),
                "total_connections_received" => metrics.total_connections_received = value.parse().unwrap_or(0),
                "total_commands_processed" => metrics.total_commands_processed = value.parse().unwrap_or(0),
                "instantaneous_ops_per_sec" => metrics.instantaneous_ops_per_sec = value.parse().unwrap_or(0),
                "keyspace_hits" => metrics.keyspace_hits = value.parse().unwrap_or(0),
                "keyspace_misses" => metrics.keyspace_misses = value.parse().unwrap_or(0),
                "db0" => {
                    // Format: keys=123,expires=0,avg_ttl=0
                    for part in value.split(',') {
                        if let Some(keys_val) = part.strip_prefix("keys=") {
                            metrics.db0_keys = keys_val.parse().unwrap_or(0);
                        } else if let Some(expires_val) = part.strip_prefix("expires=") {
                            metrics.db0_expires = expires_val.parse().unwrap_or(0);
                        }
                    }
                }
                _ => {}
            }
        }
    }

    if metrics.redis_version.is_empty() {
        None
    } else {
        Some(metrics)
    }
}