use std::convert::Infallible;
use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use folk_api::Executor;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::watch;
use tracing::{debug, error};
use crate::config::HttpConfig;
use crate::payload::{decode_response, encode_request};
pub struct HttpServer {
config: HttpConfig,
executor: Arc<dyn Executor>,
}
impl HttpServer {
pub fn new(config: HttpConfig, executor: Arc<dyn Executor>) -> Self {
Self { config, executor }
}
pub async fn run(self, mut shutdown: watch::Receiver<bool>) -> Result<()> {
let listener = TcpListener::bind(self.config.listen).await?;
let executor = self.executor.clone();
loop {
tokio::select! {
accept = listener.accept() => {
let (stream, _peer) = accept?;
let exec = executor.clone();
tokio::spawn(async move {
let io = TokioIo::new(stream);
if let Err(e) = http1::Builder::new()
.serve_connection(io, service_fn(move |req| {
let exec = exec.clone();
async move { handle(exec, req).await }
}))
.await
{
debug!(error = ?e, "http connection error");
}
});
}
_ = shutdown.changed() => {
if *shutdown.borrow() { break; }
}
}
}
Ok(())
}
}
async fn handle(
executor: Arc<dyn Executor>,
req: Request<Incoming>,
) -> Result<Response<Full<Bytes>>, Infallible> {
let payload = match encode_request(req).await {
Ok(p) => p,
Err(e) => {
error!(error = ?e, "encode request");
return Ok(Response::builder()
.status(500)
.body(Full::new(Bytes::from("encode error")))
.unwrap());
}
};
let response_bytes = match executor.execute_method("http.handle", payload).await {
Ok(b) => b,
Err(e) => {
error!(error = ?e, "dispatch to worker");
return Ok(Response::builder()
.status(502)
.body(Full::new(Bytes::from("worker error")))
.unwrap());
}
};
match decode_response(response_bytes) {
Ok(r) => Ok(r),
Err(e) => {
error!(error = ?e, "decode response");
Ok(Response::builder()
.status(500)
.body(Full::new(Bytes::from("decode error")))
.unwrap())
}
}
}