1use std::cell::RefCell;
19use std::collections::BTreeMap;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::{Arc, Mutex, OnceLock};
22use std::thread;
23use std::time::{Duration, Instant};
24
25const PROCESS_MONITOR_INTERVAL: Duration = Duration::from_secs(60);
26
27const PROCESS_MONITOR_MAX_PHASE_LINES: usize = 8;
31
32const PROCESS_MONITOR_STALL_THRESHOLD: Duration = Duration::from_secs(120);
36
37static PROCESS_MONITOR: OnceLock<Arc<ProcessMonitorState>> = OnceLock::new();
38
39thread_local! {
40 static THREAD_STACK: RefCell<ThreadStack> = RefCell::new(ThreadStack::new());
41}
42
43#[derive(Clone)]
51pub struct ScopeProgress {
52 inner: Arc<ProgressCounter>,
53}
54
55struct ProgressCounter {
56 current: AtomicU64,
57 total: AtomicU64,
58}
59
60impl ScopeProgress {
61 pub fn set(&self, current: u64, total: u64) {
64 self.inner.current.store(current, Ordering::Relaxed);
65 self.inner.total.store(total, Ordering::Relaxed);
66 }
67
68 pub fn inc(&self) {
70 self.inner.current.fetch_add(1, Ordering::Relaxed);
71 }
72
73 fn snapshot(&self) -> (u64, u64) {
74 (
75 self.inner.current.load(Ordering::Relaxed),
76 self.inner.total.load(Ordering::Relaxed),
77 )
78 }
79}
80
81#[derive(Clone)]
82struct FrameSnapshot {
83 label: String,
84 entered: Instant,
85 progress: Option<ScopeProgress>,
86}
87
88struct ThreadSnapshot {
89 name: Option<String>,
90 stack: Vec<FrameSnapshot>,
91 updated: Instant,
92}
93
94struct ProcessMonitorState {
95 started: Instant,
96 threads: Mutex<BTreeMap<String, ThreadSnapshot>>,
97 cpu: Mutex<CpuSampler>,
98}
99
100struct ThreadStack {
101 id: String,
102 name: Option<String>,
103 stack: Vec<FrameSnapshot>,
104}
105
106pub struct ProcessScopeGuard;
107
108impl ThreadStack {
109 fn new() -> Self {
110 let thread = thread::current();
111 Self {
112 id: format!("{:?}", thread.id()),
113 name: thread.name().map(str::to_string),
114 stack: Vec::new(),
115 }
116 }
117}
118
119impl ProcessMonitorState {
120 fn update_thread(&self, thread: &ThreadStack) {
121 let mut threads = self
122 .threads
123 .lock()
124 .expect("process monitor registry poisoned");
125 if thread.stack.is_empty() {
126 threads.remove(&thread.id);
127 } else {
128 threads.insert(
129 thread.id.clone(),
130 ThreadSnapshot {
131 name: thread.name.clone(),
132 stack: thread.stack.clone(),
133 updated: Instant::now(),
134 },
135 );
136 }
137 }
138
139 fn emit(&self) {
140 let threads = self
141 .threads
142 .lock()
143 .expect("process monitor registry poisoned");
144 let resource = ProcessResourceSnapshot::read();
145
146 let cpu = self
154 .cpu
155 .lock()
156 .expect("process monitor cpu sampler poisoned")
157 .sample();
158 let instrumented_threads = threads.len();
159
160 struct ThreadPhase<'a> {
165 thread_label: String,
166 depth: usize,
167 updated_ago: Duration,
168 deepest_label: &'a str,
169 deepest_age: Duration,
170 progress: Option<(u64, u64)>,
171 }
172 let mut phases: Vec<ThreadPhase<'_>> = Vec::with_capacity(threads.len());
173 for (thread_id, thread) in threads.iter() {
174 let Some(deepest) = thread.stack.last() else {
175 continue;
176 };
177 let thread_label = match &thread.name {
178 Some(name) => format!("{thread_id}/{name}"),
179 None => thread_id.clone(),
180 };
181 let progress = thread
185 .stack
186 .iter()
187 .rev()
188 .find_map(|frame| frame.progress.as_ref())
189 .map(ScopeProgress::snapshot);
190 phases.push(ThreadPhase {
191 thread_label,
192 depth: thread.stack.len(),
193 updated_ago: thread.updated.elapsed(),
194 deepest_label: deepest.label.as_str(),
195 deepest_age: deepest.entered.elapsed(),
196 progress,
197 });
198 }
199 phases.sort_by(|a, b| b.deepest_age.cmp(&a.deepest_age));
200
201 let active = match phases.first() {
205 Some(phase) => {
206 let progress = match phase.progress {
207 Some((cur, total)) if total > 0 => {
208 let pct = (cur as f64 / total as f64) * 100.0;
209 format!(" progress={cur}/{total} ({pct:.0}%)")
210 }
211 Some((cur, _)) => format!(" progress={cur}/?"),
212 None => String::new(),
213 };
214 format!(
215 " active={:?} for {}{}",
216 phase.deepest_label,
217 format_duration(phase.deepest_age),
218 progress,
219 )
220 }
221 None => " active=<idle>".to_string(),
222 };
223
224 log::info!(
225 "[process-monitor] elapsed={} {} {} instrumented_threads={}{}",
226 format_duration(self.started.elapsed()),
227 resource.format(),
228 cpu.format(),
229 instrumented_threads,
230 active,
231 );
232
233 for phase in phases
236 .iter()
237 .filter(|p| p.deepest_age >= PROCESS_MONITOR_STALL_THRESHOLD)
238 {
239 log::warn!(
240 "[process-monitor][STALL] thread={} phase={:?} stuck={}",
241 phase.thread_label,
242 phase.deepest_label,
243 format_duration(phase.deepest_age),
244 );
245 }
246
247 for phase in phases.iter().take(PROCESS_MONITOR_MAX_PHASE_LINES) {
249 let progress = match phase.progress {
250 Some((cur, total)) if total > 0 => {
251 let pct = (cur as f64 / total as f64) * 100.0;
252 format!(" progress={cur}/{total} ({pct:.0}%)")
253 }
254 Some((cur, _)) => format!(" progress={cur}/?"),
255 None => String::new(),
256 };
257 log::info!(
258 "[process-monitor] phase thread={} depth={} deepest={:?} in_frame={} updated_ago={}{}",
259 phase.thread_label,
260 phase.depth,
261 phase.deepest_label,
262 format_duration(phase.deepest_age),
263 format_duration(phase.updated_ago),
264 progress,
265 );
266 }
267 if phases.len() > PROCESS_MONITOR_MAX_PHASE_LINES {
268 log::info!(
269 "[process-monitor] phase ... and {} more active thread(s) omitted",
270 phases.len() - PROCESS_MONITOR_MAX_PHASE_LINES,
271 );
272 }
273 }
274}
275
276impl Drop for ProcessScopeGuard {
277 fn drop(&mut self) {
278 let state = process_monitor();
279 THREAD_STACK.with(|stack| {
280 let mut stack = stack.borrow_mut();
281 stack.stack.pop();
282 state.update_thread(&stack);
283 });
284 }
285}
286
287pub fn start() {
289 process_monitor();
290}
291
292pub fn track_scope(label: impl Into<String>) -> ProcessScopeGuard {
293 push_scope(label.into(), None)
294}
295
296pub fn track_scope_with_progress(
302 label: impl Into<String>,
303 total: u64,
304) -> (ProcessScopeGuard, ScopeProgress) {
305 let progress = ScopeProgress {
306 inner: Arc::new(ProgressCounter {
307 current: AtomicU64::new(0),
308 total: AtomicU64::new(total),
309 }),
310 };
311 let guard = push_scope(label.into(), Some(progress.clone()));
312 (guard, progress)
313}
314
315fn push_scope(label: String, progress: Option<ScopeProgress>) -> ProcessScopeGuard {
316 let state = process_monitor();
317 THREAD_STACK.with(|stack| {
318 let mut stack = stack.borrow_mut();
319 stack.stack.push(FrameSnapshot {
320 label,
321 entered: Instant::now(),
322 progress,
323 });
324 state.update_thread(&stack);
325 });
326 ProcessScopeGuard
327}
328
329fn process_monitor() -> Arc<ProcessMonitorState> {
330 PROCESS_MONITOR
331 .get_or_init(|| {
332 let state = Arc::new(ProcessMonitorState {
333 started: Instant::now(),
334 threads: Mutex::new(BTreeMap::new()),
335 cpu: Mutex::new(CpuSampler::new()),
336 });
337 start_process_monitor_thread(Arc::clone(&state));
338 state
339 })
340 .clone()
341}
342
343fn start_process_monitor_thread(state: Arc<ProcessMonitorState>) {
344 let builder = thread::Builder::new().name("gam-process-monitor".to_string());
345 match builder.spawn(move || {
346 loop {
347 thread::park_timeout(PROCESS_MONITOR_INTERVAL);
348 state.emit();
349 }
350 }) {
351 Ok(handle) => drop(handle),
352 Err(err) => log::warn!("failed to start process monitor thread: {err}"),
353 }
354}
355
356fn format_duration(duration: Duration) -> String {
357 let total = duration.as_secs();
358 let hours = total / 3600;
359 let minutes = (total % 3600) / 60;
360 let seconds = total % 60;
361 if hours > 0 {
362 format!("{hours}h{minutes:02}m{seconds:02}s")
363 } else if minutes > 0 {
364 format!("{minutes}m{seconds:02}s")
365 } else {
366 format!("{seconds}s")
367 }
368}
369
370struct CpuSampler {
377 prev_total_ticks: Option<u64>,
378 prev_wall: Option<Instant>,
379 last_cores: Option<f64>,
380}
381
382impl CpuSampler {
383 fn new() -> Self {
384 Self {
385 prev_total_ticks: None,
386 prev_wall: None,
387 last_cores: None,
388 }
389 }
390
391 fn sample(&mut self) -> CpuSnapshot {
392 let now = Instant::now();
393 let ticks = read_self_cpu_ticks();
394 let cores = match (ticks, self.prev_total_ticks, self.prev_wall) {
395 (Some(ticks), Some(prev_ticks), Some(prev_wall)) => {
396 let delta_ticks = ticks.saturating_sub(prev_ticks) as f64;
397 let delta_wall = now.duration_since(prev_wall).as_secs_f64();
398 let hz = clock_ticks_per_second();
399 if delta_wall > 0.0 && hz > 0.0 {
400 let cores = delta_ticks / hz / delta_wall;
401 self.last_cores = Some(cores);
402 Some(cores)
403 } else {
404 self.last_cores
405 }
406 }
407 _ => None,
408 };
409 if let Some(ticks) = ticks {
410 self.prev_total_ticks = Some(ticks);
411 self.prev_wall = Some(now);
412 }
413 CpuSnapshot {
414 cores,
415 ncpu: available_parallelism(),
416 window: PROCESS_MONITOR_INTERVAL,
417 }
418 }
419}
420
421struct CpuSnapshot {
422 cores: Option<f64>,
423 ncpu: Option<usize>,
424 window: Duration,
425}
426
427impl CpuSnapshot {
428 fn format(&self) -> String {
429 match self.cores {
430 Some(cores) => {
431 let of = match self.ncpu {
432 Some(n) => format!("/{n}"),
433 None => String::new(),
434 };
435 format!(
436 "cpu={:.1}{} cores (avg over {})",
437 cores,
438 of,
439 format_duration(self.window),
440 )
441 }
442 None => "cpu=<warming-up>".to_string(),
443 }
444 }
445}
446
447#[cfg(target_os = "linux")]
454fn read_self_cpu_ticks() -> Option<u64> {
455 let stat = std::fs::read_to_string("/proc/self/stat").ok()?;
456 let after_comm = stat.rsplit_once(')')?.1;
457 let fields: Vec<&str> = after_comm.split_whitespace().collect();
460 let utime: u64 = fields.get(11)?.parse().ok()?;
461 let stime: u64 = fields.get(12)?.parse().ok()?;
462 Some(utime.saturating_add(stime))
463}
464
465#[cfg(not(target_os = "linux"))]
466fn read_self_cpu_ticks() -> Option<u64> {
467 None
468}
469
470#[cfg(target_os = "linux")]
475fn clock_ticks_per_second() -> f64 {
476 100.0
477}
478
479#[cfg(not(target_os = "linux"))]
480fn clock_ticks_per_second() -> f64 {
481 0.0
482}
483
484fn available_parallelism() -> Option<usize> {
485 thread::available_parallelism().ok().map(|n| n.get())
486}
487
488#[derive(Default)]
489struct ProcessResourceSnapshot {
490 rss_kb: Option<u64>,
491 peak_rss_kb: Option<u64>,
492 threads: Option<u64>,
493 read_bytes: Option<u64>,
494 write_bytes: Option<u64>,
495}
496
497impl ProcessResourceSnapshot {
498 fn read() -> Self {
499 #[cfg(target_os = "linux")]
500 {
501 return Self::read_linux();
502 }
503 #[cfg(not(target_os = "linux"))]
504 {
505 Self::default()
506 }
507 }
508
509 fn format(&self) -> String {
510 format!(
511 "rss={} peak_rss={} process_threads={} read_bytes={} write_bytes={}",
512 format_kb(self.rss_kb),
513 format_kb(self.peak_rss_kb),
514 format_count(self.threads),
515 format_bytes(self.read_bytes),
516 format_bytes(self.write_bytes),
517 )
518 }
519
520 #[cfg(target_os = "linux")]
521 fn read_linux() -> Self {
522 let mut snapshot = Self::default();
523 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
524 for line in status.lines() {
525 if let Some(value) = parse_status_kb(line, "VmRSS:") {
526 snapshot.rss_kb = Some(value);
527 } else if let Some(value) = parse_status_kb(line, "VmHWM:") {
528 snapshot.peak_rss_kb = Some(value);
529 } else if let Some(value) = parse_status_count(line, "Threads:") {
530 snapshot.threads = Some(value);
531 }
532 }
533 }
534 if let Ok(io) = std::fs::read_to_string("/proc/self/io") {
535 for line in io.lines() {
536 if let Some(value) = parse_io_bytes(line, "read_bytes:") {
537 snapshot.read_bytes = Some(value);
538 } else if let Some(value) = parse_io_bytes(line, "write_bytes:") {
539 snapshot.write_bytes = Some(value);
540 }
541 }
542 }
543 snapshot
544 }
545}
546
547#[cfg(target_os = "linux")]
548fn parse_status_kb(line: &str, key: &str) -> Option<u64> {
549 let rest = line.strip_prefix(key)?.trim();
550 rest.split_whitespace().next()?.parse().ok()
551}
552
553#[cfg(target_os = "linux")]
554fn parse_status_count(line: &str, key: &str) -> Option<u64> {
555 let rest = line.strip_prefix(key)?.trim();
556 rest.split_whitespace().next()?.parse().ok()
557}
558
559#[cfg(target_os = "linux")]
560fn parse_io_bytes(line: &str, key: &str) -> Option<u64> {
561 let rest = line.strip_prefix(key)?.trim();
562 rest.parse().ok()
563}
564
565fn format_count(value: Option<u64>) -> String {
566 value
567 .map(|value| value.to_string())
568 .unwrap_or_else(|| "<unknown>".to_string())
569}
570
571fn format_kb(value: Option<u64>) -> String {
572 value
573 .map(|kb| format_bytes(Some(kb.saturating_mul(1024))))
574 .unwrap_or_else(|| "<unknown>".to_string())
575}
576
577fn format_bytes(value: Option<u64>) -> String {
578 let Some(bytes) = value else {
579 return "<unknown>".to_string();
580 };
581 const KIB: f64 = 1024.0;
582 const MIB: f64 = KIB * 1024.0;
583 const GIB: f64 = MIB * 1024.0;
584 let bytes_f = bytes as f64;
585 if bytes_f >= GIB {
586 format!("{:.1}GiB", bytes_f / GIB)
587 } else if bytes_f >= MIB {
588 format!("{:.1}MiB", bytes_f / MIB)
589 } else if bytes_f >= KIB {
590 format!("{:.1}KiB", bytes_f / KIB)
591 } else {
592 format!("{bytes}B")
593 }
594}