1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use std::{convert::Infallible, net::SocketAddr};
use async_compat::CompatExt;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::{body::Incoming, service::service_fn, Request, Response};
use nanorpc::{JrpcRequest, RpcService};
use smol::net::TcpListener;
pub struct HttpRpcServer {
listener: TcpListener,
}
impl HttpRpcServer {
pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
Ok(Self { listener })
}
pub async fn run(&self, service: impl RpcService) -> std::io::Result<()> {
let exec = smol::Executor::new();
exec.run(async {
loop {
let (next, _) = self.listener.accept().await?;
exec.spawn(async {
let connection = hyper::server::conn::http1::Builder::new()
.keep_alive(true)
.serve_connection(
next.compat(),
service_fn(|req: Request<Incoming>| async {
let response = async {
let body = req.into_body().collect().await?.to_bytes();
let jrpc_req: JrpcRequest = serde_json::from_slice(&body)?;
let jrpc_response = service.respond_raw(jrpc_req).await;
anyhow::Ok(Response::new(
Full::<Bytes>::new(
serde_json::to_vec(&jrpc_response)?.into(),
)
.boxed(),
))
};
match response.await {
Ok(resp) => Ok::<_, Infallible>(resp),
Err(err) => Ok(Response::builder()
.status(500)
.body(err.to_string().boxed())
.unwrap()),
}
}),
);
let _ = connection.await;
})
.detach();
}
})
.await
}
}