use std::fmt;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::net::TcpListener as StdListener;
use std::sync::{Arc, Mutex, Weak};
use arc_swap::ArcSwap;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use daemonbase::error::ExitError;
use futures_util::stream::{Stream, StreamExt};
use http_body_util::{BodyExt, Empty, Full, StreamBody};
use http_body_util::combinators::BoxBody;
use hyper::{Method, StatusCode};
use hyper::body::{Body, Frame};
use hyper::http::response::Builder;
use hyper::service::service_fn;
use hyper_util::rt::{TokioExecutor, TokioIo};
use log::{debug, error};
use serde::Deserialize;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use crate::metrics;
use crate::utils::http::format_http_date;
#[derive(Clone, Deserialize)]
pub struct Server {
#[serde(rename = "http-listen")]
listen: Vec<SocketAddr>,
}
impl Server {
pub fn run(
&self,
metrics: metrics::Collection,
resources: Resources,
runtime: &Runtime,
) -> Result<(), ExitError> {
let mut listeners = Vec::new();
for addr in &self.listen {
let listener = match StdListener::bind(addr) {
Ok(listener) => listener,
Err(err) => {
error!("Fatal: error listening on {}: {}", addr, err);
return Err(ExitError::default());
}
};
if let Err(err) = listener.set_nonblocking(true) {
error!(
"Fatal: failed to set listener {} to non-blocking: {}.",
addr, err
);
return Err(ExitError::default());
}
debug!("HTTP server listening on {}", addr);
listeners.push((listener, addr));
}
for (listener, addr) in listeners {
runtime.spawn(
Self::single_listener(
listener, *addr, metrics.clone(), resources.clone()
)
);
}
Ok(())
}
async fn single_listener(
listener: StdListener,
addr: SocketAddr,
metrics: metrics::Collection,
resources: Resources,
) {
let listener = match TcpListener::from_std(listener) {
Ok(listener) => listener,
Err(err) => {
error!("Error on HTTP listener: {}", err);
return
}
};
loop {
let stream = match listener.accept().await {
Ok((stream, _addr)) => stream,
Err(err) => {
error!("Fatal error in HTTP server {}: {}", addr, err);
break;
}
};
let metrics = metrics.clone();
let resources = resources.clone();
tokio::task::spawn(async move {
let _ = hyper_util::server::conn::auto::Builder::new(
TokioExecutor::new()
).serve_connection(
TokioIo::new(stream),
service_fn(move |req| {
let metrics = metrics.clone();
let resources = resources.clone();
async move {
Self::handle_request(
req, &metrics, &resources
).await
}
})
).await;
});
}
}
async fn handle_request(
req: Request,
metrics: &metrics::Collection,
resources: &Resources,
) -> Result<Response, Infallible> {
if *req.method() != Method::GET {
return Ok(Self::method_not_allowed())
}
Ok(match req.uri().path() {
"/metrics" => Self::metrics(metrics),
"/status" => Self::status(metrics),
_ => {
match resources.process_request(&req) {
Some(response) => response,
None => Self::not_found()
}
}
})
}
fn metrics(metrics: &metrics::Collection) -> Response {
ResponseBuilder::ok()
.content_type(ContentType::PROMETHEUS)
.body(metrics.assemble(metrics::OutputFormat::Prometheus))
}
fn status(metrics: &metrics::Collection) -> Response {
ResponseBuilder::ok()
.content_type(ContentType::TEXT)
.body(
metrics.assemble(metrics::OutputFormat::Plain)
)
}
fn method_not_allowed() -> Response {
ResponseBuilder::method_not_allowed()
.content_type(ContentType::TEXT)
.body("Method Not Allowed")
}
fn not_found() -> Response {
ResponseBuilder::not_found()
.content_type(ContentType::TEXT)
.body("Not Found")
}
}
#[derive(Clone, Default)]
pub struct Resources {
sources: Arc<ArcSwap<Vec<RegisteredResource>>>,
register: Arc<Mutex<()>>,
}
impl Resources {
pub fn register(&self, process: Weak<dyn ProcessRequest>) {
let lock = self.register.lock().unwrap();
let old_sources = self.sources.load();
let mut new_sources = Vec::new();
for item in old_sources.iter() {
if item.process.strong_count() > 0 {
new_sources.push(item.clone())
}
}
new_sources.push(
RegisteredResource { process }
);
self.sources.store(new_sources.into());
drop(lock);
}
pub fn process_request(
&self, request: &Request,
) -> Option<Response> {
let sources = self.sources.load();
for item in sources.iter() {
if let Some(process) = item.process.upgrade() {
if let Some(response) = process.process_request(request) {
return Some(response)
}
}
}
None
}
}
impl fmt::Debug for Resources {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let len = self.sources.load().len();
write!(f, "Resource({len} processors)")
}
}
#[derive(Clone)]
struct RegisteredResource {
process: Weak<dyn ProcessRequest>,
}
pub trait ProcessRequest: Send + Sync {
fn process_request(
&self, request: &Request
) -> Option<Response>;
}
impl<T: ProcessRequest> ProcessRequest for Arc<T> {
fn process_request(
&self, request: &Request
) -> Option<Response> {
AsRef::<T>::as_ref(self).process_request(request)
}
}
impl<F> ProcessRequest for F
where F: Fn(&Request) -> Option<Response> + Sync + Send {
fn process_request(
&self, request: &Request
) -> Option<Response> {
(self)(request)
}
}
pub type Request = hyper::Request<hyper::body::Incoming>;
pub type Response = hyper::Response<BoxBody<Bytes, Infallible>>;
#[derive(Debug)]
pub struct ResponseBuilder {
builder: Builder,
}
impl ResponseBuilder {
pub fn new(status: StatusCode) -> Self {
ResponseBuilder { builder: Builder::new().status(status) }
}
pub fn ok() -> Self {
Self::new(StatusCode::OK)
}
pub fn service_unavailable() -> Self {
Self::new(StatusCode::SERVICE_UNAVAILABLE)
}
pub fn bad_request() -> Self {
Self::new(StatusCode::BAD_REQUEST)
}
pub fn not_found() -> Self {
Self::new(StatusCode::NOT_FOUND)
}
pub fn not_modified() -> Self {
Self::new(StatusCode::NOT_MODIFIED)
}
pub fn method_not_allowed() -> Self {
Self::new(StatusCode::METHOD_NOT_ALLOWED)
}
pub fn moved_permanently() -> Self {
Self::new(StatusCode::MOVED_PERMANENTLY)
}
pub fn content_type(self, content_type: ContentType) -> Self {
ResponseBuilder {
builder: self.builder.header("Content-Type", content_type.0)
}
}
pub fn etag(self, etag: &str) -> Self {
ResponseBuilder {
builder: self.builder.header("ETag", etag)
}
}
pub fn last_modified(self, last_modified: DateTime<Utc>) -> Self {
ResponseBuilder {
builder: self.builder.header(
"Last-Modified",
format_http_date(last_modified)
)
}
}
#[allow(dead_code)]
pub fn location(self, location: &str) -> Self {
ResponseBuilder {
builder: self.builder.header(
"Location",
location
)
}
}
fn finalize<B>(self, body: B) -> Response
where
B: Body<Data = Bytes, Error = Infallible> + Send + Sync + 'static
{
self.builder.body(
body.boxed()
).expect("broken HTTP response builder")
}
pub fn body(self, body: impl Into<Bytes>) -> Response {
self.finalize(Full::new(body.into()))
}
pub fn empty(self) -> Response {
self.finalize(Empty::new())
}
pub fn stream<S>(self, body: S) -> Response
where
S: Stream<Item = Bytes> + Send + Sync + 'static
{
self.finalize(
StreamBody::new(body.map(|item| {
Ok(Frame::data(item))
}))
)
}
}
#[derive(Clone, Debug)]
pub struct ContentType(&'static [u8]);
impl ContentType {
pub const CSV: ContentType = ContentType(
b"text/csv;charset=utf-8;header=present"
);
pub const JSON: ContentType = ContentType(b"application/json");
pub const TEXT: ContentType = ContentType(b"text/plain;charset=utf-8");
pub const PROMETHEUS: ContentType = ContentType(
b"text/plain; version=0.0.4"
);
pub fn external(value: &'static [u8]) -> Self {
ContentType(value)
}
}