below_model/
collector.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20use regex::Regex;
21use slog::error;
22
23use super::*;
24use crate::collector_plugin;
25
26pub struct CollectorOptions {
27    pub cgroup_root: PathBuf,
28    pub exit_data: Arc<Mutex<procfs::PidMap>>,
29    pub collect_io_stat: bool,
30    pub disable_disk_stat: bool,
31    pub enable_btrfs_stats: bool,
32    pub enable_ethtool_stats: bool,
33    pub enable_ksm_stats: bool,
34    pub enable_resctrl_stats: bool,
35    pub enable_tc_stats: bool,
36    pub btrfs_samples: u64,
37    pub btrfs_min_pct: f64,
38    pub cgroup_re: Option<Regex>,
39    pub gpu_stats_receiver:
40        Option<collector_plugin::Consumer<crate::gpu_stats_collector_plugin::SampleType>>,
41    pub tc_stats_receiver:
42        Option<collector_plugin::Consumer<crate::tc_collector_plugin::SampleType>>,
43}
44
45impl Default for CollectorOptions {
46    fn default() -> Self {
47        Self {
48            cgroup_root: Path::new(cgroupfs::DEFAULT_CG_ROOT).to_path_buf(),
49            exit_data: Default::default(),
50            collect_io_stat: true,
51            disable_disk_stat: false,
52            enable_btrfs_stats: false,
53            enable_ethtool_stats: false,
54            enable_ksm_stats: false,
55            enable_resctrl_stats: false,
56            enable_tc_stats: false,
57            btrfs_samples: btrfs::DEFAULT_SAMPLES,
58            btrfs_min_pct: btrfs::DEFAULT_MIN_PCT,
59            cgroup_re: None,
60            gpu_stats_receiver: None,
61            tc_stats_receiver: None,
62        }
63    }
64}
65
66/// Collects data samples and maintains the latest data
67pub struct Collector {
68    logger: slog::Logger,
69    proc_reader: procfs::ProcReader,
70    prev_sample: Option<(Sample, Instant)>,
71    collector_options: CollectorOptions,
72}
73
74impl Collector {
75    pub fn new(logger: slog::Logger, collector_options: CollectorOptions) -> Self {
76        Self {
77            logger,
78            proc_reader: procfs::ProcReader::new(),
79            prev_sample: None,
80            collector_options,
81        }
82    }
83
84    pub fn collect_sample(&mut self) -> Result<Sample> {
85        collect_sample(&self.logger, &mut self.proc_reader, &self.collector_options)
86    }
87
88    /// Collect a new `Sample`, returning an updated Model
89    pub fn collect_and_update_model(&mut self) -> Result<Model> {
90        let now = Instant::now();
91        let sample = self.collect_sample()?;
92        let model = Model::new(
93            SystemTime::now(),
94            &sample,
95            self.prev_sample
96                .as_ref()
97                .map(|(s, i)| (s, now.duration_since(*i))),
98        );
99        self.prev_sample = Some((sample, now));
100        Ok(model)
101    }
102}
103
104pub fn opt_add<T: std::ops::Add<T, Output = T>>(a: Option<T>, b: Option<T>) -> Option<T> {
105    match (a, b) {
106        (Some(a), Some(b)) => Some(a + b),
107        (Some(a), None) => Some(a),
108        (None, Some(b)) => Some(b),
109        _ => None,
110    }
111}
112
113pub fn opt_multiply<S: Sized + std::ops::Mul<T, Output = S>, T: Sized>(
114    a: Option<S>,
115    b: Option<T>,
116) -> Option<S> {
117    a.and_then(|x| b.map(|y| x * y))
118}
119
120pub fn get_hostname() -> Result<String> {
121    if let Ok(h) = hostname::get() {
122        if let Ok(s) = h.into_string() {
123            return Ok(s);
124        }
125    }
126    Err(anyhow!("Could not get hostname"))
127}
128
129#[cfg(fbcode_build)]
130pub fn get_os_release() -> Result<String> {
131    std::fs::read_to_string("/etc/centos-release")
132        .context("Fail to get centos release")
133        .map(|o| o.trim_matches('\n').trim().into())
134}
135
136use os_info as _; // So RUSTFIXDEPS doesn't complain.
137#[cfg(not(fbcode_build))]
138pub fn get_os_release() -> Result<String> {
139    let info = os_info::get();
140    Ok(format!(
141        "{} {} {}",
142        info.os_type(),
143        info.version(),
144        info.bitness()
145    ))
146}
147
148fn merge_procfs_and_exit_data(
149    mut procfs_data: procfs::PidMap,
150    exit_data: procfs::PidMap,
151) -> procfs::PidMap {
152    exit_data
153        .iter()
154        // If `procfs_data` already has the pid, then we use the procfs data because the time delta
155        // between the two collection points is negligible and procfs collected data is more
156        // complete.
157        .for_each(|entry| {
158            if !procfs_data.contains_key(entry.0) {
159                procfs_data.insert(*entry.0, entry.1.clone());
160            }
161        });
162
163    procfs_data
164}
165
166/// This function will test if all field of DiskStat are zero, if so we will need to skip
167/// this sample inside collector.
168fn is_all_zero_disk_stats(disk_stats: &procfs::DiskStat) -> bool {
169    disk_stats.read_completed == Some(0)
170        && disk_stats.write_completed == Some(0)
171        && disk_stats.discard_completed == Some(0)
172        && disk_stats.read_merged == Some(0)
173        && disk_stats.read_sectors == Some(0)
174        && disk_stats.time_spend_read_ms == Some(0)
175        && disk_stats.write_merged == Some(0)
176        && disk_stats.write_sectors == Some(0)
177        && disk_stats.time_spend_write_ms == Some(0)
178        && disk_stats.discard_merged == Some(0)
179        && disk_stats.discard_sectors == Some(0)
180        && disk_stats.time_spend_discard_ms == Some(0)
181}
182
183fn collect_sample(
184    logger: &slog::Logger,
185    reader: &mut procfs::ProcReader,
186    options: &CollectorOptions,
187) -> Result<Sample> {
188    let btrfs_reader =
189        btrfs::BtrfsReader::new(options.btrfs_samples, options.btrfs_min_pct, logger.clone());
190    let ethtool_reader = ethtool::EthtoolReader::new();
191    let ksm_reader = procfs::KsmReader::new();
192
193    // Take mutex, then take all values out of shared map and replace with default map
194    //
195    // NB: unconditionally drain the exit buffer otherwise we can leak the entries
196    let exit_pidmap = std::mem::take(
197        &mut *options
198            .exit_data
199            .lock()
200            .expect("tried to acquire poisoned lock"),
201    );
202
203    Ok(Sample {
204        cgroup: collect_cgroup_sample(
205            &cgroupfs::CgroupReader::new(options.cgroup_root.to_owned())?,
206            options.collect_io_stat,
207            logger,
208            &options.cgroup_re,
209        )?,
210        processes: merge_procfs_and_exit_data(reader.read_all_pids()?, exit_pidmap),
211        netstats: match procfs::NetReader::new(logger.clone()).and_then(|v| v.read_netstat()) {
212            Ok(ns) => ns,
213            Err(e) => {
214                error!(logger, "{:#}", e);
215                Default::default()
216            }
217        },
218        system: SystemSample {
219            stat: reader.read_stat()?,
220            meminfo: reader.read_meminfo()?,
221            vmstat: reader.read_vmstat()?,
222            slabinfo_vec: reader.read_slabinfo().unwrap_or_default(),
223            ksm: if !options.enable_ksm_stats {
224                None
225            } else {
226                Some(ksm_reader.read_ksm())
227            },
228            hostname: get_hostname()?,
229            kernel_version: match reader.read_kernel_version() {
230                Ok(k) => Some(k),
231                Err(e) => {
232                    error!(logger, "{:#}", e);
233                    None
234                }
235            },
236            os_release: match get_os_release() {
237                Ok(o) => Some(o),
238                Err(e) => {
239                    error!(logger, "{:#}", e);
240                    None
241                }
242            },
243            disks: if options.disable_disk_stat {
244                Default::default()
245            } else {
246                match reader.read_disk_stats_and_fsinfo() {
247                    Ok(disks) => disks
248                        .into_iter()
249                        .filter(|(disk_name, disk_stat)| {
250                            if disk_name.starts_with("ram") || disk_name.starts_with("loop") {
251                                return false;
252                            }
253                            !is_all_zero_disk_stats(disk_stat)
254                        })
255                        .collect(),
256                    Err(e) => {
257                        error!(logger, "{:#}", e);
258                        Default::default()
259                    }
260                }
261            },
262            btrfs: if !options.enable_btrfs_stats {
263                Default::default()
264            } else {
265                match btrfs_reader.sample() {
266                    Ok(btrfs) => Some(btrfs),
267                    Err(e) => {
268                        error!(logger, "{:#}", e);
269                        Default::default()
270                    }
271                }
272            },
273        },
274        gpus: {
275            if let Some(gpu_stats_receiver) = &options.gpu_stats_receiver {
276                // It is possible to receive no sample if the
277                // collector has not updated since the previous take
278                // or the collector encountered a recoverable error
279                // (e.g. timeout). The behavior for now is to store an
280                // empty map. Alternatively we could store the latest
281                // sample and read that, but then we have to decide how
282                // stale the data can be.
283                Some(
284                    gpu_stats_receiver
285                        .try_take()
286                        .context("GPU stats collector had an error")?
287                        .unwrap_or_default(),
288                )
289            } else {
290                None
291            }
292        },
293        ethtool: if !options.enable_ethtool_stats {
294            Default::default()
295        } else {
296            match ethtool_reader.read_stats::<ethtool::Ethtool>() {
297                Ok(ethtool_stats) => Some(ethtool_stats),
298                Err(e) => {
299                    error!(logger, "{:#}", e);
300                    Default::default()
301                }
302            }
303        },
304        resctrl: if !options.enable_resctrl_stats {
305            None
306        } else {
307            match resctrlfs::ResctrlReader::root() {
308                Ok(resctrl_reader) => match resctrl_reader.read_all() {
309                    Ok(resctrl) => Some(resctrl),
310                    Err(e) => {
311                        error!(logger, "{:#}", e);
312                        None
313                    }
314                },
315                Err(_e) => {
316                    // ResctrlReader only fails to initialize if resctrlfs is
317                    // not mounted. In this case we ignore.
318                    None
319                }
320            }
321        },
322        tc: if let Some(tc_stats_receiver) = &options.tc_stats_receiver {
323            Some(
324                tc_stats_receiver
325                    .try_take()
326                    .context("TC stats collector had an error")?
327                    .unwrap_or_default(),
328            )
329        } else {
330            None
331        },
332    })
333}
334
335/// cgroupfs can give us a NotFound error if the cgroup doesn't have
336/// the relevant stat file (e.g. if it is the root cgroup). We
337/// translate that into `None` so that other errors are propagated,
338/// but omitted data is allowed.
339///
340/// This method just does that translation for us.
341fn wrap<S: Sized>(
342    v: std::result::Result<S, cgroupfs::Error>,
343) -> std::result::Result<Option<S>, cgroupfs::Error> {
344    if let Err(cgroupfs::Error::IoError(_, ref e)) = v {
345        if e.kind() == std::io::ErrorKind::NotFound {
346            return Ok(None);
347        }
348        if e.kind() == std::io::ErrorKind::Other {
349            if let Some(errno) = e.raw_os_error() {
350                if errno == /* ENODEV */ 19 {
351                    // If the cgroup is removed after a control file is opened,
352                    // ENODEV is returned. Ignore it.
353                    return Ok(None);
354                }
355            }
356        }
357    }
358    v.map(Some)
359}
360
361/// As above, but in addition, io.stat can have broken formatting due
362/// to a kernel bug which will not output more than one page. In such
363/// cases we should not fail all data collection, but just omit the io
364/// data.
365fn io_stat_wrap<S: Sized>(
366    v: std::result::Result<S, cgroupfs::Error>,
367) -> std::result::Result<Option<S>, cgroupfs::Error> {
368    match wrap(v) {
369        Err(cgroupfs::Error::InvalidFileFormat(_)) => Ok(None),
370        Err(cgroupfs::Error::UnexpectedLine(_, _)) => Ok(None),
371        wrapped => wrapped,
372    }
373}
374
375/// Pressure metrics may not be supported, in which case cgroupfs will
376/// return a specific error. We don't fail all data collection, just
377/// omit pressure metrics.
378fn pressure_wrap<S: Sized>(
379    v: std::result::Result<S, cgroupfs::Error>,
380) -> std::result::Result<Option<S>, cgroupfs::Error> {
381    match wrap(v) {
382        Err(cgroupfs::Error::PressureNotSupported(_)) => Ok(None),
383        wrapped => wrapped,
384    }
385}
386
387fn collect_cgroup_sample(
388    reader: &cgroupfs::CgroupReader,
389    collect_io_stat: bool,
390    logger: &slog::Logger,
391    cgroup_re: &Option<Regex>,
392) -> Result<CgroupSample> {
393    let io_stat = if collect_io_stat {
394        io_stat_wrap(reader.read_io_stat())?
395    } else {
396        None
397    };
398    Ok(CgroupSample {
399        cpu_stat: wrap(reader.read_cpu_stat())?.map(Into::into),
400        io_stat,
401        tids_current: wrap(reader.read_pids_current())?,
402        tids_max: wrap(reader.read_pids_max())?,
403        memory_current: wrap(reader.read_memory_current().map(|v| v as i64))?,
404        memory_stat: wrap(reader.read_memory_stat())?.map(Into::into),
405        pressure: pressure_wrap(reader.read_pressure())?.map(Into::into),
406        // We transpose at the end here to convert the
407        // Option<Result<BTreeMap... into Result<Option<BTreeMap and
408        // then bail any errors with `?` - leaving us with the
409        // Option<BTreeMap...
410        //
411        // The only case this can be None is if the cgroup no longer
412        // exists - this is consistent with the above members
413        children: wrap(reader.child_cgroup_iter())
414            .context("Failed to get iterator over cgroup children")?
415            .map(|child_iter| {
416                child_iter
417                    .filter(|child| {
418                        if let Some(cgroup_re) = cgroup_re.as_ref() {
419                            !cgroup_re.is_match(&child.name().to_string_lossy())
420                        } else {
421                            true
422                        }
423                    })
424                    .map(|child| {
425                        collect_cgroup_sample(&child, collect_io_stat, logger, cgroup_re).map(
426                            |child_sample| {
427                                (
428                                    child
429                                        .name()
430                                        .file_name()
431                                        .expect("Unexpected .. in cgroup path")
432                                        .to_string_lossy()
433                                        .to_string(),
434                                    child_sample,
435                                )
436                            },
437                        )
438                    })
439                    .collect::<Result<BTreeMap<String, CgroupSample>>>()
440            })
441            .transpose()?,
442        memory_swap_current: wrap(reader.read_memory_swap_current().map(|v| v as i64))?,
443        memory_zswap_current: None, // Use the one from memory.stat
444        memory_min: wrap(reader.read_memory_min())?,
445        memory_low: wrap(reader.read_memory_low())?,
446        memory_high: wrap(reader.read_memory_high())?,
447        memory_max: wrap(reader.read_memory_max())?,
448        memory_swap_max: wrap(reader.read_memory_swap_max())?,
449        memory_zswap_max: wrap(reader.read_memory_zswap_max())?,
450        memory_events: wrap(reader.read_memory_events())?.map(Into::into),
451        memory_events_local: wrap(reader.read_memory_events_local())?.map(Into::into),
452        inode_number: match reader.read_inode_number() {
453            Ok(st_ino) => Some(st_ino as i64),
454            Err(e) => {
455                error!(logger, "Fail to collect inode number: {:#}", e);
456                None
457            }
458        },
459        cgroup_stat: wrap(reader.read_cgroup_stat())?.map(Into::into),
460        memory_numa_stat: wrap(reader.read_memory_numa_stat())?.map(Into::into),
461        cpuset_cpus: wrap(reader.read_cpuset_cpus())?,
462        cpuset_cpus_effective: wrap(reader.read_cpuset_cpus_effective())?,
463        cpuset_mems: wrap(reader.read_cpuset_mems())?,
464        cpuset_mems_effective: wrap(reader.read_cpuset_mems_effective())?,
465        cpu_weight: wrap(reader.read_cpu_weight())?,
466        cpu_max: wrap(reader.read_cpu_max())?,
467        cgroup_controllers: wrap(reader.read_cgroup_controllers())?,
468        cgroup_subtree_control: wrap(reader.read_cgroup_subtree_control())?,
469    })
470}
471
472macro_rules! usec_pct {
473    ($a_opt:expr, $b_opt:expr, $delta:expr) => {{
474        let mut ret = None;
475        if let (Some(a), Some(b)) = ($a_opt, $b_opt) {
476            if a <= b {
477                ret = Some((b - a) as f64 * 100.0 / $delta.as_micros() as f64);
478            }
479        }
480        ret
481    }};
482}
483
484macro_rules! count_per_sec {
485    ($a_opt:expr, $b_opt:expr, $delta:expr) => {{
486        let mut ret = None;
487        if let (Some(a), Some(b)) = ($a_opt, $b_opt) {
488            if a <= b {
489                ret = Some((b - a) as f64 / $delta.as_secs_f64());
490            }
491        }
492        ret
493    }};
494    ($a:ident, $b:ident, $delta:expr, $target_type:ty) => {{
495        let mut ret = None;
496        if $a <= $b {
497            ret = Some((($b - $a) as f64 / $delta.as_secs_f64()).ceil() as $target_type);
498        }
499        ret
500    }};
501    ($a_opt:expr, $b_opt:expr, $delta:expr, $target_type:ty) => {{
502        let mut ret = None;
503        if let (Some(a), Some(b)) = ($a_opt, $b_opt) {
504            if a <= b {
505                ret = Some(((b - a) as f64 / $delta.as_secs_f64()).ceil() as $target_type);
506            }
507        }
508        ret
509    }};
510}
511
512#[allow(unused)]
513macro_rules! get_option_rate {
514    ($key:ident, $sample:ident, $last:ident) => {
515        $last
516            .map(|(l, d)| {
517                count_per_sec!(l.$key.map(|s| s as u64), $sample.$key.map(|s| s as u64), d)
518            })
519            .unwrap_or_default()
520            .map(|s| s as u64)
521    };
522}