hotmic_prometheus/
lib.rs

1//! Exports metrics by exposing a Prometheus exporter endpoint.
2extern crate futures;
3extern crate hotmic;
4extern crate hyper;
5
6use futures::prelude::*;
7use hotmic::{
8    snapshot::{Snapshot, TypedMeasurement},
9    Controller,
10};
11use hyper::{
12    error::Error as HyperError, server::Server as HyperServer, service::service_fn,
13    Body as HyperBody, Response as HyperResponse,
14};
15use std::error::Error;
16use std::fmt;
17use std::net::SocketAddr;
18
19/// Exports metrics by exposing a Prometheus exporter endpoint.
20pub struct PrometheusExporter {
21    controller: Controller,
22    addr: SocketAddr,
23}
24
25impl PrometheusExporter {
26    /// Creates a new [`PrometheusExporter`].
27    ///
28    /// When run/spawned, the exporter will listen at the given socket address, responding to any
29    /// requests by taking a snapshot and returning it in the text-based Prometheus exposition
30    /// format.
31    pub fn new(controller: Controller, addr: SocketAddr) -> Self {
32        PrometheusExporter { controller, addr }
33    }
34
35    /// Runs the exporter synchronously, blocking the calling thread.
36    ///
37    /// You should run this in a dedicated thread:
38    ///
39    /// ```c
40    /// let addr = "localhost:9090".parse().expect("failed to parse listen address");
41    /// let mut exporter = PrometheusExporter::new(controller, addr);
42    /// std::thread::spawn(move || exporter.run());
43    /// ```
44    pub fn run(self) {
45        let server = self.into_future();
46        let _ = server.wait();
47    }
48
49    /// Converts this exporter into a future that runs the Hyper-based exporter endpoint.
50    ///
51    /// ```c
52    /// let addr = "localhost:9090".parse().expect("failed to parse listen address");
53    /// let exporter = PrometheusExporter::new(controller, addr);
54    /// let server = exporter.into_future().map_err(|_| ());
55    /// tokio::run(server);
56    /// ```
57    pub fn into_future(self) -> impl Future<Item = (), Error = HyperError> + Send {
58        let addr = self.addr;
59        let new_exporter_svc = move || {
60            let controller = self.controller.clone();
61
62            service_fn(move |_| {
63                controller
64                    .get_snapshot_async()
65                    .map_err(|_| ExporterError::Snapshot)
66                    .into_future()
67                    .and_then(|rx| rx.map_err(|_| ExporterError::Snapshot))
68                    .and_then(process_snapshot)
69                    .and_then(|output| Ok(HyperResponse::new(HyperBody::from(output))))
70            })
71        };
72
73        HyperServer::bind(&addr).serve(new_exporter_svc)
74    }
75}
76
77#[derive(Debug)]
78enum ExporterError {
79    Snapshot,
80}
81
82impl Error for ExporterError {}
83
84impl fmt::Display for ExporterError {
85    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
86        match self {
87            ExporterError::Snapshot => f.write_str("snapshot failed"),
88        }
89    }
90}
91
92fn process_snapshot(snapshot: Snapshot) -> Result<String, ExporterError> {
93    let mut output = String::from("# hotmic-prometheus exporter\n");
94
95    for measurement in snapshot.into_vec() {
96        output.push_str("\n");
97
98        match measurement {
99            TypedMeasurement::Counter(label, value) => {
100                let label = label.replace('.', "_");
101                output.push_str("# TYPE ");
102                output.push_str(label.as_str());
103                output.push_str(" counter\n");
104                output.push_str(label.as_str());
105                output.push_str(" ");
106                output.push_str(value.to_string().as_str());
107                output.push_str("\n");
108            }
109            TypedMeasurement::Gauge(label, value) => {
110                let label = label.replace('.', "_");
111                output.push_str("# TYPE ");
112                output.push_str(label.as_str());
113                output.push_str(" gauge\n");
114                output.push_str(label.as_str());
115                output.push_str(" ");
116                output.push_str(value.to_string().as_str());
117                output.push_str("\n");
118            }
119            TypedMeasurement::TimingHistogram(label, summary) => {
120                let label = label.replace('.', "_");
121                output.push_str("# TYPE ");
122                output.push_str(label.as_str());
123                output.push_str("_nanoseconds summary\n");
124                for (percentile, value) in summary.measurements() {
125                    output.push_str(label.as_str());
126                    output.push_str("_nanoseconds{quantile=\"");
127                    output.push_str(percentile.as_quantile().to_string().as_str());
128                    output.push_str("\"} ");
129                    output.push_str(value.to_string().as_str());
130                    output.push_str("\n");
131                }
132                output.push_str(label.as_str());
133                output.push_str("_nanoseconds_sum ");
134                output.push_str(summary.sum().to_string().as_str());
135                output.push_str("\n");
136                output.push_str(label.as_str());
137                output.push_str("_nanoseconds_count ");
138                output.push_str(summary.count().to_string().as_str());
139                output.push_str("\n");
140            }
141            TypedMeasurement::ValueHistogram(label, summary) => {
142                let label = label.replace('.', "_");
143                output.push_str("# TYPE ");
144                output.push_str(label.as_str());
145                output.push_str(" summary\n");
146                for (percentile, value) in summary.measurements() {
147                    output.push_str(label.as_str());
148                    output.push_str("{quantile=\"");
149                    output.push_str(percentile.as_quantile().to_string().as_str());
150                    output.push_str("\"} ");
151                    output.push_str(value.to_string().as_str());
152                    output.push_str("\n");
153                }
154                output.push_str(label.as_str());
155                output.push_str("_sum ");
156                output.push_str(summary.sum().to_string().as_str());
157                output.push_str("\n");
158                output.push_str(label.as_str());
159                output.push_str("_count ");
160                output.push_str(summary.count().to_string().as_str());
161                output.push_str("\n");
162            }
163        }
164    }
165
166    Ok(output)
167}