use counter::Counter;
use desc::Desc;
use errors::{Error, Result};
use gauge::Gauge;
use libc;
pub use libc::pid_t;
use metrics::{Collector, Opts};
use procinfo::pid as pid_info;
use proto;
use std::fs;
use std::io::Read;
use std::sync::Mutex;
const MERTICS_NUMBER: usize = 6;
pub struct ProcessCollector {
pid: pid_t,
descs: Vec<Desc>,
cpu_total: Mutex<Counter>,
open_fds: Gauge,
max_fds: Gauge,
vsize: Gauge,
rss: Gauge,
start_time: Gauge,
}
impl ProcessCollector {
pub fn new<S: Into<String>>(pid: pid_t, namespace: S) -> ProcessCollector {
let namespace = namespace.into();
let mut descs = Vec::new();
let cpu_total = Counter::with_opts(
Opts::new(
"process_cpu_seconds_total",
"Total user and system CPU time spent in \
seconds.",
).namespace(namespace.clone()),
).unwrap();
descs.extend(cpu_total.desc().into_iter().cloned());
let open_fds = Gauge::with_opts(
Opts::new("process_open_fds", "Number of open file descriptors.")
.namespace(namespace.clone()),
).unwrap();
descs.extend(open_fds.desc().into_iter().cloned());
let max_fds = Gauge::with_opts(
Opts::new(
"process_max_fds",
"Maximum number of open file descriptors.",
).namespace(namespace.clone()),
).unwrap();
descs.extend(max_fds.desc().into_iter().cloned());
let vsize = Gauge::with_opts(
Opts::new(
"process_virtual_memory_bytes",
"Virtual memory size in bytes.",
).namespace(namespace.clone()),
).unwrap();
descs.extend(vsize.desc().into_iter().cloned());
let rss = Gauge::with_opts(
Opts::new(
"process_resident_memory_bytes",
"Resident memory size in bytes.",
).namespace(namespace.clone()),
).unwrap();
descs.extend(rss.desc().into_iter().cloned());
let start_time = Gauge::with_opts(
Opts::new(
"process_start_time_seconds",
"Start time of the process since unix epoch \
in seconds.",
).namespace(namespace.clone()),
).unwrap();
descs.extend(start_time.desc().into_iter().cloned());
ProcessCollector {
pid: pid,
descs: descs,
cpu_total: Mutex::new(cpu_total),
open_fds: open_fds,
max_fds: max_fds,
vsize: vsize,
rss: rss,
start_time: start_time,
}
}
pub fn for_self() -> ProcessCollector {
let pid = unsafe { libc::getpid() };
ProcessCollector::new(pid, "")
}
}
impl Collector for ProcessCollector {
fn desc(&self) -> Vec<&Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<proto::MetricFamily> {
if let Ok(num) = open_fds(self.pid) {
self.open_fds.set(num as f64);
}
if let Ok(max) = max_fds(self.pid) {
self.max_fds.set(max)
}
if let Ok(statm) = pid_info::statm(self.pid) {
self.vsize.set(statm.size as f64 * *PAGESIZE);
self.rss.set(statm.resident as f64 * *PAGESIZE);
}
let pid_stat = pid_info::stat(self.pid);
if let (&Ok(ref stat), Some(boot_time)) = (&pid_stat, *BOOT_TIME) {
self.start_time
.set(stat.start_time as f64 / *CLK_TCK + boot_time);
}
let cpu_total_mfs = {
let cpu_total = self.cpu_total.lock().unwrap();
if let Ok(stat) = pid_stat {
let total = (stat.utime + stat.stime) as f64 / *CLK_TCK;
let past = cpu_total.get();
let delta = total - past;
if delta > 0.0 {
cpu_total.inc_by(delta).unwrap();
}
}
cpu_total.collect()
};
let mut mfs = Vec::with_capacity(MERTICS_NUMBER);
mfs.extend(cpu_total_mfs);
mfs.extend(self.open_fds.collect());
mfs.extend(self.max_fds.collect());
mfs.extend(self.vsize.collect());
mfs.extend(self.rss.collect());
mfs.extend(self.start_time.collect());
mfs
}
}
fn open_fds(pid: pid_t) -> Result<usize> {
let path = format!("/proc/{}/fd", pid);
fs::read_dir(path)?.fold(Ok(0), |acc, i| {
let mut acc = acc?;
let ty = i?.file_type()?;
if !ty.is_dir() {
acc += 1;
}
Ok(acc)
})
}
fn find_statistic(all: &str, pat: &str) -> Result<f64> {
if let Some(idx) = all.find(pat) {
let mut iter = (all[idx + pat.len()..]).split_whitespace();
if let Some(v) = iter.next() {
return v.parse().map_err(|e| {
Error::Msg(format!("read statistic {} failed: {}", pat, e))
});
}
}
Err(Error::Msg(format!("read statistic {} failed", pat)))
}
const MAXFD_PATTERN: &str = "Max open files";
fn max_fds(pid: pid_t) -> Result<f64> {
let mut buffer = String::new();
fs::File::open(&format!("/proc/{}/limits", pid))
.and_then(|mut f| f.read_to_string(&mut buffer))?;
find_statistic(&buffer, MAXFD_PATTERN)
}
lazy_static! {
static ref CLK_TCK: f64 = {
unsafe {
libc::sysconf(libc::_SC_CLK_TCK) as f64
}
};
static ref PAGESIZE: f64 = {
unsafe {
libc::sysconf(libc::_SC_PAGESIZE) as f64
}
};
}
const BOOT_TIME_PATTERN: &str = "btime";
lazy_static! {
static ref BOOT_TIME: Option<f64> = {
let mut buffer = String::new();
fs::File::open("/proc/stat")
.and_then(|mut f| f.read_to_string(&mut buffer)).ok().and_then(|_| {
find_statistic(&buffer, BOOT_TIME_PATTERN).ok()
})
};
}
#[cfg(test)]
mod tests {
use super::*;
use metrics::Collector;
use registry;
use std::f64::EPSILON;
#[test]
fn test_process_collector() {
let pc = ProcessCollector::for_self();
{
let descs = pc.desc();
assert_eq!(descs.len(), super::MERTICS_NUMBER);
let mfs = pc.collect();
assert_eq!(mfs.len(), super::MERTICS_NUMBER);
}
let r = registry::Registry::new();
let res = r.register(Box::new(pc));
assert!(res.is_ok());
}
const STATUS_LITERAL: &str = r#"Name: compiz
State: S (sleeping)
Tgid: 3124
Ngid: 0
Pid: 3124
PPid: 3038
TracerPid: 0
Uid: 1000 1000 1000 1000
Gid: 1000 1000 1000 1000
FDSize: 64
Groups: 4 24 27 30 46 108 124 126 999 1000
NStgid: 3124
NSpid: 3124
NSpgid: 3038
NSsid: 3038
VmPeak: 1452388 kB
VmSize: 1362696 kB
VmLck: 0 kB
VmPin: 0 kB
VmHWM: 134316 kB
VmRSS: 112884 kB
VmData: 780020 kB
VmStk: 152 kB
VmExe: 12 kB
VmLib: 77504 kB
VmPTE: 1116 kB
VmPMD: 16 kB
VmSwap: 0 kB
HugetlbPages: 0 kB
Threads: 8
SigQ: 0/31457
SigPnd: 0000000000000000
ShdPnd: 0000000000000000
SigBlk: 0000000000000000
SigIgn: 0000000000001000
SigCgt: 0000000180014003
CapInh: 0000000000000000
CapPrm: 0000000000000000
CapEff: 0000000000000000
CapBnd: 0000003fffffffff
CapAmb: 0000000000000000
Seccomp: 0
Cpus_allowed: f
Cpus_allowed_list: 0-3
Mems_allowed: 00000000,00000001
Mems_allowed_list: 0
voluntary_ctxt_switches: 1713183
nonvoluntary_ctxt_switches: 68606
"#;
const VM_SIZE_PATTERN: &str = "VmSize:";
const VM_RSS_PATTERN: &str = "VmRSS:";
const VM_RSS: f64 = 112884.0;
const VM_SIZE: f64 = 1362696.0;
const LIMITS_LITERAL: &str = r#"
Limit Soft Limit Hard Limit Units
Max cpu time unlimited unlimited seconds
Max file size unlimited unlimited bytes
Max data size unlimited unlimited bytes
Max stack size 8388608 unlimited bytes
Max core file size 0 unlimited bytes
Max resident set unlimited unlimited bytes
Max processes 31454 31454 processes
Max open files 1024 4096 files
Max locked memory 65536 65536 bytes
Max address space unlimited unlimited bytes
Max file locks unlimited unlimited locks
Max pending signals 31454 31454 signals
Max msgqueue size 819200 819200 bytes
Max nice priority 0 0
Max realtime priority 0 0
Max realtime timeout unlimited unlimited us "#;
const MAXFD: f64 = 1024.0;
#[test]
fn test_find_statistic() {
let rss = super::find_statistic(STATUS_LITERAL, VM_RSS_PATTERN);
assert!(rss.is_ok());
assert!((rss.unwrap() - VM_RSS) < EPSILON);
let size = super::find_statistic(STATUS_LITERAL, VM_SIZE_PATTERN);
assert!(size.is_ok());
assert!((size.unwrap() - VM_SIZE) < EPSILON);
let maxfd = super::find_statistic(LIMITS_LITERAL, super::MAXFD_PATTERN).unwrap();
assert!((maxfd - MAXFD) < EPSILON);
}
}