use crate::exporters::utils::get_hostname;
use crate::exporters::*;
use crate::sensors::Sensor;
use chrono::Utc;
use riemann_client::proto::Attribute;
use riemann_client::proto::Event;
use riemann_client::Client;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
const DEFAULT_IP_ADDRESS: &str = "localhost";
const DEFAULT_PORT: &str = "5555";
struct RiemannClient {
client: Client,
}
impl RiemannClient {
fn new(address: &str, port: &str) -> RiemannClient {
let address = String::from(address);
let port = port.parse::<u16>().expect("Fail parsing port number");
let client = Client::connect(&(address, port)).expect("Fail to connect to Riemann server");
RiemannClient { client }
}
fn send_metric(&mut self, metric: &Metric) {
let mut event = Event::new();
let mut attributes: Vec<Attribute> = vec![];
for (key, value) in &metric.attributes {
let mut attribute = Attribute::new();
attribute.set_key(key.clone());
attribute.set_value(value.clone());
attributes.push(attribute);
}
event.set_time(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
event.set_ttl(metric.ttl);
event.set_host(metric.hostname.to_string());
event.set_service(metric.name.to_string());
event.set_state(metric.state.to_string());
event.set_tags(protobuf::RepeatedField::from_vec(metric.tags.clone()));
if !attributes.is_empty() {
event.set_attributes(protobuf::RepeatedField::from_vec(attributes));
}
event.set_description(metric.description.to_string());
match metric.metric_value {
MetricValueType::FloatDouble(value) => event.set_metric_d(value),
MetricValueType::IntUnsigned(value) => event.set_metric_sint64(
i64::try_from(value).expect("Metric cannot be converted to signed integer."),
),
MetricValueType::Text(ref value) => {
let value = value.replace(",", ".").replace("\n", "");
if value.contains('.') {
event.set_metric_d(value.parse::<f64>().expect("Cannot parse metric value."));
} else {
event.set_metric_sint64(
value.parse::<i64>().expect("Cannot parse metric value."),
);
}
}
}
self.client
.event(event)
.expect("Fail to send metric to Riemann");
}
}
pub struct RiemannExporter {
sensor: Box<dyn Sensor>,
}
impl RiemannExporter {
pub fn new(sensor: Box<dyn Sensor>) -> RiemannExporter {
RiemannExporter { sensor }
}
}
impl Exporter for RiemannExporter {
fn run(&mut self, parameters: ArgMatches) {
let dispatch_duration: u64 = parameters
.value_of("dispatch_duration")
.unwrap()
.parse()
.expect("Wrong dispatch_duration value, should be a number of seconds");
let hostname = get_hostname();
let mut rclient = RiemannClient::new(
parameters.value_of("address").unwrap(),
parameters.value_of("port").unwrap(),
);
info!(
"{}: Starting Riemann exporter",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
);
println!("Press CTRL-C to stop scaphandre");
println!("Measurement step is: {}s", dispatch_duration);
let mut topology = self.sensor.get_topology().unwrap();
loop {
info!(
"{}: Beginning of measure loop",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
);
topology
.proc_tracker
.clean_terminated_process_records_vectors();
info!(
"{}: Refresh topology",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
);
topology.refresh();
info!("{}: Refresh data", Utc::now().format("%Y-%m-%dT%H:%M:%S"));
let mut metric_generator = MetricGenerator::new(&topology, &hostname);
metric_generator.gen_self_metrics();
metric_generator.gen_host_metrics();
metric_generator.gen_socket_metrics();
let mut data = vec![];
let processes_tracker = &metric_generator.topology.proc_tracker;
for pid in processes_tracker.get_alive_pids() {
let exe = processes_tracker.get_process_name(pid);
let cmdline = processes_tracker.get_process_cmdline(pid);
let mut attributes = HashMap::new();
attributes.insert("pid".to_string(), pid.to_string());
attributes.insert("exe".to_string(), exe.clone());
if let Some(cmdline_str) = cmdline {
attributes.insert("cmdline".to_string(), cmdline_str.replace("\"", "\\\""));
if parameters.is_present("qemu") {
if let Some(vmname) = utils::filter_qemu_cmdline(&cmdline_str) {
attributes.insert("vmname".to_string(), vmname);
}
}
}
let metric_name = format!(
"{}_{}_{}",
"scaph_process_power_consumption_microwatts",
pid.to_string(),
exe
);
if let Some(power) = topology.get_process_power_consumption_microwatts(pid) {
data.push(Metric {
name: metric_name,
metric_type: String::from("gauge"),
ttl: 60.0,
hostname: get_hostname(),
state: String::from("ok"),
tags: vec!["scaphandre".to_string()],
attributes,
description: String::from("Power consumption due to the process, measured on at the topology level, in microwatts"),
metric_value: MetricValueType::Text(power.to_string()),
});
}
}
info!("{}: Send data", Utc::now().format("%Y-%m-%dT%H:%M:%S"));
for metric in metric_generator.get_metrics() {
rclient.send_metric(metric);
}
for metric in data {
rclient.send_metric(&metric);
}
thread::sleep(Duration::new(dispatch_duration, 0));
}
}
fn get_options() -> HashMap<String, ExporterOption> {
let mut options = HashMap::new();
options.insert(
String::from("address"),
ExporterOption {
default_value: Some(String::from(DEFAULT_IP_ADDRESS)),
help: String::from("Riemann ipv6 or ipv4 address"),
long: String::from("address"),
short: String::from("a"),
required: false,
takes_value: true,
},
);
options.insert(
String::from("port"),
ExporterOption {
default_value: Some(String::from(DEFAULT_PORT)),
help: String::from("Riemann TCP port number"),
long: String::from("port"),
short: String::from("p"),
required: false,
takes_value: true,
},
);
options.insert(
String::from("dispatch_duration"),
ExporterOption {
default_value: Some(String::from("5")),
help: String::from("Duration between metrics dispatch"),
long: String::from("dispatch"),
short: String::from("d"),
required: false,
takes_value: true,
},
);
options.insert(
String::from("qemu"),
ExporterOption {
default_value: None,
help: String::from("Instruct that scaphandre is running on an hypervisor"),
long: String::from("qemu"),
short: String::from("q"),
required: false,
takes_value: false,
},
);
options
}
}