use hyper::{
header,
service::{make_service_fn, service_fn},
Body, Response, StatusCode,
};
use prometheus::{Encoder, Registry, TextEncoder};
use std::{future::Future, net::SocketAddr, sync::Arc};
use tracing::{info, trace};
pub type RegistryFn = Box<dyn FnOnce(&Registry) -> Result<(), prometheus::Error>>;
pub struct Server {}
impl Server {
pub async fn run<S, F>(registry: Arc<Registry>, addr: S, shutdown: F) -> Result<(), hyper::Error>
where
S: Into<SocketAddr>,
F: Future<Output = ()>,
{
let addr = addr.into();
info!("starting hyper server to serve metrics");
let service = make_service_fn(move |_| {
let registry_clone = registry.clone();
let encoder = TextEncoder::new();
async move {
Ok::<_, hyper::Error>(service_fn(move |req| {
let (code, body) = if req.uri() == "/metrics" {
trace!("request");
let mf = registry_clone.gather();
let mut buffer = vec![];
encoder.encode(&mf, &mut buffer).expect("write to vec cannot fail");
(StatusCode::OK, Body::from(buffer))
} else {
trace!("wrong uri, return 404");
(StatusCode::NOT_FOUND, Body::from("404 not found"))
};
let response = Response::builder()
.status(code)
.header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
.body(body)
.unwrap();
async move { Ok::<_, hyper::Error>(response) }
}))
}
});
let server = hyper::Server::bind(&addr).serve(service);
let err = server.with_graceful_shutdown(shutdown).await;
match &err {
Ok(()) => info!("stopping prometheus hyper server successful"),
Err(e) => info!(?e, "error while shutting down"),
}
err
}
}
#[cfg(test)]
mod tests {
use super::*;
use hyper::{client::Client, Request};
use tokio::sync::Notify;
#[tokio::test]
async fn test_create() {
let shutdown = Arc::new(Notify::new());
let registry = Arc::new(Registry::new());
let shutdown_clone = Arc::clone(&shutdown);
let r = tokio::spawn(async move {
Server::run(
Arc::clone(®istry),
SocketAddr::from(([0; 4], 6001)),
shutdown_clone.notified(),
)
.await
});
shutdown.notify_one();
r.await.expect("tokio error").expect("prometheus_hyper server error");
}
#[tokio::test]
async fn test_sample() {
let shutdown = Arc::new(Notify::new());
let registry = Arc::new(Registry::new());
let shutdown_clone = Arc::clone(&shutdown);
let r = tokio::spawn(async move {
Server::run(
Arc::clone(®istry),
SocketAddr::from(([0; 4], 6002)),
shutdown_clone.notified(),
)
.await
});
let client = Client::new();
let req = Request::builder()
.method("GET")
.uri("http://localhost:6002/metrics")
.body(Body::empty())
.expect("request builder");
let res = client.request(req).await.expect("couldn't reach server");
assert_eq!(res.status(), StatusCode::OK);
shutdown.notify_one();
r.await.expect("tokio error").expect("prometheus_hyper server error");
}
#[tokio::test]
async fn test_wrong_endpoint_sample() {
let shutdown = Arc::new(Notify::new());
let registry = Arc::new(Registry::new());
let shutdown_clone = Arc::clone(&shutdown);
let r = tokio::spawn(async move {
Server::run(
Arc::clone(®istry),
SocketAddr::from(([0; 4], 6003)),
shutdown_clone.notified(),
)
.await
});
let client = Client::new();
let req = Request::builder()
.method("GET")
.uri("http://localhost:6003/foobar")
.body(Body::empty())
.expect("request builder");
let res = client.request(req).await.expect("couldn't reach server");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
shutdown.notify_one();
r.await.expect("tokio error").expect("prometheus_hyper server error");
}
}