1use std::fs::OpenOptions;
2use std::io::Write;
3use std::result::Result as stdResult;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::thread::JoinHandle;
7use std::time::{Duration, Instant};
8
9use serde::{Deserialize, Serialize};
10use sysinfo::{Networks, Pid, Process, ProcessRefreshKind, RefreshKind, System};
11use thiserror::Error;
12use tracing::info;
13
14use crate::utils::TemplatedPathBuf;
15
16#[derive(Debug)]
43pub struct SystemMonitor {
44 pid: Option<Pid>,
45 sample_interval: Duration,
46 log_path: Arc<Option<TemplatedPathBuf>>,
47 monitor_loop: Mutex<Option<JoinHandle<stdResult<(), SystemMonitorError>>>>,
48 stop: Arc<AtomicBool>,
49}
50
51#[derive(Debug)]
57struct SystemSampler {
58 system: System,
59 network: Networks,
60 pid: Option<Pid>,
61 start_measurement_time: Instant,
62 last_measurement_time: Instant,
63 last_sample: Option<Metrics>,
64 baseline_sample: Metrics,
65}
66
67#[derive(Debug, Serialize, Deserialize, Clone)]
72struct Metrics {
73 pid: u32,
75 name: String,
77 run_time: u64,
79 cpu: CpuUsage,
81 memory: MemoryUsage,
83 disk: DiskUsage,
85 network: NetworkUsage,
87}
88
89impl Metrics {
90 pub fn create(
91 system: &System,
92 network: &Networks,
93 pid: Pid,
94 sample_interval: Duration,
95 total_duration: Duration,
96 last_sample: Option<Metrics>,
97 baseline: &Metrics,
98 ) -> Result<Self> {
99 let Some(process) = system.process(pid) else {
100 return Err(SystemMonitorError::NoProcess(pid.as_u32()));
101 };
102
103 Ok(Self {
104 pid: pid.as_u32(),
105 name: process.name().to_string_lossy().into(),
106 run_time: process.run_time(),
107 cpu: CpuUsage::from(process, system),
108 memory: MemoryUsage::from(process, system, last_sample.map(|s| s.memory)),
109 disk: DiskUsage::from(process, sample_interval, total_duration, &baseline.disk),
110 network: NetworkUsage::from(network, sample_interval, total_duration, &baseline.network),
111 })
112 }
113
114 pub fn baseline(system: &System, network: &Networks, pid: Pid) -> Result<Self> {
123 let Some(process) = system.process(pid) else {
124 return Err(SystemMonitorError::NoProcess(pid.as_u32()));
125 };
126
127 Ok(Self {
128 pid: pid.as_u32(),
129 name: process.name().to_string_lossy().into(),
130 run_time: process.run_time(),
131 cpu: CpuUsage::from(process, system),
132 memory: MemoryUsage::from(process, system, None),
133 disk: DiskUsage::baseline(process),
134 network: NetworkUsage::baseline(network),
135 })
136 }
137
138 pub fn to_json(&self) -> Result<String> {
139 Ok(serde_json::to_string(&self)?)
140 }
141}
142
143#[derive(Debug, Serialize, Deserialize, Clone)]
145struct CpuUsage {
146 process_usage: f32,
148 ncpus: u32,
150 global_usage: Vec<f32>,
152}
153
154impl CpuUsage {
155 pub fn from(process: &Process, system: &System) -> Self {
156 Self {
157 process_usage: process.cpu_usage(),
158 ncpus: system.cpus().len() as u32,
159 global_usage: system.cpus().iter().map(|c| c.cpu_usage()).collect(),
160 }
161 }
162}
163
164#[derive(Debug, Serialize, Deserialize, Clone)]
166struct MemoryUsage {
167 used_bytes: u64,
169 peak_used_bytes: u64,
171 percentage: f64,
173 total_bytes: u64,
175}
176
177impl MemoryUsage {
178 pub fn from(process: &Process, system: &System, last_sample: Option<MemoryUsage>) -> Self {
179 Self {
180 used_bytes: process.memory(),
181 peak_used_bytes: process.memory().max(last_sample.map(|s| s.peak_used_bytes).unwrap_or_default()),
182 percentage: process.memory() as f64 / system.total_memory() as f64,
183 total_bytes: system.total_memory(),
184 }
185 }
186}
187
188#[derive(Debug, Serialize, Deserialize, Clone)]
190struct DiskUsage {
191 total_written_bytes: u64,
193 written_bytes: u64,
195 total_read_bytes: u64,
197 read_bytes: u64,
199
200 average_write_speed: f64,
202 instant_write_speed: f64,
204 average_read_speed: f64,
206 instant_read_speed: f64,
208}
209
210impl DiskUsage {
211 pub fn baseline(process: &Process) -> Self {
217 let usage = process.disk_usage();
218 Self {
219 total_written_bytes: usage.total_written_bytes,
220 written_bytes: 0,
221 total_read_bytes: usage.total_read_bytes,
222 read_bytes: 0,
223 average_write_speed: 0.,
224 instant_write_speed: 0.,
225 average_read_speed: 0.,
226 instant_read_speed: 0.,
227 }
228 }
229
230 pub fn from(process: &Process, sample_interval: Duration, total_duration: Duration, baseline: &DiskUsage) -> Self {
231 let usage = process.disk_usage();
232
233 let total_written_bytes = usage.total_written_bytes - baseline.total_written_bytes;
235 let total_read_bytes = usage.total_read_bytes - baseline.total_read_bytes;
236
237 Self {
238 total_written_bytes,
239 written_bytes: usage.written_bytes,
240 total_read_bytes,
241 read_bytes: usage.read_bytes,
242 average_write_speed: total_written_bytes as f64 / total_duration.as_secs_f64(),
243 instant_write_speed: usage.written_bytes as f64 / sample_interval.as_secs_f64(),
244 average_read_speed: total_read_bytes as f64 / total_duration.as_secs_f64(),
245 instant_read_speed: usage.read_bytes as f64 / sample_interval.as_secs_f64(),
246 }
247 }
248}
249
250#[derive(Debug, Serialize, Deserialize, Clone)]
252struct NetworkUsage {
253 total_tx_bytes: u64,
255 tx_bytes: u64,
257 total_rx_bytes: u64,
259 rx_bytes: u64,
261
262 average_tx_speed: f64,
264 instant_tx_speed: f64,
266 average_rx_speed: f64,
268 instant_rx_speed: f64,
270}
271
272impl NetworkUsage {
273 pub fn baseline(network: &Networks) -> Self {
279 let total_tx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_transmitted());
280 let total_rx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_received());
281
282 Self {
283 total_tx_bytes,
284 tx_bytes: 0,
285 total_rx_bytes,
286 rx_bytes: 0,
287 average_tx_speed: 0.,
288 instant_tx_speed: 0.,
289 average_rx_speed: 0.,
290 instant_rx_speed: 0.,
291 }
292 }
293
294 pub fn from(
295 network: &Networks,
296 sample_interval: Duration,
297 total_duration: Duration,
298 baseline: &NetworkUsage,
299 ) -> Self {
300 let total_tx_bytes =
301 network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_transmitted()) - baseline.total_tx_bytes;
302 let tx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.transmitted());
303 let total_rx_bytes =
304 network.iter().fold(0u64, |sum, (_, nic)| sum + nic.total_received()) - baseline.total_rx_bytes;
305 let rx_bytes = network.iter().fold(0u64, |sum, (_, nic)| sum + nic.received());
306
307 Self {
308 total_tx_bytes,
309 tx_bytes,
310 total_rx_bytes,
311 rx_bytes,
312 average_tx_speed: total_tx_bytes as f64 / total_duration.as_secs_f64(),
313 instant_tx_speed: tx_bytes as f64 / sample_interval.as_secs_f64(),
314 average_rx_speed: total_rx_bytes as f64 / total_duration.as_secs_f64(),
315 instant_rx_speed: rx_bytes as f64 / sample_interval.as_secs_f64(),
316 }
317 }
318}
319
320impl SystemSampler {
321 pub fn new(pid: Option<Pid>) -> Result<Self> {
322 let Some(pid) = pid.or_else(|| sysinfo::get_current_pid().ok()) else {
323 return Err(SystemMonitorError::NoPid);
324 };
325
326 let system = System::new_all();
327 let network = Networks::new_with_refreshed_list();
328
329 let baseline = Metrics::baseline(&system, &network, pid)?;
330
331 let now = Instant::now();
332
333 Ok(Self {
334 system,
335 network,
336 pid: Some(pid),
337 start_measurement_time: now,
338 last_measurement_time: now,
339 last_sample: None,
340 baseline_sample: baseline,
341 })
342 }
343
344 pub fn sample(&mut self) -> Result<()> {
345 self.system.refresh_all();
347 self.network.refresh(true);
349
350 let Some(pid) = self.pid.or_else(|| sysinfo::get_current_pid().ok()) else {
351 return Err(SystemMonitorError::NoPid);
352 };
353
354 let sample_interval = self.last_measurement_time.elapsed();
355 self.last_measurement_time = Instant::now();
356 let total_duration = self.start_measurement_time.elapsed();
357
358 self.last_sample = Some(Metrics::create(
359 &self.system,
360 &self.network,
361 pid,
362 sample_interval,
363 total_duration,
364 self.last_sample.take(),
365 &self.baseline_sample,
366 )?);
367
368 Ok(())
369 }
370}
371
372#[derive(Error, Debug)]
374pub enum SystemMonitorError {
375 #[error("Failed to get pid")]
376 NoPid,
377
378 #[error("Failed to get process from pid {0}")]
379 NoProcess(u32),
380
381 #[error("IO Error: {0}")]
382 IOError(#[from] std::io::Error),
383
384 #[error("Serde Json error: {0}")]
385 Serde(#[from] serde_json::Error),
386
387 #[error("Internal error: {0}")]
388 Internal(String),
389}
390
391type Result<T> = std::result::Result<T, SystemMonitorError>;
392
393impl SystemMonitor {
394 pub fn follow_process(sample_interval: Duration, log_path: Option<TemplatedPathBuf>) -> Result<Self> {
403 sysinfo::get_current_pid().map_err(|_| SystemMonitorError::NoPid)?;
404 Self::new_impl(None, sample_interval, log_path)
405 }
406
407 pub fn with_pid(pid: Pid, sample_interval: Duration, log_path: Option<TemplatedPathBuf>) -> Result<Self> {
417 let system =
418 System::new_with_specifics(RefreshKind::nothing().with_processes(ProcessRefreshKind::everything()));
419 if system.process(pid).is_none() {
420 return Err(SystemMonitorError::NoProcess(pid.as_u32()));
421 };
422
423 Self::new_impl(Some(pid), sample_interval, log_path)
424 }
425
426 fn new_impl(pid: Option<Pid>, sample_interval: Duration, log_path: Option<TemplatedPathBuf>) -> Result<Self> {
427 let ret = Self {
428 pid,
429 sample_interval,
430 log_path: log_path.into(),
431 monitor_loop: Mutex::new(None),
432 stop: Arc::new(AtomicBool::new(false)),
433 };
434
435 ret.start()?;
436
437 Ok(ret)
438 }
439
440 pub fn start(&self) -> Result<()> {
452 if self.is_running()? {
453 return Ok(());
454 }
455
456 let mut sampler = SystemSampler::new(self.pid)?;
457
458 sampler.sample()?;
461
462 let mut inner_runner = self
463 .monitor_loop
464 .lock()
465 .map_err(|e| SystemMonitorError::Internal(e.to_string()))?;
466 self.stop.store(false, Ordering::Relaxed);
467
468 let sample_interval = self.sample_interval;
469 let log_path = self.log_path.clone();
470 let stop_clone = self.stop.clone();
471
472 *inner_runner = Some(std::thread::spawn(move || {
473 loop {
474 if stop_clone.load(Ordering::Relaxed) {
475 break;
476 }
477 std::thread::sleep(sample_interval);
478 sampler.sample()?;
479
480 if let Some(sample) = &sampler.last_sample {
481 Self::output_report(sample, &log_path)?;
482 }
483 }
484 Ok(())
485 }));
486
487 Ok(())
488 }
489
490 fn output_report(sample: &Metrics, log_path: &Option<TemplatedPathBuf>) -> Result<()> {
491 let json_report = sample.to_json()?;
492
493 if let Some(path) = log_path {
494 let path = path.as_path();
495 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
496 writeln!(file, "{json_report}")?;
497 } else {
498 info!(system_usage = json_report);
499 }
500
501 Ok(())
502 }
503
504 fn is_running(&self) -> Result<bool> {
505 let inner_runner = self
506 .monitor_loop
507 .lock()
508 .map_err(|e| SystemMonitorError::Internal(e.to_string()))?;
509 Ok(inner_runner.is_some())
510 }
511
512 pub fn stop(&self) -> Result<()> {
520 self.stop.store(true, Ordering::Relaxed);
521
522 if let Some(inner_runner) = self
523 .monitor_loop
524 .lock()
525 .map_err(|e| SystemMonitorError::Internal(e.to_string()))?
526 .take()
527 {
528 match inner_runner
529 .join()
530 .map_err(|_| SystemMonitorError::Internal("join error".to_owned()))?
531 {
532 Ok(_) => (),
533 Err(SystemMonitorError::NoProcess(_)) => (), e => e?,
535 }
536 }
537
538 Ok(())
539 }
540}
541
542impl Drop for SystemMonitor {
543 fn drop(&mut self) {
544 let _ = self.stop();
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use std::fs::File;
551 use std::io::{BufRead, BufReader};
552 use std::time::Duration;
553
554 use serial_test::serial;
555 use tempfile::tempdir;
556
557 use super::*;
558
559 #[test]
560 #[serial(monitor_process)]
561 #[cfg_attr(feature = "smoke-test", ignore)]
562 fn test_monitor_self_disk_usage() -> Result<()> {
563 let tempdir = tempdir()?;
566 let tempdir_path = tempdir.path();
567 let log_path = TemplatedPathBuf::from(tempdir_path.join("system_monitor_{pid}.txt"));
568 let sample_interval = Duration::from_millis(500);
569 let monitor = SystemMonitor::follow_process(sample_interval, Some(log_path.clone()))?;
570
571 let data_file = tempdir_path.join("data");
573 let total_written_bytes = {
574 let buffer = vec![0; 1024 * 1024]; let mut fd = std::fs::OpenOptions::new()
576 .create(true)
577 .truncate(true)
578 .write(true)
579 .open(&data_file)?;
580
581 for _ in 0..10 {
582 fd.write_all(&buffer)?;
583 }
584 fd.flush()?;
585
586 10 * 1024 * 1024 };
588
589 std::thread::sleep(Duration::from_secs(2));
591 monitor.stop()?;
592
593 let filesize = std::fs::metadata(data_file)?.len();
595 assert_eq!(filesize, total_written_bytes);
596
597 let log_reader = BufReader::new(File::open(log_path.as_path())?);
598 let last_message = log_reader.lines().last().unwrap()?;
599 let metrics: Metrics = serde_json::from_str(&last_message)?;
600
601 assert!(metrics.disk.total_written_bytes >= total_written_bytes);
603
604 Ok(())
605 }
606
607 #[test]
608 #[serial(monitor_process)]
609 #[cfg_attr(feature = "smoke-test", ignore)]
610 fn test_monitor_self_memory_usage() -> Result<()> {
611 let tempdir = tempdir()?;
613 let tempdir_path = tempdir.path();
614 let log_path = TemplatedPathBuf::from(tempdir_path.join("system_monitor_{pid}.txt"));
615 let sample_interval = Duration::from_millis(500);
616 let monitor = SystemMonitor::follow_process(sample_interval, Some(log_path.clone()))?;
617
618 let peak_allocation_size = 512 * 1024 * 1024; let mut large_vec = vec![0u8; peak_allocation_size];
622 for i in 0..peak_allocation_size / (4 * 1024) {
624 large_vec[i * 4 * 1024] = 1;
625 }
626
627 std::thread::sleep(Duration::from_secs(2));
629
630 drop(large_vec);
632
633 monitor.stop()?;
634
635 let log_reader = BufReader::new(File::open(log_path.as_path())?);
637 let last_message = log_reader.lines().last().unwrap()?;
638 let metrics: Metrics = serde_json::from_str(&last_message)?;
639
640 assert!(metrics.memory.peak_used_bytes >= peak_allocation_size as u64);
642
643 Ok(())
644 }
645
646 #[test]
647 #[serial(monitor_process)]
648 fn test_monitor_nonexist_process() -> Result<()> {
649 let maybe_monitor = SystemMonitor::with_pid(Pid::from_u32(u32::MAX), Duration::from_secs(5), None);
652 assert!(maybe_monitor.is_err());
653
654 Ok(())
655 }
656}