folk-plugin-http 0.1.1

HTTP plugin for Folk — accepts connections via hyper and dispatches to PHP workers
Documentation
use std::convert::Infallible;
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
use folk_api::Executor;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::watch;
use tracing::{debug, error};

use crate::config::HttpConfig;
use crate::payload::{decode_response, encode_request};

pub struct HttpServer {
    config: HttpConfig,
    executor: Arc<dyn Executor>,
}

impl HttpServer {
    pub fn new(config: HttpConfig, executor: Arc<dyn Executor>) -> Self {
        Self { config, executor }
    }

    pub async fn run(self, mut shutdown: watch::Receiver<bool>) -> Result<()> {
        let listener = TcpListener::bind(self.config.listen).await?;
        let executor = self.executor.clone();

        loop {
            tokio::select! {
                accept = listener.accept() => {
                    let (stream, _peer) = accept?;
                    let exec = executor.clone();
                    tokio::spawn(async move {
                        let io = TokioIo::new(stream);
                        if let Err(e) = http1::Builder::new()
                            .serve_connection(io, service_fn(move |req| {
                                let exec = exec.clone();
                                async move { handle(exec, req).await }
                            }))
                            .await
                        {
                            debug!(error = ?e, "http connection error");
                        }
                    });
                }
                _ = shutdown.changed() => {
                    if *shutdown.borrow() { break; }
                }
            }
        }
        Ok(())
    }
}

async fn handle(
    executor: Arc<dyn Executor>,
    req: Request<Incoming>,
) -> Result<Response<Full<Bytes>>, Infallible> {
    let payload = match encode_request(req).await {
        Ok(p) => p,
        Err(e) => {
            error!(error = ?e, "encode request");
            return Ok(Response::builder()
                .status(500)
                .body(Full::new(Bytes::from("encode error")))
                .unwrap());
        }
    };

    let response_bytes = match executor.execute_method("http.handle", payload).await {
        Ok(b) => b,
        Err(e) => {
            error!(error = ?e, "dispatch to worker");
            return Ok(Response::builder()
                .status(502)
                .body(Full::new(Bytes::from("worker error")))
                .unwrap());
        }
    };

    match decode_response(response_bytes) {
        Ok(r) => Ok(r),
        Err(e) => {
            error!(error = ?e, "decode response");
            Ok(Response::builder()
                .status(500)
                .body(Full::new(Bytes::from("decode error")))
                .unwrap())
        }
    }
}