Skip to main content

folk_plugin_http/
server.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3
4use anyhow::Result;
5use bytes::Bytes;
6use folk_api::Executor;
7use http_body_util::Full;
8use hyper::body::Incoming;
9use hyper::server::conn::http1;
10use hyper::service::service_fn;
11use hyper::{Request, Response};
12use hyper_util::rt::TokioIo;
13use tokio::net::TcpListener;
14use tokio::sync::watch;
15use tracing::{debug, error};
16
17use crate::config::HttpConfig;
18use crate::payload::{decode_response, encode_request};
19
20pub struct HttpServer {
21    config: HttpConfig,
22    executor: Arc<dyn Executor>,
23}
24
25impl HttpServer {
26    pub fn new(config: HttpConfig, executor: Arc<dyn Executor>) -> Self {
27        Self { config, executor }
28    }
29
30    pub async fn run(self, mut shutdown: watch::Receiver<bool>) -> Result<()> {
31        let listener = TcpListener::bind(self.config.listen).await?;
32        let executor = self.executor.clone();
33
34        loop {
35            tokio::select! {
36                accept = listener.accept() => {
37                    let (stream, _peer) = accept?;
38                    let exec = executor.clone();
39                    tokio::spawn(async move {
40                        let io = TokioIo::new(stream);
41                        if let Err(e) = http1::Builder::new()
42                            .serve_connection(io, service_fn(move |req| {
43                                let exec = exec.clone();
44                                async move { handle(exec, req).await }
45                            }))
46                            .await
47                        {
48                            debug!(error = ?e, "http connection error");
49                        }
50                    });
51                }
52                _ = shutdown.changed() => {
53                    if *shutdown.borrow() { break; }
54                }
55            }
56        }
57        Ok(())
58    }
59}
60
61async fn handle(
62    executor: Arc<dyn Executor>,
63    req: Request<Incoming>,
64) -> Result<Response<Full<Bytes>>, Infallible> {
65    let payload = match encode_request(req).await {
66        Ok(p) => p,
67        Err(e) => {
68            error!(error = ?e, "encode request");
69            return Ok(Response::builder()
70                .status(500)
71                .body(Full::new(Bytes::from("encode error")))
72                .unwrap());
73        }
74    };
75
76    let response_bytes = match executor.execute(payload).await {
77        Ok(b) => b,
78        Err(e) => {
79            error!(error = ?e, "dispatch to worker");
80            return Ok(Response::builder()
81                .status(502)
82                .body(Full::new(Bytes::from("worker error")))
83                .unwrap());
84        }
85    };
86
87    match decode_response(response_bytes) {
88        Ok(r) => Ok(r),
89        Err(e) => {
90            error!(error = ?e, "decode response");
91            Ok(Response::builder()
92                .status(500)
93                .body(Full::new(Bytes::from("decode error")))
94                .unwrap())
95        }
96    }
97}