prometheus-hyper 0.1.2

small Tokio/Hyper server to run Prometheus metrics
Documentation
//! # Example coding
//! ```
//! use prometheus::{IntCounter, Opts, Registry};
//! use prometheus_hyper::{RegistryFn, Server};
//! use std::{error::Error, net::SocketAddr, sync::Arc, time::Duration};
//! use tokio::sync::Notify;
//!
//! pub struct CustomMetrics {
//!     pub foo: IntCounter,
//! }
//!
//! impl CustomMetrics {
//!     pub fn new() -> Result<(Self, RegistryFn), Box<dyn Error>> {
//!         let foo = IntCounter::with_opts(Opts::new("foo", "description"))?;
//!         let foo_clone = foo.clone();
//!         let f = |r: &Registry| r.register(Box::new(foo_clone));
//!         Ok((Self { foo }, Box::new(f)))
//!     }
//! }
//!
//! #[tokio::main(flavor = "current_thread")]
//! async fn main() -> std::result::Result<(), hyper::Error> {
//!     let registry = Arc::new(Registry::new());
//!     let shutdown = Arc::new(Notify::new());
//!     let shutdown_clone = Arc::clone(&shutdown);
//!     let (metrics, f) = CustomMetrics::new().expect("failed prometheus");
//!     f(&registry).expect("problem registering");
//!
//!     // Startup Server
//!     let jh = tokio::spawn(async move {
//!         Server::run(
//!             Arc::clone(&registry),
//!             SocketAddr::from(([0; 4], 8080)),
//!             shutdown_clone.notified(),
//!         )
//!         .await
//!     });
//!
//!     // Change Metrics
//!     metrics.foo.inc();
//!
//!     // Shutdown
//!     tokio::time::sleep(Duration::from_secs(5)).await;
//!     shutdown.notify_one();
//!     jh.await.unwrap()
//! }
//! ```
use hyper::{
    header,
    service::{make_service_fn, service_fn},
    Body, Response, StatusCode,
};
use prometheus::{Encoder, Registry, TextEncoder};
use std::{future::Future, net::SocketAddr, sync::Arc};
use tracing::{info, trace};

/// Helper fn to register metrics
pub type RegistryFn = Box<dyn FnOnce(&Registry) -> Result<(), prometheus::Error>>;

/// Metrics Server based on [`tokio`] and [`hyper`]
///
/// [`tokio`]: tokio
/// [`hyper`]: hyper
pub struct Server {}

impl Server {
    /// Create and run the metrics Server
    ///
    /// # Arguments
    /// * `registry` - provide the [`Registry`] you are also registering your
    ///   metric types to.
    /// * `addr` - `host:ip` to tcp listen on.
    /// * `shutdown` - a [`Future`], once this completes the server will start
    ///   to shut down. You can use a [`signal`] or [`Notify`] for clean
    ///   shutdown or [`pending`] to newer shutdown.
    /// # Result
    /// * [`hyper::Error`] returns the result if the shutdown was successful.
    ///
    /// # Examples
    /// ```
    /// use prometheus::Registry;
    /// use prometheus_hyper::Server;
    /// use std::{net::SocketAddr, sync::Arc};
    /// # #[tokio::main(flavor = "current_thread")]
    /// # async fn main() {
    ///
    /// let registry = Arc::new(Registry::new());
    ///
    /// // Start Server endlessly
    /// tokio::spawn(async move {
    ///     Server::run(
    ///         Arc::clone(&registry),
    ///         SocketAddr::from(([0; 4], 8080)),
    ///         futures_util::future::pending(),
    ///     )
    ///     .await
    /// });
    /// # }
    /// ```
    /// [`Registry`]: prometheus::Registry
    /// [`Future`]: std::future::Future
    /// [`pending`]: https://docs.rs/futures-util/latest/futures_util/future/fn.pending.html
    /// [`hyper::Error`]: hyper::Error
    /// [`signal`]: tokio::signal
    /// [`Notify`]: tokio::sync::Notify
    /// [`tokio`]: tokio
    /// [`hyper`]: hyper
    pub async fn run<S, F>(registry: Arc<Registry>, addr: S, shutdown: F) -> Result<(), hyper::Error>
    where
        S: Into<SocketAddr>,
        F: Future<Output = ()>,
    {
        let addr = addr.into();

        info!("starting hyper server to serve metrics");

        let service = make_service_fn(move |_| {
            let registry_clone = registry.clone();
            let encoder = TextEncoder::new();

            async move {
                Ok::<_, hyper::Error>(service_fn(move |req| {
                    let (code, body) = if req.uri() == "/metrics" {
                        trace!("request");
                        let mf = registry_clone.gather();
                        let mut buffer = vec![];

                        encoder.encode(&mf, &mut buffer).expect("write to vec cannot fail");
                        (StatusCode::OK, Body::from(buffer))
                    } else {
                        trace!("wrong uri, return 404");
                        (StatusCode::NOT_FOUND, Body::from("404 not found"))
                    };
                    let response = Response::builder()
                        .status(code)
                        .header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
                        .body(body)
                        .unwrap();
                    async move { Ok::<_, hyper::Error>(response) }
                }))
            }
        });

        let server = hyper::Server::bind(&addr).serve(service);
        let err = server.with_graceful_shutdown(shutdown).await;
        match &err {
            Ok(()) => info!("stopping prometheus hyper server successful"),
            Err(e) => info!(?e, "error while shutting down"),
        }
        err
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use hyper::{client::Client, Request};
    use tokio::sync::Notify;

    #[tokio::test]
    async fn test_create() {
        let shutdown = Arc::new(Notify::new());
        let registry = Arc::new(Registry::new());

        let shutdown_clone = Arc::clone(&shutdown);

        let r = tokio::spawn(async move {
            Server::run(
                Arc::clone(&registry),
                SocketAddr::from(([0; 4], 6001)),
                shutdown_clone.notified(),
            )
            .await
        });

        shutdown.notify_one();
        r.await.expect("tokio error").expect("prometheus_hyper server error");
    }

    #[tokio::test]
    async fn test_sample() {
        let shutdown = Arc::new(Notify::new());
        let registry = Arc::new(Registry::new());

        let shutdown_clone = Arc::clone(&shutdown);

        let r = tokio::spawn(async move {
            Server::run(
                Arc::clone(&registry),
                SocketAddr::from(([0; 4], 6002)),
                shutdown_clone.notified(),
            )
            .await
        });

        let client = Client::new();

        let req = Request::builder()
            .method("GET")
            .uri("http://localhost:6002/metrics")
            .body(Body::empty())
            .expect("request builder");

        let res = client.request(req).await.expect("couldn't reach server");
        assert_eq!(res.status(), StatusCode::OK);

        shutdown.notify_one();
        r.await.expect("tokio error").expect("prometheus_hyper server error");
    }

    #[tokio::test]
    async fn test_wrong_endpoint_sample() {
        let shutdown = Arc::new(Notify::new());
        let registry = Arc::new(Registry::new());

        let shutdown_clone = Arc::clone(&shutdown);

        let r = tokio::spawn(async move {
            Server::run(
                Arc::clone(&registry),
                SocketAddr::from(([0; 4], 6003)),
                shutdown_clone.notified(),
            )
            .await
        });

        let client = Client::new();

        let req = Request::builder()
            .method("GET")
            .uri("http://localhost:6003/foobar")
            .body(Body::empty())
            .expect("request builder");

        let res = client.request(req).await.expect("couldn't reach server");
        assert_eq!(res.status(), StatusCode::NOT_FOUND);

        shutdown.notify_one();
        r.await.expect("tokio error").expect("prometheus_hyper server error");
    }
}