folk_plugin_http/
server.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use axum::Router;
5use axum::body::Body;
6use axum::extract::State;
7use axum::http::{Request, Response};
8use axum::routing::any;
9use folk_api::Executor;
10use tokio::net::TcpListener;
11use tokio::sync::watch;
12use tracing::error;
13
14use crate::config::HttpConfig;
15use crate::payload::{decode_response, encode_request};
16
17pub struct HttpServer {
18 config: HttpConfig,
19 executor: Arc<dyn Executor>,
20}
21
22impl HttpServer {
23 pub fn new(config: HttpConfig, executor: Arc<dyn Executor>) -> Self {
24 Self { config, executor }
25 }
26
27 pub async fn run(self, mut shutdown: watch::Receiver<bool>) -> Result<()> {
28 let executor = self.executor.clone();
29
30 let app = Router::new()
31 .route("/{*path}", any(handle))
32 .route("/", any(handle))
33 .with_state(executor);
34
35 let listener = TcpListener::bind(self.config.listen).await?;
36
37 axum::serve(listener, app)
38 .with_graceful_shutdown(async move {
39 loop {
40 if shutdown.changed().await.is_err() || *shutdown.borrow() {
41 break;
42 }
43 }
44 })
45 .await?;
46
47 Ok(())
48 }
49}
50
51async fn handle(State(executor): State<Arc<dyn Executor>>, req: Request<Body>) -> Response<Body> {
52 let payload = match encode_request(req).await {
53 Ok(p) => p,
54 Err(e) => {
55 error!(error = ?e, "encode request");
56 return Response::builder()
57 .status(500)
58 .body(Body::from("encode error"))
59 .unwrap();
60 }
61 };
62
63 let response_value = match executor.execute_value("http.handle", payload).await {
64 Ok(v) => v,
65 Err(e) => {
66 error!(error = ?e, "dispatch to worker");
67 return Response::builder()
68 .status(502)
69 .body(Body::from("worker error"))
70 .unwrap();
71 }
72 };
73
74 match decode_response(response_value) {
75 Ok(r) => r,
76 Err(e) => {
77 error!(error = ?e, "decode response");
78 Response::builder()
79 .status(500)
80 .body(Body::from("decode error"))
81 .unwrap()
82 }
83 }
84}