nanorpc_http/
server.rs

1use std::{convert::Infallible, net::SocketAddr};
2
3use async_compat::CompatExt;
4use bytes::Bytes;
5use http_body_util::{BodyExt, Full};
6use hyper::{body::Incoming, service::service_fn, Request, Response};
7use nanorpc::{JrpcRequest, RpcService};
8use smol::net::TcpListener;
9
10/// An HTTP-based nanorpc server.
11pub struct HttpRpcServer {
12    listener: TcpListener,
13}
14
15impl HttpRpcServer {
16    /// Creates a new HTTP server listening at the given address. Routing is currently not supported.
17    pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
18        let listener = TcpListener::bind(addr).await?;
19        Ok(Self { listener })
20    }
21
22    /// Runs the server, processing requests with the given [RpcService], blocking indefinitely until a fatal failure happens. This must be run for the server to make any progress!
23    pub async fn run(&self, service: impl RpcService) -> std::io::Result<()> {
24        let exec = smol::Executor::new();
25        exec.run(async {
26            loop {
27                let (next, _) = self.listener.accept().await?;
28                exec.spawn(async {
29                    let connection = hyper::server::conn::http1::Builder::new()
30                        .keep_alive(true)
31                        .serve_connection(
32                            next.compat(),
33                            service_fn(|req: Request<Incoming>| async {
34                                let response = async {
35                                    let body = req.into_body().collect().await?.to_bytes();
36                                    let jrpc_req: JrpcRequest = serde_json::from_slice(&body)?;
37                                    let jrpc_response = service.respond_raw(jrpc_req).await;
38                                    anyhow::Ok(Response::new(
39                                        Full::<Bytes>::new(
40                                            serde_json::to_vec(&jrpc_response)?.into(),
41                                        )
42                                        .boxed(),
43                                    ))
44                                };
45
46                                match response.await {
47                                    Ok(resp) => Ok::<_, Infallible>(resp),
48                                    Err(err) => Ok(Response::builder()
49                                        .status(500)
50                                        .body(err.to_string().boxed())
51                                        .unwrap()),
52                                }
53                            }),
54                        );
55                    let _ = connection.await;
56                })
57                .detach();
58            }
59        })
60        .await
61    }
62}