use procfs::process::Process;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
pub struct ProcessTracker {
pub procs: Vec<Vec<ProcessRecord>>,
pub max_records_per_process: u16,
}
impl ProcessTracker {
pub fn new(max_records_per_process: u16) -> ProcessTracker {
ProcessTracker {
procs: vec![],
max_records_per_process,
}
}
pub fn add_process_record(&mut self, process: Process) -> Result<String, String> {
let iterator = self.procs.iter_mut();
let pid = process.pid;
let mut filtered = iterator.filter(|x| !x.is_empty() && x[0].process.pid == pid);
let result = filtered.next();
let process_record = ProcessRecord::new(process);
if let Some(vector) = result {
if !vector.is_empty()
&& process_record.process.stat.comm != vector.get(0).unwrap().process.stat.comm
{
*vector = vec![];
}
vector.insert(0, process_record); ProcessTracker::clean_old_process_records(vector, self.max_records_per_process);
} else {
self.procs.push(vec![process_record]); }
Ok(String::from("Successfully added record to process."))
}
fn clean_old_process_records(records: &mut Vec<ProcessRecord>, max_records_per_process: u16) {
if records.len() > max_records_per_process as usize {
let diff = records.len() - max_records_per_process as usize;
for _ in 0..diff {
records.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
let res = records.pop().unwrap().timestamp;
trace!(
"Cleaning old ProcessRecords in vector for PID {}",
records[0].process.pid
);
trace!("Deleting record with timestamp: {:?}", res);
}
}
}
pub fn find_records(&self, pid: i32) -> Option<&Vec<ProcessRecord>> {
let mut refer = None;
for v in &self.procs {
if !v.is_empty() && v[0].process.pid == pid {
if refer.is_some() {
warn!("ISSUE: PID {} spread in proc tracker", pid);
}
refer = Some(v);
}
}
refer
}
pub fn get_diff_utime(&self, pid: i32) -> Option<u64> {
let records = self.find_records(pid).unwrap();
if records.len() > 1 {
return Some(records[0].process.stat.utime - records[1].process.stat.utime);
}
None
}
pub fn get_diff_stime(&self, pid: i32) -> Option<u64> {
let records = self.find_records(pid).unwrap();
if records.len() > 1 {
return Some(records[0].process.stat.stime - records[1].process.stat.stime);
}
None
}
pub fn get_alive_processes(&self) -> Vec<&Vec<ProcessRecord>> {
let mut res = vec![];
for p in self.procs.iter() {
if !p.is_empty() {
let status = p[0].process.status();
if let Ok(status_val) = status {
if !&status_val.state.contains('T') {
res.push(p);
}
}
}
}
res
}
pub fn get_alive_pids(&self) -> Vec<i32> {
self.get_alive_processes()
.iter()
.filter(|x| !x.is_empty())
.map(|x| x[0].process.pid)
.collect()
}
pub fn get_all_pids(&self) -> Vec<i32> {
self.procs
.iter()
.filter(|x| !x.is_empty())
.map(|x| x[0].process.pid)
.collect()
}
pub fn get_process_name(&self, pid: i32) -> String {
let mut result = self
.procs
.iter()
.filter(|x| !x.is_empty() && x.get(0).unwrap().process.pid == pid);
let process = result.next().unwrap();
if result.next().is_some() {
panic!("Found two vectors of processes with the same id, maintainers should fix this.");
}
process.get(0).unwrap().process.stat.comm.clone()
}
pub fn get_process_cmdline(&self, pid: i32) -> Option<String> {
let mut result = self
.procs
.iter()
.filter(|x| !x.is_empty() && x.get(0).unwrap().process.pid == pid);
let process = result.next().unwrap();
if let Some(vec) = process.get(0) {
if let Ok(mut cmdline_vec) = vec.process.cmdline() {
let mut cmdline = String::from("");
while !cmdline_vec.is_empty() {
if !cmdline_vec.is_empty() {
cmdline.push_str(&cmdline_vec.remove(0));
}
}
return Some(cmdline);
}
}
None
}
pub fn get_top_consumers(&self, top: u16) -> Vec<(Process, u64)> {
let mut consumers: Vec<(Process, u64)> = vec![];
for p in &self.procs {
if p.len() > 1 {
let last_time = p.first().unwrap().total_time_jiffies();
let previous_time = p.get(1).unwrap().total_time_jiffies();
let mut diff = 0;
if previous_time <= last_time {
diff = last_time - previous_time;
}
let higher: Vec<&(Process, u64)> = consumers
.iter()
.filter(|x| ProcessRecord::new(x.0.to_owned()).total_time_jiffies() > diff)
.collect();
if higher.len() < top as usize {
consumers.push((p.last().unwrap().process.clone(), diff));
consumers.sort_by(|x, y| y.1.cmp(&x.1));
if consumers.len() > top as usize {
consumers.pop();
}
}
}
}
consumers
}
pub fn clean_terminated_process_records_vectors(&mut self) {
let mut d_unint_sleep = 0;
let mut r_running = 0;
let mut s_int_sleep = 0;
let mut t_stopped = 0;
let mut z_defunct_zombie = 0;
let mut w_no_resident_high_prio = 0;
let mut n_low_prio = 0;
let mut l_pages_locked = 0;
let mut i_idle = 0;
let mut unknown = 0;
for v in &mut self.procs {
if !v.is_empty() {
if let Some(first) = v.first() {
if let Ok(status) = first.process.status() {
if status.state.contains('T') {
while !v.is_empty() {
v.pop();
}
t_stopped += 1;
} else if status.state.contains('D') {
d_unint_sleep += 1;
} else if status.state.contains('R') {
r_running += 1;
} else if status.state.contains('S') {
s_int_sleep += 1;
} else if status.state.contains('Z') {
z_defunct_zombie += 1;
} else if status.state.contains('W') {
w_no_resident_high_prio += 1;
} else if status.state.contains('N') {
n_low_prio += 1;
} else if status.state.contains('L') {
l_pages_locked += 1;
} else if status.state.contains('I') {
i_idle += 1;
} else {
unknown += 1;
debug!("unkown state: {} name: {}", status.state, status.name);
}
} else {
while !v.is_empty() {
v.pop();
}
}
}
}
}
debug!(
"d:{} r:{} s:{} t:{} z:{} w:{} n:{} l:{} i:{} u:{}",
d_unint_sleep,
r_running,
s_int_sleep,
t_stopped,
z_defunct_zombie,
w_no_resident_high_prio,
n_low_prio,
l_pages_locked,
i_idle,
unknown
);
self.drop_empty_process_records_vectors();
}
fn drop_empty_process_records_vectors(&mut self) {
let procs = &mut self.procs;
if !procs.is_empty() {
for i in 0..(procs.len() - 1) {
if let Some(v) = procs.get(i) {
if v.is_empty() {
procs.remove(i);
}
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct ProcessRecord {
pub process: Process,
pub timestamp: Duration,
}
impl ProcessRecord {
pub fn new(process: Process) -> ProcessRecord {
ProcessRecord {
process,
timestamp: current_system_time_since_epoch(),
}
}
pub fn total_time_jiffies(&self) -> u64 {
let stime = self.process.stat.stime;
let utime = self.process.stat.utime;
let cutime = self.process.stat.cutime as u64;
let cstime = self.process.stat.cstime as u64;
let guest_time = self.process.stat.guest_time.unwrap_or_default();
let cguest_time = self.process.stat.cguest_time.unwrap_or_default() as u64;
let delayacct_blkio_ticks = self.process.stat.delayacct_blkio_ticks.unwrap_or_default();
let itrealvalue = self.process.stat.itrealvalue as u64;
trace!(
"ProcessRecord: stime {} utime {} cutime {} cstime {} guest_time {} cguest_time {} delayacct_blkio_ticks {} itrealvalue {}",
stime, utime, cutime, cstime, guest_time, cguest_time, delayacct_blkio_ticks, itrealvalue
);
stime + utime + guest_time + cguest_time + delayacct_blkio_ticks + itrealvalue
}
}
pub fn current_system_time_since_epoch() -> Duration {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn process_records_added() {
let proc = Process::myself().unwrap();
let mut tracker = ProcessTracker::new(3);
for _ in 0..3 {
assert_eq!(tracker.add_process_record(proc.clone()).is_ok(), true);
}
assert_eq!(tracker.procs.len(), 1);
assert_eq!(tracker.procs[0].len(), 3);
}
#[test]
fn process_records_cleaned() {
let proc = Process::myself().unwrap();
let mut tracker = ProcessTracker::new(3);
for _ in 0..5 {
assert_eq!(tracker.add_process_record(proc.clone()).is_ok(), true);
}
assert_eq!(tracker.procs.len(), 1);
assert_eq!(tracker.procs[0].len(), 3);
for _ in 0..15 {
assert_eq!(tracker.add_process_record(proc.clone()).is_ok(), true);
}
assert_eq!(tracker.procs.len(), 1);
assert_eq!(tracker.procs[0].len(), 3);
}
}