Skip to main content

gam_runtime/
process_monitor.rs

1//! Process-wide liveness monitor with per-thread scope stacks.
2//!
3//! Each heartbeat (≈once/minute) reports three things a user watching a long
4//! compute log actually needs:
5//!   1. The currently-active operation — the label of the longest-running
6//!      instrumented scope on any thread and how long it has been running, so a
7//!      multi-minute silent window shows `active="BMS coord_corrections …" for 142s`
8//!      instead of nothing.
9//!   2. A TRUE busy signal — process-wide CPU utilization in cores-busy,
10//!      computed from `/proc/self/stat` (utime+stime) deltas between heartbeats.
11//!      A rayon fan-out saturating ~70 cores reads as `cpu=68.3 cores`, where
12//!      the old `active_threads` counter only ever saw the handful of threads
13//!      inside an instrumented `track_scope` (rayon workers are not) and so
14//!      reported a misleading `0`.
15//!   3. Progress — when a long scope registers a progress counter (via
16//!      [`track_scope_with_progress`]) the heartbeat surfaces `progress=a/b (X%)`.
17
18use std::cell::RefCell;
19use std::collections::BTreeMap;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::{Arc, Mutex, OnceLock};
22use std::thread;
23use std::time::{Duration, Instant};
24
25const PROCESS_MONITOR_INTERVAL: Duration = Duration::from_secs(60);
26
27/// Maximum number of per-thread phase lines emitted in one periodic dump
28/// (ordered by deepest-frame age, oldest first); the remainder is summarized
29/// as a count so the dump stays readable on a many-threaded process.
30const PROCESS_MONITOR_MAX_PHASE_LINES: usize = 8;
31
32/// A thread whose deepest instrumented frame has been live longer than this is
33/// flagged with a loud `[process-monitor][STALL]` line so a long unlogged
34/// phase is impossible to miss in the log.
35const PROCESS_MONITOR_STALL_THRESHOLD: Duration = Duration::from_secs(120);
36
37static PROCESS_MONITOR: OnceLock<Arc<ProcessMonitorState>> = OnceLock::new();
38
39thread_local! {
40    static THREAD_STACK: RefCell<ThreadStack> = RefCell::new(ThreadStack::new());
41}
42
43/// A shared progress counter a long-running scope can expose to the heartbeat.
44///
45/// A compute loop creates one via [`track_scope_with_progress`], then bumps
46/// [`ScopeProgress::set`] / [`ScopeProgress::inc`] as it advances. The
47/// heartbeat reads the current/total atomically and surfaces a percentage in
48/// the active-scope line — no log-spam coupling between the loop and the
49/// monitor cadence.
50#[derive(Clone)]
51pub struct ScopeProgress {
52    inner: Arc<ProgressCounter>,
53}
54
55struct ProgressCounter {
56    current: AtomicU64,
57    total: AtomicU64,
58}
59
60impl ScopeProgress {
61    /// Record progress as `current` out of `total` units (e.g. rows processed
62    /// out of rows total). `total == 0` is treated as "total unknown".
63    pub fn set(&self, current: u64, total: u64) {
64        self.inner.current.store(current, Ordering::Relaxed);
65        self.inner.total.store(total, Ordering::Relaxed);
66    }
67
68    /// Advance the current count by one, leaving the total unchanged.
69    pub fn inc(&self) {
70        self.inner.current.fetch_add(1, Ordering::Relaxed);
71    }
72
73    fn snapshot(&self) -> (u64, u64) {
74        (
75            self.inner.current.load(Ordering::Relaxed),
76            self.inner.total.load(Ordering::Relaxed),
77        )
78    }
79}
80
81#[derive(Clone)]
82struct FrameSnapshot {
83    label: String,
84    entered: Instant,
85    progress: Option<ScopeProgress>,
86}
87
88struct ThreadSnapshot {
89    name: Option<String>,
90    stack: Vec<FrameSnapshot>,
91    updated: Instant,
92}
93
94struct ProcessMonitorState {
95    started: Instant,
96    threads: Mutex<BTreeMap<String, ThreadSnapshot>>,
97    cpu: Mutex<CpuSampler>,
98}
99
100struct ThreadStack {
101    id: String,
102    name: Option<String>,
103    stack: Vec<FrameSnapshot>,
104}
105
106pub struct ProcessScopeGuard;
107
108impl ThreadStack {
109    fn new() -> Self {
110        let thread = thread::current();
111        Self {
112            id: format!("{:?}", thread.id()),
113            name: thread.name().map(str::to_string),
114            stack: Vec::new(),
115        }
116    }
117}
118
119impl ProcessMonitorState {
120    fn update_thread(&self, thread: &ThreadStack) {
121        let mut threads = self
122            .threads
123            .lock()
124            .expect("process monitor registry poisoned");
125        if thread.stack.is_empty() {
126            threads.remove(&thread.id);
127        } else {
128            threads.insert(
129                thread.id.clone(),
130                ThreadSnapshot {
131                    name: thread.name.clone(),
132                    stack: thread.stack.clone(),
133                    updated: Instant::now(),
134                },
135            );
136        }
137    }
138
139    fn emit(&self) {
140        let threads = self
141            .threads
142            .lock()
143            .expect("process monitor registry poisoned");
144        let resource = ProcessResourceSnapshot::read();
145
146        // TRUE busy signal: process-wide cores-busy averaged over the interval
147        // since the last heartbeat, read from /proc/self/stat. Independent of
148        // whether the busy threads happen to sit inside an instrumented scope,
149        // so a rayon fan-out over ~70 cores reads as ~70 here. The old
150        // `active_threads` counter (now `instrumented_threads`, kept as a
151        // diagnostic) only ever counted threads inside a `track_scope` and so
152        // reported a misleading 0 during rayon-heavy windows.
153        let cpu = self
154            .cpu
155            .lock()
156            .expect("process monitor cpu sampler poisoned")
157            .sample();
158        let instrumented_threads = threads.len();
159
160        // Build a per-thread view keyed on the DEEPEST frame (the innermost
161        // phase the thread is actually executing) and how long it has been
162        // there. Order oldest-first so the most-likely-stalled phases sort to
163        // the top of the (capped) dump.
164        struct ThreadPhase<'a> {
165            thread_label: String,
166            depth: usize,
167            updated_ago: Duration,
168            deepest_label: &'a str,
169            deepest_age: Duration,
170            progress: Option<(u64, u64)>,
171        }
172        let mut phases: Vec<ThreadPhase<'_>> = Vec::with_capacity(threads.len());
173        for (thread_id, thread) in threads.iter() {
174            let Some(deepest) = thread.stack.last() else {
175                continue;
176            };
177            let thread_label = match &thread.name {
178                Some(name) => format!("{thread_id}/{name}"),
179                None => thread_id.clone(),
180            };
181            // Surface the first progress counter found walking the stack from
182            // the innermost frame outward (the innermost reporting scope is the
183            // most specific "what is it doing right now").
184            let progress = thread
185                .stack
186                .iter()
187                .rev()
188                .find_map(|frame| frame.progress.as_ref())
189                .map(ScopeProgress::snapshot);
190            phases.push(ThreadPhase {
191                thread_label,
192                depth: thread.stack.len(),
193                updated_ago: thread.updated.elapsed(),
194                deepest_label: deepest.label.as_str(),
195                deepest_age: deepest.entered.elapsed(),
196                progress,
197            });
198        }
199        phases.sort_by(|a, b| b.deepest_age.cmp(&a.deepest_age));
200
201        // Headline line: total elapsed, resource snapshot, true CPU busy
202        // signal, and — front and center — the longest-running active scope so
203        // a user instantly sees "what is it doing and for how long".
204        let active = match phases.first() {
205            Some(phase) => {
206                let progress = match phase.progress {
207                    Some((cur, total)) if total > 0 => {
208                        let pct = (cur as f64 / total as f64) * 100.0;
209                        format!(" progress={cur}/{total} ({pct:.0}%)")
210                    }
211                    Some((cur, _)) => format!(" progress={cur}/?"),
212                    None => String::new(),
213                };
214                format!(
215                    " active={:?} for {}{}",
216                    phase.deepest_label,
217                    format_duration(phase.deepest_age),
218                    progress,
219                )
220            }
221            None => " active=<idle>".to_string(),
222        };
223
224        log::info!(
225            "[process-monitor] elapsed={} {} {} instrumented_threads={}{}",
226            format_duration(self.started.elapsed()),
227            resource.format(),
228            cpu.format(),
229            instrumented_threads,
230            active,
231        );
232
233        // STALL warnings first, loud and unconditional (not subject to the
234        // per-dump phase-line cap) so a long unlogged phase is never silent.
235        for phase in phases
236            .iter()
237            .filter(|p| p.deepest_age >= PROCESS_MONITOR_STALL_THRESHOLD)
238        {
239            log::warn!(
240                "[process-monitor][STALL] thread={} phase={:?} stuck={}",
241                phase.thread_label,
242                phase.deepest_label,
243                format_duration(phase.deepest_age),
244            );
245        }
246
247        // Compact per-thread phase summary: deepest frame label + age, capped.
248        for phase in phases.iter().take(PROCESS_MONITOR_MAX_PHASE_LINES) {
249            let progress = match phase.progress {
250                Some((cur, total)) if total > 0 => {
251                    let pct = (cur as f64 / total as f64) * 100.0;
252                    format!(" progress={cur}/{total} ({pct:.0}%)")
253                }
254                Some((cur, _)) => format!(" progress={cur}/?"),
255                None => String::new(),
256            };
257            log::info!(
258                "[process-monitor] phase thread={} depth={} deepest={:?} in_frame={} updated_ago={}{}",
259                phase.thread_label,
260                phase.depth,
261                phase.deepest_label,
262                format_duration(phase.deepest_age),
263                format_duration(phase.updated_ago),
264                progress,
265            );
266        }
267        if phases.len() > PROCESS_MONITOR_MAX_PHASE_LINES {
268            log::info!(
269                "[process-monitor] phase ... and {} more active thread(s) omitted",
270                phases.len() - PROCESS_MONITOR_MAX_PHASE_LINES,
271            );
272        }
273    }
274}
275
276impl Drop for ProcessScopeGuard {
277    fn drop(&mut self) {
278        let state = process_monitor();
279        THREAD_STACK.with(|stack| {
280            let mut stack = stack.borrow_mut();
281            stack.stack.pop();
282            state.update_thread(&stack);
283        });
284    }
285}
286
287/// Start the background process monitor thread if it is not already running.
288pub fn start() {
289    process_monitor();
290}
291
292pub fn track_scope(label: impl Into<String>) -> ProcessScopeGuard {
293    push_scope(label.into(), None)
294}
295
296/// Open a tracked scope that also exposes a live progress counter to the
297/// heartbeat. The returned [`ScopeProgress`] is cheap to clone into worker
298/// closures; bump it as the loop advances and the heartbeat will surface
299/// `progress=a/b (X%)` on the active-scope line. The scope closes when the
300/// returned guard is dropped, exactly like [`track_scope`].
301pub fn track_scope_with_progress(
302    label: impl Into<String>,
303    total: u64,
304) -> (ProcessScopeGuard, ScopeProgress) {
305    let progress = ScopeProgress {
306        inner: Arc::new(ProgressCounter {
307            current: AtomicU64::new(0),
308            total: AtomicU64::new(total),
309        }),
310    };
311    let guard = push_scope(label.into(), Some(progress.clone()));
312    (guard, progress)
313}
314
315fn push_scope(label: String, progress: Option<ScopeProgress>) -> ProcessScopeGuard {
316    let state = process_monitor();
317    THREAD_STACK.with(|stack| {
318        let mut stack = stack.borrow_mut();
319        stack.stack.push(FrameSnapshot {
320            label,
321            entered: Instant::now(),
322            progress,
323        });
324        state.update_thread(&stack);
325    });
326    ProcessScopeGuard
327}
328
329fn process_monitor() -> Arc<ProcessMonitorState> {
330    PROCESS_MONITOR
331        .get_or_init(|| {
332            let state = Arc::new(ProcessMonitorState {
333                started: Instant::now(),
334                threads: Mutex::new(BTreeMap::new()),
335                cpu: Mutex::new(CpuSampler::new()),
336            });
337            start_process_monitor_thread(Arc::clone(&state));
338            state
339        })
340        .clone()
341}
342
343fn start_process_monitor_thread(state: Arc<ProcessMonitorState>) {
344    let builder = thread::Builder::new().name("gam-process-monitor".to_string());
345    match builder.spawn(move || {
346        loop {
347            thread::park_timeout(PROCESS_MONITOR_INTERVAL);
348            state.emit();
349        }
350    }) {
351        Ok(handle) => drop(handle),
352        Err(err) => log::warn!("failed to start process monitor thread: {err}"),
353    }
354}
355
356fn format_duration(duration: Duration) -> String {
357    let total = duration.as_secs();
358    let hours = total / 3600;
359    let minutes = (total % 3600) / 60;
360    let seconds = total % 60;
361    if hours > 0 {
362        format!("{hours}h{minutes:02}m{seconds:02}s")
363    } else if minutes > 0 {
364        format!("{minutes}m{seconds:02}s")
365    } else {
366        format!("{seconds}s")
367    }
368}
369
370/// Process-wide CPU utilization sampler.
371///
372/// On Linux it reads cumulative user+system CPU jiffies from `/proc/self/stat`
373/// and, between consecutive heartbeats, computes the average number of cores
374/// kept busy over the interval: `Δ(utime+stime)/clock_hz / Δwall`. The first
375/// heartbeat has no prior sample and reports the busy figure as unknown.
376struct CpuSampler {
377    prev_total_ticks: Option<u64>,
378    prev_wall: Option<Instant>,
379    last_cores: Option<f64>,
380}
381
382impl CpuSampler {
383    fn new() -> Self {
384        Self {
385            prev_total_ticks: None,
386            prev_wall: None,
387            last_cores: None,
388        }
389    }
390
391    fn sample(&mut self) -> CpuSnapshot {
392        let now = Instant::now();
393        let ticks = read_self_cpu_ticks();
394        let cores = match (ticks, self.prev_total_ticks, self.prev_wall) {
395            (Some(ticks), Some(prev_ticks), Some(prev_wall)) => {
396                let delta_ticks = ticks.saturating_sub(prev_ticks) as f64;
397                let delta_wall = now.duration_since(prev_wall).as_secs_f64();
398                let hz = clock_ticks_per_second();
399                if delta_wall > 0.0 && hz > 0.0 {
400                    let cores = delta_ticks / hz / delta_wall;
401                    self.last_cores = Some(cores);
402                    Some(cores)
403                } else {
404                    self.last_cores
405                }
406            }
407            _ => None,
408        };
409        if let Some(ticks) = ticks {
410            self.prev_total_ticks = Some(ticks);
411            self.prev_wall = Some(now);
412        }
413        CpuSnapshot {
414            cores,
415            ncpu: available_parallelism(),
416            window: PROCESS_MONITOR_INTERVAL,
417        }
418    }
419}
420
421struct CpuSnapshot {
422    cores: Option<f64>,
423    ncpu: Option<usize>,
424    window: Duration,
425}
426
427impl CpuSnapshot {
428    fn format(&self) -> String {
429        match self.cores {
430            Some(cores) => {
431                let of = match self.ncpu {
432                    Some(n) => format!("/{n}"),
433                    None => String::new(),
434                };
435                format!(
436                    "cpu={:.1}{} cores (avg over {})",
437                    cores,
438                    of,
439                    format_duration(self.window),
440                )
441            }
442            None => "cpu=<warming-up>".to_string(),
443        }
444    }
445}
446
447/// Cumulative user+system CPU jiffies for this process from `/proc/self/stat`.
448///
449/// `/proc/self/stat` is a single space-separated line; field 14 (`utime`) and
450/// field 15 (`stime`) are the process's user/system time in clock ticks. The
451/// process command name (field 2) is parenthesized and may itself contain
452/// spaces, so we split after the final `)`.
453#[cfg(target_os = "linux")]
454fn read_self_cpu_ticks() -> Option<u64> {
455    let stat = std::fs::read_to_string("/proc/self/stat").ok()?;
456    let after_comm = stat.rsplit_once(')')?.1;
457    // After the closing ')' the remaining fields start at field 3 (state), so
458    // utime is index 11 and stime is index 12 of the post-')' whitespace split.
459    let fields: Vec<&str> = after_comm.split_whitespace().collect();
460    let utime: u64 = fields.get(11)?.parse().ok()?;
461    let stime: u64 = fields.get(12)?.parse().ok()?;
462    Some(utime.saturating_add(stime))
463}
464
465#[cfg(not(target_os = "linux"))]
466fn read_self_cpu_ticks() -> Option<u64> {
467    None
468}
469
470/// Clock ticks per second (`sysconf(_SC_CLK_TCK)`); on Linux this is almost
471/// universally 100. We hard-pin 100 rather than linking libc just for the
472/// sysconf call — the value is fixed at kernel build time and the standard
473/// Linux ABI value is 100, which is what `/proc` times are reported in.
474#[cfg(target_os = "linux")]
475fn clock_ticks_per_second() -> f64 {
476    100.0
477}
478
479#[cfg(not(target_os = "linux"))]
480fn clock_ticks_per_second() -> f64 {
481    0.0
482}
483
484fn available_parallelism() -> Option<usize> {
485    thread::available_parallelism().ok().map(|n| n.get())
486}
487
488#[derive(Default)]
489struct ProcessResourceSnapshot {
490    rss_kb: Option<u64>,
491    peak_rss_kb: Option<u64>,
492    threads: Option<u64>,
493    read_bytes: Option<u64>,
494    write_bytes: Option<u64>,
495}
496
497impl ProcessResourceSnapshot {
498    fn read() -> Self {
499        #[cfg(target_os = "linux")]
500        {
501            return Self::read_linux();
502        }
503        #[cfg(not(target_os = "linux"))]
504        {
505            Self::default()
506        }
507    }
508
509    fn format(&self) -> String {
510        format!(
511            "rss={} peak_rss={} process_threads={} read_bytes={} write_bytes={}",
512            format_kb(self.rss_kb),
513            format_kb(self.peak_rss_kb),
514            format_count(self.threads),
515            format_bytes(self.read_bytes),
516            format_bytes(self.write_bytes),
517        )
518    }
519
520    #[cfg(target_os = "linux")]
521    fn read_linux() -> Self {
522        let mut snapshot = Self::default();
523        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
524            for line in status.lines() {
525                if let Some(value) = parse_status_kb(line, "VmRSS:") {
526                    snapshot.rss_kb = Some(value);
527                } else if let Some(value) = parse_status_kb(line, "VmHWM:") {
528                    snapshot.peak_rss_kb = Some(value);
529                } else if let Some(value) = parse_status_count(line, "Threads:") {
530                    snapshot.threads = Some(value);
531                }
532            }
533        }
534        if let Ok(io) = std::fs::read_to_string("/proc/self/io") {
535            for line in io.lines() {
536                if let Some(value) = parse_io_bytes(line, "read_bytes:") {
537                    snapshot.read_bytes = Some(value);
538                } else if let Some(value) = parse_io_bytes(line, "write_bytes:") {
539                    snapshot.write_bytes = Some(value);
540                }
541            }
542        }
543        snapshot
544    }
545}
546
547#[cfg(target_os = "linux")]
548fn parse_status_kb(line: &str, key: &str) -> Option<u64> {
549    let rest = line.strip_prefix(key)?.trim();
550    rest.split_whitespace().next()?.parse().ok()
551}
552
553#[cfg(target_os = "linux")]
554fn parse_status_count(line: &str, key: &str) -> Option<u64> {
555    let rest = line.strip_prefix(key)?.trim();
556    rest.split_whitespace().next()?.parse().ok()
557}
558
559#[cfg(target_os = "linux")]
560fn parse_io_bytes(line: &str, key: &str) -> Option<u64> {
561    let rest = line.strip_prefix(key)?.trim();
562    rest.parse().ok()
563}
564
565fn format_count(value: Option<u64>) -> String {
566    value
567        .map(|value| value.to_string())
568        .unwrap_or_else(|| "<unknown>".to_string())
569}
570
571fn format_kb(value: Option<u64>) -> String {
572    value
573        .map(|kb| format_bytes(Some(kb.saturating_mul(1024))))
574        .unwrap_or_else(|| "<unknown>".to_string())
575}
576
577fn format_bytes(value: Option<u64>) -> String {
578    let Some(bytes) = value else {
579        return "<unknown>".to_string();
580    };
581    const KIB: f64 = 1024.0;
582    const MIB: f64 = KIB * 1024.0;
583    const GIB: f64 = MIB * 1024.0;
584    let bytes_f = bytes as f64;
585    if bytes_f >= GIB {
586        format!("{:.1}GiB", bytes_f / GIB)
587    } else if bytes_f >= MIB {
588        format!("{:.1}MiB", bytes_f / MIB)
589    } else if bytes_f >= KIB {
590        format!("{:.1}KiB", bytes_f / KIB)
591    } else {
592        format!("{bytes}B")
593    }
594}