folk_plugin_http/
server.rs1use std::convert::Infallible;
2use std::sync::Arc;
3
4use anyhow::Result;
5use bytes::Bytes;
6use folk_api::Executor;
7use http_body_util::Full;
8use hyper::body::Incoming;
9use hyper::server::conn::http1;
10use hyper::service::service_fn;
11use hyper::{Request, Response};
12use hyper_util::rt::TokioIo;
13use tokio::net::TcpListener;
14use tokio::sync::watch;
15use tracing::{debug, error};
16
17use crate::config::HttpConfig;
18use crate::payload::{decode_response, encode_request};
19
20pub struct HttpServer {
21 config: HttpConfig,
22 executor: Arc<dyn Executor>,
23}
24
25impl HttpServer {
26 pub fn new(config: HttpConfig, executor: Arc<dyn Executor>) -> Self {
27 Self { config, executor }
28 }
29
30 pub async fn run(self, mut shutdown: watch::Receiver<bool>) -> Result<()> {
31 let listener = TcpListener::bind(self.config.listen).await?;
32 let executor = self.executor.clone();
33
34 loop {
35 tokio::select! {
36 accept = listener.accept() => {
37 let (stream, _peer) = accept?;
38 let exec = executor.clone();
39 tokio::spawn(async move {
40 let io = TokioIo::new(stream);
41 if let Err(e) = http1::Builder::new()
42 .serve_connection(io, service_fn(move |req| {
43 let exec = exec.clone();
44 async move { handle(exec, req).await }
45 }))
46 .await
47 {
48 debug!(error = ?e, "http connection error");
49 }
50 });
51 }
52 _ = shutdown.changed() => {
53 if *shutdown.borrow() { break; }
54 }
55 }
56 }
57 Ok(())
58 }
59}
60
61async fn handle(
62 executor: Arc<dyn Executor>,
63 req: Request<Incoming>,
64) -> Result<Response<Full<Bytes>>, Infallible> {
65 let payload = match encode_request(req).await {
66 Ok(p) => p,
67 Err(e) => {
68 error!(error = ?e, "encode request");
69 return Ok(Response::builder()
70 .status(500)
71 .body(Full::new(Bytes::from("encode error")))
72 .unwrap());
73 }
74 };
75
76 let response_bytes = match executor.execute(payload).await {
77 Ok(b) => b,
78 Err(e) => {
79 error!(error = ?e, "dispatch to worker");
80 return Ok(Response::builder()
81 .status(502)
82 .body(Full::new(Bytes::from("worker error")))
83 .unwrap());
84 }
85 };
86
87 match decode_response(response_bytes) {
88 Ok(r) => Ok(r),
89 Err(e) => {
90 error!(error = ?e, "decode response");
91 Ok(Response::builder()
92 .status(500)
93 .body(Full::new(Bytes::from("decode error")))
94 .unwrap())
95 }
96 }
97}