solana_core/
system_monitor_service.rs

1#[cfg(target_arch = "x86")]
2use core::arch::x86::{CpuidResult, __cpuid, __cpuid_count, __get_cpuid_max};
3#[cfg(target_arch = "x86_64")]
4use core::arch::x86_64::{CpuidResult, __cpuid, __cpuid_count, __get_cpuid_max};
5#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
6use num_enum::{IntoPrimitive, TryFromPrimitive};
7#[cfg(target_os = "linux")]
8use std::{fs::File, io::BufReader};
9use {
10    solana_sdk::timing::AtomicInterval,
11    std::{
12        collections::HashMap,
13        io::BufRead,
14        sync::{
15            atomic::{AtomicBool, Ordering},
16            Arc,
17        },
18        thread::{self, sleep, Builder, JoinHandle},
19        time::Duration,
20    },
21    sys_info::{Error, LoadAvg},
22};
23
24const MS_PER_S: u64 = 1_000;
25const MS_PER_M: u64 = MS_PER_S * 60;
26const MS_PER_H: u64 = MS_PER_M * 60;
27const SAMPLE_INTERVAL_UDP_MS: u64 = 2 * MS_PER_S;
28const SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS: u64 = MS_PER_H;
29const SAMPLE_INTERVAL_MEM_MS: u64 = 5 * MS_PER_S;
30const SAMPLE_INTERVAL_CPU_MS: u64 = 10 * MS_PER_S;
31const SAMPLE_INTERVAL_CPU_ID_MS: u64 = MS_PER_H;
32const SAMPLE_INTERVAL_DISK_MS: u64 = 5 * MS_PER_S;
33const SLEEP_INTERVAL: Duration = Duration::from_millis(500);
34
35#[cfg(target_os = "linux")]
36const PROC_NET_SNMP_PATH: &str = "/proc/net/snmp";
37#[cfg(target_os = "linux")]
38const PROC_NET_DEV_PATH: &str = "/proc/net/dev";
39#[cfg(target_os = "linux")]
40const SYS_BLOCK_PATH: &str = "/sys/block";
41
42pub struct SystemMonitorService {
43    thread_hdl: JoinHandle<()>,
44}
45
46#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
47struct UdpStats {
48    in_datagrams: u64,
49    no_ports: u64,
50    in_errors: u64,
51    out_datagrams: u64,
52    rcvbuf_errors: u64,
53    sndbuf_errors: u64,
54    in_csum_errors: u64,
55    ignored_multi: u64,
56}
57
58#[derive(Default)]
59#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
60// These stats are aggregated across all network devices excluding the loopback interface.
61struct NetDevStats {
62    // Number of bytes received
63    rx_bytes: u64,
64    // Number of packets received
65    rx_packets: u64,
66    // Number of receive errors detected by device driver
67    rx_errs: u64,
68    // Number of receive packets dropped by the device driver (not included in error count)
69    rx_drops: u64,
70    // Number of receive FIFO buffer errors
71    rx_fifo: u64,
72    // Number of receive packet framing errors
73    rx_frame: u64,
74    // Number of compressed packets received
75    rx_compressed: u64,
76    // Number of multicast frames received by device driver
77    rx_multicast: u64,
78    // Number of bytes transmitted
79    tx_bytes: u64,
80    // Number of packets transmitted
81    tx_packets: u64,
82    // Number of transmit errors detected by device driver
83    tx_errs: u64,
84    // Number of transmit packets dropped by device driver
85    tx_drops: u64,
86    // Number of transmit FIFO buffer errors
87    tx_fifo: u64,
88    // Number of transmit collisions detected
89    tx_colls: u64,
90    // Number of transmit carrier losses detected by device driver
91    tx_carrier: u64,
92    // Number of compressed packets transmitted
93    tx_compressed: u64,
94}
95
96#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
97struct NetStats {
98    udp_stats: UdpStats,
99    net_dev_stats: NetDevStats,
100}
101
102struct CpuInfo {
103    cpu_num: u32,
104    cpu_freq_mhz: u64,
105    load_avg: LoadAvg,
106    num_threads: u64,
107}
108
109#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
110#[derive(IntoPrimitive)]
111#[repr(i64)]
112enum CpuManufacturer {
113    Other,
114    Intel,
115    Amd,
116}
117
118#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
119#[derive(IntoPrimitive, TryFromPrimitive, PartialEq, PartialOrd)]
120#[repr(u32)]
121// The value passed into cpuid via eax, to control what the result means
122enum CpuidParamValue {
123    Manufacturer = 0,
124    Processor = 1,
125    Cache = 2,
126    SerialNumber = 3,
127    Topology = 4,
128    Unsupported = 5,
129    ThermalAndPower = 6,
130    Extended = 7,
131}
132#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
133const CPUID_PARAM_MAX_SUPPORTED_VALUE: u32 = 7;
134
135#[derive(Default)]
136#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
137// These stats are aggregated across all storage devices excluding internal loopbacks.
138// Fields are cumulative since boot with the exception of 'num_disks' and 'io_in_progress'
139struct DiskStats {
140    reads_completed: u64,
141    reads_merged: u64,
142    sectors_read: u64,
143    time_reading_ms: u64,
144    writes_completed: u64,
145    writes_merged: u64,
146    sectors_written: u64,
147    time_writing_ms: u64,
148    io_in_progress: u64,
149    time_io_ms: u64,
150    // weighted time multiplies time performing IO by number of commands in the queue
151    time_io_weighted_ms: u64,
152    discards_completed: u64,
153    discards_merged: u64,
154    sectors_discarded: u64,
155    time_discarding: u64,
156    flushes_completed: u64,
157    time_flushing: u64,
158    num_disks: u64,
159}
160
161impl UdpStats {
162    fn from_map(udp_stats: &HashMap<String, u64>) -> Self {
163        Self {
164            in_datagrams: *udp_stats.get("InDatagrams").unwrap_or(&0),
165            no_ports: *udp_stats.get("NoPorts").unwrap_or(&0),
166            in_errors: *udp_stats.get("InErrors").unwrap_or(&0),
167            out_datagrams: *udp_stats.get("OutDatagrams").unwrap_or(&0),
168            rcvbuf_errors: *udp_stats.get("RcvbufErrors").unwrap_or(&0),
169            sndbuf_errors: *udp_stats.get("SndbufErrors").unwrap_or(&0),
170            in_csum_errors: *udp_stats.get("InCsumErrors").unwrap_or(&0),
171            ignored_multi: *udp_stats.get("IgnoredMulti").unwrap_or(&0),
172        }
173    }
174}
175
176impl DiskStats {
177    #[cfg_attr(not(target_os = "linux"), allow(dead_code))]
178    fn accumulate(&mut self, other: &DiskStats) {
179        self.reads_completed += other.reads_completed;
180        self.reads_merged += other.reads_merged;
181        self.sectors_read += other.sectors_read;
182        self.time_reading_ms += other.time_reading_ms;
183        self.writes_completed += other.writes_completed;
184        self.writes_merged += other.writes_merged;
185        self.sectors_written += other.sectors_written;
186        self.time_writing_ms += other.time_writing_ms;
187        self.io_in_progress += other.io_in_progress;
188        self.time_io_ms += other.time_io_ms;
189        self.time_io_weighted_ms += other.time_io_weighted_ms;
190        self.discards_completed += other.discards_completed;
191        self.discards_merged += other.discards_merged;
192        self.sectors_discarded += other.sectors_discarded;
193        self.time_discarding += other.time_discarding;
194        self.flushes_completed += other.flushes_completed;
195        self.time_flushing += other.time_flushing;
196    }
197}
198
199fn platform_id() -> String {
200    format!(
201        "{}/{}/{}",
202        std::env::consts::FAMILY,
203        std::env::consts::OS,
204        std::env::consts::ARCH
205    )
206}
207
208#[cfg(target_os = "linux")]
209fn read_net_stats() -> Result<NetStats, String> {
210    let file_path_snmp = PROC_NET_SNMP_PATH;
211    let file_snmp = File::open(file_path_snmp).map_err(|e| e.to_string())?;
212    let mut reader_snmp = BufReader::new(file_snmp);
213
214    let file_path_dev = PROC_NET_DEV_PATH;
215    let file_dev = File::open(file_path_dev).map_err(|e| e.to_string())?;
216    let mut reader_dev = BufReader::new(file_dev);
217
218    let udp_stats = parse_udp_stats(&mut reader_snmp)?;
219    let net_dev_stats = parse_net_dev_stats(&mut reader_dev)?;
220    Ok(NetStats {
221        udp_stats,
222        net_dev_stats,
223    })
224}
225
226#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
227fn parse_udp_stats(reader_snmp: &mut impl BufRead) -> Result<UdpStats, String> {
228    let mut udp_lines = Vec::default();
229    for line in reader_snmp.lines() {
230        let line = line.map_err(|e| e.to_string())?;
231        if line.starts_with("Udp:") {
232            udp_lines.push(line);
233            if udp_lines.len() == 2 {
234                break;
235            }
236        }
237    }
238    if udp_lines.len() != 2 {
239        return Err(format!(
240            "parse error, expected 2 lines, num lines: {}",
241            udp_lines.len()
242        ));
243    }
244
245    let pairs: Vec<_> = udp_lines[0]
246        .split_ascii_whitespace()
247        .zip(udp_lines[1].split_ascii_whitespace())
248        .collect();
249    let udp_stats: HashMap<String, u64> = pairs[1..]
250        .iter()
251        .map(|(label, val)| (label.to_string(), val.parse::<u64>().unwrap()))
252        .collect();
253
254    let stats = UdpStats::from_map(&udp_stats);
255    Ok(stats)
256}
257
258#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
259fn parse_net_dev_stats(reader_dev: &mut impl BufRead) -> Result<NetDevStats, String> {
260    let mut stats = NetDevStats::default();
261    for (line_number, line) in reader_dev.lines().enumerate() {
262        if line_number < 2 {
263            // Skip first two lines with header information.
264            continue;
265        }
266
267        let line = line.map_err(|e| e.to_string())?;
268        let values: Vec<_> = line.split_ascii_whitespace().collect();
269
270        if values.len() != 17 {
271            return Err("parse error, expected exactly 17 stat elements".to_string());
272        }
273        if values[0] == "lo:" {
274            // Filter out the loopback network interface as we are only concerned with
275            // external traffic.
276            continue;
277        }
278
279        stats.rx_bytes += values[1].parse::<u64>().map_err(|e| e.to_string())?;
280        stats.rx_packets += values[2].parse::<u64>().map_err(|e| e.to_string())?;
281        stats.rx_errs += values[3].parse::<u64>().map_err(|e| e.to_string())?;
282        stats.rx_drops += values[4].parse::<u64>().map_err(|e| e.to_string())?;
283        stats.rx_fifo += values[5].parse::<u64>().map_err(|e| e.to_string())?;
284        stats.rx_frame += values[6].parse::<u64>().map_err(|e| e.to_string())?;
285        stats.rx_compressed += values[7].parse::<u64>().map_err(|e| e.to_string())?;
286        stats.rx_multicast += values[8].parse::<u64>().map_err(|e| e.to_string())?;
287        stats.tx_bytes += values[9].parse::<u64>().map_err(|e| e.to_string())?;
288        stats.tx_packets += values[10].parse::<u64>().map_err(|e| e.to_string())?;
289        stats.tx_errs += values[11].parse::<u64>().map_err(|e| e.to_string())?;
290        stats.tx_drops += values[12].parse::<u64>().map_err(|e| e.to_string())?;
291        stats.tx_fifo += values[13].parse::<u64>().map_err(|e| e.to_string())?;
292        stats.tx_colls += values[14].parse::<u64>().map_err(|e| e.to_string())?;
293        stats.tx_carrier += values[15].parse::<u64>().map_err(|e| e.to_string())?;
294        stats.tx_compressed += values[16].parse::<u64>().map_err(|e| e.to_string())?;
295    }
296
297    Ok(stats)
298}
299
300#[cfg(target_os = "linux")]
301pub fn verify_net_stats_access() -> Result<(), String> {
302    read_net_stats()?;
303    Ok(())
304}
305
306#[cfg(not(target_os = "linux"))]
307pub fn verify_net_stats_access() -> Result<(), String> {
308    Ok(())
309}
310
311#[cfg(target_os = "linux")]
312fn read_disk_stats() -> Result<DiskStats, String> {
313    let mut stats = DiskStats::default();
314    let mut num_disks = 0;
315    let blk_device_dir_iter = std::fs::read_dir(SYS_BLOCK_PATH).map_err(|e| e.to_string())?;
316    blk_device_dir_iter
317        .filter_map(|blk_device_dir| {
318            match blk_device_dir {
319                Ok(blk_device_dir) => {
320                    let blk_device_dir_name = &blk_device_dir.file_name();
321                    let blk_device_dir_name = blk_device_dir_name.to_string_lossy();
322                    if blk_device_dir_name.starts_with("loop")
323                        || blk_device_dir_name.starts_with("dm")
324                        || blk_device_dir_name.starts_with("md")
325                    {
326                        // Filter out loopback devices, dmcrypt volumes, and mdraid volumes
327                        return None;
328                    }
329                    let mut path = blk_device_dir.path();
330                    path.push("stat");
331                    match File::open(path) {
332                        Ok(file_diskstats) => Some(file_diskstats),
333                        Err(_) => None,
334                    }
335                }
336                Err(_) => None,
337            }
338        })
339        .for_each(|file_diskstats| {
340            let mut reader_diskstats = BufReader::new(file_diskstats);
341            stats.accumulate(&parse_disk_stats(&mut reader_diskstats).unwrap_or_default());
342            num_disks += 1;
343        });
344    stats.num_disks = num_disks;
345    Ok(stats)
346}
347
348#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
349fn parse_disk_stats(reader_diskstats: &mut impl BufRead) -> Result<DiskStats, String> {
350    let mut stats = DiskStats::default();
351    let mut line = String::new();
352    reader_diskstats
353        .read_line(&mut line)
354        .map_err(|e| e.to_string())?;
355    let values: Vec<_> = line.split_ascii_whitespace().collect();
356    let num_elements = values.len();
357
358    if num_elements != 11 && num_elements != 15 && num_elements != 17 {
359        return Err("parse error, unknown number of disk stat elements".to_string());
360    }
361
362    stats.reads_completed = values[0].parse::<u64>().map_err(|e| e.to_string())?;
363    stats.reads_merged = values[1].parse::<u64>().map_err(|e| e.to_string())?;
364    stats.sectors_read = values[2].parse::<u64>().map_err(|e| e.to_string())?;
365    stats.time_reading_ms = values[3].parse::<u64>().map_err(|e| e.to_string())?;
366    stats.writes_completed = values[4].parse::<u64>().map_err(|e| e.to_string())?;
367    stats.writes_merged = values[5].parse::<u64>().map_err(|e| e.to_string())?;
368    stats.sectors_written = values[6].parse::<u64>().map_err(|e| e.to_string())?;
369    stats.time_writing_ms = values[7].parse::<u64>().map_err(|e| e.to_string())?;
370    stats.io_in_progress = values[8].parse::<u64>().map_err(|e| e.to_string())?;
371    stats.time_io_ms = values[9].parse::<u64>().map_err(|e| e.to_string())?;
372    stats.time_io_weighted_ms = values[10].parse::<u64>().map_err(|e| e.to_string())?;
373    if num_elements > 11 {
374        // Kernel 4.18+ appends four more fields for discard
375        stats.discards_completed = values[11].parse::<u64>().map_err(|e| e.to_string())?;
376        stats.discards_merged = values[12].parse::<u64>().map_err(|e| e.to_string())?;
377        stats.sectors_discarded = values[13].parse::<u64>().map_err(|e| e.to_string())?;
378        stats.time_discarding = values[14].parse::<u64>().map_err(|e| e.to_string())?;
379    }
380    if num_elements > 15 {
381        // Kernel 5.5+ appends two more fields for flush requests
382        stats.flushes_completed = values[15].parse::<u64>().map_err(|e| e.to_string())?;
383        stats.time_flushing = values[16].parse::<u64>().map_err(|e| e.to_string())?;
384    }
385
386    Ok(stats)
387}
388
389pub struct SystemMonitorStatsReportConfig {
390    pub report_os_memory_stats: bool,
391    pub report_os_network_stats: bool,
392    pub report_os_cpu_stats: bool,
393    pub report_os_disk_stats: bool,
394}
395
396#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
397enum InterestingLimit {
398    Recommend(i64),
399    QueryOnly,
400}
401
402#[cfg(target_os = "linux")]
403const INTERESTING_LIMITS: &[(&str, InterestingLimit)] = &[
404    ("net.core.rmem_max", InterestingLimit::Recommend(134217728)),
405    ("net.core.wmem_max", InterestingLimit::Recommend(134217728)),
406    ("vm.max_map_count", InterestingLimit::Recommend(1000000)),
407    ("net.core.optmem_max", InterestingLimit::QueryOnly),
408    ("net.core.netdev_max_backlog", InterestingLimit::QueryOnly),
409];
410
411impl SystemMonitorService {
412    pub fn new(exit: Arc<AtomicBool>, config: SystemMonitorStatsReportConfig) -> Self {
413        info!("Starting SystemMonitorService");
414        let thread_hdl = Builder::new()
415            .name("solSystemMonitr".to_string())
416            .spawn(move || {
417                Self::run(exit, config);
418            })
419            .unwrap();
420
421        Self { thread_hdl }
422    }
423
424    #[cfg(target_os = "linux")]
425    fn linux_get_current_network_limits() -> Vec<(&'static str, &'static InterestingLimit, i64)> {
426        use sysctl::Sysctl;
427
428        fn sysctl_read(name: &str) -> Result<String, sysctl::SysctlError> {
429            let ctl = sysctl::Ctl::new(name)?;
430            let val = ctl.value_string()?;
431            Ok(val)
432        }
433
434        fn normalize_err<E: std::fmt::Display>(key: &str, error: E) -> String {
435            format!("Failed to query value for {}: {}", key, error)
436        }
437        INTERESTING_LIMITS
438            .iter()
439            .map(|(key, interesting_limit)| {
440                let current_value = sysctl_read(key)
441                    .map_err(|e| normalize_err(key, e))
442                    .and_then(|val| val.parse::<i64>().map_err(|e| normalize_err(key, e)))
443                    .unwrap_or_else(|e| {
444                        error!("{}", e);
445                        -1
446                    });
447                (*key, interesting_limit, current_value)
448            })
449            .collect::<Vec<_>>()
450    }
451
452    #[cfg_attr(not(target_os = "linux"), allow(dead_code))]
453    fn linux_report_network_limits(
454        current_limits: &[(&'static str, &'static InterestingLimit, i64)],
455    ) -> bool {
456        current_limits
457            .iter()
458            .all(|(key, interesting_limit, current_value)| {
459                datapoint_warn!("os-config", (key, *current_value, i64));
460                match interesting_limit {
461                    InterestingLimit::Recommend(recommended_value)
462                        if current_value < recommended_value =>
463                    {
464                        warn!(
465                            "  {key}: recommended={recommended_value}, current={current_value} \
466                             too small"
467                        );
468                        false
469                    }
470                    InterestingLimit::Recommend(recommended_value) => {
471                        info!("  {key}: recommended={recommended_value} current={current_value}");
472                        true
473                    }
474                    InterestingLimit::QueryOnly => {
475                        info!("  {key}: report-only --  current={current_value}");
476                        true
477                    }
478                }
479            })
480    }
481
482    #[cfg(not(target_os = "linux"))]
483    pub fn check_os_network_limits() -> bool {
484        datapoint_info!("os-config", ("platform", platform_id(), String));
485        true
486    }
487
488    #[cfg(target_os = "linux")]
489    pub fn check_os_network_limits() -> bool {
490        datapoint_info!("os-config", ("platform", platform_id(), String));
491        let current_limits = Self::linux_get_current_network_limits();
492        Self::linux_report_network_limits(&current_limits)
493    }
494
495    #[cfg(target_os = "linux")]
496    fn process_net_stats(net_stats: &mut Option<NetStats>) {
497        match read_net_stats() {
498            Ok(new_stats) => {
499                if let Some(old_stats) = net_stats {
500                    Self::report_net_stats(old_stats, &new_stats);
501                }
502                *net_stats = Some(new_stats);
503            }
504            Err(e) => warn!("read_net_stats: {}", e),
505        }
506    }
507
508    #[cfg(not(target_os = "linux"))]
509    fn process_net_stats(_net_stats: &mut Option<NetStats>) {}
510
511    #[cfg(target_os = "linux")]
512    fn report_net_stats(old_stats: &NetStats, new_stats: &NetStats) {
513        datapoint_info!(
514            "net-stats-validator",
515            (
516                "in_datagrams_delta",
517                new_stats.udp_stats.in_datagrams - old_stats.udp_stats.in_datagrams,
518                i64
519            ),
520            (
521                "no_ports_delta",
522                new_stats.udp_stats.no_ports - old_stats.udp_stats.no_ports,
523                i64
524            ),
525            (
526                "in_errors_delta",
527                new_stats.udp_stats.in_errors - old_stats.udp_stats.in_errors,
528                i64
529            ),
530            (
531                "out_datagrams_delta",
532                new_stats.udp_stats.out_datagrams - old_stats.udp_stats.out_datagrams,
533                i64
534            ),
535            (
536                "rcvbuf_errors_delta",
537                new_stats.udp_stats.rcvbuf_errors - old_stats.udp_stats.rcvbuf_errors,
538                i64
539            ),
540            (
541                "sndbuf_errors_delta",
542                new_stats.udp_stats.sndbuf_errors - old_stats.udp_stats.sndbuf_errors,
543                i64
544            ),
545            (
546                "in_csum_errors_delta",
547                new_stats.udp_stats.in_csum_errors - old_stats.udp_stats.in_csum_errors,
548                i64
549            ),
550            (
551                "ignored_multi_delta",
552                new_stats.udp_stats.ignored_multi - old_stats.udp_stats.ignored_multi,
553                i64
554            ),
555            ("in_errors", new_stats.udp_stats.in_errors, i64),
556            ("rcvbuf_errors", new_stats.udp_stats.rcvbuf_errors, i64),
557            ("sndbuf_errors", new_stats.udp_stats.sndbuf_errors, i64),
558            (
559                "rx_bytes_delta",
560                new_stats
561                    .net_dev_stats
562                    .rx_bytes
563                    .saturating_sub(old_stats.net_dev_stats.rx_bytes),
564                i64
565            ),
566            (
567                "rx_packets_delta",
568                new_stats
569                    .net_dev_stats
570                    .rx_packets
571                    .saturating_sub(old_stats.net_dev_stats.rx_packets),
572                i64
573            ),
574            (
575                "rx_errs_delta",
576                new_stats
577                    .net_dev_stats
578                    .rx_errs
579                    .saturating_sub(old_stats.net_dev_stats.rx_errs),
580                i64
581            ),
582            (
583                "rx_drops_delta",
584                new_stats
585                    .net_dev_stats
586                    .rx_drops
587                    .saturating_sub(old_stats.net_dev_stats.rx_drops),
588                i64
589            ),
590            (
591                "rx_fifo_delta",
592                new_stats
593                    .net_dev_stats
594                    .rx_fifo
595                    .saturating_sub(old_stats.net_dev_stats.rx_fifo),
596                i64
597            ),
598            (
599                "rx_frame_delta",
600                new_stats
601                    .net_dev_stats
602                    .rx_frame
603                    .saturating_sub(old_stats.net_dev_stats.rx_frame),
604                i64
605            ),
606            (
607                "tx_bytes_delta",
608                new_stats
609                    .net_dev_stats
610                    .tx_bytes
611                    .saturating_sub(old_stats.net_dev_stats.tx_bytes),
612                i64
613            ),
614            (
615                "tx_packets_delta",
616                new_stats
617                    .net_dev_stats
618                    .tx_packets
619                    .saturating_sub(old_stats.net_dev_stats.tx_packets),
620                i64
621            ),
622            (
623                "tx_errs_delta",
624                new_stats
625                    .net_dev_stats
626                    .tx_errs
627                    .saturating_sub(old_stats.net_dev_stats.tx_errs),
628                i64
629            ),
630            (
631                "tx_drops_delta",
632                new_stats
633                    .net_dev_stats
634                    .tx_drops
635                    .saturating_sub(old_stats.net_dev_stats.tx_drops),
636                i64
637            ),
638            (
639                "tx_fifo_delta",
640                new_stats
641                    .net_dev_stats
642                    .tx_fifo
643                    .saturating_sub(old_stats.net_dev_stats.tx_fifo),
644                i64
645            ),
646            (
647                "tx_colls_delta",
648                new_stats
649                    .net_dev_stats
650                    .tx_colls
651                    .saturating_sub(old_stats.net_dev_stats.tx_colls),
652                i64
653            ),
654        );
655    }
656
657    fn calc_percent(numerator: u64, denom: u64) -> f64 {
658        if denom == 0 {
659            0.0
660        } else {
661            (numerator as f64 / denom as f64) * 100.0
662        }
663    }
664
665    fn report_mem_stats() {
666        // get mem info (in kb)
667        if let Ok(info) = sys_info::mem_info() {
668            const KB: u64 = 1_024;
669            datapoint_info!(
670                "memory-stats",
671                ("total", info.total * KB, i64),
672                ("swap_total", info.swap_total * KB, i64),
673                ("buffers_bytes", info.buffers * KB, i64),
674                ("cached_bytes", info.cached * KB, i64),
675                (
676                    "free_percent",
677                    Self::calc_percent(info.free, info.total),
678                    f64
679                ),
680                (
681                    "used_bytes",
682                    info.total.saturating_sub(info.avail) * KB,
683                    i64
684                ),
685                (
686                    "avail_percent",
687                    Self::calc_percent(info.avail, info.total),
688                    f64
689                ),
690                (
691                    "buffers_percent",
692                    Self::calc_percent(info.buffers, info.total),
693                    f64
694                ),
695                (
696                    "cached_percent",
697                    Self::calc_percent(info.cached, info.total),
698                    f64
699                ),
700                (
701                    "swap_free_percent",
702                    Self::calc_percent(info.swap_free, info.swap_total),
703                    f64
704                ),
705            )
706        }
707    }
708
709    fn cpu_info() -> Result<CpuInfo, Error> {
710        let cpu_num = sys_info::cpu_num()?;
711        let cpu_freq_mhz = sys_info::cpu_speed()?;
712        let load_avg = sys_info::loadavg()?;
713        let num_threads = sys_info::proc_total()?;
714
715        Ok(CpuInfo {
716            cpu_num,
717            cpu_freq_mhz,
718            load_avg,
719            num_threads,
720        })
721    }
722
723    #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
724    fn report_cpuid_values() {
725        const CPUID_MANUFACTURER_EBX_INTEL: u32 = 0x756e6547;
726        const CPUID_MANUFACTURER_EDX_INTEL: u32 = 0x49656e69;
727        const CPUID_MANUFACTURER_ECX_INTEL: u32 = 0x6c65746e;
728        const CPUID_MANUFACTURER_EBX_AMD: u32 = 0x68747541;
729        const CPUID_MANUFACTURER_EDX_AMD: u32 = 0x69746e65;
730        const CPUID_MANUFACTURER_ECX_AMD: u32 = 0x444d4163;
731
732        unsafe {
733            let cpuid_mfr = __cpuid(0);
734            let cpuid_empty = CpuidResult {
735                eax: 0,
736                ebx: 0,
737                ecx: 0,
738                edx: 0,
739            };
740
741            let max_leaf = match CpuidParamValue::try_from(std::cmp::min(
742                cpuid_mfr.eax,
743                CPUID_PARAM_MAX_SUPPORTED_VALUE,
744            )) {
745                Ok(val) => val,
746                Err(_err) => CpuidParamValue::Manufacturer,
747            };
748
749            let mfr_id = if cpuid_mfr.ebx == CPUID_MANUFACTURER_EBX_INTEL
750                && cpuid_mfr.edx == CPUID_MANUFACTURER_EDX_INTEL
751                && cpuid_mfr.ecx == CPUID_MANUFACTURER_ECX_INTEL
752            {
753                CpuManufacturer::Intel // GenuineIntel
754            } else if cpuid_mfr.ebx == CPUID_MANUFACTURER_EBX_AMD
755                && cpuid_mfr.edx == CPUID_MANUFACTURER_EDX_AMD
756                && cpuid_mfr.ecx == CPUID_MANUFACTURER_ECX_AMD
757            {
758                CpuManufacturer::Amd // AuthenticAMD
759            } else {
760                CpuManufacturer::Other // anything else
761            };
762
763            let cpuid_processor = if CpuidParamValue::Processor <= max_leaf {
764                __cpuid(CpuidParamValue::Processor.into())
765            } else {
766                cpuid_empty
767            };
768            let cpuid_cache = if CpuidParamValue::Cache <= max_leaf {
769                __cpuid(CpuidParamValue::Cache.into())
770            } else {
771                cpuid_empty
772            };
773            let cpuid_topology = if CpuidParamValue::Topology <= max_leaf {
774                __cpuid(CpuidParamValue::Topology.into())
775            } else {
776                cpuid_empty
777            };
778            let cpuid_extended_0 = if CpuidParamValue::Extended <= max_leaf {
779                __cpuid_count(CpuidParamValue::Extended.into(), 0)
780            } else {
781                cpuid_empty
782            };
783            let cpuid_extended_1 = if CpuidParamValue::Extended <= max_leaf {
784                if 1 <= __get_cpuid_max(CpuidParamValue::Extended.into()).1 {
785                    __cpuid_count(CpuidParamValue::Extended.into(), 1)
786                } else {
787                    cpuid_empty
788                }
789            } else {
790                cpuid_empty
791            };
792
793            datapoint_info!(
794                "cpuid-values",
795                ("manufacturer_id", i64::from(mfr_id), i64),
796                ("cpuid_processor_eax", i64::from(cpuid_processor.eax), i64),
797                ("cpuid_processor_ebx", i64::from(cpuid_processor.ebx), i64),
798                ("cpuid_processor_ecx", i64::from(cpuid_processor.ecx), i64),
799                ("cpuid_processor_edx", i64::from(cpuid_processor.edx), i64),
800                ("cpuid_cache_eax", i64::from(cpuid_cache.eax), i64),
801                ("cpuid_cache_ebx", i64::from(cpuid_cache.ebx), i64),
802                ("cpuid_cache_ecx", i64::from(cpuid_cache.ecx), i64),
803                ("cpuid_cache_edx", i64::from(cpuid_cache.edx), i64),
804                ("cpuid_topology_eax", i64::from(cpuid_topology.eax), i64),
805                ("cpuid_topology_ebx", i64::from(cpuid_topology.ebx), i64),
806                ("cpuid_topology_ecx", i64::from(cpuid_topology.ecx), i64),
807                ("cpuid_topology_edx", i64::from(cpuid_topology.edx), i64),
808                ("cpuid_extended_0_ebx", i64::from(cpuid_extended_0.ebx), i64),
809                ("cpuid_extended_0_ecx", i64::from(cpuid_extended_0.ecx), i64),
810                ("cpuid_extended_0_edx", i64::from(cpuid_extended_0.edx), i64),
811                ("cpuid_extended_1_eax", i64::from(cpuid_extended_1.eax), i64),
812            );
813        };
814    }
815
816    fn report_cpu_stats() {
817        if let Ok(info) = Self::cpu_info() {
818            datapoint_info!(
819                "cpu-stats",
820                ("cpu_num", info.cpu_num as i64, i64),
821                ("cpu0_freq_mhz", info.cpu_freq_mhz as i64, i64),
822                ("average_load_one_minute", info.load_avg.one, f64),
823                ("average_load_five_minutes", info.load_avg.five, f64),
824                ("average_load_fifteen_minutes", info.load_avg.fifteen, f64),
825                ("total_num_threads", info.num_threads as i64, i64),
826            )
827        }
828    }
829
830    #[cfg(target_os = "linux")]
831    fn process_disk_stats(disk_stats: &mut Option<DiskStats>) {
832        match read_disk_stats() {
833            Ok(new_stats) => {
834                if let Some(old_stats) = disk_stats {
835                    Self::report_disk_stats(old_stats, &new_stats);
836                }
837                *disk_stats = Some(new_stats);
838            }
839            Err(e) => warn!("read_disk_stats: {}", e),
840        }
841    }
842
843    #[cfg(not(target_os = "linux"))]
844    fn process_disk_stats(_disk_stats: &mut Option<DiskStats>) {}
845
846    #[cfg(target_os = "linux")]
847    fn report_disk_stats(old_stats: &DiskStats, new_stats: &DiskStats) {
848        datapoint_info!(
849            "disk-stats",
850            (
851                "reads_completed",
852                new_stats
853                    .reads_completed
854                    .saturating_sub(old_stats.reads_completed),
855                i64
856            ),
857            (
858                "reads_merged",
859                new_stats
860                    .reads_merged
861                    .saturating_sub(old_stats.reads_merged),
862                i64
863            ),
864            (
865                "sectors_read",
866                new_stats
867                    .sectors_read
868                    .saturating_sub(old_stats.sectors_read),
869                i64
870            ),
871            (
872                "time_reading_ms",
873                new_stats
874                    .time_reading_ms
875                    .saturating_sub(old_stats.time_reading_ms),
876                i64
877            ),
878            (
879                "writes_completed",
880                new_stats
881                    .writes_completed
882                    .saturating_sub(old_stats.writes_completed),
883                i64
884            ),
885            (
886                "writes_merged",
887                new_stats
888                    .writes_merged
889                    .saturating_sub(old_stats.writes_merged),
890                i64
891            ),
892            (
893                "sectors_written",
894                new_stats
895                    .sectors_written
896                    .saturating_sub(old_stats.sectors_written),
897                i64
898            ),
899            (
900                "time_writing_ms",
901                new_stats
902                    .time_writing_ms
903                    .saturating_sub(old_stats.time_writing_ms),
904                i64
905            ),
906            ("io_in_progress", new_stats.io_in_progress, i64),
907            (
908                "time_io_ms",
909                new_stats.time_io_ms.saturating_sub(old_stats.time_io_ms),
910                i64
911            ),
912            (
913                "time_io_weighted_ms",
914                new_stats
915                    .time_io_weighted_ms
916                    .saturating_sub(old_stats.time_io_weighted_ms),
917                i64
918            ),
919            (
920                "discards_completed",
921                new_stats
922                    .discards_completed
923                    .saturating_sub(old_stats.discards_completed),
924                i64
925            ),
926            (
927                "discards_merged",
928                new_stats
929                    .discards_merged
930                    .saturating_sub(old_stats.discards_merged),
931                i64
932            ),
933            (
934                "sectors_discarded",
935                new_stats
936                    .sectors_discarded
937                    .saturating_sub(old_stats.sectors_discarded),
938                i64
939            ),
940            (
941                "time_discarding",
942                new_stats
943                    .time_discarding
944                    .saturating_sub(old_stats.time_discarding),
945                i64
946            ),
947            (
948                "flushes_completed",
949                new_stats
950                    .flushes_completed
951                    .saturating_sub(old_stats.flushes_completed),
952                i64
953            ),
954            (
955                "time_flushing",
956                new_stats
957                    .time_flushing
958                    .saturating_sub(old_stats.time_flushing),
959                i64
960            ),
961            ("num_disks", new_stats.num_disks, i64),
962        )
963    }
964
965    pub fn run(exit: Arc<AtomicBool>, config: SystemMonitorStatsReportConfig) {
966        let mut udp_stats = None;
967        let mut disk_stats = None;
968        let network_limits_timer = AtomicInterval::default();
969        let udp_timer = AtomicInterval::default();
970        let mem_timer = AtomicInterval::default();
971        let cpu_timer = AtomicInterval::default();
972        let cpuid_timer = AtomicInterval::default();
973        let disk_timer = AtomicInterval::default();
974
975        loop {
976            if exit.load(Ordering::Relaxed) {
977                break;
978            }
979            if config.report_os_network_stats {
980                if network_limits_timer.should_update(SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS) {
981                    Self::check_os_network_limits();
982                }
983                if udp_timer.should_update(SAMPLE_INTERVAL_UDP_MS) {
984                    Self::process_net_stats(&mut udp_stats);
985                }
986            }
987            if config.report_os_memory_stats && mem_timer.should_update(SAMPLE_INTERVAL_MEM_MS) {
988                Self::report_mem_stats();
989            }
990            if config.report_os_cpu_stats {
991                if cpu_timer.should_update(SAMPLE_INTERVAL_CPU_MS) {
992                    Self::report_cpu_stats();
993                }
994                if cpuid_timer.should_update(SAMPLE_INTERVAL_CPU_ID_MS) {
995                    #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
996                    Self::report_cpuid_values();
997                }
998            }
999            if config.report_os_disk_stats && disk_timer.should_update(SAMPLE_INTERVAL_DISK_MS) {
1000                Self::process_disk_stats(&mut disk_stats);
1001            }
1002            sleep(SLEEP_INTERVAL);
1003        }
1004    }
1005
1006    pub fn join(self) -> thread::Result<()> {
1007        self.thread_hdl.join()
1008    }
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013    use super::*;
1014
1015    #[test]
1016    fn test_parse_udp_stats() {
1017        const MOCK_SNMP: &[u8] =
1018b"Ip: Forwarding DefaultTTL InReceives InHdrErrors InAddrErrors ForwDatagrams InUnknownProtos InDiscards InDelivers OutRequests OutDiscards OutNoRoutes ReasmTimeout ReasmReqds ReasmOKs ReasmFails FragOKs FragFails FragCreates
1019Ip: 1 64 357 0 2 0 0 0 355 315 0 6 0 0 0 0 0 0 0
1020Icmp: InMsgs InErrors InCsumErrors InDestUnreachs InTimeExcds InParmProbs InSrcQuenchs InRedirects InEchos InEchoReps InTimestamps InTimestampReps InAddrMasks InAddrMaskReps OutMsgs OutErrors OutDestUnreachs OutTimeExcds OutParmProbs OutSrcQuenchs OutRedirects OutEchos OutEchoReps OutTimestamps OutTimestampReps OutAddrMasks OutAddrMaskReps
1021Icmp: 3 0 0 3 0 0 0 0 0 0 0 0 0 0 7 0 7 0 0 0 0 0 0 0 0 0 0
1022IcmpMsg: InType3 OutType3
1023IcmpMsg: 3 7
1024Tcp: RtoAlgorithm RtoMin RtoMax MaxConn ActiveOpens PassiveOpens AttemptFails EstabResets CurrEstab InSegs OutSegs RetransSegs InErrs OutRsts InCsumErrors
1025Tcp: 1 200 120000 -1 29 1 0 0 5 318 279 0 0 4 0
1026Udp: InDatagrams NoPorts InErrors OutDatagrams RcvbufErrors SndbufErrors InCsumErrors IgnoredMulti
1027Udp: 27 7 0 30 0 0 0 0
1028UdpLite: InDatagrams NoPorts InErrors OutDatagrams RcvbufErrors SndbufErrors InCsumErrors IgnoredMulti
1029UdpLite: 0 0 0 0 0 0 0 0" as &[u8];
1030        const UNEXPECTED_DATA: &[u8] = b"unexpected data" as &[u8];
1031
1032        let mut mock_snmp = MOCK_SNMP;
1033        let stats = parse_udp_stats(&mut mock_snmp).unwrap();
1034        assert_eq!(stats.out_datagrams, 30);
1035        assert_eq!(stats.no_ports, 7);
1036
1037        mock_snmp = UNEXPECTED_DATA;
1038        let stats = parse_udp_stats(&mut mock_snmp);
1039        assert!(stats.is_err());
1040    }
1041
1042    #[test]
1043    fn test_parse_net_dev_stats() {
1044        const MOCK_DEV: &[u8] =
1045b"Inter-|   Receive                                                |  Transmit
1046face |bytes    packets errs drop fifo frame compressed multicast|bytes    packets errs drop fifo colls carrier compressed
1047lo: 50     1    0    0    0     0          0         0 100 2    1    0    0     0       0          0
1048eno1: 100     1    0    0    0     0          0         0 200 3    2    0    0     0       0          0
1049ens4: 400     4    0    1    0     0          0         0 250 5    0    0    0     0       0          0" as &[u8];
1050        const UNEXPECTED_DATA: &[u8] = b"un
1051expected
1052data" as &[u8];
1053
1054        let mut mock_dev = MOCK_DEV;
1055        let stats = parse_net_dev_stats(&mut mock_dev).unwrap();
1056        assert_eq!(stats.rx_bytes, 500);
1057        assert_eq!(stats.rx_packets, 5);
1058        assert_eq!(stats.rx_errs, 0);
1059        assert_eq!(stats.rx_drops, 1);
1060        assert_eq!(stats.tx_bytes, 450);
1061        assert_eq!(stats.tx_packets, 8);
1062        assert_eq!(stats.tx_errs, 2);
1063        assert_eq!(stats.tx_drops, 0);
1064
1065        let mut mock_dev = UNEXPECTED_DATA;
1066        let stats = parse_net_dev_stats(&mut mock_dev);
1067        assert!(stats.is_err());
1068    }
1069
1070    #[test]
1071    fn test_parse_disk_stats() {
1072        const MOCK_DISK_11: &[u8] =
1073b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738" as &[u8];
1074        // Matches kernel 4.18+ format
1075        const MOCK_DISK_15: &[u8] =
1076b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738        0        0        0        0" as &[u8];
1077        // Matches kernel 5.5+ format
1078        const MOCK_DISK_17: &[u8] =
1079b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738        0        0        0        0    70715     2922" as &[u8];
1080        const UNEXPECTED_DATA_1: &[u8] =
1081b" 2095701   479815 122620302  1904439 43496218 26953623 3935324729 283313376        0  6101780 285220738        0        0        0        0    70715" as &[u8];
1082
1083        const UNEXPECTED_DATA_2: &[u8] = b"un
1084ex
1085pec
1086ted
1087data" as &[u8];
1088
1089        let mut mock_disk = MOCK_DISK_11;
1090        let stats = parse_disk_stats(&mut mock_disk).unwrap();
1091        assert_eq!(stats.reads_completed, 2095701);
1092        assert_eq!(stats.time_io_weighted_ms, 285220738);
1093
1094        let mut mock_disk = MOCK_DISK_15;
1095        let stats = parse_disk_stats(&mut mock_disk).unwrap();
1096        assert_eq!(stats.reads_completed, 2095701);
1097        assert_eq!(stats.time_discarding, 0);
1098
1099        let mut mock_disk = MOCK_DISK_17;
1100        let stats = parse_disk_stats(&mut mock_disk).unwrap();
1101        assert_eq!(stats.reads_completed, 2095701);
1102        assert_eq!(stats.time_flushing, 2922);
1103
1104        let mut mock_disk = UNEXPECTED_DATA_1;
1105        let stats = parse_disk_stats(&mut mock_disk);
1106        assert!(stats.is_err());
1107
1108        let mut mock_disk = UNEXPECTED_DATA_2;
1109        let stats = parse_disk_stats(&mut mock_disk);
1110        assert!(stats.is_err());
1111    }
1112
1113    #[test]
1114    fn test_calc_percent() {
1115        assert!(SystemMonitorService::calc_percent(99, 100) < 100.0);
1116        let one_tb_as_kb = (1u64 << 40) >> 10;
1117        assert!(SystemMonitorService::calc_percent(one_tb_as_kb - 1, one_tb_as_kb) < 100.0);
1118    }
1119}