1use std::ffi::{OsStr, OsString};
4use std::time::{Duration, Instant};
5
6use sysinfo::{Pid, ProcessesToUpdate, System};
7use tokio::sync::watch;
8use tokio::task::JoinHandle;
9
10const GPU_CLASSES: &[&str] = &["AGXAcceleratorG15X", "IOAccelerator"];
11const EMBEDDER_PROCESS_NAMES: &[&str] = &["ollama", "llama-server", "mlx_server", "mlx-server"];
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum GpuStatus {
16 Available { class_name: String },
17 Unavailable { reason: String },
18}
19
20impl Default for GpuStatus {
21 fn default() -> Self {
22 Self::Unavailable {
23 reason: "not sampled yet".to_string(),
24 }
25 }
26}
27
28#[derive(Debug, Clone)]
30pub struct MonitorSnapshot {
31 pub system_cpu_percent: f32,
32 pub system_ram_used: u64,
33 pub system_ram_total: u64,
34 pub rust_memex_cpu: f32,
35 pub rust_memex_rss: u64,
36 pub embedder_cpu_aggregate: f32,
37 pub embedder_rss_aggregate: u64,
38 pub gpu_util_percent: Option<f32>,
39 pub gpu_memory_used: Option<u64>,
40 pub gpu_memory_total: Option<u64>,
41 pub gpu_status: GpuStatus,
42 pub sampled_at: Instant,
43}
44
45impl Default for MonitorSnapshot {
46 fn default() -> Self {
47 Self {
48 system_cpu_percent: 0.0,
49 system_ram_used: 0,
50 system_ram_total: 0,
51 rust_memex_cpu: 0.0,
52 rust_memex_rss: 0,
53 embedder_cpu_aggregate: 0.0,
54 embedder_rss_aggregate: 0,
55 gpu_util_percent: None,
56 gpu_memory_used: None,
57 gpu_memory_total: None,
58 gpu_status: GpuStatus::default(),
59 sampled_at: Instant::now(),
60 }
61 }
62}
63
64impl MonitorSnapshot {
65 pub fn format_bytes(bytes: u64) -> String {
66 const KB: f64 = 1024.0;
67 const MB: f64 = KB * 1024.0;
68 const GB: f64 = MB * 1024.0;
69
70 match bytes {
71 0..=1023 => format!("{bytes} B"),
72 1_024..=1_048_575 => format!("{:.0} KB", bytes as f64 / KB),
73 1_048_576..=1_073_741_823 => format!("{:.0} MB", bytes as f64 / MB),
74 _ => format!("{:.1} GB", bytes as f64 / GB),
75 }
76 }
77}
78
79pub fn spawn_monitor(interval: Duration) -> (watch::Receiver<MonitorSnapshot>, JoinHandle<()>) {
81 let (sender, receiver) = watch::channel(MonitorSnapshot::default());
82 let handle = tokio::spawn(async move {
83 let my_pid = Pid::from_u32(std::process::id());
84 let mut system = System::new_all();
85 system.refresh_all();
86 tokio::time::sleep(Duration::from_millis(250)).await;
87
88 loop {
89 system.refresh_cpu_usage();
90 system.refresh_memory();
91 system.refresh_processes(ProcessesToUpdate::All, true);
92
93 let snapshot = build_snapshot(&system, my_pid);
94 if sender.send(snapshot).is_err() {
95 break;
96 }
97 tokio::time::sleep(interval).await;
98 }
99 });
100
101 (receiver, handle)
102}
103
104fn build_snapshot(system: &System, my_pid: Pid) -> MonitorSnapshot {
105 let mut snapshot = MonitorSnapshot {
106 system_cpu_percent: system.global_cpu_usage(),
107 system_ram_used: system.used_memory(),
108 system_ram_total: system.total_memory(),
109 sampled_at: Instant::now(),
110 ..MonitorSnapshot::default()
111 };
112
113 if let Some(process) = system.process(my_pid) {
114 snapshot.rust_memex_cpu = process.cpu_usage();
115 snapshot.rust_memex_rss = process.memory();
116 }
117
118 for process in system.processes().values() {
119 if is_embedder_process(process.name(), process.cmd()) {
120 snapshot.embedder_cpu_aggregate += process.cpu_usage();
121 snapshot.embedder_rss_aggregate += process.memory();
122 }
123 }
124
125 match probe_gpu() {
126 Ok(metrics) => {
127 snapshot.gpu_util_percent = Some(metrics.device_util as f32);
128 snapshot.gpu_memory_used = metrics.memory_used;
129 snapshot.gpu_memory_total = metrics.memory_total;
130 snapshot.gpu_status = GpuStatus::Available {
131 class_name: metrics.class_name,
132 };
133 }
134 Err(status) => {
135 snapshot.gpu_status = status;
136 }
137 }
138
139 snapshot
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
143struct GpuMetrics {
144 class_name: String,
145 device_util: u64,
146 memory_used: Option<u64>,
147 memory_total: Option<u64>,
148}
149
150fn probe_gpu() -> Result<GpuMetrics, GpuStatus> {
151 #[cfg(not(target_os = "macos"))]
152 return Err(GpuStatus::Unavailable {
153 reason: "GPU telemetry only supported on macOS via ioreg".to_string(),
154 });
155
156 let mut reasons = Vec::new();
157
158 for class_name in GPU_CLASSES {
159 match std::process::Command::new("ioreg")
160 .args(["-l", "-w", "0", "-r", "-c", class_name, "-d", "1"])
161 .output()
162 {
163 Ok(output) if output.status.success() => {
164 let stdout = String::from_utf8_lossy(&output.stdout);
165 if let Some(metrics) = parse_ioreg_output(&stdout, class_name) {
166 return Ok(metrics);
167 }
168 reasons.push(format!("{class_name}: telemetry keys not found"));
169 }
170 Ok(output) => {
171 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
172 if stderr.is_empty() {
173 reasons.push(format!("{class_name}: ioreg exited with {}", output.status));
174 } else {
175 reasons.push(format!("{class_name}: {stderr}"));
176 }
177 }
178 Err(error) => {
179 reasons.push(format!("{class_name}: {error}"));
180 }
181 }
182 }
183
184 Err(GpuStatus::Unavailable {
185 reason: reasons.join(" | "),
186 })
187}
188
189fn parse_ioreg_output(output: &str, class_name: &str) -> Option<GpuMetrics> {
190 let device_util = extract_ioreg_value(output, "Device Utilization %")
191 .or_else(|| extract_ioreg_value(output, "Renderer Utilization %"))?;
192
193 Some(GpuMetrics {
194 class_name: class_name.to_string(),
195 device_util,
196 memory_used: extract_ioreg_value(output, "In use system memory"),
197 memory_total: extract_ioreg_value(output, "Alloc system memory"),
198 })
199}
200
201fn extract_ioreg_value(output: &str, key: &str) -> Option<u64> {
202 let quoted_key = format!("\"{key}\"");
203 let key_index = output.find("ed_key)?;
204 let value_region = &output[key_index + quoted_key.len()..];
205 let equals_index = value_region.find('=')?;
206 let remainder = value_region[equals_index + 1..].trim_start();
207 let digits: String = remainder
208 .chars()
209 .skip_while(|ch| !ch.is_ascii_digit())
210 .take_while(|ch| ch.is_ascii_digit())
211 .collect();
212
213 if digits.is_empty() {
214 None
215 } else {
216 digits.parse().ok()
217 }
218}
219
220fn is_embedder_process(name: &OsStr, cmdline: &[OsString]) -> bool {
221 let name = name.to_string_lossy().to_lowercase();
222 if EMBEDDER_PROCESS_NAMES
223 .iter()
224 .any(|candidate| name.contains(candidate))
225 {
226 return true;
227 }
228
229 if name.contains("python") {
230 let cmdline = cmdline
231 .iter()
232 .map(|segment| segment.to_string_lossy())
233 .collect::<Vec<_>>()
234 .join(" ")
235 .to_lowercase();
236 return cmdline.contains("mlx") || cmdline.contains("embed");
237 }
238
239 false
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn parse_ioreg_fixture_file() {
248 let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
249 .join("tests/fixtures/ioreg_m3_ultra.txt");
250 let fixture = std::fs::read_to_string(&fixture_path).expect("read ioreg fixture");
251 let metrics = parse_ioreg_output(&fixture, "AGXAcceleratorG15X").expect("parse ioreg");
252 assert_eq!(metrics.class_name, "AGXAcceleratorG15X");
253 assert!(metrics.device_util <= 100);
254 assert!(metrics.memory_total.is_some());
255 }
256
257 #[test]
258 fn extract_ioreg_value_handles_inline_statistics() {
259 let sample = "\"PerformanceStatistics\" = {\"Device Utilization %\"=13,\"Alloc system memory\"=477265838080,\"In use system memory\"=1874984960}";
260 assert_eq!(
261 extract_ioreg_value(sample, "Device Utilization %"),
262 Some(13)
263 );
264 assert_eq!(
265 extract_ioreg_value(sample, "Alloc system memory"),
266 Some(477_265_838_080)
267 );
268 assert_eq!(
269 extract_ioreg_value(sample, "In use system memory"),
270 Some(1_874_984_960)
271 );
272 }
273
274 #[test]
275 fn embedder_process_detection_matches_expected_names() {
276 assert!(is_embedder_process(OsStr::new("ollama"), &[]));
277 assert!(is_embedder_process(OsStr::new("llama-server"), &[]));
278 assert!(is_embedder_process(OsStr::new("mlx_server"), &[]));
279 assert!(is_embedder_process(
280 OsStr::new("python3"),
281 &[
282 OsString::from("python3"),
283 OsString::from("-m"),
284 OsString::from("mlx.embed")
285 ]
286 ));
287 assert!(!is_embedder_process(OsStr::new("nginx"), &[]));
288 }
289}