1use std::{convert::Infallible, net::SocketAddr};
2
3use async_compat::CompatExt;
4use bytes::Bytes;
5use http_body_util::{BodyExt, Full};
6use hyper::{body::Incoming, service::service_fn, Request, Response};
7use nanorpc::{JrpcRequest, RpcService};
8use smol::net::TcpListener;
9
10pub struct HttpRpcServer {
12 listener: TcpListener,
13}
14
15impl HttpRpcServer {
16 pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
18 let listener = TcpListener::bind(addr).await?;
19 Ok(Self { listener })
20 }
21
22 pub async fn run(&self, service: impl RpcService) -> std::io::Result<()> {
24 let exec = smol::Executor::new();
25 exec.run(async {
26 loop {
27 let (next, _) = self.listener.accept().await?;
28 exec.spawn(async {
29 let connection = hyper::server::conn::http1::Builder::new()
30 .keep_alive(true)
31 .serve_connection(
32 next.compat(),
33 service_fn(|req: Request<Incoming>| async {
34 let response = async {
35 let body = req.into_body().collect().await?.to_bytes();
36 let jrpc_req: JrpcRequest = serde_json::from_slice(&body)?;
37 let jrpc_response = service.respond_raw(jrpc_req).await;
38 anyhow::Ok(Response::new(
39 Full::<Bytes>::new(
40 serde_json::to_vec(&jrpc_response)?.into(),
41 )
42 .boxed(),
43 ))
44 };
45
46 match response.await {
47 Ok(resp) => Ok::<_, Infallible>(resp),
48 Err(err) => Ok(Response::builder()
49 .status(500)
50 .body(err.to_string().boxed())
51 .unwrap()),
52 }
53 }),
54 );
55 let _ = connection.await;
56 })
57 .detach();
58 }
59 })
60 .await
61 }
62}