1#![deny(missing_docs)]
3use futures::stream::StreamExt;
4use json_rpc2::{futures::Server, Request, Response};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
8use tokio_util::codec::{FramedRead, LinesCodec};
9
10type Error = Box<dyn std::error::Error + Send + Sync>;
12
13type Result<T> = std::result::Result<T, Error>;
15
16#[derive(Debug, Serialize, Deserialize)]
20pub enum Message {
21 #[serde(rename = "request")]
23 Request(Request),
24 #[serde(rename = "response")]
26 Response(Response),
27}
28
29#[derive(Debug, Serialize, Deserialize)]
31pub struct Identity {
32 pub id: String,
34}
35
36pub fn call(method: &str, params: Option<Value>) -> Message {
38 Message::Request(Request::new_reply(method, params))
39}
40
41pub fn notify(method: &str, params: Option<Value>) -> Message {
43 Message::Request(Request::new_notification(method, params))
44}
45
46pub async fn write<W>(
48 writer: &mut W,
49 msg: &Message,
50) -> Result<()>
51 where W: AsyncWrite + Unpin {
52 writer
53 .write(
54 serde_json::to_vec(msg)
55 .map_err(Box::new)?
56 .as_slice(),
57 )
58 .await?;
59 writer.write(b"\n").await?;
60 writer.flush().await?;
61 Ok(())
62}
63
64pub async fn serve<S, R, W, I, O, A>(
71 server: Server<'_, S>,
72 state: &S,
73 reader: ReadHalf<R>,
74 mut writer: WriteHalf<W>,
75 request: I,
76 response: O,
77 answer: A,
78) -> Result<()>
79where
80 R: AsyncRead,
81 W: AsyncWrite,
82 S: Send + Sync,
83 I: Fn(&Request),
84 O: Fn(&Response),
85 A: Fn(Response) -> Result<Option<Message>>,
86{
87 let mut lines = FramedRead::new(reader, LinesCodec::new());
88 while let Some(line) = lines.next().await {
89 let line = line.map_err(Box::new)?;
90 match serde_json::from_str::<Message>(&line).map_err(Box::new)? {
91 Message::Request(mut req) => {
92 (request)(&req);
93 let res = server.serve(&mut req, state).await;
94 if let Some(res) = res {
95 (response)(&res);
96 let msg = Message::Response(res);
97 write(&mut writer, &msg).await?;
98 }
99 }
100 Message::Response(reply) => {
101 let msg = (answer)(reply)?;
102 if let Some(msg) = msg {
103 write(&mut writer, &msg).await?;
104 }
105 }
106 }
107 }
108 Ok(())
109}