use crate::exporters::*;
use crate::sensors::Sensor;
use clap::Arg;
use serde::{Deserialize, Serialize};
use std::fs;
use std::fs::File;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};
pub struct JSONExporter {
sensor: Box<dyn Sensor>,
reports: Vec<Report>,
}
impl Exporter for JSONExporter {
fn run(&mut self, parameters: ArgMatches) {
self.runner(parameters);
}
fn get_options() -> Vec<clap::Arg<'static, 'static>> {
let mut options = Vec::new();
let arg = Arg::with_name("timeout")
.help("Maximum time spent measuring, in seconds.")
.long("timeout")
.short("t")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("step_duration")
.default_value("2")
.help("Set measurement step duration in second.")
.long("step")
.short("s")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("step_duration_nano")
.default_value("0")
.help("Set measurement step duration in nano second.")
.long("step_nano")
.short("n")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("file_path")
.default_value("")
.help("Destination file for the report.")
.long("file")
.short("f")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("max_top_consumers")
.default_value("10")
.help("Maximum number of processes to watch.")
.long("max-top-consumers")
.short("m")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("containers")
.help("Monitor and apply labels for processes running as containers")
.long("containers")
.required(false)
.takes_value(false);
options.push(arg);
options
}
}
#[derive(Serialize, Deserialize)]
struct Domain {
name: String,
consumption: f32,
timestamp: f64,
}
#[derive(Serialize, Deserialize)]
struct Socket {
id: u16,
consumption: f32,
domains: Vec<Domain>,
timestamp: f64,
}
#[derive(Serialize, Deserialize)]
struct Consumer {
exe: PathBuf,
pid: i32,
consumption: f32,
timestamp: f64,
container: Option<Container>,
}
#[derive(Serialize, Deserialize)]
struct Container {
id: String,
runtime: String,
scheduler: String,
}
#[derive(Serialize, Deserialize)]
struct Host {
consumption: f32,
timestamp: f64,
}
#[derive(Serialize, Deserialize)]
struct Report {
host: Host,
consumers: Vec<Consumer>,
sockets: Vec<Socket>,
}
impl JSONExporter {
pub fn new(sensor: Box<dyn Sensor>) -> JSONExporter {
JSONExporter {
sensor,
reports: Vec::new(),
}
}
pub fn runner(&mut self, parameters: ArgMatches) {
let topology = self.sensor.get_topology().unwrap();
let mut metric_generator = MetricGenerator::new(
topology,
utils::get_hostname(),
parameters.is_present("qemu"),
parameters.is_present("containers"),
);
let step_duration: u64 = parameters
.value_of("step_duration")
.unwrap()
.parse()
.expect("Wrong step_duration value, should be a number of seconds");
let step_duration_nano: u32 = parameters
.value_of("step_duration_nano")
.unwrap()
.parse()
.expect("Wrong step_duration_nano value, should be a number of nano seconds");
info!("Measurement step is: {}s", step_duration);
if let Some(timeout) = parameters.value_of("timeout") {
let now = Instant::now();
let timeout_secs: u64 = timeout.parse().unwrap();
while now.elapsed().as_secs() <= timeout_secs {
self.iterate(¶meters, &mut metric_generator);
thread::sleep(Duration::new(step_duration, step_duration_nano));
}
} else {
loop {
self.iterate(¶meters, &mut metric_generator);
thread::sleep(Duration::new(step_duration, step_duration_nano));
}
}
}
fn iterate(&mut self, parameters: &ArgMatches, metric_generator: &mut MetricGenerator) {
metric_generator.topology.refresh();
self.retrieve_metrics(parameters, metric_generator);
}
fn retrieve_metrics(
&mut self,
parameters: &ArgMatches,
metric_generator: &mut MetricGenerator,
) {
metric_generator.gen_all_metrics();
let metrics = metric_generator.pop_metrics();
let mut metrics_iter = metrics.iter();
let mut host_report: Option<Host> = None;
if let Some(host_metric) = metrics_iter.find(|x| x.name == "scaph_host_power_microwatts") {
let host_power_string = format!("{}", host_metric.metric_value);
let host_power_f32 = host_power_string.parse::<f32>().unwrap();
if host_power_f32 > 0.0 {
host_report = Some(Host {
consumption: host_power_f32,
timestamp: host_metric.timestamp.as_secs_f64(),
});
}
} else {
info!("didn't find host metric");
};
let consumers = metric_generator.topology.proc_tracker.get_top_consumers(
parameters
.value_of("max_top_consumers")
.unwrap_or("10")
.parse::<u16>()
.unwrap(),
);
let top_consumers = consumers
.iter()
.filter_map(|(process, _value)| {
metrics
.iter()
.find(|x| {
x.name == "scaph_process_power_consumption_microwatts"
&& process.pid
== x.attributes.get("pid").unwrap().parse::<i32>().unwrap()
})
.map(|metric| Consumer {
exe: PathBuf::from(metric.attributes.get("exe").unwrap()),
pid: process.pid,
consumption: format!("{}", metric.metric_value).parse::<f32>().unwrap(),
timestamp: metric.timestamp.as_secs_f64(),
container: match parameters.is_present("containers") {
true => metric.attributes.get("container_id").map(|container_id| {
Container {
id: String::from(container_id),
runtime: String::from(
metric
.attributes
.get("container_runtime")
.unwrap_or(&String::from("unknown")),
),
scheduler: String::from(
metric
.attributes
.get("container_scheduler")
.unwrap_or(&String::from("unknown")),
),
}
}),
false => None,
},
})
})
.collect::<Vec<_>>();
let all_sockets = metric_generator
.topology
.get_sockets_passive()
.iter()
.filter_map(|socket| {
if let Some(metric) = metrics_iter.find(|x| {
if x.name == "scaph_socket_power_microwatts" {
socket.id
== x.attributes
.get("socket_id")
.unwrap()
.parse::<u16>()
.unwrap()
} else {
info!("socket not found ! ");
false
}
}) {
let socket_power = format!("{}", metric.metric_value).parse::<f32>().unwrap();
let domains = metrics
.iter()
.filter(|x| {
x.name == "scaph_domain_power_microwatts"
&& x.attributes
.get("socket_id")
.unwrap()
.parse::<u16>()
.unwrap()
== socket.id
})
.map(|d| Domain {
name: d.name.clone(),
consumption: format!("{}", d.metric_value).parse::<f32>().unwrap(),
timestamp: d.timestamp.as_secs_f64(),
})
.collect::<Vec<_>>();
Some(Socket {
id: socket.id,
consumption: socket_power,
domains,
timestamp: metric.timestamp.as_secs_f64(),
})
} else {
None
}
})
.collect::<Vec<_>>();
match host_report {
Some(host) => {
let report = Report {
host,
consumers: top_consumers,
sockets: all_sockets,
};
let file_path = parameters.value_of("file_path").unwrap();
if file_path.is_empty() {
let json: String =
serde_json::to_string(&report).expect("Unable to parse report");
println!("{}", &json);
} else {
self.reports.push(report);
let json: String =
serde_json::to_string(&self.reports).expect("Unable to parse report");
let _ = File::create(file_path);
fs::write(file_path, json).expect("Unable to write file");
}
}
None => {
info!("No data yet, didn't write report.");
}
}
}
}
#[cfg(test)]
mod tests {
}