solana_core/
system_monitor_service.rs

1#[cfg(target_arch = "x86")]
2use core::arch::x86::{CpuidResult, __cpuid, __cpuid_count, __get_cpuid_max};
3#[cfg(target_arch = "x86_64")]
4use core::arch::x86_64::{CpuidResult, __cpuid, __cpuid_count, __get_cpuid_max};
5#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
6use num_enum::{IntoPrimitive, TryFromPrimitive};
7#[cfg(target_os = "linux")]
8use std::{fs::File, io::BufReader};
9use {
10    solana_time_utils::AtomicInterval,
11    std::{
12        collections::HashMap,
13        io::BufRead,
14        sync::{
15            atomic::{AtomicBool, Ordering},
16            Arc,
17        },
18        thread::{self, sleep, Builder, JoinHandle},
19        time::Duration,
20    },
21    sys_info::{Error, LoadAvg},
22};
23
24const MS_PER_S: u64 = 1_000;
25const MS_PER_M: u64 = MS_PER_S * 60;
26const MS_PER_H: u64 = MS_PER_M * 60;
27const SAMPLE_INTERVAL_UDP_MS: u64 = 2 * MS_PER_S;
28const SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS: u64 = MS_PER_H;
29const SAMPLE_INTERVAL_MEM_MS: u64 = 5 * MS_PER_S;
30const SAMPLE_INTERVAL_CPU_MS: u64 = 10 * MS_PER_S;
31const SAMPLE_INTERVAL_CPU_ID_MS: u64 = MS_PER_H;
32const SAMPLE_INTERVAL_DISK_MS: u64 = 5 * MS_PER_S;
33const SLEEP_INTERVAL: Duration = Duration::from_millis(500);
34
35#[cfg(target_os = "linux")]
36const PROC_NET_SNMP_PATH: &str = "/proc/net/snmp";
37#[cfg(target_os = "linux")]
38const PROC_NET_DEV_PATH: &str = "/proc/net/dev";
39#[cfg(target_os = "linux")]
40const SYS_BLOCK_PATH: &str = "/sys/block";
41
42pub struct SystemMonitorService {
43    thread_hdl: JoinHandle<()>,
44}
45
46#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
47struct UdpStats {
48    in_datagrams: u64,
49    no_ports: u64,
50    in_errors: u64,
51    out_datagrams: u64,
52    rcvbuf_errors: u64,
53    sndbuf_errors: u64,
54    in_csum_errors: u64,
55    ignored_multi: u64,
56}
57
58#[derive(Default)]
59#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
60// These stats are aggregated across all network devices excluding the loopback interface.
61struct NetDevStats {
62    // Number of bytes received
63    rx_bytes: u64,
64    // Number of packets received
65    rx_packets: u64,
66    // Number of receive errors detected by device driver
67    rx_errs: u64,
68    // Number of receive packets dropped by the device driver (not included in error count)
69    rx_drops: u64,
70    // Number of receive FIFO buffer errors
71    rx_fifo: u64,
72    // Number of receive packet framing errors
73    rx_frame: u64,
74    // Number of compressed packets received
75    rx_compressed: u64,
76    // Number of multicast frames received by device driver
77    rx_multicast: u64,
78    // Number of bytes transmitted
79    tx_bytes: u64,
80    // Number of packets transmitted
81    tx_packets: u64,
82    // Number of transmit errors detected by device driver
83    tx_errs: u64,
84    // Number of transmit packets dropped by device driver
85    tx_drops: u64,
86    // Number of transmit FIFO buffer errors
87    tx_fifo: u64,
88    // Number of transmit collisions detected
89    tx_colls: u64,
90    // Number of transmit carrier losses detected by device driver
91    tx_carrier: u64,
92    // Number of compressed packets transmitted
93    tx_compressed: u64,
94}
95
96#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
97struct NetStats {
98    udp_stats: UdpStats,
99    net_dev_stats: NetDevStats,
100}
101
102struct CpuInfo {
103    cpu_num: u32,
104    cpu_freq_mhz: u64,
105    load_avg: LoadAvg,
106    num_threads: u64,
107}
108
109#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
110#[derive(IntoPrimitive)]
111#[repr(i64)]
112enum CpuManufacturer {
113    Other,
114    Intel,
115    Amd,
116}
117
118#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
119#[derive(IntoPrimitive, TryFromPrimitive, PartialEq, PartialOrd)]
120#[repr(u32)]
121// The value passed into cpuid via eax, to control what the result means
122enum CpuidParamValue {
123    Manufacturer = 0,
124    Processor = 1,
125    Cache = 2,
126    SerialNumber = 3,
127    Topology = 4,
128    Unsupported = 5,
129    ThermalAndPower = 6,
130    Extended = 7,
131}
132#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
133const CPUID_PARAM_MAX_SUPPORTED_VALUE: u32 = 7;
134
135#[derive(Default)]
136#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
137// These stats are aggregated across all storage devices excluding internal loopbacks.
138// Fields are cumulative since boot with the exception of 'num_disks' and 'io_in_progress'
139struct DiskStats {
140    reads_completed: u64,
141    reads_merged: u64,
142    sectors_read: u64,
143    time_reading_ms: u64,
144    writes_completed: u64,
145    writes_merged: u64,
146    sectors_written: u64,
147    time_writing_ms: u64,
148    io_in_progress: u64,
149    time_io_ms: u64,
150    // weighted time multiplies time performing IO by number of commands in the queue
151    time_io_weighted_ms: u64,
152    discards_completed: u64,
153    discards_merged: u64,
154    sectors_discarded: u64,
155    time_discarding: u64,
156    flushes_completed: u64,
157    time_flushing: u64,
158    num_disks: u64,
159}
160
161impl UdpStats {
162    fn from_map(udp_stats: &HashMap<String, u64>) -> Self {
163        Self {
164            in_datagrams: *udp_stats.get("InDatagrams").unwrap_or(&0),
165            no_ports: *udp_stats.get("NoPorts").unwrap_or(&0),
166            in_errors: *udp_stats.get("InErrors").unwrap_or(&0),
167            out_datagrams: *udp_stats.get("OutDatagrams").unwrap_or(&0),
168            rcvbuf_errors: *udp_stats.get("RcvbufErrors").unwrap_or(&0),
169            sndbuf_errors: *udp_stats.get("SndbufErrors").unwrap_or(&0),
170            in_csum_errors: *udp_stats.get("InCsumErrors").unwrap_or(&0),
171            ignored_multi: *udp_stats.get("IgnoredMulti").unwrap_or(&0),
172        }
173    }
174}
175
176impl DiskStats {
177    #[cfg_attr(not(target_os = "linux"), allow(dead_code))]
178    fn accumulate(&mut self, other: &DiskStats) {
179        self.reads_completed += other.reads_completed;
180        self.reads_merged += other.reads_merged;
181        self.sectors_read += other.sectors_read;
182        self.time_reading_ms += other.time_reading_ms;
183        self.writes_completed += other.writes_completed;
184        self.writes_merged += other.writes_merged;
185        self.sectors_written += other.sectors_written;
186        self.time_writing_ms += other.time_writing_ms;
187        self.io_in_progress += other.io_in_progress;
188        self.time_io_ms += other.time_io_ms;
189        self.time_io_weighted_ms += other.time_io_weighted_ms;
190        self.discards_completed += other.discards_completed;
191        self.discards_merged += other.discards_merged;
192        self.sectors_discarded += other.sectors_discarded;
193        self.time_discarding += other.time_discarding;
194        self.flushes_completed += other.flushes_completed;
195        self.time_flushing += other.time_flushing;
196    }
197}
198
199fn platform_id() -> String {
200    format!(
201        "{}/{}/{}",
202        std::env::consts::FAMILY,
203        std::env::consts::OS,
204        std::env::consts::ARCH
205    )
206}
207
208#[cfg(target_os = "linux")]
209fn read_net_stats() -> Result<NetStats, String> {
210    let file_path_snmp = PROC_NET_SNMP_PATH;
211    let file_snmp = File::open(file_path_snmp).map_err(|e| e.to_string())?;
212    let mut reader_snmp = BufReader::new(file_snmp);
213
214    let file_path_dev = PROC_NET_DEV_PATH;
215    let file_dev = File::open(file_path_dev).map_err(|e| e.to_string())?;
216    let mut reader_dev = BufReader::new(file_dev);
217
218    let udp_stats = parse_udp_stats(&mut reader_snmp)?;
219    let net_dev_stats = parse_net_dev_stats(&mut reader_dev)?;
220    Ok(NetStats {
221        udp_stats,
222        net_dev_stats,
223    })
224}
225
226#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
227fn parse_udp_stats(reader_snmp: &mut impl BufRead) -> Result<UdpStats, String> {
228    let mut udp_lines = Vec::default();
229    for line in reader_snmp.lines() {
230        let line = line.map_err(|e| e.to_string())?;
231        if line.starts_with("Udp:") {
232            udp_lines.push(line);
233            if udp_lines.len() == 2 {
234                break;
235            }
236        }
237    }
238    if udp_lines.len() != 2 {
239        return Err(format!(
240            "parse error, expected 2 lines, num lines: {}",
241            udp_lines.len()
242        ));
243    }
244
245    let pairs: Vec<_> = udp_lines[0]
246        .split_ascii_whitespace()
247        .zip(udp_lines[1].split_ascii_whitespace())
248        .collect();
249    let udp_stats: HashMap<String, u64> = pairs[1..]
250        .iter()
251        .map(|(label, val)| (label.to_string(), val.parse::<u64>().unwrap()))
252        .collect();
253
254    let stats = UdpStats::from_map(&udp_stats);
255    Ok(stats)
256}
257
258#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
259fn parse_net_dev_stats(reader_dev: &mut impl BufRead) -> Result<NetDevStats, String> {
260    let mut stats = NetDevStats::default();
261    for (line_number, line) in reader_dev.lines().enumerate() {
262        if line_number < 2 {
263            // Skip first two lines with header information.
264            continue;
265        }
266
267        let line = line.map_err(|e| e.to_string())?;
268        let values: Vec<_> = line.split_ascii_whitespace().collect();
269
270        if values.len() != 17 {
271            return Err("parse error, expected exactly 17 stat elements".to_string());
272        }
273        if values[0] == "lo:" {
274            // Filter out the loopback network interface as we are only concerned with
275            // external traffic.
276            continue;
277        }
278
279        stats.rx_bytes += values[1].parse::<u64>().map_err(|e| e.to_string())?;
280        stats.rx_packets += values[2].parse::<u64>().map_err(|e| e.to_string())?;
281        stats.rx_errs += values[3].parse::<u64>().map_err(|e| e.to_string())?;
282        stats.rx_drops += values[4].parse::<u64>().map_err(|e| e.to_string())?;
283        stats.rx_fifo += values[5].parse::<u64>().map_err(|e| e.to_string())?;
284        stats.rx_frame += values[6].parse::<u64>().map_err(|e| e.to_string())?;
285        stats.rx_compressed += values[7].parse::<u64>().map_err(|e| e.to_string())?;
286        stats.rx_multicast += values[8].parse::<u64>().map_err(|e| e.to_string())?;
287        stats.tx_bytes += values[9].parse::<u64>().map_err(|e| e.to_string())?;
288        stats.tx_packets += values[10].parse::<u64>().map_err(|e| e.to_string())?;
289        stats.tx_errs += values[11].parse::<u64>().map_err(|e| e.to_string())?;
290        stats.tx_drops += values[12].parse::<u64>().map_err(|e| e.to_string())?;
291        stats.tx_fifo += values[13].parse::<u64>().map_err(|e| e.to_string())?;
292        stats.tx_colls += values[14].parse::<u64>().map_err(|e| e.to_string())?;
293        stats.tx_carrier += values[15].parse::<u64>().map_err(|e| e.to_string())?;
294        stats.tx_compressed += values[16].parse::<u64>().map_err(|e| e.to_string())?;
295    }
296
297    Ok(stats)
298}
299
300#[cfg(target_os = "linux")]
301pub fn verify_net_stats_access() -> Result<(), String> {
302    read_net_stats()?;
303    Ok(())
304}
305
306#[cfg(not(target_os = "linux"))]
307pub fn verify_net_stats_access() -> Result<(), String> {
308    Ok(())
309}
310
311#[cfg(target_os = "linux")]
312fn read_disk_stats() -> Result<DiskStats, String> {
313    let mut stats = DiskStats::default();
314    let mut num_disks = 0;
315    let blk_device_dir_iter = std::fs::read_dir(SYS_BLOCK_PATH).map_err(|e| e.to_string())?;
316    blk_device_dir_iter
317        .filter_map(|blk_device_dir| {
318            match blk_device_dir {
319                Ok(blk_device_dir) => {
320                    let blk_device_dir_name = &blk_device_dir.file_name();
321                    let blk_device_dir_name = blk_device_dir_name.to_string_lossy();
322                    if blk_device_dir_name.starts_with("loop")
323                        || blk_device_dir_name.starts_with("dm")
324                        || blk_device_dir_name.starts_with("md")
325                    {
326                        // Filter out loopback devices, dmcrypt volumes, and mdraid volumes
327                        return None;
328                    }
329                    let mut path = blk_device_dir.path();
330                    path.push("stat");
331                    File::open(path).ok()
332                }
333                Err(_) => None,
334            }
335        })
336        .for_each(|file_diskstats| {
337            let mut reader_diskstats = BufReader::new(file_diskstats);
338            stats.accumulate(&parse_disk_stats(&mut reader_diskstats).unwrap_or_default());
339            num_disks += 1;
340        });
341    stats.num_disks = num_disks;
342    Ok(stats)
343}
344
345#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
346fn parse_disk_stats(reader_diskstats: &mut impl BufRead) -> Result<DiskStats, String> {
347    let mut stats = DiskStats::default();
348    let mut line = String::new();
349    reader_diskstats
350        .read_line(&mut line)
351        .map_err(|e| e.to_string())?;
352    let values: Vec<_> = line.split_ascii_whitespace().collect();
353    let num_elements = values.len();
354
355    if num_elements != 11 && num_elements != 15 && num_elements != 17 {
356        return Err("parse error, unknown number of disk stat elements".to_string());
357    }
358
359    stats.reads_completed = values[0].parse::<u64>().map_err(|e| e.to_string())?;
360    stats.reads_merged = values[1].parse::<u64>().map_err(|e| e.to_string())?;
361    stats.sectors_read = values[2].parse::<u64>().map_err(|e| e.to_string())?;
362    stats.time_reading_ms = values[3].parse::<u64>().map_err(|e| e.to_string())?;
363    stats.writes_completed = values[4].parse::<u64>().map_err(|e| e.to_string())?;
364    stats.writes_merged = values[5].parse::<u64>().map_err(|e| e.to_string())?;
365    stats.sectors_written = values[6].parse::<u64>().map_err(|e| e.to_string())?;
366    stats.time_writing_ms = values[7].parse::<u64>().map_err(|e| e.to_string())?;
367    stats.io_in_progress = values[8].parse::<u64>().map_err(|e| e.to_string())?;
368    stats.time_io_ms = values[9].parse::<u64>().map_err(|e| e.to_string())?;
369    stats.time_io_weighted_ms = values[10].parse::<u64>().map_err(|e| e.to_string())?;
370    if num_elements > 11 {
371        // Kernel 4.18+ appends four more fields for discard
372        stats.discards_completed = values[11].parse::<u64>().map_err(|e| e.to_string())?;
373        stats.discards_merged = values[12].parse::<u64>().map_err(|e| e.to_string())?;
374        stats.sectors_discarded = values[13].parse::<u64>().map_err(|e| e.to_string())?;
375        stats.time_discarding = values[14].parse::<u64>().map_err(|e| e.to_string())?;
376    }
377    if num_elements > 15 {
378        // Kernel 5.5+ appends two more fields for flush requests
379        stats.flushes_completed = values[15].parse::<u64>().map_err(|e| e.to_string())?;
380        stats.time_flushing = values[16].parse::<u64>().map_err(|e| e.to_string())?;
381    }
382
383    Ok(stats)
384}
385
386pub struct SystemMonitorStatsReportConfig {
387    pub report_os_memory_stats: bool,
388    pub report_os_network_stats: bool,
389    pub report_os_cpu_stats: bool,
390    pub report_os_disk_stats: bool,
391}
392
393#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
394enum InterestingLimit {
395    Recommend(i64),
396    QueryOnly,
397}
398
399#[cfg(target_os = "linux")]
400const INTERESTING_LIMITS: &[(&str, InterestingLimit)] = &[
401    ("net.core.rmem_max", InterestingLimit::Recommend(134217728)),
402    ("net.core.wmem_max", InterestingLimit::Recommend(134217728)),
403    ("vm.max_map_count", InterestingLimit::Recommend(1000000)),
404    ("net.core.optmem_max", InterestingLimit::QueryOnly),
405    ("net.core.netdev_max_backlog", InterestingLimit::QueryOnly),
406];
407
408impl SystemMonitorService {
409    pub fn new(exit: Arc<AtomicBool>, config: SystemMonitorStatsReportConfig) -> Self {
410        info!("Starting SystemMonitorService");
411        let thread_hdl = Builder::new()
412            .name("solSystemMonitr".to_string())
413            .spawn(move || {
414                Self::run(exit, config);
415            })
416            .unwrap();
417
418        Self { thread_hdl }
419    }
420
421    #[cfg(target_os = "linux")]
422    fn linux_get_current_network_limits() -> Vec<(&'static str, &'static InterestingLimit, i64)> {
423        use sysctl::Sysctl;
424
425        fn sysctl_read(name: &str) -> Result<String, sysctl::SysctlError> {
426            let ctl = sysctl::Ctl::new(name)?;
427            let val = ctl.value_string()?;
428            Ok(val)
429        }
430
431        fn normalize_err<E: std::fmt::Display>(key: &str, error: E) -> String {
432            format!("Failed to query value for {}: {}", key, error)
433        }
434        INTERESTING_LIMITS
435            .iter()
436            .map(|(key, interesting_limit)| {
437                let current_value = sysctl_read(key)
438                    .map_err(|e| normalize_err(key, e))
439                    .and_then(|val| val.parse::<i64>().map_err(|e| normalize_err(key, e)))
440                    .unwrap_or_else(|e| {
441                        error!("{}", e);
442                        -1
443                    });
444                (*key, interesting_limit, current_value)
445            })
446            .collect::<Vec<_>>()
447    }
448
449    #[cfg_attr(not(target_os = "linux"), allow(dead_code))]
450    fn linux_report_network_limits(
451        current_limits: &[(&'static str, &'static InterestingLimit, i64)],
452    ) -> bool {
453        current_limits
454            .iter()
455            .all(|(key, interesting_limit, current_value)| {
456                datapoint_warn!("os-config", (key, *current_value, i64));
457                match interesting_limit {
458                    InterestingLimit::Recommend(recommended_value)
459                        if current_value < recommended_value =>
460                    {
461                        warn!(
462                            "  {key}: recommended={recommended_value}, current={current_value} \
463                             too small"
464                        );
465                        false
466                    }
467                    InterestingLimit::Recommend(recommended_value) => {
468                        info!("  {key}: recommended={recommended_value} current={current_value}");
469                        true
470                    }
471                    InterestingLimit::QueryOnly => {
472                        info!("  {key}: report-only --  current={current_value}");
473                        true
474                    }
475                }
476            })
477    }
478
479    #[cfg(not(target_os = "linux"))]
480    pub fn check_os_network_limits() -> bool {
481        datapoint_info!("os-config", ("platform", platform_id(), String));
482        true
483    }
484
485    #[cfg(target_os = "linux")]
486    pub fn check_os_network_limits() -> bool {
487        datapoint_info!("os-config", ("platform", platform_id(), String));
488        let current_limits = Self::linux_get_current_network_limits();
489        Self::linux_report_network_limits(&current_limits)
490    }
491
492    #[cfg(target_os = "linux")]
493    fn process_net_stats(net_stats: &mut Option<NetStats>) {
494        match read_net_stats() {
495            Ok(new_stats) => {
496                if let Some(old_stats) = net_stats {
497                    Self::report_net_stats(old_stats, &new_stats);
498                }
499                *net_stats = Some(new_stats);
500            }
501            Err(e) => warn!("read_net_stats: {}", e),
502        }
503    }
504
505    #[cfg(not(target_os = "linux"))]
506    fn process_net_stats(_net_stats: &mut Option<NetStats>) {}
507
508    #[cfg(target_os = "linux")]
509    fn report_net_stats(old_stats: &NetStats, new_stats: &NetStats) {
510        datapoint_info!(
511            "net-stats-validator",
512            (
513                "in_datagrams_delta",
514                new_stats.udp_stats.in_datagrams - old_stats.udp_stats.in_datagrams,
515                i64
516            ),
517            (
518                "no_ports_delta",
519                new_stats.udp_stats.no_ports - old_stats.udp_stats.no_ports,
520                i64
521            ),
522            (
523                "in_errors_delta",
524                new_stats.udp_stats.in_errors - old_stats.udp_stats.in_errors,
525                i64
526            ),
527            (
528                "out_datagrams_delta",
529                new_stats.udp_stats.out_datagrams - old_stats.udp_stats.out_datagrams,
530                i64
531            ),
532            (
533                "rcvbuf_errors_delta",
534                new_stats.udp_stats.rcvbuf_errors - old_stats.udp_stats.rcvbuf_errors,
535                i64
536            ),
537            (
538                "sndbuf_errors_delta",
539                new_stats.udp_stats.sndbuf_errors - old_stats.udp_stats.sndbuf_errors,
540                i64
541            ),
542            (
543                "in_csum_errors_delta",
544                new_stats.udp_stats.in_csum_errors - old_stats.udp_stats.in_csum_errors,
545                i64
546            ),
547            (
548                "ignored_multi_delta",
549                new_stats.udp_stats.ignored_multi - old_stats.udp_stats.ignored_multi,
550                i64
551            ),
552            ("in_errors", new_stats.udp_stats.in_errors, i64),
553            ("rcvbuf_errors", new_stats.udp_stats.rcvbuf_errors, i64),
554            ("sndbuf_errors", new_stats.udp_stats.sndbuf_errors, i64),
555            (
556                "rx_bytes_delta",
557                new_stats
558                    .net_dev_stats
559                    .rx_bytes
560                    .saturating_sub(old_stats.net_dev_stats.rx_bytes),
561                i64
562            ),
563            (
564                "rx_packets_delta",
565                new_stats
566                    .net_dev_stats
567                    .rx_packets
568                    .saturating_sub(old_stats.net_dev_stats.rx_packets),
569                i64
570            ),
571            (
572                "rx_errs_delta",
573                new_stats
574                    .net_dev_stats
575                    .rx_errs
576                    .saturating_sub(old_stats.net_dev_stats.rx_errs),
577                i64
578            ),
579            (
580                "rx_drops_delta",
581                new_stats
582                    .net_dev_stats
583                    .rx_drops
584                    .saturating_sub(old_stats.net_dev_stats.rx_drops),
585                i64
586            ),
587            (
588                "rx_fifo_delta",
589                new_stats
590                    .net_dev_stats
591                    .rx_fifo
592                    .saturating_sub(old_stats.net_dev_stats.rx_fifo),
593                i64
594            ),
595            (
596                "rx_frame_delta",
597                new_stats
598                    .net_dev_stats
599                    .rx_frame
600                    .saturating_sub(old_stats.net_dev_stats.rx_frame),
601                i64
602            ),
603            (
604                "tx_bytes_delta",
605                new_stats
606                    .net_dev_stats
607                    .tx_bytes
608                    .saturating_sub(old_stats.net_dev_stats.tx_bytes),
609                i64
610            ),
611            (
612                "tx_packets_delta",
613                new_stats
614                    .net_dev_stats
615                    .tx_packets
616                    .saturating_sub(old_stats.net_dev_stats.tx_packets),
617                i64
618            ),
619            (
620                "tx_errs_delta",
621                new_stats
622                    .net_dev_stats
623                    .tx_errs
624                    .saturating_sub(old_stats.net_dev_stats.tx_errs),
625                i64
626            ),
627            (
628                "tx_drops_delta",
629                new_stats
630                    .net_dev_stats
631                    .tx_drops
632                    .saturating_sub(old_stats.net_dev_stats.tx_drops),
633                i64
634            ),
635            (
636                "tx_fifo_delta",
637                new_stats
638                    .net_dev_stats
639                    .tx_fifo
640                    .saturating_sub(old_stats.net_dev_stats.tx_fifo),
641                i64
642            ),
643            (
644                "tx_colls_delta",
645                new_stats
646                    .net_dev_stats
647                    .tx_colls
648                    .saturating_sub(old_stats.net_dev_stats.tx_colls),
649                i64
650            ),
651        );
652    }
653
654    fn calc_percent(numerator: u64, denom: u64) -> f64 {
655        if denom == 0 {
656            0.0
657        } else {
658            (numerator as f64 / denom as f64) * 100.0
659        }
660    }
661
662    fn report_mem_stats() {
663        // get mem info (in kb)
664        if let Ok(info) = sys_info::mem_info() {
665            const KB: u64 = 1_024;
666            datapoint_info!(
667                "memory-stats",
668                ("total", info.total * KB, i64),
669                ("swap_total", info.swap_total * KB, i64),
670                ("buffers_bytes", info.buffers * KB, i64),
671                ("cached_bytes", info.cached * KB, i64),
672                (
673                    "free_percent",
674                    Self::calc_percent(info.free, info.total),
675                    f64
676                ),
677                (
678                    "used_bytes",
679                    info.total.saturating_sub(info.avail) * KB,
680                    i64
681                ),
682                (
683                    "avail_percent",
684                    Self::calc_percent(info.avail, info.total),
685                    f64
686                ),
687                (
688                    "buffers_percent",
689                    Self::calc_percent(info.buffers, info.total),
690                    f64
691                ),
692                (
693                    "cached_percent",
694                    Self::calc_percent(info.cached, info.total),
695                    f64
696                ),
697                (
698                    "swap_free_percent",
699                    Self::calc_percent(info.swap_free, info.swap_total),
700                    f64
701                ),
702            )
703        }
704    }
705
706    fn cpu_info() -> Result<CpuInfo, Error> {
707        let cpu_num = sys_info::cpu_num()?;
708        let cpu_freq_mhz = sys_info::cpu_speed()?;
709        let load_avg = sys_info::loadavg()?;
710        let num_threads = sys_info::proc_total()?;
711
712        Ok(CpuInfo {
713            cpu_num,
714            cpu_freq_mhz,
715            load_avg,
716            num_threads,
717        })
718    }
719
720    #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
721    fn report_cpuid_values() {
722        const CPUID_MANUFACTURER_EBX_INTEL: u32 = 0x756e6547;
723        const CPUID_MANUFACTURER_EDX_INTEL: u32 = 0x49656e69;
724        const CPUID_MANUFACTURER_ECX_INTEL: u32 = 0x6c65746e;
725        const CPUID_MANUFACTURER_EBX_AMD: u32 = 0x68747541;
726        const CPUID_MANUFACTURER_EDX_AMD: u32 = 0x69746e65;
727        const CPUID_MANUFACTURER_ECX_AMD: u32 = 0x444d4163;
728
729        unsafe {
730            let cpuid_mfr = __cpuid(0);
731            let cpuid_empty = CpuidResult {
732                eax: 0,
733                ebx: 0,
734                ecx: 0,
735                edx: 0,
736            };
737
738            let max_leaf = match CpuidParamValue::try_from(std::cmp::min(
739                cpuid_mfr.eax,
740                CPUID_PARAM_MAX_SUPPORTED_VALUE,
741            )) {
742                Ok(val) => val,
743                Err(_err) => CpuidParamValue::Manufacturer,
744            };
745
746            let mfr_id = if cpuid_mfr.ebx == CPUID_MANUFACTURER_EBX_INTEL
747                && cpuid_mfr.edx == CPUID_MANUFACTURER_EDX_INTEL
748                && cpuid_mfr.ecx == CPUID_MANUFACTURER_ECX_INTEL
749            {
750                CpuManufacturer::Intel // GenuineIntel
751            } else if cpuid_mfr.ebx == CPUID_MANUFACTURER_EBX_AMD
752                && cpuid_mfr.edx == CPUID_MANUFACTURER_EDX_AMD
753                && cpuid_mfr.ecx == CPUID_MANUFACTURER_ECX_AMD
754            {
755                CpuManufacturer::Amd // AuthenticAMD
756            } else {
757                CpuManufacturer::Other // anything else
758            };
759
760            let cpuid_processor = if CpuidParamValue::Processor <= max_leaf {
761                __cpuid(CpuidParamValue::Processor.into())
762            } else {
763                cpuid_empty
764            };
765            let cpuid_cache = if CpuidParamValue::Cache <= max_leaf {
766                __cpuid(CpuidParamValue::Cache.into())
767            } else {
768                cpuid_empty
769            };
770            let cpuid_topology = if CpuidParamValue::Topology <= max_leaf {
771                __cpuid(CpuidParamValue::Topology.into())
772            } else {
773                cpuid_empty
774            };
775            let cpuid_extended_0 = if CpuidParamValue::Extended <= max_leaf {
776                __cpuid_count(CpuidParamValue::Extended.into(), 0)
777            } else {
778                cpuid_empty
779            };
780            let cpuid_extended_1 = if CpuidParamValue::Extended <= max_leaf {
781                if 1 <= __get_cpuid_max(CpuidParamValue::Extended.into()).1 {
782                    __cpuid_count(CpuidParamValue::Extended.into(), 1)
783                } else {
784                    cpuid_empty
785                }
786            } else {
787                cpuid_empty
788            };
789
790            datapoint_info!(
791                "cpuid-values",
792                ("manufacturer_id", i64::from(mfr_id), i64),
793                ("cpuid_processor_eax", i64::from(cpuid_processor.eax), i64),
794                ("cpuid_processor_ebx", i64::from(cpuid_processor.ebx), i64),
795                ("cpuid_processor_ecx", i64::from(cpuid_processor.ecx), i64),
796                ("cpuid_processor_edx", i64::from(cpuid_processor.edx), i64),
797                ("cpuid_cache_eax", i64::from(cpuid_cache.eax), i64),
798                ("cpuid_cache_ebx", i64::from(cpuid_cache.ebx), i64),
799                ("cpuid_cache_ecx", i64::from(cpuid_cache.ecx), i64),
800                ("cpuid_cache_edx", i64::from(cpuid_cache.edx), i64),
801                ("cpuid_topology_eax", i64::from(cpuid_topology.eax), i64),
802                ("cpuid_topology_ebx", i64::from(cpuid_topology.ebx), i64),
803                ("cpuid_topology_ecx", i64::from(cpuid_topology.ecx), i64),
804                ("cpuid_topology_edx", i64::from(cpuid_topology.edx), i64),
805                ("cpuid_extended_0_ebx", i64::from(cpuid_extended_0.ebx), i64),
806                ("cpuid_extended_0_ecx", i64::from(cpuid_extended_0.ecx), i64),
807                ("cpuid_extended_0_edx", i64::from(cpuid_extended_0.edx), i64),
808                ("cpuid_extended_1_eax", i64::from(cpuid_extended_1.eax), i64),
809            );
810        };
811    }
812
813    fn report_cpu_stats() {
814        if let Ok(info) = Self::cpu_info() {
815            datapoint_info!(
816                "cpu-stats",
817                ("cpu_num", info.cpu_num as i64, i64),
818                ("cpu0_freq_mhz", info.cpu_freq_mhz as i64, i64),
819                ("average_load_one_minute", info.load_avg.one, f64),
820                ("average_load_five_minutes", info.load_avg.five, f64),
821                ("average_load_fifteen_minutes", info.load_avg.fifteen, f64),
822                ("total_num_threads", info.num_threads as i64, i64),
823            )
824        }
825    }
826
827    #[cfg(target_os = "linux")]
828    fn process_disk_stats(disk_stats: &mut Option<DiskStats>) {
829        match read_disk_stats() {
830            Ok(new_stats) => {
831                if let Some(old_stats) = disk_stats {
832                    Self::report_disk_stats(old_stats, &new_stats);
833                }
834                *disk_stats = Some(new_stats);
835            }
836            Err(e) => warn!("read_disk_stats: {}", e),
837        }
838    }
839
840    #[cfg(not(target_os = "linux"))]
841    fn process_disk_stats(_disk_stats: &mut Option<DiskStats>) {}
842
843    #[cfg(target_os = "linux")]
844    fn report_disk_stats(old_stats: &DiskStats, new_stats: &DiskStats) {
845        datapoint_info!(
846            "disk-stats",
847            (
848                "reads_completed",
849                new_stats
850                    .reads_completed
851                    .saturating_sub(old_stats.reads_completed),
852                i64
853            ),
854            (
855                "reads_merged",
856                new_stats
857                    .reads_merged
858                    .saturating_sub(old_stats.reads_merged),
859                i64
860            ),
861            (
862                "sectors_read",
863                new_stats
864                    .sectors_read
865                    .saturating_sub(old_stats.sectors_read),
866                i64
867            ),
868            (
869                "time_reading_ms",
870                new_stats
871                    .time_reading_ms
872                    .saturating_sub(old_stats.time_reading_ms),
873                i64
874            ),
875            (
876                "writes_completed",
877                new_stats
878                    .writes_completed
879                    .saturating_sub(old_stats.writes_completed),
880                i64
881            ),
882            (
883                "writes_merged",
884                new_stats
885                    .writes_merged
886                    .saturating_sub(old_stats.writes_merged),
887                i64
888            ),
889            (
890                "sectors_written",
891                new_stats
892                    .sectors_written
893                    .saturating_sub(old_stats.sectors_written),
894                i64
895            ),
896            (
897                "time_writing_ms",
898                new_stats
899                    .time_writing_ms
900                    .saturating_sub(old_stats.time_writing_ms),
901                i64
902            ),
903            ("io_in_progress", new_stats.io_in_progress, i64),
904            (
905                "time_io_ms",
906                new_stats.time_io_ms.saturating_sub(old_stats.time_io_ms),
907                i64
908            ),
909            (
910                "time_io_weighted_ms",
911                new_stats
912                    .time_io_weighted_ms
913                    .saturating_sub(old_stats.time_io_weighted_ms),
914                i64
915            ),
916            (
917                "discards_completed",
918                new_stats
919                    .discards_completed
920                    .saturating_sub(old_stats.discards_completed),
921                i64
922            ),
923            (
924                "discards_merged",
925                new_stats
926                    .discards_merged
927                    .saturating_sub(old_stats.discards_merged),
928                i64
929            ),
930            (
931                "sectors_discarded",
932                new_stats
933                    .sectors_discarded
934                    .saturating_sub(old_stats.sectors_discarded),
935                i64
936            ),
937            (
938                "time_discarding",
939                new_stats
940                    .time_discarding
941                    .saturating_sub(old_stats.time_discarding),
942                i64
943            ),
944            (
945                "flushes_completed",
946                new_stats
947                    .flushes_completed
948                    .saturating_sub(old_stats.flushes_completed),
949                i64
950            ),
951            (
952                "time_flushing",
953                new_stats
954                    .time_flushing
955                    .saturating_sub(old_stats.time_flushing),
956                i64
957            ),
958            ("num_disks", new_stats.num_disks, i64),
959        )
960    }
961
962    pub fn run(exit: Arc<AtomicBool>, config: SystemMonitorStatsReportConfig) {
963        let mut udp_stats = None;
964        let mut disk_stats = None;
965        let network_limits_timer = AtomicInterval::default();
966        let udp_timer = AtomicInterval::default();
967        let mem_timer = AtomicInterval::default();
968        let cpu_timer = AtomicInterval::default();
969        let cpuid_timer = AtomicInterval::default();
970        let disk_timer = AtomicInterval::default();
971
972        loop {
973            if exit.load(Ordering::Relaxed) {
974                break;
975            }
976            if config.report_os_network_stats {
977                if network_limits_timer.should_update(SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS) {
978                    Self::check_os_network_limits();
979                }
980                if udp_timer.should_update(SAMPLE_INTERVAL_UDP_MS) {
981                    Self::process_net_stats(&mut udp_stats);
982                }
983            }
984            if config.report_os_memory_stats && mem_timer.should_update(SAMPLE_INTERVAL_MEM_MS) {
985                Self::report_mem_stats();
986            }
987            if config.report_os_cpu_stats {
988                if cpu_timer.should_update(SAMPLE_INTERVAL_CPU_MS) {
989                    Self::report_cpu_stats();
990                }
991                if cpuid_timer.should_update(SAMPLE_INTERVAL_CPU_ID_MS) {
992                    #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
993                    Self::report_cpuid_values();
994                }
995            }
996            if config.report_os_disk_stats && disk_timer.should_update(SAMPLE_INTERVAL_DISK_MS) {
997                Self::process_disk_stats(&mut disk_stats);
998            }
999            sleep(SLEEP_INTERVAL);
1000        }
1001    }
1002
1003    pub fn join(self) -> thread::Result<()> {
1004        self.thread_hdl.join()
1005    }
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010    use super::*;
1011
1012    #[test]
1013    fn test_parse_udp_stats() {
1014        const MOCK_SNMP: &[u8] =
1015b"Ip: Forwarding DefaultTTL InReceives InHdrErrors InAddrErrors ForwDatagrams InUnknownProtos InDiscards InDelivers OutRequests OutDiscards OutNoRoutes ReasmTimeout ReasmReqds ReasmOKs ReasmFails FragOKs FragFails FragCreates
1016Ip: 1 64 357 0 2 0 0 0 355 315 0 6 0 0 0 0 0 0 0
1017Icmp: InMsgs InErrors InCsumErrors InDestUnreachs InTimeExcds InParmProbs InSrcQuenchs InRedirects InEchos InEchoReps InTimestamps InTimestampReps InAddrMasks InAddrMaskReps OutMsgs OutErrors OutDestUnreachs OutTimeExcds OutParmProbs OutSrcQuenchs OutRedirects OutEchos OutEchoReps OutTimestamps OutTimestampReps OutAddrMasks OutAddrMaskReps
1018Icmp: 3 0 0 3 0 0 0 0 0 0 0 0 0 0 7 0 7 0 0 0 0 0 0 0 0 0 0
1019IcmpMsg: InType3 OutType3
1020IcmpMsg: 3 7
1021Tcp: RtoAlgorithm RtoMin RtoMax MaxConn ActiveOpens PassiveOpens AttemptFails EstabResets CurrEstab InSegs OutSegs RetransSegs InErrs OutRsts InCsumErrors
1022Tcp: 1 200 120000 -1 29 1 0 0 5 318 279 0 0 4 0
1023Udp: InDatagrams NoPorts InErrors OutDatagrams RcvbufErrors SndbufErrors InCsumErrors IgnoredMulti
1024Udp: 27 7 0 30 0 0 0 0
1025UdpLite: InDatagrams NoPorts InErrors OutDatagrams RcvbufErrors SndbufErrors InCsumErrors IgnoredMulti
1026UdpLite: 0 0 0 0 0 0 0 0" as &[u8];
1027        const UNEXPECTED_DATA: &[u8] = b"unexpected data" as &[u8];
1028
1029        let mut mock_snmp = MOCK_SNMP;
1030        let stats = parse_udp_stats(&mut mock_snmp).unwrap();
1031        assert_eq!(stats.out_datagrams, 30);
1032        assert_eq!(stats.no_ports, 7);
1033
1034        mock_snmp = UNEXPECTED_DATA;
1035        let stats = parse_udp_stats(&mut mock_snmp);
1036        assert!(stats.is_err());
1037    }
1038
1039    #[test]
1040    fn test_parse_net_dev_stats() {
1041        const MOCK_DEV: &[u8] =
1042b"Inter-|   Receive                                                |  Transmit
1043face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
1044lo: 50     1    0    0    0     0          0         0 100 2    1    0    0     0       0          0
1045eno1: 100     1    0    0    0     0          0         0 200 3    2    0    0     0       0          0
1046ens4: 400     4    0    1    0     0          0         0 250 5    0    0    0     0       0          0" as &[u8];
1047        const UNEXPECTED_DATA: &[u8] = b"un
1048expected
1049data" as &[u8];
1050
1051        let mut mock_dev = MOCK_DEV;
1052        let stats = parse_net_dev_stats(&mut mock_dev).unwrap();
1053        assert_eq!(stats.rx_bytes, 500);
1054        assert_eq!(stats.rx_packets, 5);
1055        assert_eq!(stats.rx_errs, 0);
1056        assert_eq!(stats.rx_drops, 1);
1057        assert_eq!(stats.tx_bytes, 450);
1058        assert_eq!(stats.tx_packets, 8);
1059        assert_eq!(stats.tx_errs, 2);
1060        assert_eq!(stats.tx_drops, 0);
1061
1062        let mut mock_dev = UNEXPECTED_DATA;
1063        let stats = parse_net_dev_stats(&mut mock_dev);
1064        assert!(stats.is_err());
1065    }
1066
1067    #[test]
1068    fn test_parse_disk_stats() {
1069        const MOCK_DISK_11: &[u8] =
1070b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738" as &[u8];
1071        // Matches kernel 4.18+ format
1072        const MOCK_DISK_15: &[u8] =
1073b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738        0        0        0        0" as &[u8];
1074        // Matches kernel 5.5+ format
1075        const MOCK_DISK_17: &[u8] =
1076b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738        0        0        0        0    70715     2922" as &[u8];
1077        const UNEXPECTED_DATA_1: &[u8] =
1078b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738        0        0        0        0    70715" as &[u8];
1079
1080        const UNEXPECTED_DATA_2: &[u8] = b"un
1081ex
1082pec
1083ted
1084data" as &[u8];
1085
1086        let mut mock_disk = MOCK_DISK_11;
1087        let stats = parse_disk_stats(&mut mock_disk).unwrap();
1088        assert_eq!(stats.reads_completed, 2095701);
1089        assert_eq!(stats.time_io_weighted_ms, 285220738);
1090
1091        let mut mock_disk = MOCK_DISK_15;
1092        let stats = parse_disk_stats(&mut mock_disk).unwrap();
1093        assert_eq!(stats.reads_completed, 2095701);
1094        assert_eq!(stats.time_discarding, 0);
1095
1096        let mut mock_disk = MOCK_DISK_17;
1097        let stats = parse_disk_stats(&mut mock_disk).unwrap();
1098        assert_eq!(stats.reads_completed, 2095701);
1099        assert_eq!(stats.time_flushing, 2922);
1100
1101        let mut mock_disk = UNEXPECTED_DATA_1;
1102        let stats = parse_disk_stats(&mut mock_disk);
1103        assert!(stats.is_err());
1104
1105        let mut mock_disk = UNEXPECTED_DATA_2;
1106        let stats = parse_disk_stats(&mut mock_disk);
1107        assert!(stats.is_err());
1108    }
1109
1110    #[test]
1111    fn test_calc_percent() {
1112        assert!(SystemMonitorService::calc_percent(99, 100) < 100.0);
1113        let one_tb_as_kb = (1u64 << 40) >> 10;
1114        assert!(SystemMonitorService::calc_percent(one_tb_as_kb - 1, one_tb_as_kb) < 100.0);
1115    }
1116}