use crate::current_system_time_since_epoch;
use crate::exporters::*;
use crate::sensors::{Sensor, Topology};
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use chrono::Utc;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Mutex;
use std::time::Duration;
use utils::get_hostname;
const DEFAULT_IP_ADDRESS: &str = "::";
pub struct PrometheusExporter {
sensor: Box<dyn Sensor>,
}
impl PrometheusExporter {
pub fn new(sensor: Box<dyn Sensor>) -> PrometheusExporter {
PrometheusExporter { sensor }
}
}
impl Exporter for PrometheusExporter {
fn run(&mut self, parameters: ArgMatches) {
info!(
"{}: Starting Prometheus exporter",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
);
println!("Press CTRL-C to stop scaphandre");
match runner(
(*self.sensor.get_topology()).unwrap(),
parameters.value_of("address").unwrap().to_string(),
parameters.value_of("port").unwrap().to_string(),
parameters.value_of("suffix").unwrap().to_string(),
parameters.is_present("qemu"),
get_hostname(),
) {
Ok(()) => warn!("Prometheus exporter shut down gracefully."),
Err(error) => panic!("Something failed in the prometheus exporter: {}", error),
}
}
fn get_options() -> HashMap<String, ExporterOption> {
let mut options = HashMap::new();
options.insert(
String::from("address"),
ExporterOption {
default_value: Some(String::from(DEFAULT_IP_ADDRESS)),
help: String::from("ipv6 or ipv4 address to expose the service to"),
long: String::from("address"),
short: String::from("a"),
required: false,
takes_value: true,
},
);
options.insert(
String::from("port"),
ExporterOption {
default_value: Some(String::from("8080")),
help: String::from("TCP port number to expose the service"),
long: String::from("port"),
short: String::from("p"),
required: false,
takes_value: true,
},
);
options.insert(
String::from("suffix"),
ExporterOption {
default_value: Some(String::from("metrics")),
help: String::from("url suffix to access metrics"),
long: String::from("suffix"),
short: String::from("s"),
required: false,
takes_value: true,
},
);
options.insert(
String::from("qemu"),
ExporterOption {
default_value: None,
help: String::from("Instruct that scaphandre is running on an hypervisor"),
long: String::from("qemu"),
short: String::from("q"),
required: false,
takes_value: false,
},
);
options
}
}
struct PowerMetrics {
topology: Mutex<Topology>,
last_request: Mutex<Duration>,
qemu: bool,
hostname: String,
}
#[actix_web::main]
async fn runner(
topology: Topology,
address: String,
port: String,
suffix: String,
qemu: bool,
hostname: String,
) -> std::io::Result<()> {
if let Err(error) = address.parse::<IpAddr>() {
panic!("{} is not a valid ip address: {}", address, error);
}
if let Err(error) = port.parse::<u64>() {
panic!("Not a valid TCP port numer: {}", error);
}
HttpServer::new(move || {
App::new()
.data(PowerMetrics {
topology: Mutex::new(topology.clone()),
last_request: Mutex::new(Duration::new(0, 0)),
qemu,
hostname: hostname.clone(),
})
.service(web::resource(&suffix).route(web::get().to(show_metrics)))
.default_service(web::route().to(landing_page))
})
.workers(1)
.bind(format!("{}:{}", address, port))?
.run()
.await
}
fn format_metric(key: &str, value: &str, labels: Option<&HashMap<String, String>>) -> String {
let mut result = key.to_string();
if let Some(labels) = labels {
result.push('{');
for (k, v) in labels.iter() {
result.push_str(&format!("{}=\"{}\",", k, v));
}
result.remove(result.len() - 1);
result.push('}');
}
result.push_str(&format!(" {}\n", value));
result
}
fn push_metric(
mut body: String,
help: String,
metric_type: String,
metric_name: String,
metric_line: String,
) -> String {
body.push_str(&format!("# HELP {} {}", metric_name, help));
body.push_str(&format!("\n# TYPE {} {}\n", metric_name, metric_type));
body.push_str(&metric_line);
body
}
async fn show_metrics(data: web::Data<PowerMetrics>) -> impl Responder {
let now = current_system_time_since_epoch();
let mut last_request = data.last_request.lock().unwrap();
if now - (*last_request) > Duration::from_secs(5) {
{
info!(
"{}: Refresh topology",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
);
let mut topology = data.topology.lock().unwrap();
(*topology)
.proc_tracker
.clean_terminated_process_records_vectors();
(*topology).refresh();
}
}
*last_request = now;
let topo = data.topology.lock().unwrap();
let mut metric_generator = MetricGenerator::new(&*topo, &data.hostname);
info!("{}: Refresh data", Utc::now().format("%Y-%m-%dT%H:%M:%S"));
let mut body = String::from("");
metric_generator.gen_all_metrics(data.qemu);
for msg in metric_generator.get_metrics() {
let mut attributes: Option<&HashMap<String, String>> = None;
if !msg.attributes.is_empty() {
attributes = Some(&msg.attributes);
}
let value = match msg.metric_value {
MetricValueType::FloatDouble(value) => value.to_string(),
MetricValueType::IntUnsigned(value) => value.to_string(),
MetricValueType::Text(ref value) => value.to_string(),
};
body = push_metric(
body,
msg.description.clone(),
msg.metric_type.clone(),
msg.name.clone(),
format_metric(&msg.name, &value, attributes),
);
}
HttpResponse::Ok()
.body(body)
}
async fn landing_page() -> impl Responder {
let body = String::from(
"<a href=\"https://github.com/hubblo-org/scaphandre/\">Scaphandre's</a> prometheus exporter here. Metrics available on <a href=\"/metrics\">/metrics</a>"
);
HttpResponse::Ok()
.body(body)
}