Skip to main content

rust_memex/tui/
monitor.rs

1//! Live system monitor for the indexing dashboard.
2
3use std::ffi::{OsStr, OsString};
4use std::time::{Duration, Instant};
5
6use sysinfo::{Pid, ProcessesToUpdate, System};
7use tokio::sync::watch;
8use tokio::task::JoinHandle;
9
10const GPU_CLASSES: &[&str] = &["AGXAcceleratorG15X", "IOAccelerator"];
11const EMBEDDER_PROCESS_NAMES: &[&str] = &["ollama", "llama-server", "mlx_server", "mlx-server"];
12
13/// GPU probe status for the dashboard.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum GpuStatus {
16    Available { class_name: String },
17    Unavailable { reason: String },
18}
19
20impl Default for GpuStatus {
21    fn default() -> Self {
22        Self::Unavailable {
23            reason: "not sampled yet".to_string(),
24        }
25    }
26}
27
28/// Latest system monitor snapshot for the dashboard.
29#[derive(Debug, Clone)]
30pub struct MonitorSnapshot {
31    pub system_cpu_percent: f32,
32    pub system_ram_used: u64,
33    pub system_ram_total: u64,
34    pub rust_memex_cpu: f32,
35    pub rust_memex_rss: u64,
36    pub embedder_cpu_aggregate: f32,
37    pub embedder_rss_aggregate: u64,
38    pub gpu_util_percent: Option<f32>,
39    pub gpu_memory_used: Option<u64>,
40    pub gpu_memory_total: Option<u64>,
41    pub gpu_status: GpuStatus,
42    pub sampled_at: Instant,
43}
44
45impl Default for MonitorSnapshot {
46    fn default() -> Self {
47        Self {
48            system_cpu_percent: 0.0,
49            system_ram_used: 0,
50            system_ram_total: 0,
51            rust_memex_cpu: 0.0,
52            rust_memex_rss: 0,
53            embedder_cpu_aggregate: 0.0,
54            embedder_rss_aggregate: 0,
55            gpu_util_percent: None,
56            gpu_memory_used: None,
57            gpu_memory_total: None,
58            gpu_status: GpuStatus::default(),
59            sampled_at: Instant::now(),
60        }
61    }
62}
63
64impl MonitorSnapshot {
65    pub fn format_bytes(bytes: u64) -> String {
66        const KB: f64 = 1024.0;
67        const MB: f64 = KB * 1024.0;
68        const GB: f64 = MB * 1024.0;
69
70        match bytes {
71            0..=1023 => format!("{bytes} B"),
72            1_024..=1_048_575 => format!("{:.0} KB", bytes as f64 / KB),
73            1_048_576..=1_073_741_823 => format!("{:.0} MB", bytes as f64 / MB),
74            _ => format!("{:.1} GB", bytes as f64 / GB),
75        }
76    }
77}
78
79/// Spawn the system monitor sampler with a latest-value watch channel.
80pub fn spawn_monitor(interval: Duration) -> (watch::Receiver<MonitorSnapshot>, JoinHandle<()>) {
81    let (sender, receiver) = watch::channel(MonitorSnapshot::default());
82    let handle = tokio::spawn(async move {
83        let my_pid = Pid::from_u32(std::process::id());
84        let mut system = System::new_all();
85        system.refresh_all();
86        tokio::time::sleep(Duration::from_millis(250)).await;
87
88        loop {
89            system.refresh_cpu_usage();
90            system.refresh_memory();
91            system.refresh_processes(ProcessesToUpdate::All, true);
92
93            let snapshot = build_snapshot(&system, my_pid);
94            if sender.send(snapshot).is_err() {
95                break;
96            }
97            tokio::time::sleep(interval).await;
98        }
99    });
100
101    (receiver, handle)
102}
103
104fn build_snapshot(system: &System, my_pid: Pid) -> MonitorSnapshot {
105    let mut snapshot = MonitorSnapshot {
106        system_cpu_percent: system.global_cpu_usage(),
107        system_ram_used: system.used_memory(),
108        system_ram_total: system.total_memory(),
109        sampled_at: Instant::now(),
110        ..MonitorSnapshot::default()
111    };
112
113    if let Some(process) = system.process(my_pid) {
114        snapshot.rust_memex_cpu = process.cpu_usage();
115        snapshot.rust_memex_rss = process.memory();
116    }
117
118    for process in system.processes().values() {
119        if is_embedder_process(process.name(), process.cmd()) {
120            snapshot.embedder_cpu_aggregate += process.cpu_usage();
121            snapshot.embedder_rss_aggregate += process.memory();
122        }
123    }
124
125    match probe_gpu() {
126        Ok(metrics) => {
127            snapshot.gpu_util_percent = Some(metrics.device_util as f32);
128            snapshot.gpu_memory_used = metrics.memory_used;
129            snapshot.gpu_memory_total = metrics.memory_total;
130            snapshot.gpu_status = GpuStatus::Available {
131                class_name: metrics.class_name,
132            };
133        }
134        Err(status) => {
135            snapshot.gpu_status = status;
136        }
137    }
138
139    snapshot
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
143struct GpuMetrics {
144    class_name: String,
145    device_util: u64,
146    memory_used: Option<u64>,
147    memory_total: Option<u64>,
148}
149
150fn probe_gpu() -> Result<GpuMetrics, GpuStatus> {
151    #[cfg(not(target_os = "macos"))]
152    return Err(GpuStatus::Unavailable {
153        reason: "GPU telemetry only supported on macOS via ioreg".to_string(),
154    });
155
156    let mut reasons = Vec::new();
157
158    for class_name in GPU_CLASSES {
159        match std::process::Command::new("ioreg")
160            .args(["-l", "-w", "0", "-r", "-c", class_name, "-d", "1"])
161            .output()
162        {
163            Ok(output) if output.status.success() => {
164                let stdout = String::from_utf8_lossy(&output.stdout);
165                if let Some(metrics) = parse_ioreg_output(&stdout, class_name) {
166                    return Ok(metrics);
167                }
168                reasons.push(format!("{class_name}: telemetry keys not found"));
169            }
170            Ok(output) => {
171                let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
172                if stderr.is_empty() {
173                    reasons.push(format!("{class_name}: ioreg exited with {}", output.status));
174                } else {
175                    reasons.push(format!("{class_name}: {stderr}"));
176                }
177            }
178            Err(error) => {
179                reasons.push(format!("{class_name}: {error}"));
180            }
181        }
182    }
183
184    Err(GpuStatus::Unavailable {
185        reason: reasons.join(" | "),
186    })
187}
188
189fn parse_ioreg_output(output: &str, class_name: &str) -> Option<GpuMetrics> {
190    let device_util = extract_ioreg_value(output, "Device Utilization %")
191        .or_else(|| extract_ioreg_value(output, "Renderer Utilization %"))?;
192
193    Some(GpuMetrics {
194        class_name: class_name.to_string(),
195        device_util,
196        memory_used: extract_ioreg_value(output, "In use system memory"),
197        memory_total: extract_ioreg_value(output, "Alloc system memory"),
198    })
199}
200
201fn extract_ioreg_value(output: &str, key: &str) -> Option<u64> {
202    let quoted_key = format!("\"{key}\"");
203    let key_index = output.find(&quoted_key)?;
204    let value_region = &output[key_index + quoted_key.len()..];
205    let equals_index = value_region.find('=')?;
206    let remainder = value_region[equals_index + 1..].trim_start();
207    let digits: String = remainder
208        .chars()
209        .skip_while(|ch| !ch.is_ascii_digit())
210        .take_while(|ch| ch.is_ascii_digit())
211        .collect();
212
213    if digits.is_empty() {
214        None
215    } else {
216        digits.parse().ok()
217    }
218}
219
220fn is_embedder_process(name: &OsStr, cmdline: &[OsString]) -> bool {
221    let name = name.to_string_lossy().to_lowercase();
222    if EMBEDDER_PROCESS_NAMES
223        .iter()
224        .any(|candidate| name.contains(candidate))
225    {
226        return true;
227    }
228
229    if name.contains("python") {
230        let cmdline = cmdline
231            .iter()
232            .map(|segment| segment.to_string_lossy())
233            .collect::<Vec<_>>()
234            .join(" ")
235            .to_lowercase();
236        return cmdline.contains("mlx") || cmdline.contains("embed");
237    }
238
239    false
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn parse_ioreg_fixture_file() {
248        let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
249            .join("tests/fixtures/ioreg_m3_ultra.txt");
250        let fixture = std::fs::read_to_string(&fixture_path).expect("read ioreg fixture");
251        let metrics = parse_ioreg_output(&fixture, "AGXAcceleratorG15X").expect("parse ioreg");
252        assert_eq!(metrics.class_name, "AGXAcceleratorG15X");
253        assert!(metrics.device_util <= 100);
254        assert!(metrics.memory_total.is_some());
255    }
256
257    #[test]
258    fn extract_ioreg_value_handles_inline_statistics() {
259        let sample = "\"PerformanceStatistics\" = {\"Device Utilization %\"=13,\"Alloc system memory\"=477265838080,\"In use system memory\"=1874984960}";
260        assert_eq!(
261            extract_ioreg_value(sample, "Device Utilization %"),
262            Some(13)
263        );
264        assert_eq!(
265            extract_ioreg_value(sample, "Alloc system memory"),
266            Some(477_265_838_080)
267        );
268        assert_eq!(
269            extract_ioreg_value(sample, "In use system memory"),
270            Some(1_874_984_960)
271        );
272    }
273
274    #[test]
275    fn embedder_process_detection_matches_expected_names() {
276        assert!(is_embedder_process(OsStr::new("ollama"), &[]));
277        assert!(is_embedder_process(OsStr::new("llama-server"), &[]));
278        assert!(is_embedder_process(OsStr::new("mlx_server"), &[]));
279        assert!(is_embedder_process(
280            OsStr::new("python3"),
281            &[
282                OsString::from("python3"),
283                OsString::from("-m"),
284                OsString::from("mlx.embed")
285            ]
286        ));
287        assert!(!is_embedder_process(OsStr::new("nginx"), &[]));
288    }
289}