Skip to main content

xet_runtime/logging/
system_monitor.rs

1use std::fs::OpenOptions;
2use std::io::Write;
3use std::result::Result as stdResult;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::thread::JoinHandle;
7use std::time::{Duration, Instant};
8
9use serde::{Deserialize, Serialize};
10use sysinfo::{Networks, Pid, Process, ProcessRefreshKind, RefreshKind, System};
11use thiserror::Error;
12use tracing::info;
13
14use crate::utils::TemplatedPathBuf;
15
16/// A utility for monitoring system resource usage of a process.
17///
18/// `SystemMonitor` can be configured to track a specific process ID or the current process.
19/// It periodically samples CPU usage, memory usage, disk I/O, and network I/O,
20/// and writes the metrics to a specified output file or to the tracing log.
21///
22/// # Example
23///
24/// ```no_run
25/// use std::time::Duration;
26///
27/// use xet_runtime::logging::SystemMonitor;
28/// use xet_runtime::utils::TemplatedPathBuf;
29///
30/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
31/// let monitor = SystemMonitor::follow_process(
32///     Duration::from_secs(5),
33///     Some(TemplatedPathBuf::from("monitor_{PID}_{TIMESTAMP}.log")),
34/// )?;
35///
36/// // ... application logic ...
37///
38/// monitor.stop()?;
39/// # Ok(())
40/// # }
41/// ```
42#[derive(Debug)]
43pub struct SystemMonitor {
44    pid: Option<Pid>,
45    sample_interval: Duration,
46    log_path: Arc<Option<TemplatedPathBuf>>,
47    monitor_loop: Mutex<Option<JoinHandle<stdResult<(), SystemMonitorError>>>>,
48    stop: Arc<AtomicBool>,
49}
50
51/// Internal state for sampling system metrics.
52///
53/// This struct holds the `sysinfo` `System` and `Networks` objects, which are refreshed
54/// at each sampling interval. It also tracks the process ID being monitored and timing
55/// information to calculate rates and averages.
56#[derive(Debug)]
57struct SystemSampler {
58    system: System,
59    network: Networks,
60    pid: Option<Pid>,
61    start_measurement_time: Instant,
62    last_measurement_time: Instant,
63    last_sample: Option<Metrics>,
64    baseline_sample: Metrics,
65}
66
67/// A snapshot of system metrics at a specific point in time.
68///
69/// This struct contains detailed information about CPU, memory, disk, and network usage
70/// for the monitored process.
71#[derive(Debug, Serialize, Deserialize, Clone)]
72struct Metrics {
73    /// Process ID of the monitored process
74    pid: u32,
75    /// Name of the process
76    name: String,
77    /// Total run time of the process in seconds
78    run_time: u64,
79    /// CPU usage metrics
80    cpu: CpuUsage,
81    /// Memory usage metrics
82    memory: MemoryUsage,
83    /// Disk I/O numbers and speed
84    disk: DiskUsage,
85    /// Network I/O numbers and speed
86    network: NetworkUsage,
87}
88
89impl Metrics {
90    pub fn create(
91        system: &System,
92        network: &Networks,
93        pid: Pid,
94        sample_interval: Duration,
95        total_duration: Duration,
96        last_sample: Option<Metrics>,
97        baseline: &Metrics,
98    ) -> Result<Self> {
99        let Some(process) = system.process(pid) else {
100            return Err(SystemMonitorError::NoProcess(pid.as_u32()));
101        };
102
103        Ok(Self {
104            pid: pid.as_u32(),
105            name: process.name().to_string_lossy().into(),
106            run_time: process.run_time(),
107            cpu: CpuUsage::from(process, system),
108            memory: MemoryUsage::from(process, system, last_sample.map(|s| s.memory)),
109            disk: DiskUsage::from(process, sample_interval, total_duration, &baseline.disk),
110            network: NetworkUsage::from(network, sample_interval, total_duration, &baseline.network),
111        })
112    }
113
114    /// Creates a baseline `Metrics` snapshot at the start of monitoring.
115    ///
116    /// This captures the initial state of disk and network I/O, which are reported
117    /// as cumulative values by the underlying system library. This baseline allows
118    /// for calculating the delta of resource usage during the monitoring session.
119    ///
120    /// This helps provide useful information when used by hf_xet in a long running
121    /// Python process, e.g. a iPython notebook
122    pub fn baseline(system: &System, network: &Networks, pid: Pid) -> Result<Self> {
123        let Some(process) = system.process(pid) else {
124            return Err(SystemMonitorError::NoProcess(pid.as_u32()));
125        };
126
127        Ok(Self {
128            pid: pid.as_u32(),
129            name: process.name().to_string_lossy().into(),
130            run_time: process.run_time(),
131            cpu: CpuUsage::from(process, system),
132            memory: MemoryUsage::from(process, system, None),
133            disk: DiskUsage::baseline(process),
134            network: NetworkUsage::baseline(network),
135        })
136    }
137
138    pub fn to_json(&self) -> Result<String> {
139        Ok(serde_json::to_string(&self)?)
140    }
141}
142
143/// Represents CPU usage metrics.
144#[derive(Debug, Serialize, Deserialize, Clone)]
145struct CpuUsage {
146    /// CPU usage of the monitored process as a percentage.
147    process_usage: f32,
148    /// Total number of CPUs in the system.
149    ncpus: u32,
150    /// Usage of individual CPUs as a percentage.
151    global_usage: Vec<f32>,
152}
153
154impl CpuUsage {
155    pub fn from(process: &Process, system: &System) -> Self {
156        Self {
157            process_usage: process.cpu_usage(),
158            ncpus: system.cpus().len() as u32,
159            global_usage: system.cpus().iter().map(|c| c.cpu_usage()).collect(),
160        }
161    }
162}
163
164/// Represents memory usage metrics.
165#[derive(Debug, Serialize, Deserialize, Clone)]
166struct MemoryUsage {
167    /// Current memory usage in bytes of the monitored process.
168    used_bytes: u64,
169    /// Peak memory usage in bytes observed for the monitored process during the session.
170    peak_used_bytes: u64,
171    /// Memory usage of the monitored process as a percentage of total system RAM.
172    percentage: f64,
173    /// Total system RAM size in bytes.
174    total_bytes: u64,
175}
176
177impl MemoryUsage {
178    pub fn from(process: &Process, system: &System, last_sample: Option<MemoryUsage>) -> Self {
179        Self {
180            used_bytes: process.memory(),
181            peak_used_bytes: process.memory().max(last_sample.map(|s| s.peak_used_bytes).unwrap_or_default()),
182            percentage: process.memory() as f64 / system.total_memory() as f64,
183            total_bytes: system.total_memory(),
184        }
185    }
186}
187
188/// Represents disk I/O metrics.
189#[derive(Debug, Serialize, Deserialize, Clone)]
190struct DiskUsage {
191    /// Total number of bytes written by the process since the monitor started.
192    total_written_bytes: u64,
193    /// Number of bytes written by the process since the last sample.
194    written_bytes: u64,
195    /// Total number of bytes read by the process since the monitor started.
196    total_read_bytes: u64,
197    /// Number of bytes read by the process since the last sample.
198    read_bytes: u64,
199
200    /// Average write speed in bytes per second over the entire monitoring duration.
201    average_write_speed: f64,
202    /// Instantaneous write speed in bytes per second over the last sample interval.
203    instant_write_speed: f64,
204    /// Average read speed in bytes per second over the entire monitoring duration.
205    average_read_speed: f64,
206    /// Instantaneous read speed in bytes per second over the last sample interval.
207    instant_read_speed: f64,
208}
209
210impl DiskUsage {
211    /// Creates a baseline for disk usage at the start of monitoring.
212    ///
213    /// This is necessary because `sysinfo` provides cumulative disk I/O statistics
214    /// since the process started. To measure usage only during the monitoring period,
215    /// we capture this initial state and subtract it from later samples.
216    pub fn baseline(process: &Process) -> Self {
217        let usage = process.disk_usage();
218        Self {
219            total_written_bytes: usage.total_written_bytes,
220            written_bytes: 0,
221            total_read_bytes: usage.total_read_bytes,
222            read_bytes: 0,
223            average_write_speed: 0.,
224            instant_write_speed: 0.,
225            average_read_speed: 0.,
226            instant_read_speed: 0.,
227        }
228    }
229
230    pub fn from(process: &Process, sample_interval: Duration, total_duration: Duration, baseline: &DiskUsage) -> Self {
231        let usage = process.disk_usage();
232
233        // Subtract stats before the monitor
234        let total_written_bytes = usage.total_written_bytes - baseline.total_written_bytes;
235        let total_read_bytes = usage.total_read_bytes - baseline.total_read_bytes;
236
237        Self {
238            total_written_bytes,
239            written_bytes: usage.written_bytes,
240            total_read_bytes,
241            read_bytes: usage.read_bytes,
242            average_write_speed: total_written_bytes as f64 / total_duration.as_secs_f64(),
243            instant_write_speed: usage.written_bytes as f64 / sample_interval.as_secs_f64(),
244            average_read_speed: total_read_bytes as f64 / total_duration.as_secs_f64(),
245            instant_read_speed: usage.read_bytes as f64 / sample_interval.as_secs_f64(),
246        }
247    }
248}
249
250/// Represents network I/O metrics for all interfaces combined.
251#[derive(Debug, Serialize, Deserialize, Clone)]
252struct NetworkUsage {
253    /// Total number of bytes transmitted across all network interfaces since the monitor started.
254    total_tx_bytes: u64,
255    /// Number of bytes transmitted across all network interfaces since the last sample.
256    tx_bytes: u64,
257    /// Total number of bytes received across all network interfaces since the monitor started.
258    total_rx_bytes: u64,
259    /// Number of bytes received across all network interfaces since the last sample.
260    rx_bytes: u64,
261
262    /// Average transmit speed in bytes per second over the entire monitoring duration.
263    average_tx_speed: f64,
264    /// Instantaneous transmit speed in bytes per second over the last sample interval.
265    instant_tx_speed: f64,
266    /// Average receive speed in bytes per second over the entire monitoring duration.
267    average_rx_speed: f64,
268    /// Instantaneous receive speed in bytes per second over the last sample interval.
269    instant_rx_speed: f64,
270}
271
272impl NetworkUsage {
273    /// Creates a baseline for network usage at the start of monitoring.
274    ///
275    /// This is necessary because `sysinfo` provides cumulative network I/O statistics
276    /// since the system booted. To measure usage only during the monitoring period,
277    /// we capture this initial state and subtract it from later samples.
278    pub fn baseline(network: &Networks) -> Self {
279        let total_tx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_transmitted());
280        let total_rx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_received());
281
282        Self {
283            total_tx_bytes,
284            tx_bytes: 0,
285            total_rx_bytes,
286            rx_bytes: 0,
287            average_tx_speed: 0.,
288            instant_tx_speed: 0.,
289            average_rx_speed: 0.,
290            instant_rx_speed: 0.,
291        }
292    }
293
294    pub fn from(
295        network: &Networks,
296        sample_interval: Duration,
297        total_duration: Duration,
298        baseline: &NetworkUsage,
299    ) -> Self {
300        let total_tx_bytes =
301            network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_transmitted()) - baseline.total_tx_bytes;
302        let tx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.transmitted());
303        let total_rx_bytes =
304            network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_received()) - baseline.total_rx_bytes;
305        let rx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.received());
306
307        Self {
308            total_tx_bytes,
309            tx_bytes,
310            total_rx_bytes,
311            rx_bytes,
312            average_tx_speed: total_tx_bytes as f64 / total_duration.as_secs_f64(),
313            instant_tx_speed: tx_bytes as f64 / sample_interval.as_secs_f64(),
314            average_rx_speed: total_rx_bytes as f64 / total_duration.as_secs_f64(),
315            instant_rx_speed: rx_bytes as f64 / sample_interval.as_secs_f64(),
316        }
317    }
318}
319
320impl SystemSampler {
321    pub fn new(pid: Option<Pid>) -> Result<Self> {
322        let Some(pid) = pid.or_else(|| sysinfo::get_current_pid().ok()) else {
323            return Err(SystemMonitorError::NoPid);
324        };
325
326        let system = System::new_all();
327        let network = Networks::new_with_refreshed_list();
328
329        let baseline = Metrics::baseline(&system, &network, pid)?;
330
331        let now = Instant::now();
332
333        Ok(Self {
334            system,
335            network,
336            pid: Some(pid),
337            start_measurement_time: now,
338            last_measurement_time: now,
339            last_sample: None,
340            baseline_sample: baseline,
341        })
342    }
343
344    pub fn sample(&mut self) -> Result<()> {
345        // refresh process, cpu, memory and disk usage
346        self.system.refresh_all();
347        // refresh network interface usage
348        self.network.refresh(true);
349
350        let Some(pid) = self.pid.or_else(|| sysinfo::get_current_pid().ok()) else {
351            return Err(SystemMonitorError::NoPid);
352        };
353
354        let sample_interval = self.last_measurement_time.elapsed();
355        self.last_measurement_time = Instant::now();
356        let total_duration = self.start_measurement_time.elapsed();
357
358        self.last_sample = Some(Metrics::create(
359            &self.system,
360            &self.network,
361            pid,
362            sample_interval,
363            total_duration,
364            self.last_sample.take(),
365            &self.baseline_sample,
366        )?);
367
368        Ok(())
369    }
370}
371
372/// Errors that can occur during system monitoring.
373#[derive(Error, Debug)]
374pub enum SystemMonitorError {
375    #[error("Failed to get pid")]
376    NoPid,
377
378    #[error("Failed to get process from pid {0}")]
379    NoProcess(u32),
380
381    #[error("IO Error: {0}")]
382    IOError(#[from] std::io::Error),
383
384    #[error("Serde Json error: {0}")]
385    Serde(#[from] serde_json::Error),
386
387    #[error("Internal error: {0}")]
388    Internal(String),
389}
390
391type Result<T> = std::result::Result<T, SystemMonitorError>;
392
393impl SystemMonitor {
394    /// Creates a new SystemMonitor that follows the current process.
395    ///
396    /// Monitoring starts immediately upon creation. The background thread begins
397    /// sampling system metrics at the specified interval.
398    ///
399    /// # Arguments
400    /// * `sample_interval` - The interval at which to sample system metrics.
401    /// * `log_path` - Optional path template for the output log file. If None, logs to tracing at INFO level.
402    pub fn follow_process(sample_interval: Duration, log_path: Option<TemplatedPathBuf>) -> Result<Self> {
403        sysinfo::get_current_pid().map_err(|_| SystemMonitorError::NoPid)?;
404        Self::new_impl(None, sample_interval, log_path)
405    }
406
407    /// Creates a new SystemMonitor that follows a specific process ID.
408    ///
409    /// Monitoring starts immediately upon creation. The background thread begins
410    /// sampling system metrics at the specified interval.
411    ///
412    /// # Arguments
413    /// * `pid` - The process ID to monitor.
414    /// * `sample_interval` - The interval at which to sample system metrics.
415    /// * `log_path` - Optional path template for the output log file. If None, logs to tracing at INFO level.
416    pub fn with_pid(pid: Pid, sample_interval: Duration, log_path: Option<TemplatedPathBuf>) -> Result<Self> {
417        let system =
418            System::new_with_specifics(RefreshKind::nothing().with_processes(ProcessRefreshKind::everything()));
419        if system.process(pid).is_none() {
420            return Err(SystemMonitorError::NoProcess(pid.as_u32()));
421        };
422
423        Self::new_impl(Some(pid), sample_interval, log_path)
424    }
425
426    fn new_impl(pid: Option<Pid>, sample_interval: Duration, log_path: Option<TemplatedPathBuf>) -> Result<Self> {
427        let ret = Self {
428            pid,
429            sample_interval,
430            log_path: log_path.into(),
431            monitor_loop: Mutex::new(None),
432            stop: Arc::new(AtomicBool::new(false)),
433        };
434
435        ret.start()?;
436
437        Ok(ret)
438    }
439
440    /// Starts the monitoring thread.
441    ///
442    /// This function is called automatically by `follow_process()` and `with_pid()`,
443    /// so it typically doesn't need to be called manually. If the monitor is already
444    /// running, this is a no-op.
445    ///
446    /// # Errors
447    /// Returns an error if:
448    /// - The log path is invalid or cannot be written to
449    /// - The monitored process no longer exists
450    /// - Internal synchronization fails
451    pub fn start(&self) -> Result<()> {
452        if self.is_running()? {
453            return Ok(());
454        }
455
456        let mut sampler = SystemSampler::new(self.pid)?;
457
458        // Take a sample before the thread starts so that errors like a bad log_path
459        // show up immediately in the caller.
460        sampler.sample()?;
461
462        let mut inner_runner = self
463            .monitor_loop
464            .lock()
465            .map_err(|e| SystemMonitorError::Internal(e.to_string()))?;
466        self.stop.store(false, Ordering::Relaxed);
467
468        let sample_interval = self.sample_interval;
469        let log_path = self.log_path.clone();
470        let stop_clone = self.stop.clone();
471
472        *inner_runner = Some(std::thread::spawn(move || {
473            loop {
474                if stop_clone.load(Ordering::Relaxed) {
475                    break;
476                }
477                std::thread::sleep(sample_interval);
478                sampler.sample()?;
479
480                if let Some(sample) = &sampler.last_sample {
481                    Self::output_report(sample, &log_path)?;
482                }
483            }
484            Ok(())
485        }));
486
487        Ok(())
488    }
489
490    fn output_report(sample: &Metrics, log_path: &Option<TemplatedPathBuf>) -> Result<()> {
491        let json_report = sample.to_json()?;
492
493        if let Some(path) = log_path {
494            let path = path.as_path();
495            let mut file = OpenOptions::new().create(true).append(true).open(path)?;
496            writeln!(file, "{json_report}")?;
497        } else {
498            info!(system_usage = json_report);
499        }
500
501        Ok(())
502    }
503
504    fn is_running(&self) -> Result<bool> {
505        let inner_runner = self
506            .monitor_loop
507            .lock()
508            .map_err(|e| SystemMonitorError::Internal(e.to_string()))?;
509        Ok(inner_runner.is_some())
510    }
511
512    /// Stops the monitoring thread.
513    ///
514    /// Signals the background thread to stop and waits for it to join.
515    ///
516    /// # Errors
517    /// Returns an error if there is an issue stopping the thread, such as if the thread
518    /// panicked or if there are internal synchronization issues.
519    pub fn stop(&self) -> Result<()> {
520        self.stop.store(true, Ordering::Relaxed);
521
522        if let Some(inner_runner) = self
523            .monitor_loop
524            .lock()
525            .map_err(|e| SystemMonitorError::Internal(e.to_string()))?
526            .take()
527        {
528            match inner_runner
529                .join()
530                .map_err(|_| SystemMonitorError::Internal("join error".to_owned()))?
531            {
532                Ok(_) => (),
533                Err(SystemMonitorError::NoProcess(_)) => (), // monitored process naturally died
534                e => e?,
535            }
536        }
537
538        Ok(())
539    }
540}
541
542impl Drop for SystemMonitor {
543    fn drop(&mut self) {
544        let _ = self.stop();
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use std::fs::File;
551    use std::io::{BufRead, BufReader};
552    use std::time::Duration;
553
554    use serial_test::serial;
555    use tempfile::tempdir;
556
557    use super::*;
558
559    #[test]
560    #[serial(monitor_process)]
561    #[cfg_attr(feature = "smoke-test", ignore)]
562    fn test_monitor_self_disk_usage() -> Result<()> {
563        // Verifies that the system monitor correctly tracks and reports disk usage of this process
564
565        let tempdir = tempdir()?;
566        let tempdir_path = tempdir.path();
567        let log_path = TemplatedPathBuf::from(tempdir_path.join("system_monitor_{pid}.txt"));
568        let sample_interval = Duration::from_millis(500);
569        let monitor = SystemMonitor::follow_process(sample_interval, Some(log_path.clone()))?;
570
571        // produce some disk usage
572        let data_file = tempdir_path.join("data");
573        let total_written_bytes = {
574            let buffer = vec![0; 1024 * 1024]; // 1MiB
575            let mut fd = std::fs::OpenOptions::new()
576                .create(true)
577                .truncate(true)
578                .write(true)
579                .open(&data_file)?;
580
581            for _ in 0..10 {
582                fd.write_all(&buffer)?;
583            }
584            fd.flush()?;
585
586            10 * 1024 * 1024 // 10MiB
587        };
588
589        // wait for the last sample and abort monitor
590        std::thread::sleep(Duration::from_secs(2));
591        monitor.stop()?;
592
593        // check monitor logs
594        let filesize = std::fs::metadata(data_file)?.len();
595        assert_eq!(filesize, total_written_bytes);
596
597        let log_reader = BufReader::new(File::open(log_path.as_path())?);
598        let last_message = log_reader.lines().last().unwrap()?;
599        let metrics: Metrics = serde_json::from_str(&last_message)?;
600
601        // The total_written_bytes should be at least the size of the file created by this process.
602        assert!(metrics.disk.total_written_bytes >= total_written_bytes);
603
604        Ok(())
605    }
606
607    #[test]
608    #[serial(monitor_process)]
609    #[cfg_attr(feature = "smoke-test", ignore)]
610    fn test_monitor_self_memory_usage() -> Result<()> {
611        // Verifies that the system monitor correctly tracks and reports peak memory usage.
612        let tempdir = tempdir()?;
613        let tempdir_path = tempdir.path();
614        let log_path = TemplatedPathBuf::from(tempdir_path.join("system_monitor_{pid}.txt"));
615        let sample_interval = Duration::from_millis(500);
616        let monitor = SystemMonitor::follow_process(sample_interval, Some(log_path.clone()))?;
617
618        let peak_allocation_size = 512 * 1024 * 1024; // 512 MiB
619
620        // Allocate a large chunk of memory.
621        let mut large_vec = vec![0u8; peak_allocation_size];
622        // Touch each Page to commit usage.
623        for i in 0..peak_allocation_size / (4 * 1024) {
624            large_vec[i * 4 * 1024] = 1;
625        }
626
627        // Wait for a sample to be taken while memory usage is high.
628        std::thread::sleep(Duration::from_secs(2));
629
630        // Drop the large allocation.
631        drop(large_vec);
632
633        monitor.stop()?;
634
635        // Check monitor logs.
636        let log_reader = BufReader::new(File::open(log_path.as_path())?);
637        let last_message = log_reader.lines().last().unwrap()?;
638        let metrics: Metrics = serde_json::from_str(&last_message)?;
639
640        // The peak memory usage should be at least the size of our large allocation.
641        assert!(metrics.memory.peak_used_bytes >= peak_allocation_size as u64);
642
643        Ok(())
644    }
645
646    #[test]
647    #[serial(monitor_process)]
648    fn test_monitor_nonexist_process() -> Result<()> {
649        // Verifies that the system monitor fails to initiate if targeted at an invalid pid
650
651        let maybe_monitor = SystemMonitor::with_pid(Pid::from_u32(u32::MAX), Duration::from_secs(5), None);
652        assert!(maybe_monitor.is_err());
653
654        Ok(())
655    }
656}