1use async_executor::Executor;
2use async_trait::async_trait;
3use futures_util::{
4 io::{AsyncWriteExt, BufReader},
5 AsyncBufReadExt, AsyncReadExt,
6};
7use nanorpc::{JrpcRequest, JrpcResponse, RpcService, RpcTransport};
8use sillad::{dialer::Dialer, listener::Listener};
9pub struct DialerTransport<D: Dialer>(pub D);
10
11#[async_trait]
12impl<D: Dialer> RpcTransport for DialerTransport<D> {
13 type Error = anyhow::Error;
14 async fn call_raw(&self, req: JrpcRequest) -> Result<JrpcResponse, Self::Error> {
15 let mut conn = self.0.dial().await?;
17 conn.write_all(format!("{}\n", serde_json::to_string(&req)?).as_bytes())
18 .await?;
19 let mut conn = BufReader::new(conn);
20 let mut line = String::new();
21 conn.read_line(&mut line).await?;
22 Ok(serde_json::from_str(&line)?)
23 }
24}
25
26pub async fn rpc_serve(
28 mut listener: impl Listener,
29 service: impl RpcService,
30) -> std::io::Result<()> {
31 let lexec = Executor::new();
32 lexec
33 .run(async {
34 loop {
35 let next = listener.accept().await?;
36 lexec
37 .spawn::<anyhow::Result<()>>(async {
38 let (read, mut write) = next.split();
39 let mut read = BufReader::new(read);
40 loop {
41 let mut line = String::new();
42 read.read_line(&mut line).await?;
43 let req: JrpcRequest = serde_json::from_str(&line)?;
44 let resp = service.respond_raw(req).await;
45 write
46 .write_all(
47 format!("{}\n", serde_json::to_string(&resp)?).as_bytes(),
48 )
49 .await?;
50 }
51 })
52 .detach();
53 }
54 })
55 .await
56}