nanorpc_sillad/
lib.rs

1use async_executor::Executor;
2use async_trait::async_trait;
3use futures_util::{
4    io::{AsyncWriteExt, BufReader},
5    AsyncBufReadExt, AsyncReadExt,
6};
7use nanorpc::{JrpcRequest, JrpcResponse, RpcService, RpcTransport};
8use sillad::{dialer::Dialer, listener::Listener};
9pub struct DialerTransport<D: Dialer>(pub D);
10
11#[async_trait]
12impl<D: Dialer> RpcTransport for DialerTransport<D> {
13    type Error = anyhow::Error;
14    async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
15        // TODO some form of connection pooling. right now there's no pooling
16        let mut conn = self.0.dial().await?;
17        conn.write_all(format!("{}\n", serde_json::to_string(&req)?).as_bytes())
18            .await?;
19        let mut conn = BufReader::new(conn);
20        let mut line = String::new();
21        conn.read_line(&mut line).await?;
22        Ok(serde_json::from_str(&line)?)
23    }
24}
25
26/// Runs a given nanorpc service using the given sillad listener
27pub async fn rpc_serve(
28    mut listener: impl Listener,
29    service: impl RpcService,
30) -> std::io::Result<()> {
31    let lexec = Executor::new();
32    lexec
33        .run(async {
34            loop {
35                let next = listener.accept().await?;
36                lexec
37                    .spawn::<anyhow::Result<()>>(async {
38                        let (read, mut write) = next.split();
39                        let mut read = BufReader::new(read);
40                        loop {
41                            let mut line = String::new();
42                            read.read_line(&mut line).await?;
43                            let req: JrpcRequest = serde_json::from_str(&line)?;
44                            let resp = service.respond_raw(req).await;
45                            write
46                                .write_all(
47                                    format!("{}\n", serde_json::to_string(&resp)?).as_bytes(),
48                                )
49                                .await?;
50                        }
51                    })
52                    .detach();
53            }
54        })
55        .await
56}