extern crate futures;
extern crate hotmic;
extern crate hyper;
use futures::prelude::*;
use hotmic::{
snapshot::{Snapshot, TypedMeasurement},
Controller,
};
use hyper::{
error::Error as HyperError, server::Server as HyperServer, service::service_fn,
Body as HyperBody, Response as HyperResponse,
};
use std::error::Error;
use std::fmt;
use std::net::SocketAddr;
pub struct PrometheusExporter {
controller: Controller,
addr: SocketAddr,
}
impl PrometheusExporter {
pub fn new(controller: Controller, addr: SocketAddr) -> Self {
PrometheusExporter { controller, addr }
}
pub fn run(self) {
let server = self.into_future();
let _ = server.wait();
}
pub fn into_future(self) -> impl Future<Item = (), Error = HyperError> + Send {
let addr = self.addr;
let new_exporter_svc = move || {
let controller = self.controller.clone();
service_fn(move |_| {
controller
.get_snapshot_async()
.map_err(|_| ExporterError::Snapshot)
.into_future()
.and_then(|rx| rx.map_err(|_| ExporterError::Snapshot))
.and_then(process_snapshot)
.and_then(|output| Ok(HyperResponse::new(HyperBody::from(output))))
})
};
HyperServer::bind(&addr).serve(new_exporter_svc)
}
}
#[derive(Debug)]
enum ExporterError {
Snapshot,
}
impl Error for ExporterError {}
impl fmt::Display for ExporterError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ExporterError::Snapshot => f.write_str("snapshot failed"),
}
}
}
fn process_snapshot(snapshot: Snapshot) -> Result<String, ExporterError> {
let mut output = String::from("# hotmic-prometheus exporter\n");
for measurement in snapshot.into_vec() {
output.push_str("\n");
match measurement {
TypedMeasurement::Counter(label, value) => {
let label = label.replace('.', "_");
output.push_str("# TYPE ");
output.push_str(label.as_str());
output.push_str(" counter\n");
output.push_str(label.as_str());
output.push_str(" ");
output.push_str(value.to_string().as_str());
output.push_str("\n");
}
TypedMeasurement::Gauge(label, value) => {
let label = label.replace('.', "_");
output.push_str("# TYPE ");
output.push_str(label.as_str());
output.push_str(" gauge\n");
output.push_str(label.as_str());
output.push_str(" ");
output.push_str(value.to_string().as_str());
output.push_str("\n");
}
TypedMeasurement::TimingHistogram(label, summary) => {
let label = label.replace('.', "_");
output.push_str("# TYPE ");
output.push_str(label.as_str());
output.push_str("_nanoseconds summary\n");
for (percentile, value) in summary.measurements() {
output.push_str(label.as_str());
output.push_str("_nanoseconds{quantile=\"");
output.push_str(percentile.as_quantile().to_string().as_str());
output.push_str("\"} ");
output.push_str(value.to_string().as_str());
output.push_str("\n");
}
output.push_str(label.as_str());
output.push_str("_nanoseconds_sum ");
output.push_str(summary.sum().to_string().as_str());
output.push_str("\n");
output.push_str(label.as_str());
output.push_str("_nanoseconds_count ");
output.push_str(summary.count().to_string().as_str());
output.push_str("\n");
}
TypedMeasurement::ValueHistogram(label, summary) => {
let label = label.replace('.', "_");
output.push_str("# TYPE ");
output.push_str(label.as_str());
output.push_str(" summary\n");
for (percentile, value) in summary.measurements() {
output.push_str(label.as_str());
output.push_str("{quantile=\"");
output.push_str(percentile.as_quantile().to_string().as_str());
output.push_str("\"} ");
output.push_str(value.to_string().as_str());
output.push_str("\n");
}
output.push_str(label.as_str());
output.push_str("_sum ");
output.push_str(summary.sum().to_string().as_str());
output.push_str("\n");
output.push_str(label.as_str());
output.push_str("_count ");
output.push_str(summary.count().to_string().as_str());
output.push_str("\n");
}
}
}
Ok(output)
}