prometheus_exporter/
lib.rs

1//! Helper to export prometheus metrics via http.
2//!
3//! Information on how to use the prometheus crate can be found at
4//! [`prometheus`].
5//!
6//! # Basic Example
7//! The most basic usage of this crate is to just start a http server at the
8//! given binding which will export all metrics registered on the global
9//! prometheus registry:
10//!
11//! ```rust
12//! use prometheus_exporter::{
13//!     self,
14//!     prometheus::register_counter,
15//! };
16//!
17//! let binding = "127.0.0.1:9184".parse().unwrap();
18//! // Will create an exporter and start the http server using the given binding.
19//! // If the webserver can't bind to the given binding it will fail with an error.
20//! prometheus_exporter::start(binding).unwrap();
21//!
22//! // Create a counter using the global prometheus registry and increment it by one.
23//! // Notice that the macro is coming from the reexported prometheus crate instead
24//! // of the original crate. This is important as different versions of the
25//! // prometheus crate have incompatible global registries.
26//! let counter = register_counter!("user_exporter_counter", "help").unwrap();
27//! counter.inc();
28//! ```
29//!
30//! # Wait for request
31//! A probably more useful example in which the exporter waits until a request
32//! comes in and then updates the metrics on the fly:
33//! ```
34//! use prometheus_exporter::{
35//!     self,
36//!     prometheus::register_counter,
37//! };
38//! # let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
39//! # {
40//! #   let barrier = barrier.clone();
41//! #   std::thread::spawn(move || {
42//! #     println!("client barrier");
43//! #     barrier.wait();
44//! #     let body = reqwest::blocking::get("http://127.0.0.1:9185").unwrap().text().unwrap();
45//! #     println!("body = {:?}", body);
46//! #   });
47//! # }
48//!
49//! let binding = "127.0.0.1:9185".parse().unwrap();
50//! let exporter = prometheus_exporter::start(binding).unwrap();
51//!
52//! let counter = register_counter!("example_exporter_counter", "help").unwrap();
53//!
54//! # barrier.wait();
55//! // Wait will return a waitgroup so we need to bind the return value.
56//! // The webserver will wait with responding to the request until the
57//! // waitgroup has been dropped
58//! let guard = exporter.wait_request();
59//!
60//! // Updates can safely happen after the wait. This has the advantage
61//! // that metrics are always in sync with the exporter so you won't
62//! // get half updated metrics.
63//! counter.inc();
64//!
65//! // Drop the guard after metrics have been updated.
66//! drop(guard);
67//! ```
68//!
69//! # Update periodically
70//! Another use case is to update the metrics periodically instead of updating
71//! them for each request. This could be useful if the generation of the
72//! metrics is expensive and shouldn't happen all the time.
73//! ```
74//! use prometheus_exporter::{
75//!     self,
76//!     prometheus::register_counter,
77//! };
78//! # let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
79//! # {
80//! #   let barrier = barrier.clone();
81//! #   std::thread::spawn(move || {
82//! #     println!("client barrier");
83//! #     barrier.wait();
84//! #     let body = reqwest::blocking::get("http://127.0.0.1:9186").unwrap().text().unwrap();
85//! #     println!("body = {:?}", body);
86//! #   });
87//! # }
88//!
89//! let binding = "127.0.0.1:9186".parse().unwrap();
90//! let exporter = prometheus_exporter::start(binding).unwrap();
91//!
92//! let counter = register_counter!("example_exporter_counter", "help").unwrap();
93//!
94//! // Wait for one second and then update the metrics. `wait_duration` will
95//! // return a mutex guard which makes sure that the http server won't
96//! // respond while the metrics get updated.
97//! let guard = exporter.wait_duration(std::time::Duration::from_millis(100));
98//! # barrier.wait();
99//! counter.inc();
100//! drop(guard);
101//! ```
102//!
103//! You can find examples under [`/examples`](https://github.com/AlexanderThaller/prometheus_exporter/tree/master/examples).
104//!
105//! # Crate Features
106//! ## `logging`
107//! *Enabled by default*: yes
108//!
109//! Enables startup logging and failed request logging using the
110//! [`log`](https://crates.io/crates/log) crate.
111//!
112//! ## `internal_metrics`
113//! *Enabled by default*: yes
114//!
115//! Enables the registration of internal metrics used by the crate. Will enable
116//! the following metrics:
117//! * `prometheus_exporter_requests_total`: Number of HTTP requests received.
118//! * `prometheus_exporter_response_size_bytes`: The HTTP response sizes in
119//!   bytes.
120//! * `prometheus_exporter_request_duration_seconds`: The HTTP request latencies
121//!   in seconds.
122//!
123//! This feature will not work in combination with using a custom registry.
124
125#![deny(missing_docs)]
126#![forbid(unsafe_code)]
127#![warn(clippy::pedantic)]
128#![warn(clippy::unwrap_used)]
129#![warn(rust_2018_idioms, unused_lifetimes, missing_debug_implementations)]
130
131#[cfg(test)]
132mod test;
133
134// Reexport prometheus so version missmatches don't happen.
135pub use prometheus;
136
137#[cfg(feature = "internal_metrics")]
138use crate::prometheus::{
139    register_histogram,
140    register_int_counter,
141    register_int_gauge,
142    Histogram,
143    IntCounter,
144    IntGauge,
145};
146#[cfg(feature = "internal_metrics")]
147use lazy_static::lazy_static;
148#[cfg(feature = "logging")]
149use log::{
150    error,
151    info,
152};
153
154use crate::prometheus::{
155    Encoder,
156    TextEncoder,
157};
158use std::{
159    net::SocketAddr,
160    sync::{
161        atomic::{
162            AtomicBool,
163            Ordering,
164        },
165        mpsc::{
166            sync_channel,
167            Receiver,
168            SyncSender,
169        },
170        Arc,
171        Barrier,
172        Mutex,
173        MutexGuard,
174    },
175    thread,
176    time::Duration,
177};
178use thiserror::Error;
179use tiny_http::{
180    Header,
181    Request,
182    Response,
183    Server as HTTPServer,
184};
185
186#[cfg(feature = "internal_metrics")]
187lazy_static! {
188    static ref HTTP_COUNTER: IntCounter = register_int_counter!(
189        "prometheus_exporter_requests_total",
190        "Number of HTTP requests received."
191    )
192    .expect("can not create HTTP_COUNTER metric. this should never fail");
193    static ref HTTP_BODY_GAUGE: IntGauge = register_int_gauge!(
194        "prometheus_exporter_response_size_bytes",
195        "The HTTP response sizes in bytes."
196    )
197    .expect("can not create HTTP_BODY_GAUGE metric. this should never fail");
198    static ref HTTP_REQ_HISTOGRAM: Histogram = register_histogram!(
199        "prometheus_exporter_request_duration_seconds",
200        "The HTTP request latencies in seconds."
201    )
202    .expect("can not create HTTP_REQ_HISTOGRAM metric. this should never fail");
203}
204
205/// Errors that can occur while building or running an exporter.
206#[derive(Debug, Error)]
207pub enum Error {
208    /// Returned when trying to start the exporter and
209    /// [`tiny_http::Server::http`] fails.
210    #[error("can not start http server: {0}")]
211    ServerStart(Box<dyn std::error::Error + Send + Sync + 'static>),
212    /// Returned when supplying a non-ascii endpoint to
213    /// [`Builder::with_endpoint`].
214    #[error("supplied endpoint is not valid ascii: {0}")]
215    EndpointNotAscii(String),
216}
217
218/// Errors that can occur while handling requests.
219#[derive(Debug, Error)]
220enum HandlerError {
221    /// Returned when the encoding of the metrics by
222    /// [`prometheus::Encoder::encode`] fails.
223    #[error("can not encode metrics: {0}")]
224    EncodeMetrics(prometheus::Error),
225
226    /// Returned when returning encoded metrics via response with
227    /// [`tiny_http::Request::respond`] fails.
228    #[error("can not generate response: {0}")]
229    Response(std::io::Error),
230}
231
232/// Builder to create a new [`crate::Exporter`].
233#[derive(Debug)]
234pub struct Builder {
235    binding: SocketAddr,
236    endpoint: Endpoint,
237    registry: prometheus::Registry,
238}
239
240#[derive(Debug)]
241struct Endpoint(String);
242impl Default for Endpoint {
243    fn default() -> Self {
244        Self("/metrics".to_string())
245    }
246}
247
248/// Helper to export prometheus metrics via http.
249#[derive(Debug)]
250pub struct Exporter {
251    request_receiver: Receiver<Arc<Barrier>>,
252    is_waiting: Arc<AtomicBool>,
253    update_lock: Arc<Mutex<()>>,
254}
255
256/// Helper to export prometheus metrics via http.
257#[derive(Debug)]
258struct Server {}
259
260/// Create and start a new exporter which uses the given socket address to
261/// export the metrics.
262/// # Errors
263///
264/// Will return [`enum@Error`] if the http server fails to start for any reason.
265pub fn start(binding: SocketAddr) -> Result<Exporter, Error> {
266    Builder::new(binding).start()
267}
268
269impl Builder {
270    /// Create a new builder with the given binding.
271    #[must_use]
272    pub fn new(binding: SocketAddr) -> Builder {
273        Self {
274            binding,
275            endpoint: Endpoint::default(),
276            registry: prometheus::default_registry().clone(),
277        }
278    }
279
280    /// Sets the endpoint that the metrics will be served on. If the endpoint is
281    /// not set with this method then the default `/metrics` will be used.
282    /// # Errors
283    ///
284    /// Will return [`enum@Error`] if the supplied string slice is not valid
285    /// ascii.
286    pub fn with_endpoint(&mut self, endpoint: &str) -> Result<(), Error> {
287        if !endpoint.is_ascii() {
288            return Err(Error::EndpointNotAscii(endpoint.to_string()));
289        }
290        let mut clean_endpoint = String::from('/');
291        clean_endpoint.push_str(endpoint.trim_matches('/'));
292        self.endpoint = Endpoint(clean_endpoint);
293        Ok(())
294    }
295
296    /// Sets the registry the metrics will be gathered from. If the registry is
297    /// not set, the default registry provided by the prometheus crate will be
298    /// used. If a custom registry is used, the metrics provided by the
299    /// `internal_metrics` feature are not available.
300    pub fn with_registry(&mut self, registry: prometheus::Registry) {
301        self.registry = registry;
302    }
303
304    /// Create and start new exporter based on the information from the builder.
305    /// # Errors
306    ///
307    /// Will return [`enum@Error`] if the http server fails to start for any
308    /// reason.
309    pub fn start(self) -> Result<Exporter, Error> {
310        let (request_sender, request_receiver) = sync_channel(0);
311        let is_waiting = Arc::new(AtomicBool::new(false));
312        let update_lock = Arc::new(Mutex::new(()));
313
314        let exporter = Exporter {
315            request_receiver,
316            is_waiting: Arc::clone(&is_waiting),
317            update_lock: Arc::clone(&update_lock),
318        };
319
320        Server::start(
321            self.binding,
322            self.endpoint.0,
323            request_sender,
324            is_waiting,
325            update_lock,
326            self.registry,
327        )?;
328
329        Ok(exporter)
330    }
331}
332
333impl Exporter {
334    /// Return new builder which will create a exporter once built.
335    #[must_use]
336    pub fn builder(binding: SocketAddr) -> Builder {
337        Builder::new(binding)
338    }
339
340    /// Wait until a new request comes in. Returns a mutex guard to make the
341    /// http server wait until the metrics have been updated.
342    #[must_use = "not using the guard will result in the exporter returning the prometheus data \
343                  immediately over http"]
344    pub fn wait_request(&self) -> MutexGuard<'_, ()> {
345        self.is_waiting.store(true, Ordering::SeqCst);
346
347        let update_waitgroup = self
348            .request_receiver
349            .recv()
350            .expect("can not receive from request_receiver channel. this should never happen");
351
352        self.is_waiting.store(false, Ordering::SeqCst);
353
354        let guard = self
355            .update_lock
356            .lock()
357            .expect("poisioned mutex. should never happen");
358
359        update_waitgroup.wait();
360
361        guard
362    }
363
364    /// Wait for given duration. Returns a mutex guard to make the http
365    /// server wait until the metrics have been updated.
366    #[must_use = "not using the guard will result in the exporter returning the prometheus data \
367                  immediately over http"]
368    pub fn wait_duration(&self, duration: Duration) -> MutexGuard<'_, ()> {
369        thread::sleep(duration);
370
371        self.update_lock
372            .lock()
373            .expect("poisioned mutex. should never happen")
374    }
375}
376
377impl Server {
378    fn start(
379        binding: SocketAddr,
380        endpoint: String,
381        request_sender: SyncSender<Arc<Barrier>>,
382        is_waiting: Arc<AtomicBool>,
383        update_lock: Arc<Mutex<()>>,
384        registry: prometheus::Registry,
385    ) -> Result<(), Error> {
386        let server = HTTPServer::http(&binding).map_err(Error::ServerStart)?;
387
388        thread::spawn(move || {
389            #[cfg(feature = "logging")]
390            info!("exporting metrics to http://{}{}", binding, endpoint);
391
392            let encoder = TextEncoder::new();
393
394            for request in server.incoming_requests() {
395                if let Err(err) = if request.url() == endpoint {
396                    Self::handler_metrics(
397                        request,
398                        &encoder,
399                        &request_sender,
400                        &is_waiting,
401                        &update_lock,
402                        &registry,
403                    )
404                } else {
405                    Self::handler_redirect(request, &endpoint)
406                } {
407                    #[cfg(feature = "logging")]
408                    error!("{}", err);
409
410                    // Just so there are no complains about unused err variable when logging
411                    // feature is disabled
412                    drop(err);
413                }
414            }
415        });
416
417        Ok(())
418    }
419
420    fn handler_metrics(
421        request: Request,
422        encoder: &TextEncoder,
423        request_sender: &SyncSender<Arc<Barrier>>,
424        is_waiting: &Arc<AtomicBool>,
425        update_lock: &Arc<Mutex<()>>,
426        registry: &prometheus::Registry,
427    ) -> Result<(), HandlerError> {
428        #[cfg(feature = "internal_metrics")]
429        HTTP_COUNTER.inc();
430
431        #[cfg(feature = "internal_metrics")]
432        let timer = HTTP_REQ_HISTOGRAM.start_timer();
433
434        if is_waiting.load(Ordering::SeqCst) {
435            let barrier = Arc::new(Barrier::new(2));
436
437            request_sender
438                .send(Arc::clone(&barrier))
439                .expect("can not send to request_sender. this should never happen");
440
441            barrier.wait();
442        }
443
444        let _lock = update_lock
445            .lock()
446            .expect("poisioned mutex. should never happen");
447
448        #[cfg(feature = "internal_metrics")]
449        drop(timer);
450
451        Self::process_request(request, encoder, registry)
452    }
453
454    fn process_request(
455        request: Request,
456        encoder: &TextEncoder,
457        registry: &prometheus::Registry,
458    ) -> Result<(), HandlerError> {
459        let metric_families = registry.gather();
460        let mut buffer = vec![];
461
462        encoder
463            .encode(&metric_families, &mut buffer)
464            .map_err(HandlerError::EncodeMetrics)?;
465
466        #[cfg(feature = "internal_metrics")]
467        HTTP_BODY_GAUGE.set(buffer.len() as i64);
468
469        let response = Response::from_data(buffer);
470        request.respond(response).map_err(HandlerError::Response)?;
471
472        Ok(())
473    }
474
475    fn handler_redirect(request: Request, endpoint: &str) -> Result<(), HandlerError> {
476        let response = Response::from_string(format!("try {} for metrics\n", endpoint))
477            .with_status_code(301)
478            .with_header(Header {
479                field: "Location"
480                    .parse()
481                    .expect("can not parse location header field. this should never fail"),
482                value: ascii::AsciiString::from_ascii(endpoint)
483                    .expect("can not parse header value. this should never fail"),
484            });
485
486        request.respond(response).map_err(HandlerError::Response)?;
487
488        Ok(())
489    }
490}