Skip to main content

runtimo_core/
telemetry.rs

1//! System Telemetry — Discovery-based environment awareness.
2//!
3//! Captures a full snapshot of the host machine: CPU, RAM, disk, accelerators
4//! (any kind — GPU, TPU, NPU), running services (detected via listening ports),
5//! and network state (public IP, tunnels).
6//!
7//! The telemetry is a **discovery protocol**: it reports what IS on the machine,
8//! not what the developer expects to find. No assumed hardware. Empty means
9//! nothing was found — not that the field is irrelevant.
10//! Every capability execution records before/after deltas.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use runtimo_core::Telemetry;
16//!
17//! let tel = Telemetry::capture();
18//! tel.print_report();
19//! // RUNTIMO TELEMETRY [1715800000]
20//! // CPU   : AMD EPYC 7T83
21//! // RAM   : 16Gi total, 13Gi free
22//! // ...
23//! ```
24
25use crate::cmd::run_cmd;
26use serde::{Deserialize, Serialize};
27use std::sync::Mutex;
28
29static TELEMETRY_CACHE: Mutex<Option<(Telemetry, std::time::Instant)>> = Mutex::new(None);
30const CACHE_TTL_SECS: u64 = 30;
31
32/// Full system telemetry snapshot.
33///
34/// Contains four sub-structures: [`SystemInfo`], [`HardwareInfo`],
35/// [`ServiceInfo`], and [`NetworkInfo`], plus a Unix timestamp.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37#[allow(clippy::exhaustive_structs)]
38pub struct Telemetry {
39    /// Unix timestamp (seconds) when the snapshot was taken.
40    pub timestamp: u64,
41    /// Basic system information (CPU model, RAM, disk, uptime, load).
42    pub system: SystemInfo,
43    /// Special hardware devices (TPU, GPU, JAX availability).
44    pub hardware: HardwareInfo,
45    /// Service status (vLLM version, running state, port binding).
46    pub services: ServiceInfo,
47    /// Network state (public IP, tunnel status).
48    pub network: NetworkInfo,
49}
50
51/// Basic system information from `/proc` and shell commands.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[allow(clippy::exhaustive_structs)]
54pub struct SystemInfo {
55    /// CPU model string (from `/proc/cpuinfo`).
56    pub cpu_model: String,
57    /// Total RAM (human-readable, e.g. `"16Gi"`).
58    pub ram_total: String,
59    /// Free RAM (human-readable, e.g. `"13Gi"`).
60    pub ram_free: String,
61    /// Total disk space (human-readable, e.g. `"100G"`).
62    pub disk_total: String,
63    /// Free disk space (human-readable).
64    pub disk_free: String,
65    /// Disk usage percentage (e.g. `"45%"`).
66    pub disk_used_percent: String,
67    /// System uptime (e.g. `"up 3 days, 2 hours"`).
68    pub uptime: String,
69    /// Load average (e.g. `" 0.50,  0.30,  0.20"`).
70    pub load_average: String,
71}
72
73/// Special hardware device information.
74///
75/// Detects accelerators generically — GPUs (nvidia-smi, rocm-smi, /dev/dri),
76/// TPUs (/dev/accel*), and JAX availability. Reports what exists, not what
77/// was expected.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[allow(clippy::exhaustive_structs)]
80pub struct HardwareInfo {
81    /// Detected accelerator devices (any kind). Empty vec = no accelerators found.
82    #[serde(default)]
83    pub accelerators: Vec<AcceleratorInfo>,
84    /// Whether the `jax` Python package is importable.
85    #[serde(default)]
86    pub jax_available: bool,
87    /// JAX version string (e.g. `"0.4.25"`), if available.
88    #[serde(default)]
89    pub jax_version: Option<String>,
90    /// Number of JAX-visible devices, if available.
91    #[serde(default)]
92    pub jax_device_count: Option<usize>,
93}
94
95/// A detected hardware accelerator.
96#[derive(Debug, Clone, Serialize, Deserialize)]
97#[allow(clippy::exhaustive_structs)]
98pub struct AcceleratorInfo {
99    /// Accelerator kind: "gpu", "tpu", "npu".
100    pub kind: String,
101    /// Number of devices of this kind detected.
102    pub count: usize,
103    /// Vendor name if identifiable (e.g. "nvidia", "amd", "google").
104    #[serde(default)]
105    pub vendor: Option<String>,
106    /// Device model string if available.
107    #[serde(default)]
108    pub model: Option<String>,
109}
110
111/// Service status — port-based detection.
112///
113/// Scans for listening TCP ports and maps well-known ports to service names.
114/// Only services with actively listening ports are reported. No service is
115/// assumed to exist.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[allow(clippy::exhaustive_structs)]
118pub struct ServiceInfo {
119    /// Services detected on this machine. Empty vec = no known services found.
120    #[serde(default)]
121    pub detected_services: Vec<DetectedService>,
122}
123
124/// A detected service running on the machine.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[allow(clippy::exhaustive_structs)]
127pub struct DetectedService {
128    /// Service name (e.g. "vllm", "nginx", "postgres").
129    pub name: String,
130    /// Version string if detectable.
131    #[serde(default)]
132    pub version: Option<String>,
133    /// Whether the service process is running.
134    #[serde(default)]
135    pub running: bool,
136    /// Ports the service is listening on.
137    #[serde(default)]
138    pub ports: Vec<u16>,
139}
140
141/// Network state information.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[allow(clippy::exhaustive_structs)]
144pub struct NetworkInfo {
145    /// Public IP address (from `ifconfig.me`), or `"unknown"`.
146    pub public_ip: String,
147    /// Whether a `cloudflared` tunnel process is running.
148    pub tunnel_running: bool,
149    /// The full `cloudflared` process command line, if running.
150    pub tunnel_name: Option<String>,
151}
152
153impl Telemetry {
154    /// Captures a full system telemetry snapshot.
155    ///
156    /// Results are cached for 30 seconds to avoid running 15+ shell subprocesses
157    /// on repeated calls. Network queries (public_ip, tunnel) are skipped when
158    /// returning a cached value.
159    pub fn capture() -> Self {
160        let now = std::time::Instant::now();
161        {
162            let cache = TELEMETRY_CACHE.lock().unwrap_or_else(|e| e.into_inner());
163            if let Some((cached, instant)) = cache.as_ref() {
164                if now.duration_since(*instant).as_secs() < CACHE_TTL_SECS {
165                    return cached.clone();
166                }
167            }
168        }
169
170        let timestamp = std::time::SystemTime::now()
171            .duration_since(std::time::UNIX_EPOCH)
172            .map_or(0, |d| d.as_secs());
173
174        let telemetry = Self {
175            timestamp,
176            system: SystemInfo::capture(),
177            hardware: HardwareInfo::capture(),
178            services: ServiceInfo::capture(),
179            network: NetworkInfo::capture(),
180        };
181
182        let mut cache = TELEMETRY_CACHE.lock().unwrap_or_else(|e| e.into_inner());
183        *cache = Some((telemetry.clone(), now));
184        telemetry
185    }
186
187    /// Prints telemetry in a human-readable report to stdout.
188    pub fn print_report(&self) {
189        println!("\n{}", "=".repeat(60));
190        println!(" RUNTIMO TELEMETRY [{}]", self.timestamp);
191        println!("{}", "=".repeat(60));
192
193        println!("\n--- SYSTEM ---");
194        println!(" CPU   : {}", self.system.cpu_model);
195        println!(
196            " RAM   : {} total, {} free",
197            self.system.ram_total, self.system.ram_free
198        );
199        println!(
200            " Disk  : {} total, {} free ({}% used)",
201            self.system.disk_total, self.system.disk_free, self.system.disk_used_percent
202        );
203        println!(" Uptime: {}", self.system.uptime);
204        println!(" Load  : {}", self.system.load_average);
205
206        println!("\n--- HARDWARE ---");
207        if self.hardware.accelerators.is_empty() {
208            println!(" Accelerators: none detected");
209        } else {
210            for acc in &self.hardware.accelerators {
211                println!(
212                    " {}: {}x {}{}",
213                    acc.kind,
214                    acc.count,
215                    acc.model.as_deref().unwrap_or("unknown"),
216                    acc.vendor
217                        .as_ref()
218                        .map(|v| format!(" ({})", v))
219                        .unwrap_or_default()
220                );
221            }
222        }
223        if self.hardware.jax_available {
224            println!(
225                " JAX: v{} ({} devices)",
226                self.hardware
227                    .jax_version
228                    .clone()
229                    .unwrap_or_else(|| "unknown".into()),
230                self.hardware.jax_device_count.unwrap_or(0)
231            );
232        }
233
234        println!("\n--- SERVICES ---");
235        if self.services.detected_services.is_empty() {
236            println!(" Services: none detected");
237        } else {
238            for svc in &self.services.detected_services {
239                let ports_str = if svc.ports.is_empty() {
240                    String::new()
241                } else {
242                    format!(
243                        " ports=[{}]",
244                        svc.ports
245                            .iter()
246                            .map(|p| p.to_string())
247                            .collect::<Vec<_>>()
248                            .join(",")
249                    )
250                };
251                println!(
252                    " {}: v{} ({}){}",
253                    svc.name,
254                    svc.version.as_deref().unwrap_or("?"),
255                    if svc.running { "running" } else { "stopped" },
256                    ports_str
257                );
258            }
259        }
260
261        println!("\n--- NETWORK ---");
262        println!(" Public IP: {}", self.network.public_ip);
263        println!(
264            " Tunnel: {} ({})",
265            if self.network.tunnel_running {
266                "running"
267            } else {
268                "not running"
269            },
270            self.network
271                .tunnel_name
272                .clone()
273                .unwrap_or_else(|| "unknown".into())
274        );
275
276        println!("\n{}", "=".repeat(60));
277    }
278}
279
280impl SystemInfo {
281    fn capture() -> Self {
282        let ram_total = run_cmd("free -h | grep Mem | awk '{print $2}'");
283        let ram_free = run_cmd("free -h | grep Mem | awk '{print $4}'");
284        let disk_total = run_cmd("df -h / | tail -1 | awk '{print $2}'");
285        let disk_free = run_cmd("df -h / | tail -1 | awk '{print $4}'");
286        let disk_pct_str = run_cmd("df / | tail -1 | awk '{print $5}'");
287        let disk_used_percent = disk_pct_str.replace('%', "");
288
289        Self {
290            cpu_model: run_cmd("cat /proc/cpuinfo | grep 'model name' | head -1 | cut -d: -f2"),
291            ram_total,
292            ram_free,
293            disk_total,
294            disk_free,
295            disk_used_percent,
296            uptime: run_cmd("uptime -p"),
297            load_average: run_cmd("uptime | awk -F'load average:' '{print $2}'"),
298        }
299    }
300}
301
302impl HardwareInfo {
303    fn capture() -> Self {
304        let mut accelerators = Vec::new();
305
306        // TPU devices via /dev/accel*
307        let tpu_count: usize = run_cmd("ls /dev/accel* 2>/dev/null | wc -l")
308            .parse()
309            .unwrap_or(0);
310        if tpu_count > 0 {
311            accelerators.push(AcceleratorInfo {
312                kind: "tpu".into(),
313                count: tpu_count,
314                vendor: Some("google".into()),
315                model: None,
316            });
317        }
318
319        // NVIDIA GPUs via nvidia-smi
320        let nvidia_gpu_count: usize = run_cmd("nvidia-smi --list-gpus 2>/dev/null | wc -l")
321            .parse()
322            .unwrap_or(0);
323        if nvidia_gpu_count > 0 {
324            let model =
325                run_cmd("nvidia-smi --query-gpu=name --format=csv,noheader 2>/dev/null | head -1");
326            accelerators.push(AcceleratorInfo {
327                kind: "gpu".into(),
328                count: nvidia_gpu_count,
329                vendor: Some("nvidia".into()),
330                model: if model.is_empty() { None } else { Some(model) },
331            });
332        }
333
334        // AMD GPUs via rocm-smi
335        let amd_gpu_count: usize =
336            run_cmd("rocm-smi --showproductname 2>/dev/null | grep -c 'GPU\\['")
337                .parse()
338                .unwrap_or(0);
339        if amd_gpu_count > 0 {
340            accelerators.push(AcceleratorInfo {
341                kind: "gpu".into(),
342                count: amd_gpu_count,
343                vendor: Some("amd".into()),
344                model: None,
345            });
346        }
347
348        // Generic DRM devices (fallback for any GPU)
349        if nvidia_gpu_count == 0 && amd_gpu_count == 0 {
350            let dri_count: usize = run_cmd("ls /dev/dri/render* 2>/dev/null | wc -l")
351                .parse()
352                .unwrap_or(0);
353            if dri_count > 0 {
354                accelerators.push(AcceleratorInfo {
355                    kind: "gpu".into(),
356                    count: dri_count,
357                    vendor: None,
358                    model: Some("drm-render".into()),
359                });
360            }
361        }
362
363        let jax_available =
364            run_cmd("timeout 10 python3 -c 'import jax' 2>/dev/null && echo yes || echo no")
365                == "yes";
366        let jax_version = if jax_available {
367            Some(run_cmd(
368                "timeout 10 python3 -c 'import jax; print(jax.__version__)'",
369            ))
370        } else {
371            None
372        };
373        let jax_device_count = if jax_available {
374            run_cmd("timeout 10 python3 -c 'import jax; print(len(jax.devices()))'")
375                .parse()
376                .ok()
377        } else {
378            None
379        };
380
381        Self {
382            accelerators,
383            jax_available,
384            jax_version,
385            jax_device_count,
386        }
387    }
388}
389
390impl ServiceInfo {
391    fn capture() -> Self {
392        let mut detected = Vec::new();
393
394        // Scan listening TCP ports and map to known services
395        let listening = parse_listening_ports();
396
397        for &port in &listening {
398            if let Some(svc) = detect_service_for_port(port) {
399                // Avoid duplicates (e.g. nginx on both 80 and 443)
400                if !detected
401                    .iter()
402                    .any(|s: &DetectedService| s.name == svc.name)
403                {
404                    detected.push(svc);
405                }
406            }
407        }
408
409        Self {
410            detected_services: detected,
411        }
412    }
413}
414
415/// Parse `ss -ltnp` output into listening ports.
416#[allow(clippy::indexing_slicing)] // ss output has fixed column format
417fn parse_listening_ports() -> Vec<u16> {
418    let output = run_cmd("ss -ltnp 2>/dev/null");
419    let mut result = Vec::new();
420
421    for line in output.lines().skip(1) {
422        let parts: Vec<&str> = line.split_whitespace().collect();
423        if parts.len() < 5 {
424            continue;
425        }
426
427        let addr_port = parts[4];
428        let Some(port) = addr_port
429            .rsplit(':')
430            .next()
431            .and_then(|p| p.parse::<u16>().ok())
432        else {
433            continue;
434        };
435
436        result.push(port);
437    }
438
439    result
440}
441
442/// Well-known port → service name mapping.
443/// Only ports where we can confidently identify the service.
444fn detect_service_for_port(port: u16) -> Option<DetectedService> {
445    match port {
446        22 => Some(DetectedService {
447            name: "ssh".into(),
448            version: run_cmd("sshd -V 2>&1 | head -1").into(),
449            running: true,
450            ports: vec![22],
451        }),
452        80 | 443 => Some(DetectedService {
453            name: "nginx".into(),
454            version: detect_version("nginx -v 2>&1 | grep -oP 'nginx/\\K[0-9.]+'"),
455            running: true,
456            ports: vec![port],
457        }),
458        3306 => Some(DetectedService {
459            name: "mysql".into(),
460            version: detect_version(
461                "mysql --version 2>/dev/null | grep -oP '[0-9]+\\.[0-9]+\\.[0-9]+'",
462            ),
463            running: true,
464            ports: vec![3306],
465        }),
466        5432 => Some(DetectedService {
467            name: "postgres".into(),
468            version: detect_version("postgres --version 2>/dev/null | grep -oP '[0-9]+\\.[0-9]+'"),
469            running: true,
470            ports: vec![5432],
471        }),
472        6379 => Some(DetectedService {
473            name: "redis".into(),
474            version: detect_version(
475                "redis-server --version 2>/dev/null | grep -oP 'v=[0-9]+\\.[0-9]+\\.[0-9]+'",
476            ),
477            running: true,
478            ports: vec![6379],
479        }),
480        27017 => Some(DetectedService {
481            name: "mongodb".into(),
482            version: detect_version(
483                "mongod --version 2>/dev/null | grep -oP '[0-9]+\\.[0-9]+\\.[0-9]+'",
484            ),
485            running: true,
486            ports: vec![27017],
487        }),
488        _ => None,
489    }
490}
491
492/// Run a version-detection command, return the result or empty string.
493fn detect_version(cmd: &str) -> Option<String> {
494    let v = run_cmd(cmd);
495    if v.is_empty() {
496        None
497    } else {
498        Some(v)
499    }
500}
501
502impl NetworkInfo {
503    fn capture() -> Self {
504        let public_ip = run_cmd(
505            "curl -s --connect-timeout 5 --max-time 5 ifconfig.me 2>/dev/null || echo 'unknown'",
506        );
507        let tunnel_output = run_cmd("pgrep -fa cloudflared");
508        let tunnel_running = !tunnel_output.is_empty();
509        let tunnel_name = if tunnel_running {
510            Some(tunnel_output)
511        } else {
512            None
513        };
514
515        Self {
516            public_ip,
517            tunnel_running,
518            tunnel_name,
519        }
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526
527    #[test]
528    fn test_telemetry_capture() {
529        let telemetry = Telemetry::capture();
530        assert!(telemetry.timestamp > 0, "timestamp must be positive");
531
532        let s = &telemetry.system;
533        assert!(!s.cpu_model.is_empty(), "cpu_model must not be empty");
534        assert!(!s.ram_total.is_empty(), "ram_total must not be empty");
535        assert!(!s.disk_total.is_empty(), "disk_total must not be empty");
536
537        let h = &telemetry.hardware;
538        assert!(
539            h.accelerators.iter().all(|a| !a.kind.is_empty()),
540            "accelerator kind must not be empty"
541        );
542        assert!(
543            h.accelerators.iter().all(|a| a.count > 0),
544            "accelerator count must be > 0"
545        );
546
547        let svc = &telemetry.services;
548        assert!(
549            svc.detected_services.iter().all(|s| !s.name.is_empty()),
550            "service name must not be empty"
551        );
552
553        let net = &telemetry.network;
554        assert!(!net.public_ip.is_empty(), "public_ip must not be empty");
555    }
556
557    #[test]
558    fn test_telemetry_cache_works() {
559        let t1 = Telemetry::capture();
560        let t2 = Telemetry::capture();
561        assert_eq!(
562            t1.timestamp, t2.timestamp,
563            "cached telemetry should be identical"
564        );
565    }
566
567    #[test]
568    fn test_accelerators_back_compat() {
569        let hw = HardwareInfo {
570            accelerators: vec![
571                AcceleratorInfo {
572                    kind: "gpu".into(),
573                    count: 4,
574                    vendor: Some("nvidia".into()),
575                    model: Some("A100".into()),
576                },
577                AcceleratorInfo {
578                    kind: "tpu".into(),
579                    count: 8,
580                    vendor: Some("google".into()),
581                    model: None,
582                },
583            ],
584            jax_available: false,
585            jax_version: None,
586            jax_device_count: None,
587        };
588
589        let total_tpu: usize = hw
590            .accelerators
591            .iter()
592            .filter(|a| a.kind == "tpu")
593            .map(|a| a.count)
594            .sum();
595        let total_gpu: usize = hw
596            .accelerators
597            .iter()
598            .filter(|a| a.kind == "gpu")
599            .map(|a| a.count)
600            .sum();
601
602        assert_eq!(total_tpu, 8, "total tpu should be 8");
603        assert_eq!(total_gpu, 4, "total gpu should be 4");
604    }
605
606    #[test]
607    fn test_accelerators_empty_is_valid() {
608        let hw = HardwareInfo {
609            accelerators: vec![],
610            jax_available: false,
611            jax_version: None,
612            jax_device_count: None,
613        };
614
615        assert!(hw.accelerators.is_empty());
616    }
617
618    #[test]
619    fn test_service_back_compat() {
620        let svc = ServiceInfo {
621            detected_services: vec![DetectedService {
622                name: "vllm".into(),
623                version: Some("0.6.0".into()),
624                running: true,
625                ports: vec![8200],
626            }],
627        };
628
629        let vllm = &svc.detected_services[0];
630        assert_eq!(vllm.name, "vllm");
631        assert_eq!(vllm.version.as_deref(), Some("0.6.0"));
632        assert!(vllm.running);
633        assert_eq!(vllm.ports, vec![8200]);
634    }
635
636    #[test]
637    fn test_services_empty_is_valid() {
638        let svc = ServiceInfo {
639            detected_services: vec![],
640        };
641
642        assert!(svc.detected_services.is_empty());
643    }
644
645    #[test]
646    fn test_telemetry_serialization_roundtrip() {
647        let hw = HardwareInfo {
648            accelerators: vec![AcceleratorInfo {
649                kind: "gpu".into(),
650                count: 2,
651                vendor: Some("nvidia".into()),
652                model: Some("H100".into()),
653            }],
654            jax_available: true,
655            jax_version: Some("0.4.30".into()),
656            jax_device_count: Some(2),
657        };
658
659        let svc = ServiceInfo {
660            detected_services: vec![DetectedService {
661                name: "docker".into(),
662                version: Some("26.0.0".into()),
663                running: true,
664                ports: vec![],
665            }],
666        };
667
668        let json = serde_json::to_string(&hw).unwrap();
669        let parsed: HardwareInfo = serde_json::from_str(&json).unwrap();
670        assert_eq!(parsed.accelerators.len(), 1);
671        assert_eq!(parsed.accelerators[0].kind, "gpu");
672        assert_eq!(parsed.accelerators[0].model.as_deref(), Some("H100"));
673
674        let json = serde_json::to_string(&svc).unwrap();
675        let parsed: ServiceInfo = serde_json::from_str(&json).unwrap();
676        assert_eq!(parsed.detected_services.len(), 1);
677        assert_eq!(parsed.detected_services[0].name, "docker");
678    }
679
680    #[test]
681    fn test_telemetry_deserialize_old_wal_event() {
682        let old_json = r#"{
683            "jax_available": true,
684            "jax_version": "0.4.25",
685            "jax_device_count": 8
686        }"#;
687
688        let parsed: HardwareInfo = serde_json::from_str(old_json).unwrap();
689        assert!(
690            parsed.accelerators.is_empty(),
691            "old WAL events deserialize with empty accelerators"
692        );
693        assert!(parsed.jax_available);
694    }
695}