use crate::cmd::run_cmd;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
static TELEMETRY_CACHE: Mutex<Option<(Telemetry, std::time::Instant)>> = Mutex::new(None);
const CACHE_TTL_SECS: u64 = 30;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Telemetry {
pub timestamp: u64,
pub system: SystemInfo,
pub hardware: HardwareInfo,
pub services: ServiceInfo,
pub network: NetworkInfo,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemInfo {
pub cpu_model: String,
pub ram_total: String,
pub ram_free: String,
pub disk_total: String,
pub disk_free: String,
pub disk_used_percent: String,
pub uptime: String,
pub load_average: String,
pub ram_total_bytes: u64,
pub ram_free_bytes: u64,
pub disk_total_bytes: u64,
pub disk_free_bytes: u64,
pub disk_used_percent_numeric: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HardwareInfo {
#[serde(default)]
pub accelerators: Vec<AcceleratorInfo>,
#[serde(default)]
pub jax_available: bool,
#[serde(default)]
pub jax_version: Option<String>,
#[serde(default)]
pub jax_device_count: Option<usize>,
#[serde(default)]
pub tpu_devices: usize,
#[serde(default)]
pub gpu_devices: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AcceleratorInfo {
pub kind: String,
pub count: usize,
#[serde(default)]
pub vendor: Option<String>,
#[serde(default)]
pub model: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
#[serde(default)]
pub detected_services: Vec<DetectedService>,
#[serde(default)]
pub vllm_version: Option<String>,
#[serde(default)]
pub vllm_running: bool,
#[serde(default)]
pub vllm_port_bound: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetectedService {
pub name: String,
#[serde(default)]
pub version: Option<String>,
#[serde(default)]
pub running: bool,
#[serde(default)]
pub ports: Vec<u16>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkInfo {
pub public_ip: String,
pub tunnel_running: bool,
pub tunnel_name: Option<String>,
}
impl Telemetry {
pub fn capture() -> Self {
let now = std::time::Instant::now();
{
let cache = TELEMETRY_CACHE.lock().unwrap_or_else(|e| e.into_inner());
if let Some((cached, instant)) = cache.as_ref() {
if now.duration_since(*instant).as_secs() < CACHE_TTL_SECS {
return cached.clone();
}
}
}
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let telemetry = Self {
timestamp,
system: SystemInfo::capture(),
hardware: HardwareInfo::capture(),
services: ServiceInfo::capture(),
network: NetworkInfo::capture(),
};
let mut cache = TELEMETRY_CACHE.lock().unwrap_or_else(|e| e.into_inner());
*cache = Some((telemetry.clone(), now));
telemetry
}
pub fn print_report(&self) {
println!("\n{}", "=".repeat(60));
println!(" RUNTIMO TELEMETRY [{}]", self.timestamp);
println!("{}", "=".repeat(60));
println!("\n--- SYSTEM ---");
println!(" CPU : {}", self.system.cpu_model);
println!(
" RAM : {} total, {} free",
self.system.ram_total, self.system.ram_free
);
println!(
" Disk : {} total, {} free ({}% used)",
self.system.disk_total, self.system.disk_free, self.system.disk_used_percent
);
println!(" Uptime: {}", self.system.uptime);
println!(" Load : {}", self.system.load_average);
println!("\n--- HARDWARE ---");
if self.hardware.accelerators.is_empty() {
println!(" Accelerators: none detected");
} else {
for acc in &self.hardware.accelerators {
println!(
" {}: {}x {}{}",
acc.kind,
acc.count,
acc.model.as_deref().unwrap_or("unknown"),
acc.vendor
.as_ref()
.map(|v| format!(" ({})", v))
.unwrap_or_default()
);
}
}
if self.hardware.jax_available {
println!(
" JAX: v{} ({} devices)",
self.hardware
.jax_version
.clone()
.unwrap_or_else(|| "unknown".into()),
self.hardware.jax_device_count.unwrap_or(0)
);
}
println!("\n--- SERVICES ---");
if self.services.detected_services.is_empty() {
println!(" Services: none detected");
} else {
for svc in &self.services.detected_services {
let ports_str = if svc.ports.is_empty() {
String::new()
} else {
format!(
" ports=[{}]",
svc.ports
.iter()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(",")
)
};
println!(
" {}: v{} ({}){}",
svc.name,
svc.version.as_deref().unwrap_or("?"),
if svc.running { "running" } else { "stopped" },
ports_str
);
}
}
println!("\n--- NETWORK ---");
println!(" Public IP: {}", self.network.public_ip);
println!(
" Tunnel: {} ({})",
if self.network.tunnel_running {
"running"
} else {
"not running"
},
self.network
.tunnel_name
.clone()
.unwrap_or_else(|| "unknown".into())
);
println!("\n{}", "=".repeat(60));
}
}
impl SystemInfo {
fn capture() -> Self {
let ram_total = run_cmd("free -h | grep Mem | awk '{print $2}'");
let ram_free = run_cmd("free -h | grep Mem | awk '{print $4}'");
let disk_total = run_cmd("df -h / | tail -1 | awk '{print $2}'");
let disk_free = run_cmd("df -h / | tail -1 | awk '{print $4}'");
let disk_pct_str = run_cmd("df / | tail -1 | awk '{print $5}'");
let disk_used_percent = disk_pct_str.replace('%', "");
let disk_used_percent_numeric = disk_used_percent.parse::<f64>().unwrap_or(0.0);
let ram_total_bytes = run_cmd("free -b | grep Mem | awk '{print $2}'")
.parse()
.unwrap_or(0);
let ram_free_bytes = run_cmd("free -b | grep Mem | awk '{print $4}'")
.parse()
.unwrap_or(0);
let disk_total_bytes = run_cmd("df --bytes / | tail -1 | awk '{print $2}'")
.parse()
.unwrap_or(0);
let disk_free_bytes = run_cmd("df --bytes / | tail -1 | awk '{print $4}'")
.parse()
.unwrap_or(0);
Self {
cpu_model: run_cmd("cat /proc/cpuinfo | grep 'model name' | head -1 | cut -d: -f2"),
ram_total,
ram_free,
disk_total,
disk_free,
disk_used_percent,
uptime: run_cmd("uptime -p"),
load_average: run_cmd("uptime | awk -F'load average:' '{print $2}'"),
ram_total_bytes,
ram_free_bytes,
disk_total_bytes,
disk_free_bytes,
disk_used_percent_numeric,
}
}
}
impl HardwareInfo {
fn capture() -> Self {
let mut accelerators = Vec::new();
let tpu_count: usize = run_cmd("ls /dev/accel* 2>/dev/null | wc -l")
.parse()
.unwrap_or(0);
if tpu_count > 0 {
accelerators.push(AcceleratorInfo {
kind: "tpu".into(),
count: tpu_count,
vendor: Some("google".into()),
model: None,
});
}
let nvidia_gpu_count: usize = run_cmd("nvidia-smi --list-gpus 2>/dev/null | wc -l")
.parse()
.unwrap_or(0);
if nvidia_gpu_count > 0 {
let model = run_cmd(
"nvidia-smi --query-gpu=name --format=csv,noheader 2>/dev/null | head -1",
);
accelerators.push(AcceleratorInfo {
kind: "gpu".into(),
count: nvidia_gpu_count,
vendor: Some("nvidia".into()),
model: if model.is_empty() { None } else { Some(model) },
});
}
let amd_gpu_count: usize =
run_cmd("rocm-smi --showproductname 2>/dev/null | grep -c 'GPU\\['")
.parse()
.unwrap_or(0);
if amd_gpu_count > 0 {
accelerators.push(AcceleratorInfo {
kind: "gpu".into(),
count: amd_gpu_count,
vendor: Some("amd".into()),
model: None,
});
}
if nvidia_gpu_count == 0 && amd_gpu_count == 0 {
let dri_count: usize = run_cmd(
"ls /dev/dri/render* 2>/dev/null | wc -l",
)
.parse()
.unwrap_or(0);
if dri_count > 0 {
accelerators.push(AcceleratorInfo {
kind: "gpu".into(),
count: dri_count,
vendor: None,
model: Some("drm-render".into()),
});
}
}
let jax_available =
run_cmd("timeout 10 python3 -c 'import jax' 2>/dev/null && echo yes || echo no") == "yes";
let jax_version = if jax_available {
Some(run_cmd("timeout 10 python3 -c 'import jax; print(jax.__version__)'"))
} else {
None
};
let jax_device_count = if jax_available {
run_cmd("timeout 10 python3 -c 'import jax; print(len(jax.devices()))'")
.parse()
.ok()
} else {
None
};
let total_tpu = accelerators
.iter()
.filter(|a| a.kind == "tpu")
.map(|a| a.count)
.sum();
let total_gpu = accelerators
.iter()
.filter(|a| a.kind == "gpu")
.map(|a| a.count)
.sum();
Self {
accelerators,
jax_available,
jax_version,
jax_device_count,
tpu_devices: total_tpu,
gpu_devices: total_gpu,
}
}
}
impl ServiceInfo {
fn capture() -> Self {
let mut detected = Vec::new();
let listening = parse_listening_ports();
for &port in &listening {
if let Some(svc) = detect_service_for_port(port) {
if !detected.iter().any(|s: &DetectedService| s.name == svc.name) {
detected.push(svc);
}
}
}
let vllm_version_str = detected
.iter()
.find(|s| s.name == "vllm")
.and_then(|s| s.version.clone());
let vllm_running = detected.iter().any(|s| s.name == "vllm" && s.running);
let vllm_port_bound = detected
.iter()
.find(|s| s.name == "vllm")
.map(|s| s.ports.contains(&8200))
.unwrap_or(false);
Self {
detected_services: detected,
vllm_version: vllm_version_str,
vllm_running,
vllm_port_bound,
}
}
}
fn parse_listening_ports() -> Vec<u16> {
let output = run_cmd("ss -ltnp 2>/dev/null");
let mut result = Vec::new();
for line in output.lines().skip(1) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 5 {
continue;
}
let addr_port = parts[4];
let port = match addr_port.rsplit(':').next().and_then(|p| p.parse::<u16>().ok()) {
Some(p) => p,
None => continue,
};
result.push(port);
}
result
}
fn detect_service_for_port(port: u16) -> Option<DetectedService> {
match port {
22 => Some(DetectedService {
name: "ssh".into(),
version: run_cmd("sshd -V 2>&1 | head -1").into(),
running: true,
ports: vec![22],
}),
80 | 443 => Some(DetectedService {
name: "nginx".into(),
version: detect_version("nginx -v 2>&1 | grep -oP 'nginx/\\K[0-9.]+'"),
running: true,
ports: vec![port],
}),
3306 => Some(DetectedService {
name: "mysql".into(),
version: detect_version("mysql --version 2>/dev/null | grep -oP '[0-9]+\\.[0-9]+\\.[0-9]+'"),
running: true,
ports: vec![3306],
}),
5432 => Some(DetectedService {
name: "postgres".into(),
version: detect_version("postgres --version 2>/dev/null | grep -oP '[0-9]+\\.[0-9]+'"),
running: true,
ports: vec![5432],
}),
6379 => Some(DetectedService {
name: "redis".into(),
version: detect_version("redis-server --version 2>/dev/null | grep -oP 'v=[0-9]+\\.[0-9]+\\.[0-9]+'"),
running: true,
ports: vec![6379],
}),
27017 => Some(DetectedService {
name: "mongodb".into(),
version: detect_version("mongod --version 2>/dev/null | grep -oP '[0-9]+\\.[0-9]+\\.[0-9]+'"),
running: true,
ports: vec![27017],
}),
_ => None,
}
}
fn detect_version(cmd: &str) -> Option<String> {
let v = run_cmd(cmd);
if v.is_empty() { None } else { Some(v) }
}
impl NetworkInfo {
fn capture() -> Self {
let public_ip = run_cmd("curl -s --connect-timeout 5 --max-time 5 ifconfig.me 2>/dev/null || echo 'unknown'");
let tunnel_output = run_cmd("pgrep -fa cloudflared");
let tunnel_running = !tunnel_output.is_empty();
let tunnel_name = if tunnel_running {
Some(tunnel_output)
} else {
None
};
Self {
public_ip,
tunnel_running,
tunnel_name,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telemetry_capture() {
let telemetry = Telemetry::capture();
assert!(telemetry.timestamp > 0, "timestamp must be positive");
let s = &telemetry.system;
assert!(!s.cpu_model.is_empty(), "cpu_model must not be empty");
assert!(s.ram_total_bytes > 0, "ram_total_bytes must be > 0");
assert!(!s.ram_total.is_empty(), "ram_total must not be empty");
assert!(!s.disk_total.is_empty(), "disk_total must not be empty");
let h = &telemetry.hardware;
assert!(
h.accelerators.iter().all(|a| !a.kind.is_empty()),
"accelerator kind must not be empty"
);
assert!(
h.accelerators.iter().all(|a| a.count > 0),
"accelerator count must be > 0"
);
let svc = &telemetry.services;
assert!(
svc.detected_services.iter().all(|s| !s.name.is_empty()),
"service name must not be empty"
);
let net = &telemetry.network;
assert!(!net.public_ip.is_empty(), "public_ip must not be empty");
}
#[test]
fn test_telemetry_cache_works() {
let t1 = Telemetry::capture();
let t2 = Telemetry::capture();
assert_eq!(t1.timestamp, t2.timestamp, "cached telemetry should be identical");
}
#[test]
fn test_accelerators_back_compat() {
let hw = HardwareInfo {
accelerators: vec![
AcceleratorInfo {
kind: "gpu".into(),
count: 4,
vendor: Some("nvidia".into()),
model: Some("A100".into()),
},
AcceleratorInfo {
kind: "tpu".into(),
count: 8,
vendor: Some("google".into()),
model: None,
},
],
jax_available: false,
jax_version: None,
jax_device_count: None,
tpu_devices: 0,
gpu_devices: 0,
};
let total_tpu: usize = hw
.accelerators
.iter()
.filter(|a| a.kind == "tpu")
.map(|a| a.count)
.sum();
let total_gpu: usize = hw
.accelerators
.iter()
.filter(|a| a.kind == "gpu")
.map(|a| a.count)
.sum();
assert_eq!(total_tpu, 8, "back-compat tpu_devices should be 8");
assert_eq!(total_gpu, 4, "back-compat gpu_devices should be 4");
}
#[test]
fn test_accelerators_empty_is_valid() {
let hw = HardwareInfo {
accelerators: vec![],
jax_available: false,
jax_version: None,
jax_device_count: None,
tpu_devices: 0,
gpu_devices: 0,
};
assert!(hw.accelerators.is_empty());
assert_eq!(hw.tpu_devices, 0);
assert_eq!(hw.gpu_devices, 0);
}
#[test]
fn test_service_back_compat() {
let svc = ServiceInfo {
detected_services: vec![DetectedService {
name: "vllm".into(),
version: Some("0.6.0".into()),
running: true,
ports: vec![8200],
}],
vllm_version: None,
vllm_running: false,
vllm_port_bound: false,
};
let vllm = &svc.detected_services[0];
assert_eq!(vllm.name, "vllm");
assert_eq!(vllm.version.as_deref(), Some("0.6.0"));
assert!(vllm.running);
assert_eq!(vllm.ports, vec![8200]);
}
#[test]
fn test_services_empty_is_valid() {
let svc = ServiceInfo {
detected_services: vec![],
vllm_version: None,
vllm_running: false,
vllm_port_bound: false,
};
assert!(svc.detected_services.is_empty());
}
#[test]
fn test_telemetry_serialization_roundtrip() {
let hw = HardwareInfo {
accelerators: vec![
AcceleratorInfo {
kind: "gpu".into(),
count: 2,
vendor: Some("nvidia".into()),
model: Some("H100".into()),
},
],
jax_available: true,
jax_version: Some("0.4.30".into()),
jax_device_count: Some(2),
tpu_devices: 0,
gpu_devices: 2,
};
let svc = ServiceInfo {
detected_services: vec![DetectedService {
name: "docker".into(),
version: Some("26.0.0".into()),
running: true,
ports: vec![],
}],
vllm_version: None,
vllm_running: false,
vllm_port_bound: false,
};
let json = serde_json::to_string(&hw).unwrap();
let parsed: HardwareInfo = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.accelerators.len(), 1);
assert_eq!(parsed.accelerators[0].kind, "gpu");
assert_eq!(parsed.accelerators[0].model.as_deref(), Some("H100"));
let json = serde_json::to_string(&svc).unwrap();
let parsed: ServiceInfo = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.detected_services.len(), 1);
assert_eq!(parsed.detected_services[0].name, "docker");
}
#[test]
fn test_telemetry_deserialize_old_wal_event() {
let old_json = r#"{
"tpu_devices": 8,
"gpu_devices": 4,
"jax_available": true,
"jax_version": "0.4.25",
"jax_device_count": 8
}"#;
let parsed: HardwareInfo = serde_json::from_str(old_json).unwrap();
assert_eq!(parsed.tpu_devices, 8);
assert_eq!(parsed.gpu_devices, 4);
assert!(parsed.accelerators.is_empty(),
"old WAL events deserialize with empty accelerators (backwards compat)");
assert!(parsed.jax_available);
}
}