use futures_util::future;
use hyper::{Body, Request, Response};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tracing::{debug, info_span, Instrument};
pub type Result<T> = hyper::Result<T>;
pub type Error = hyper::Error;
#[derive(Clone, Debug)]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
pub struct AdminArgs {
#[cfg_attr(feature = "clap", clap(long, default_value = "0.0.0.0:8080"))]
pub admin_addr: SocketAddr,
}
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
#[derive(Clone, Debug)]
pub struct Builder {
addr: SocketAddr,
ready: Readiness,
}
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
#[derive(Debug)]
pub struct Bound {
addr: SocketAddr,
ready: Readiness,
server: hyper::server::Builder<hyper::server::conn::AddrIncoming>,
}
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
#[derive(Clone, Debug)]
pub struct Readiness(Arc<AtomicBool>);
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "admin")))]
pub struct Server {
addr: SocketAddr,
ready: Readiness,
task: tokio::task::JoinHandle<Result<()>>,
}
impl Default for AdminArgs {
fn default() -> Self {
Self {
admin_addr: SocketAddr::from(([0, 0, 0, 0], 8080)),
}
}
}
impl AdminArgs {
pub fn into_builder(self) -> Builder {
Builder::new(self.admin_addr)
}
}
impl Builder {
pub fn new(addr: SocketAddr) -> Self {
Self {
addr,
ready: Readiness(Arc::new(false.into())),
}
}
pub fn readiness(&self) -> Readiness {
self.ready.clone()
}
pub fn set_ready(&self) {
self.ready.set(true);
}
pub fn bind(self) -> Result<Bound> {
let Self { addr, ready } = self;
let server = hyper::server::Server::try_bind(&addr)?
.http1_half_close(true)
.http1_header_read_timeout(Duration::from_secs(2))
.http1_max_buf_size(8 * 1024);
Ok(Bound {
addr,
ready,
server,
})
}
}
impl Bound {
pub fn readiness(&self) -> Readiness {
self.ready.clone()
}
pub fn set_ready(&self) {
self.ready.set(true);
}
pub fn spawn(self) -> Server {
let ready = self.ready.clone();
let server = self
.server
.serve(hyper::service::make_service_fn(move |_conn| {
let ready = ready.clone();
future::ok::<_, hyper::Error>(hyper::service::service_fn(
move |req: hyper::Request<hyper::Body>| match req.uri().path() {
"/live" => future::ok(handle_live(req)),
"/ready" => future::ok(handle_ready(&ready, req)),
_ => future::ok::<_, hyper::Error>(
hyper::Response::builder()
.status(hyper::StatusCode::NOT_FOUND)
.body(hyper::Body::default())
.unwrap(),
),
},
))
}));
let task = tokio::spawn(
async move {
debug!("Serving");
server.await
}
.instrument(info_span!("admin", port = %self.addr.port())),
);
Server {
task,
addr: self.addr,
ready: self.ready,
}
}
}
impl Readiness {
pub fn get(&self) -> bool {
self.0.load(Ordering::Acquire)
}
pub fn set(&self, ready: bool) {
self.0.store(ready, Ordering::Release);
}
}
impl Server {
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
pub fn readiness(&self) -> Readiness {
self.ready.clone()
}
pub fn into_join_handle(self) -> tokio::task::JoinHandle<Result<()>> {
self.task
}
}
fn handle_live(req: Request<Body>) -> Response<Body> {
match *req.method() {
hyper::Method::GET | hyper::Method::HEAD => Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("alive\n".into())
.unwrap(),
_ => Response::builder()
.status(hyper::StatusCode::METHOD_NOT_ALLOWED)
.body(Body::default())
.unwrap(),
}
}
fn handle_ready(Readiness(ready): &Readiness, req: Request<Body>) -> Response<Body> {
match *req.method() {
hyper::Method::GET | hyper::Method::HEAD => {
if ready.load(Ordering::Acquire) {
return Response::builder()
.status(hyper::StatusCode::OK)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("ready\n".into())
.unwrap();
}
Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
.header(hyper::header::CONTENT_TYPE, "text/plain")
.body("not ready\n".into())
.unwrap()
}
_ => Response::builder()
.status(hyper::StatusCode::METHOD_NOT_ALLOWED)
.body(Body::default())
.unwrap(),
}
}