1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
use std::{convert::Infallible, net::SocketAddr};
use async_compat::CompatExt;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::{body::Incoming, service::service_fn, Request, Response};
use nanorpc::{JrpcRequest, RpcService};
use smol::net::TcpListener;
/// An HTTP-based nanorpc server.
pub struct HttpRpcServer {
listener: TcpListener,
}
impl HttpRpcServer {
/// Creates a new HTTP server listening at the given address. Routing is currently not supported.
pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
Ok(Self { listener })
}
/// Runs the server, processing requests with the given [RpcService], blocking indefinitely until a fatal failure happens. This must be run for the server to make any progress!
pub async fn run(&self, service: impl RpcService) -> std::io::Result<()> {
let exec = smol::Executor::new();
exec.run(async {
loop {
let (next, _) = self.listener.accept().await?;
exec.spawn(async {
let connection = hyper::server::conn::http1::Builder::new()
.keep_alive(true)
.serve_connection(
next.compat(),
service_fn(|req: Request<Incoming>| async {
let response = async {
let body = req.into_body().collect().await?.to_bytes();
let jrpc_req: JrpcRequest = serde_json::from_slice(&body)?;
let jrpc_response = service.respond_raw(jrpc_req).await;
anyhow::Ok(Response::new(
Full::<Bytes>::new(
serde_json::to_vec(&jrpc_response)?.into(),
)
.boxed(),
))
};
match response.await {
Ok(resp) => Ok::<_, Infallible>(resp),
Err(err) => Ok(Response::builder()
.status(500)
.body(err.to_string().boxed())
.unwrap()),
}
}),
);
let _ = connection.await;
})
.detach();
}
})
.await
}
}