hotmic-prometheus 0.1.0

a Prometheus exporter for hotmic
Documentation
//! Exports metrics by exposing a Prometheus exporter endpoint.
extern crate futures;
extern crate hotmic;
extern crate hyper;

use futures::prelude::*;
use hotmic::{
    snapshot::{Snapshot, TypedMeasurement},
    Controller,
};
use hyper::{
    error::Error as HyperError, server::Server as HyperServer, service::service_fn,
    Body as HyperBody, Response as HyperResponse,
};
use std::error::Error;
use std::fmt;
use std::net::SocketAddr;

/// Exports metrics by exposing a Prometheus exporter endpoint.
pub struct PrometheusExporter {
    controller: Controller,
    addr: SocketAddr,
}

impl PrometheusExporter {
    /// Creates a new [`PrometheusExporter`].
    ///
    /// When run/spawned, the exporter will listen at the given socket address, responding to any
    /// requests by taking a snapshot and returning it in the text-based Prometheus exposition
    /// format.
    pub fn new(controller: Controller, addr: SocketAddr) -> Self {
        PrometheusExporter { controller, addr }
    }

    /// Runs the exporter synchronously, blocking the calling thread.
    ///
    /// You should run this in a dedicated thread:
    ///
    /// ```c
    /// let addr = "localhost:9090".parse().expect("failed to parse listen address");
    /// let mut exporter = PrometheusExporter::new(controller, addr);
    /// std::thread::spawn(move || exporter.run());
    /// ```
    pub fn run(self) {
        let server = self.into_future();
        let _ = server.wait();
    }

    /// Converts this exporter into a future that runs the Hyper-based exporter endpoint.
    ///
    /// ```c
    /// let addr = "localhost:9090".parse().expect("failed to parse listen address");
    /// let exporter = PrometheusExporter::new(controller, addr);
    /// let server = exporter.into_future().map_err(|_| ());
    /// tokio::run(server);
    /// ```
    pub fn into_future(self) -> impl Future<Item = (), Error = HyperError> + Send {
        let addr = self.addr;
        let new_exporter_svc = move || {
            let controller = self.controller.clone();

            service_fn(move |_| {
                controller
                    .get_snapshot_async()
                    .map_err(|_| ExporterError::Snapshot)
                    .into_future()
                    .and_then(|rx| rx.map_err(|_| ExporterError::Snapshot))
                    .and_then(process_snapshot)
                    .and_then(|output| Ok(HyperResponse::new(HyperBody::from(output))))
            })
        };

        HyperServer::bind(&addr).serve(new_exporter_svc)
    }
}

#[derive(Debug)]
enum ExporterError {
    Snapshot,
}

impl Error for ExporterError {}

impl fmt::Display for ExporterError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            ExporterError::Snapshot => f.write_str("snapshot failed"),
        }
    }
}

fn process_snapshot(snapshot: Snapshot) -> Result<String, ExporterError> {
    let mut output = String::from("# hotmic-prometheus exporter\n");

    for measurement in snapshot.into_vec() {
        output.push_str("\n");

        match measurement {
            TypedMeasurement::Counter(label, value) => {
                let label = label.replace('.', "_");
                output.push_str("# TYPE ");
                output.push_str(label.as_str());
                output.push_str(" counter\n");
                output.push_str(label.as_str());
                output.push_str(" ");
                output.push_str(value.to_string().as_str());
                output.push_str("\n");
            }
            TypedMeasurement::Gauge(label, value) => {
                let label = label.replace('.', "_");
                output.push_str("# TYPE ");
                output.push_str(label.as_str());
                output.push_str(" gauge\n");
                output.push_str(label.as_str());
                output.push_str(" ");
                output.push_str(value.to_string().as_str());
                output.push_str("\n");
            }
            TypedMeasurement::TimingHistogram(label, summary) => {
                let label = label.replace('.', "_");
                output.push_str("# TYPE ");
                output.push_str(label.as_str());
                output.push_str("_nanoseconds summary\n");
                for (percentile, value) in summary.measurements() {
                    output.push_str(label.as_str());
                    output.push_str("_nanoseconds{quantile=\"");
                    output.push_str(percentile.as_quantile().to_string().as_str());
                    output.push_str("\"} ");
                    output.push_str(value.to_string().as_str());
                    output.push_str("\n");
                }
                output.push_str(label.as_str());
                output.push_str("_nanoseconds_sum ");
                output.push_str(summary.sum().to_string().as_str());
                output.push_str("\n");
                output.push_str(label.as_str());
                output.push_str("_nanoseconds_count ");
                output.push_str(summary.count().to_string().as_str());
                output.push_str("\n");
            }
            TypedMeasurement::ValueHistogram(label, summary) => {
                let label = label.replace('.', "_");
                output.push_str("# TYPE ");
                output.push_str(label.as_str());
                output.push_str(" summary\n");
                for (percentile, value) in summary.measurements() {
                    output.push_str(label.as_str());
                    output.push_str("{quantile=\"");
                    output.push_str(percentile.as_quantile().to_string().as_str());
                    output.push_str("\"} ");
                    output.push_str(value.to_string().as_str());
                    output.push_str("\n");
                }
                output.push_str(label.as_str());
                output.push_str("_sum ");
                output.push_str(summary.sum().to_string().as_str());
                output.push_str("\n");
                output.push_str(label.as_str());
                output.push_str("_count ");
                output.push_str(summary.count().to_string().as_str());
                output.push_str("\n");
            }
        }
    }

    Ok(output)
}