1use std::path::Path;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20use regex::Regex;
21use slog::error;
22
23use super::*;
24use crate::collector_plugin;
25
26pub struct CollectorOptions {
27 pub cgroup_root: PathBuf,
28 pub exit_data: Arc<Mutex<procfs::PidMap>>,
29 pub collect_io_stat: bool,
30 pub disable_disk_stat: bool,
31 pub enable_btrfs_stats: bool,
32 pub enable_ethtool_stats: bool,
33 pub enable_ksm_stats: bool,
34 pub enable_resctrl_stats: bool,
35 pub enable_tc_stats: bool,
36 pub btrfs_samples: u64,
37 pub btrfs_min_pct: f64,
38 pub cgroup_re: Option<Regex>,
39 pub gpu_stats_receiver:
40 Option<collector_plugin::Consumer<crate::gpu_stats_collector_plugin::SampleType>>,
41 pub tc_stats_receiver:
42 Option<collector_plugin::Consumer<crate::tc_collector_plugin::SampleType>>,
43}
44
45impl Default for CollectorOptions {
46 fn default() -> Self {
47 Self {
48 cgroup_root: Path::new(cgroupfs::DEFAULT_CG_ROOT).to_path_buf(),
49 exit_data: Default::default(),
50 collect_io_stat: true,
51 disable_disk_stat: false,
52 enable_btrfs_stats: false,
53 enable_ethtool_stats: false,
54 enable_ksm_stats: false,
55 enable_resctrl_stats: false,
56 enable_tc_stats: false,
57 btrfs_samples: btrfs::DEFAULT_SAMPLES,
58 btrfs_min_pct: btrfs::DEFAULT_MIN_PCT,
59 cgroup_re: None,
60 gpu_stats_receiver: None,
61 tc_stats_receiver: None,
62 }
63 }
64}
65
66pub struct Collector {
68 logger: slog::Logger,
69 proc_reader: procfs::ProcReader,
70 prev_sample: Option<(Sample, Instant)>,
71 collector_options: CollectorOptions,
72}
73
74impl Collector {
75 pub fn new(logger: slog::Logger, collector_options: CollectorOptions) -> Self {
76 Self {
77 logger,
78 proc_reader: procfs::ProcReader::new(),
79 prev_sample: None,
80 collector_options,
81 }
82 }
83
84 pub fn collect_sample(&mut self) -> Result<Sample> {
85 collect_sample(&self.logger, &mut self.proc_reader, &self.collector_options)
86 }
87
88 pub fn collect_and_update_model(&mut self) -> Result<Model> {
90 let now = Instant::now();
91 let sample = self.collect_sample()?;
92 let model = Model::new(
93 SystemTime::now(),
94 &sample,
95 self.prev_sample
96 .as_ref()
97 .map(|(s, i)| (s, now.duration_since(*i))),
98 );
99 self.prev_sample = Some((sample, now));
100 Ok(model)
101 }
102}
103
104pub fn opt_add<T: std::ops::Add<T, Output = T>>(a: Option<T>, b: Option<T>) -> Option<T> {
105 match (a, b) {
106 (Some(a), Some(b)) => Some(a + b),
107 (Some(a), None) => Some(a),
108 (None, Some(b)) => Some(b),
109 _ => None,
110 }
111}
112
113pub fn opt_multiply<S: Sized + std::ops::Mul<T, Output = S>, T: Sized>(
114 a: Option<S>,
115 b: Option<T>,
116) -> Option<S> {
117 a.and_then(|x| b.map(|y| x * y))
118}
119
120pub fn get_hostname() -> Result<String> {
121 if let Ok(h) = hostname::get() {
122 if let Ok(s) = h.into_string() {
123 return Ok(s);
124 }
125 }
126 Err(anyhow!("Could not get hostname"))
127}
128
129#[cfg(fbcode_build)]
130pub fn get_os_release() -> Result<String> {
131 std::fs::read_to_string("/etc/centos-release")
132 .context("Fail to get centos release")
133 .map(|o| o.trim_matches('\n').trim().into())
134}
135
136use os_info as _; #[cfg(not(fbcode_build))]
138pub fn get_os_release() -> Result<String> {
139 let info = os_info::get();
140 Ok(format!(
141 "{} {} {}",
142 info.os_type(),
143 info.version(),
144 info.bitness()
145 ))
146}
147
148fn merge_procfs_and_exit_data(
149 mut procfs_data: procfs::PidMap,
150 exit_data: procfs::PidMap,
151) -> procfs::PidMap {
152 exit_data
153 .iter()
154 .for_each(|entry| {
158 if !procfs_data.contains_key(entry.0) {
159 procfs_data.insert(*entry.0, entry.1.clone());
160 }
161 });
162
163 procfs_data
164}
165
166fn is_all_zero_disk_stats(disk_stats: &procfs::DiskStat) -> bool {
169 disk_stats.read_completed == Some(0)
170 && disk_stats.write_completed == Some(0)
171 && disk_stats.discard_completed == Some(0)
172 && disk_stats.read_merged == Some(0)
173 && disk_stats.read_sectors == Some(0)
174 && disk_stats.time_spend_read_ms == Some(0)
175 && disk_stats.write_merged == Some(0)
176 && disk_stats.write_sectors == Some(0)
177 && disk_stats.time_spend_write_ms == Some(0)
178 && disk_stats.discard_merged == Some(0)
179 && disk_stats.discard_sectors == Some(0)
180 && disk_stats.time_spend_discard_ms == Some(0)
181}
182
183fn collect_sample(
184 logger: &slog::Logger,
185 reader: &mut procfs::ProcReader,
186 options: &CollectorOptions,
187) -> Result<Sample> {
188 let btrfs_reader =
189 btrfs::BtrfsReader::new(options.btrfs_samples, options.btrfs_min_pct, logger.clone());
190 let ethtool_reader = ethtool::EthtoolReader::new();
191 let ksm_reader = procfs::KsmReader::new();
192
193 let exit_pidmap = std::mem::take(
197 &mut *options
198 .exit_data
199 .lock()
200 .expect("tried to acquire poisoned lock"),
201 );
202
203 Ok(Sample {
204 cgroup: collect_cgroup_sample(
205 &cgroupfs::CgroupReader::new(options.cgroup_root.to_owned())?,
206 options.collect_io_stat,
207 logger,
208 &options.cgroup_re,
209 )?,
210 processes: merge_procfs_and_exit_data(reader.read_all_pids()?, exit_pidmap),
211 netstats: match procfs::NetReader::new(logger.clone()).and_then(|v| v.read_netstat()) {
212 Ok(ns) => ns,
213 Err(e) => {
214 error!(logger, "{:#}", e);
215 Default::default()
216 }
217 },
218 system: SystemSample {
219 stat: reader.read_stat()?,
220 meminfo: reader.read_meminfo()?,
221 vmstat: reader.read_vmstat()?,
222 slabinfo_vec: reader.read_slabinfo().unwrap_or_default(),
223 ksm: if !options.enable_ksm_stats {
224 None
225 } else {
226 Some(ksm_reader.read_ksm())
227 },
228 hostname: get_hostname()?,
229 kernel_version: match reader.read_kernel_version() {
230 Ok(k) => Some(k),
231 Err(e) => {
232 error!(logger, "{:#}", e);
233 None
234 }
235 },
236 os_release: match get_os_release() {
237 Ok(o) => Some(o),
238 Err(e) => {
239 error!(logger, "{:#}", e);
240 None
241 }
242 },
243 disks: if options.disable_disk_stat {
244 Default::default()
245 } else {
246 match reader.read_disk_stats_and_fsinfo() {
247 Ok(disks) => disks
248 .into_iter()
249 .filter(|(disk_name, disk_stat)| {
250 if disk_name.starts_with("ram") || disk_name.starts_with("loop") {
251 return false;
252 }
253 !is_all_zero_disk_stats(disk_stat)
254 })
255 .collect(),
256 Err(e) => {
257 error!(logger, "{:#}", e);
258 Default::default()
259 }
260 }
261 },
262 btrfs: if !options.enable_btrfs_stats {
263 Default::default()
264 } else {
265 match btrfs_reader.sample() {
266 Ok(btrfs) => Some(btrfs),
267 Err(e) => {
268 error!(logger, "{:#}", e);
269 Default::default()
270 }
271 }
272 },
273 },
274 gpus: {
275 if let Some(gpu_stats_receiver) = &options.gpu_stats_receiver {
276 Some(
284 gpu_stats_receiver
285 .try_take()
286 .context("GPU stats collector had an error")?
287 .unwrap_or_default(),
288 )
289 } else {
290 None
291 }
292 },
293 ethtool: if !options.enable_ethtool_stats {
294 Default::default()
295 } else {
296 match ethtool_reader.read_stats::<ethtool::Ethtool>() {
297 Ok(ethtool_stats) => Some(ethtool_stats),
298 Err(e) => {
299 error!(logger, "{:#}", e);
300 Default::default()
301 }
302 }
303 },
304 resctrl: if !options.enable_resctrl_stats {
305 None
306 } else {
307 match resctrlfs::ResctrlReader::root() {
308 Ok(resctrl_reader) => match resctrl_reader.read_all() {
309 Ok(resctrl) => Some(resctrl),
310 Err(e) => {
311 error!(logger, "{:#}", e);
312 None
313 }
314 },
315 Err(_e) => {
316 None
319 }
320 }
321 },
322 tc: if let Some(tc_stats_receiver) = &options.tc_stats_receiver {
323 Some(
324 tc_stats_receiver
325 .try_take()
326 .context("TC stats collector had an error")?
327 .unwrap_or_default(),
328 )
329 } else {
330 None
331 },
332 })
333}
334
335fn wrap<S: Sized>(
342 v: std::result::Result<S, cgroupfs::Error>,
343) -> std::result::Result<Option<S>, cgroupfs::Error> {
344 if let Err(cgroupfs::Error::IoError(_, ref e)) = v {
345 if e.kind() == std::io::ErrorKind::NotFound {
346 return Ok(None);
347 }
348 if e.kind() == std::io::ErrorKind::Other {
349 if let Some(errno) = e.raw_os_error() {
350 if errno == 19 {
351 return Ok(None);
354 }
355 }
356 }
357 }
358 v.map(Some)
359}
360
361fn io_stat_wrap<S: Sized>(
366 v: std::result::Result<S, cgroupfs::Error>,
367) -> std::result::Result<Option<S>, cgroupfs::Error> {
368 match wrap(v) {
369 Err(cgroupfs::Error::InvalidFileFormat(_)) => Ok(None),
370 Err(cgroupfs::Error::UnexpectedLine(_, _)) => Ok(None),
371 wrapped => wrapped,
372 }
373}
374
375fn pressure_wrap<S: Sized>(
379 v: std::result::Result<S, cgroupfs::Error>,
380) -> std::result::Result<Option<S>, cgroupfs::Error> {
381 match wrap(v) {
382 Err(cgroupfs::Error::PressureNotSupported(_)) => Ok(None),
383 wrapped => wrapped,
384 }
385}
386
387fn collect_cgroup_sample(
388 reader: &cgroupfs::CgroupReader,
389 collect_io_stat: bool,
390 logger: &slog::Logger,
391 cgroup_re: &Option<Regex>,
392) -> Result<CgroupSample> {
393 let io_stat = if collect_io_stat {
394 io_stat_wrap(reader.read_io_stat())?
395 } else {
396 None
397 };
398 Ok(CgroupSample {
399 cpu_stat: wrap(reader.read_cpu_stat())?.map(Into::into),
400 io_stat,
401 tids_current: wrap(reader.read_pids_current())?,
402 tids_max: wrap(reader.read_pids_max())?,
403 memory_current: wrap(reader.read_memory_current().map(|v| v as i64))?,
404 memory_stat: wrap(reader.read_memory_stat())?.map(Into::into),
405 pressure: pressure_wrap(reader.read_pressure())?.map(Into::into),
406 children: wrap(reader.child_cgroup_iter())
414 .context("Failed to get iterator over cgroup children")?
415 .map(|child_iter| {
416 child_iter
417 .filter(|child| {
418 if let Some(cgroup_re) = cgroup_re.as_ref() {
419 !cgroup_re.is_match(&child.name().to_string_lossy())
420 } else {
421 true
422 }
423 })
424 .map(|child| {
425 collect_cgroup_sample(&child, collect_io_stat, logger, cgroup_re).map(
426 |child_sample| {
427 (
428 child
429 .name()
430 .file_name()
431 .expect("Unexpected .. in cgroup path")
432 .to_string_lossy()
433 .to_string(),
434 child_sample,
435 )
436 },
437 )
438 })
439 .collect::<Result<BTreeMap<String, CgroupSample>>>()
440 })
441 .transpose()?,
442 memory_swap_current: wrap(reader.read_memory_swap_current().map(|v| v as i64))?,
443 memory_zswap_current: None, memory_min: wrap(reader.read_memory_min())?,
445 memory_low: wrap(reader.read_memory_low())?,
446 memory_high: wrap(reader.read_memory_high())?,
447 memory_max: wrap(reader.read_memory_max())?,
448 memory_swap_max: wrap(reader.read_memory_swap_max())?,
449 memory_zswap_max: wrap(reader.read_memory_zswap_max())?,
450 memory_events: wrap(reader.read_memory_events())?.map(Into::into),
451 memory_events_local: wrap(reader.read_memory_events_local())?.map(Into::into),
452 inode_number: match reader.read_inode_number() {
453 Ok(st_ino) => Some(st_ino as i64),
454 Err(e) => {
455 error!(logger, "Fail to collect inode number: {:#}", e);
456 None
457 }
458 },
459 cgroup_stat: wrap(reader.read_cgroup_stat())?.map(Into::into),
460 memory_numa_stat: wrap(reader.read_memory_numa_stat())?.map(Into::into),
461 cpuset_cpus: wrap(reader.read_cpuset_cpus())?,
462 cpuset_cpus_effective: wrap(reader.read_cpuset_cpus_effective())?,
463 cpuset_mems: wrap(reader.read_cpuset_mems())?,
464 cpuset_mems_effective: wrap(reader.read_cpuset_mems_effective())?,
465 cpu_weight: wrap(reader.read_cpu_weight())?,
466 cpu_max: wrap(reader.read_cpu_max())?,
467 cgroup_controllers: wrap(reader.read_cgroup_controllers())?,
468 cgroup_subtree_control: wrap(reader.read_cgroup_subtree_control())?,
469 })
470}
471
472macro_rules! usec_pct {
473 ($a_opt:expr, $b_opt:expr, $delta:expr) => {{
474 let mut ret = None;
475 if let (Some(a), Some(b)) = ($a_opt, $b_opt) {
476 if a <= b {
477 ret = Some((b - a) as f64 * 100.0 / $delta.as_micros() as f64);
478 }
479 }
480 ret
481 }};
482}
483
484macro_rules! count_per_sec {
485 ($a_opt:expr, $b_opt:expr, $delta:expr) => {{
486 let mut ret = None;
487 if let (Some(a), Some(b)) = ($a_opt, $b_opt) {
488 if a <= b {
489 ret = Some((b - a) as f64 / $delta.as_secs_f64());
490 }
491 }
492 ret
493 }};
494 ($a:ident, $b:ident, $delta:expr, $target_type:ty) => {{
495 let mut ret = None;
496 if $a <= $b {
497 ret = Some((($b - $a) as f64 / $delta.as_secs_f64()).ceil() as $target_type);
498 }
499 ret
500 }};
501 ($a_opt:expr, $b_opt:expr, $delta:expr, $target_type:ty) => {{
502 let mut ret = None;
503 if let (Some(a), Some(b)) = ($a_opt, $b_opt) {
504 if a <= b {
505 ret = Some(((b - a) as f64 / $delta.as_secs_f64()).ceil() as $target_type);
506 }
507 }
508 ret
509 }};
510}
511
512#[allow(unused)]
513macro_rules! get_option_rate {
514 ($key:ident, $sample:ident, $last:ident) => {
515 $last
516 .map(|(l, d)| {
517 count_per_sec!(l.$key.map(|s| s as u64), $sample.$key.map(|s| s as u64), d)
518 })
519 .unwrap_or_default()
520 .map(|s| s as u64)
521 };
522}