use super::utils::get_hostname;
use crate::current_system_time_since_epoch;
use crate::exporters::{Exporter, MetricGenerator, MetricValueType};
use crate::sensors::{Sensor, Topology};
use chrono::Utc;
use clap::{Arg, ArgMatches};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
use std::fmt::Write as _;
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex},
time::Duration,
};
const DEFAULT_IP_ADDRESS: &str = "::";
pub struct PrometheusExporter {
sensor: Box<dyn Sensor>,
}
impl PrometheusExporter {
pub fn new(sensor: Box<dyn Sensor>) -> PrometheusExporter {
PrometheusExporter { sensor }
}
}
impl Exporter for PrometheusExporter {
fn run(&mut self, parameters: ArgMatches) {
info!(
"{}: Starting Prometheus exporter",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
);
println!("Press CTRL-C to stop scaphandre");
runner(
(*self.sensor.get_topology()).unwrap(),
parameters.value_of("address").unwrap().to_string(),
parameters.value_of("port").unwrap().to_string(),
parameters.value_of("suffix").unwrap().to_string(),
parameters.is_present("qemu"),
parameters.is_present("containers"),
get_hostname(),
);
}
fn get_options() -> Vec<clap::Arg<'static, 'static>> {
let mut options = Vec::new();
let arg = Arg::with_name("address")
.default_value(DEFAULT_IP_ADDRESS)
.help("ipv6 or ipv4 address to expose the service to")
.long("address")
.short("a")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("port")
.default_value("8080")
.help("TCP port number to expose the service")
.long("port")
.short("p")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("suffix")
.default_value("metrics")
.help("url suffix to access metrics")
.long("suffix")
.short("s")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("qemu")
.help("Apply labels to metrics of processes looking like a Qemu/KVM virtual machine")
.long("qemu")
.short("q")
.required(false)
.takes_value(false);
options.push(arg);
let arg = Arg::with_name("containers")
.help("Monitor and apply labels for processes running as containers")
.long("containers")
.required(false)
.takes_value(false);
options.push(arg);
let arg = Arg::with_name("kubernetes_host")
.help("FQDN of the kubernetes API server")
.long("kubernetes-host")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("kubernetes_scheme")
.help("Protocol used to access kubernetes API server")
.long("kubernetes-scheme")
.default_value("http")
.required(false)
.takes_value(true);
options.push(arg);
let arg = Arg::with_name("kubernetes_port")
.help("Kubernetes API server port number")
.long("kubernetes-port")
.default_value("6443")
.required(false)
.takes_value(true);
options.push(arg);
options
}
}
struct PowerMetrics {
last_request: Mutex<Duration>,
metric_generator: Mutex<MetricGenerator>,
}
#[tokio::main]
async fn runner(
topology: Topology,
address: String,
port: String,
suffix: String,
qemu: bool,
watch_containers: bool,
hostname: String,
) {
if let Ok(addr) = address.parse::<IpAddr>() {
if let Ok(port) = port.parse::<u16>() {
let socket_addr = SocketAddr::new(addr, port);
let power_metrics = PowerMetrics {
last_request: Mutex::new(Duration::new(0, 0)),
metric_generator: Mutex::new(MetricGenerator::new(
topology,
hostname.clone(),
qemu,
watch_containers,
)),
};
let context = Arc::new(power_metrics);
let make_svc = make_service_fn(move |_| {
let ctx = context.clone();
let sfx = suffix.clone();
async {
Ok::<_, Infallible>(service_fn(move |req| {
show_metrics(req, ctx.clone(), sfx.clone())
}))
}
});
let server = Server::bind(&socket_addr);
let res = server.serve(make_svc);
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let graceful = res.with_graceful_shutdown(async {
rx.await.ok();
});
if let Err(e) = graceful.await {
error!("server error: {}", e);
}
let _ = tx.send(());
} else {
panic!("{} is not a valid TCP port number", port);
}
} else {
panic!("{} is not a valid ip address", address);
}
}
fn format_metric(key: &str, value: &str, labels: Option<&HashMap<String, String>>) -> String {
let mut result = key.to_string();
if let Some(labels) = labels {
result.push('{');
for (k, v) in labels.iter() {
let _ = write!(result, "{}=\"{}\",", k, v.replace('\"', "_"));
}
result.remove(result.len() - 1);
result.push('}');
}
let _ = writeln!(result, " {value}");
result
}
fn push_metric(
mut body: String,
help: String,
metric_type: String,
metric_name: String,
metric_line: String,
add_help: bool,
) -> String {
if add_help {
let _ = write!(body, "# HELP {metric_name} {help}");
let _ = write!(body, "\n# TYPE {metric_name} {metric_type}\n");
}
body.push_str(&metric_line);
body
}
async fn show_metrics(
req: Request<Body>,
context: Arc<PowerMetrics>,
suffix: String,
) -> Result<Response<Body>, Infallible> {
trace!("{}", req.uri());
let mut body = String::new();
if req.uri().path() == format!("/{}", &suffix) {
trace!("in metrics !");
let now = current_system_time_since_epoch();
let mut last_request = context.last_request.lock().unwrap();
let mut metric_generator = context.metric_generator.lock().unwrap();
if now - (*last_request) > Duration::from_secs(2) {
{
info!(
"{}: Refresh topology",
Utc::now().format("%Y-%m-%dT%H:%M:%S")
);
metric_generator
.topology
.proc_tracker
.clean_terminated_process_records_vectors();
metric_generator.topology.refresh();
}
}
*last_request = now;
info!("{}: Refresh data", Utc::now().format("%Y-%m-%dT%H:%M:%S"));
metric_generator.gen_all_metrics();
let mut metrics_pushed: Vec<String> = vec![];
for msg in metric_generator.pop_metrics() {
let mut attributes: Option<&HashMap<String, String>> = None;
if !msg.attributes.is_empty() {
attributes = Some(&msg.attributes);
}
let value = match msg.metric_value {
MetricValueType::FloatDouble(value) => value.to_string(),
MetricValueType::IntUnsigned(value) => value.to_string(),
MetricValueType::Text(ref value) => value.to_string(),
};
let mut should_i_add_help = true;
if metrics_pushed.contains(&msg.name) {
should_i_add_help = false;
} else {
metrics_pushed.insert(0, msg.name.clone());
}
body = push_metric(
body,
msg.description.clone(),
msg.metric_type.clone(),
msg.name.clone(),
format_metric(&msg.name, &value, attributes),
should_i_add_help,
);
}
} else {
let _ = write!(body, "<a href=\"https://github.com/hubblo-org/scaphandre/\">Scaphandre's</a> prometheus exporter here. Metrics available on <a href=\"/{suffix}\">/{suffix}</a>");
}
Ok(Response::new(body.into()))
}