prometheus_exporter/lib.rs
1//! Helper to export prometheus metrics via http.
2//!
3//! Information on how to use the prometheus crate can be found at
4//! [`prometheus`].
5//!
6//! # Basic Example
7//! The most basic usage of this crate is to just start a http server at the
8//! given binding which will export all metrics registered on the global
9//! prometheus registry:
10//!
11//! ```rust
12//! use prometheus_exporter::{
13//! self,
14//! prometheus::register_counter,
15//! };
16//!
17//! let binding = "127.0.0.1:9184".parse().unwrap();
18//! // Will create an exporter and start the http server using the given binding.
19//! // If the webserver can't bind to the given binding it will fail with an error.
20//! prometheus_exporter::start(binding).unwrap();
21//!
22//! // Create a counter using the global prometheus registry and increment it by one.
23//! // Notice that the macro is coming from the reexported prometheus crate instead
24//! // of the original crate. This is important as different versions of the
25//! // prometheus crate have incompatible global registries.
26//! let counter = register_counter!("user_exporter_counter", "help").unwrap();
27//! counter.inc();
28//! ```
29//!
30//! # Wait for request
31//! A probably more useful example in which the exporter waits until a request
32//! comes in and then updates the metrics on the fly:
33//! ```
34//! use prometheus_exporter::{
35//! self,
36//! prometheus::register_counter,
37//! };
38//! # let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
39//! # {
40//! # let barrier = barrier.clone();
41//! # std::thread::spawn(move || {
42//! # println!("client barrier");
43//! # barrier.wait();
44//! # let body = reqwest::blocking::get("http://127.0.0.1:9185").unwrap().text().unwrap();
45//! # println!("body = {:?}", body);
46//! # });
47//! # }
48//!
49//! let binding = "127.0.0.1:9185".parse().unwrap();
50//! let exporter = prometheus_exporter::start(binding).unwrap();
51//!
52//! let counter = register_counter!("example_exporter_counter", "help").unwrap();
53//!
54//! # barrier.wait();
55//! // Wait will return a waitgroup so we need to bind the return value.
56//! // The webserver will wait with responding to the request until the
57//! // waitgroup has been dropped
58//! let guard = exporter.wait_request();
59//!
60//! // Updates can safely happen after the wait. This has the advantage
61//! // that metrics are always in sync with the exporter so you won't
62//! // get half updated metrics.
63//! counter.inc();
64//!
65//! // Drop the guard after metrics have been updated.
66//! drop(guard);
67//! ```
68//!
69//! # Update periodically
70//! Another use case is to update the metrics periodically instead of updating
71//! them for each request. This could be useful if the generation of the
72//! metrics is expensive and shouldn't happen all the time.
73//! ```
74//! use prometheus_exporter::{
75//! self,
76//! prometheus::register_counter,
77//! };
78//! # let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
79//! # {
80//! # let barrier = barrier.clone();
81//! # std::thread::spawn(move || {
82//! # println!("client barrier");
83//! # barrier.wait();
84//! # let body = reqwest::blocking::get("http://127.0.0.1:9186").unwrap().text().unwrap();
85//! # println!("body = {:?}", body);
86//! # });
87//! # }
88//!
89//! let binding = "127.0.0.1:9186".parse().unwrap();
90//! let exporter = prometheus_exporter::start(binding).unwrap();
91//!
92//! let counter = register_counter!("example_exporter_counter", "help").unwrap();
93//!
94//! // Wait for one second and then update the metrics. `wait_duration` will
95//! // return a mutex guard which makes sure that the http server won't
96//! // respond while the metrics get updated.
97//! let guard = exporter.wait_duration(std::time::Duration::from_millis(100));
98//! # barrier.wait();
99//! counter.inc();
100//! drop(guard);
101//! ```
102//!
103//! You can find examples under [`/examples`](https://github.com/AlexanderThaller/prometheus_exporter/tree/master/examples).
104//!
105//! # Crate Features
106//! ## `logging`
107//! *Enabled by default*: yes
108//!
109//! Enables startup logging and failed request logging using the
110//! [`log`](https://crates.io/crates/log) crate.
111//!
112//! ## `internal_metrics`
113//! *Enabled by default*: yes
114//!
115//! Enables the registration of internal metrics used by the crate. Will enable
116//! the following metrics:
117//! * `prometheus_exporter_requests_total`: Number of HTTP requests received.
118//! * `prometheus_exporter_response_size_bytes`: The HTTP response sizes in
119//! bytes.
120//! * `prometheus_exporter_request_duration_seconds`: The HTTP request latencies
121//! in seconds.
122//!
123//! This feature will not work in combination with using a custom registry.
124
125#![deny(missing_docs)]
126#![forbid(unsafe_code)]
127#![warn(clippy::pedantic)]
128#![warn(clippy::unwrap_used)]
129#![warn(rust_2018_idioms, unused_lifetimes, missing_debug_implementations)]
130
131#[cfg(test)]
132mod test;
133
134// Reexport prometheus so version missmatches don't happen.
135pub use prometheus;
136
137#[cfg(feature = "internal_metrics")]
138use crate::prometheus::{
139 register_histogram,
140 register_int_counter,
141 register_int_gauge,
142 Histogram,
143 IntCounter,
144 IntGauge,
145};
146#[cfg(feature = "internal_metrics")]
147use lazy_static::lazy_static;
148#[cfg(feature = "logging")]
149use log::{
150 error,
151 info,
152};
153
154use crate::prometheus::{
155 Encoder,
156 TextEncoder,
157};
158use std::{
159 net::SocketAddr,
160 sync::{
161 atomic::{
162 AtomicBool,
163 Ordering,
164 },
165 mpsc::{
166 sync_channel,
167 Receiver,
168 SyncSender,
169 },
170 Arc,
171 Barrier,
172 Mutex,
173 MutexGuard,
174 },
175 thread,
176 time::Duration,
177};
178use thiserror::Error;
179use tiny_http::{
180 Header,
181 Request,
182 Response,
183 Server as HTTPServer,
184};
185
186#[cfg(feature = "internal_metrics")]
187lazy_static! {
188 static ref HTTP_COUNTER: IntCounter = register_int_counter!(
189 "prometheus_exporter_requests_total",
190 "Number of HTTP requests received."
191 )
192 .expect("can not create HTTP_COUNTER metric. this should never fail");
193 static ref HTTP_BODY_GAUGE: IntGauge = register_int_gauge!(
194 "prometheus_exporter_response_size_bytes",
195 "The HTTP response sizes in bytes."
196 )
197 .expect("can not create HTTP_BODY_GAUGE metric. this should never fail");
198 static ref HTTP_REQ_HISTOGRAM: Histogram = register_histogram!(
199 "prometheus_exporter_request_duration_seconds",
200 "The HTTP request latencies in seconds."
201 )
202 .expect("can not create HTTP_REQ_HISTOGRAM metric. this should never fail");
203}
204
205/// Errors that can occur while building or running an exporter.
206#[derive(Debug, Error)]
207pub enum Error {
208 /// Returned when trying to start the exporter and
209 /// [`tiny_http::Server::http`] fails.
210 #[error("can not start http server: {0}")]
211 ServerStart(Box<dyn std::error::Error + Send + Sync + 'static>),
212 /// Returned when supplying a non-ascii endpoint to
213 /// [`Builder::with_endpoint`].
214 #[error("supplied endpoint is not valid ascii: {0}")]
215 EndpointNotAscii(String),
216}
217
218/// Errors that can occur while handling requests.
219#[derive(Debug, Error)]
220enum HandlerError {
221 /// Returned when the encoding of the metrics by
222 /// [`prometheus::Encoder::encode`] fails.
223 #[error("can not encode metrics: {0}")]
224 EncodeMetrics(prometheus::Error),
225
226 /// Returned when returning encoded metrics via response with
227 /// [`tiny_http::Request::respond`] fails.
228 #[error("can not generate response: {0}")]
229 Response(std::io::Error),
230}
231
232/// Builder to create a new [`crate::Exporter`].
233#[derive(Debug)]
234pub struct Builder {
235 binding: SocketAddr,
236 endpoint: Endpoint,
237 registry: prometheus::Registry,
238}
239
240#[derive(Debug)]
241struct Endpoint(String);
242impl Default for Endpoint {
243 fn default() -> Self {
244 Self("/metrics".to_string())
245 }
246}
247
248/// Helper to export prometheus metrics via http.
249#[derive(Debug)]
250pub struct Exporter {
251 request_receiver: Receiver<Arc<Barrier>>,
252 is_waiting: Arc<AtomicBool>,
253 update_lock: Arc<Mutex<()>>,
254}
255
256/// Helper to export prometheus metrics via http.
257#[derive(Debug)]
258struct Server {}
259
260/// Create and start a new exporter which uses the given socket address to
261/// export the metrics.
262/// # Errors
263///
264/// Will return [`enum@Error`] if the http server fails to start for any reason.
265pub fn start(binding: SocketAddr) -> Result<Exporter, Error> {
266 Builder::new(binding).start()
267}
268
269impl Builder {
270 /// Create a new builder with the given binding.
271 #[must_use]
272 pub fn new(binding: SocketAddr) -> Builder {
273 Self {
274 binding,
275 endpoint: Endpoint::default(),
276 registry: prometheus::default_registry().clone(),
277 }
278 }
279
280 /// Sets the endpoint that the metrics will be served on. If the endpoint is
281 /// not set with this method then the default `/metrics` will be used.
282 /// # Errors
283 ///
284 /// Will return [`enum@Error`] if the supplied string slice is not valid
285 /// ascii.
286 pub fn with_endpoint(&mut self, endpoint: &str) -> Result<(), Error> {
287 if !endpoint.is_ascii() {
288 return Err(Error::EndpointNotAscii(endpoint.to_string()));
289 }
290 let mut clean_endpoint = String::from('/');
291 clean_endpoint.push_str(endpoint.trim_matches('/'));
292 self.endpoint = Endpoint(clean_endpoint);
293 Ok(())
294 }
295
296 /// Sets the registry the metrics will be gathered from. If the registry is
297 /// not set, the default registry provided by the prometheus crate will be
298 /// used. If a custom registry is used, the metrics provided by the
299 /// `internal_metrics` feature are not available.
300 pub fn with_registry(&mut self, registry: prometheus::Registry) {
301 self.registry = registry;
302 }
303
304 /// Create and start new exporter based on the information from the builder.
305 /// # Errors
306 ///
307 /// Will return [`enum@Error`] if the http server fails to start for any
308 /// reason.
309 pub fn start(self) -> Result<Exporter, Error> {
310 let (request_sender, request_receiver) = sync_channel(0);
311 let is_waiting = Arc::new(AtomicBool::new(false));
312 let update_lock = Arc::new(Mutex::new(()));
313
314 let exporter = Exporter {
315 request_receiver,
316 is_waiting: Arc::clone(&is_waiting),
317 update_lock: Arc::clone(&update_lock),
318 };
319
320 Server::start(
321 self.binding,
322 self.endpoint.0,
323 request_sender,
324 is_waiting,
325 update_lock,
326 self.registry,
327 )?;
328
329 Ok(exporter)
330 }
331}
332
333impl Exporter {
334 /// Return new builder which will create a exporter once built.
335 #[must_use]
336 pub fn builder(binding: SocketAddr) -> Builder {
337 Builder::new(binding)
338 }
339
340 /// Wait until a new request comes in. Returns a mutex guard to make the
341 /// http server wait until the metrics have been updated.
342 #[must_use = "not using the guard will result in the exporter returning the prometheus data \
343 immediately over http"]
344 pub fn wait_request(&self) -> MutexGuard<'_, ()> {
345 self.is_waiting.store(true, Ordering::SeqCst);
346
347 let update_waitgroup = self
348 .request_receiver
349 .recv()
350 .expect("can not receive from request_receiver channel. this should never happen");
351
352 self.is_waiting.store(false, Ordering::SeqCst);
353
354 let guard = self
355 .update_lock
356 .lock()
357 .expect("poisioned mutex. should never happen");
358
359 update_waitgroup.wait();
360
361 guard
362 }
363
364 /// Wait for given duration. Returns a mutex guard to make the http
365 /// server wait until the metrics have been updated.
366 #[must_use = "not using the guard will result in the exporter returning the prometheus data \
367 immediately over http"]
368 pub fn wait_duration(&self, duration: Duration) -> MutexGuard<'_, ()> {
369 thread::sleep(duration);
370
371 self.update_lock
372 .lock()
373 .expect("poisioned mutex. should never happen")
374 }
375}
376
377impl Server {
378 fn start(
379 binding: SocketAddr,
380 endpoint: String,
381 request_sender: SyncSender<Arc<Barrier>>,
382 is_waiting: Arc<AtomicBool>,
383 update_lock: Arc<Mutex<()>>,
384 registry: prometheus::Registry,
385 ) -> Result<(), Error> {
386 let server = HTTPServer::http(&binding).map_err(Error::ServerStart)?;
387
388 thread::spawn(move || {
389 #[cfg(feature = "logging")]
390 info!("exporting metrics to http://{}{}", binding, endpoint);
391
392 let encoder = TextEncoder::new();
393
394 for request in server.incoming_requests() {
395 if let Err(err) = if request.url() == endpoint {
396 Self::handler_metrics(
397 request,
398 &encoder,
399 &request_sender,
400 &is_waiting,
401 &update_lock,
402 ®istry,
403 )
404 } else {
405 Self::handler_redirect(request, &endpoint)
406 } {
407 #[cfg(feature = "logging")]
408 error!("{}", err);
409
410 // Just so there are no complains about unused err variable when logging
411 // feature is disabled
412 drop(err);
413 }
414 }
415 });
416
417 Ok(())
418 }
419
420 fn handler_metrics(
421 request: Request,
422 encoder: &TextEncoder,
423 request_sender: &SyncSender<Arc<Barrier>>,
424 is_waiting: &Arc<AtomicBool>,
425 update_lock: &Arc<Mutex<()>>,
426 registry: &prometheus::Registry,
427 ) -> Result<(), HandlerError> {
428 #[cfg(feature = "internal_metrics")]
429 HTTP_COUNTER.inc();
430
431 #[cfg(feature = "internal_metrics")]
432 let timer = HTTP_REQ_HISTOGRAM.start_timer();
433
434 if is_waiting.load(Ordering::SeqCst) {
435 let barrier = Arc::new(Barrier::new(2));
436
437 request_sender
438 .send(Arc::clone(&barrier))
439 .expect("can not send to request_sender. this should never happen");
440
441 barrier.wait();
442 }
443
444 let _lock = update_lock
445 .lock()
446 .expect("poisioned mutex. should never happen");
447
448 #[cfg(feature = "internal_metrics")]
449 drop(timer);
450
451 Self::process_request(request, encoder, registry)
452 }
453
454 fn process_request(
455 request: Request,
456 encoder: &TextEncoder,
457 registry: &prometheus::Registry,
458 ) -> Result<(), HandlerError> {
459 let metric_families = registry.gather();
460 let mut buffer = vec![];
461
462 encoder
463 .encode(&metric_families, &mut buffer)
464 .map_err(HandlerError::EncodeMetrics)?;
465
466 #[cfg(feature = "internal_metrics")]
467 HTTP_BODY_GAUGE.set(buffer.len() as i64);
468
469 let response = Response::from_data(buffer);
470 request.respond(response).map_err(HandlerError::Response)?;
471
472 Ok(())
473 }
474
475 fn handler_redirect(request: Request, endpoint: &str) -> Result<(), HandlerError> {
476 let response = Response::from_string(format!("try {} for metrics\n", endpoint))
477 .with_status_code(301)
478 .with_header(Header {
479 field: "Location"
480 .parse()
481 .expect("can not parse location header field. this should never fail"),
482 value: ascii::AsciiString::from_ascii(endpoint)
483 .expect("can not parse header value. this should never fail"),
484 });
485
486 request.respond(response).map_err(HandlerError::Response)?;
487
488 Ok(())
489 }
490}