1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#![deny(missing_docs)]
use futures::stream::StreamExt;
use json_rpc2::{futures::Server, Request, Response};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio_util::codec::{FramedRead, LinesCodec};
type Error = Box<dyn std::error::Error + Send + Sync>;
type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
#[serde(rename = "request")]
Request(Request),
#[serde(rename = "response")]
Response(Response),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Identity {
pub id: String,
}
pub fn call(method: &str, params: Option<Value>) -> Message {
Message::Request(Request::new_reply(method, params))
}
pub fn notify(method: &str, params: Option<Value>) -> Message {
Message::Request(Request::new_notification(method, params))
}
pub async fn write<W>(
writer: &mut W,
msg: &Message,
) -> Result<()>
where W: AsyncWrite + Unpin {
writer
.write(
serde_json::to_vec(msg)
.map_err(Box::new)?
.as_slice(),
)
.await?;
writer.write(b"\n").await?;
writer.flush().await?;
Ok(())
}
pub async fn serve<S, R, W, I, O, A>(
server: Server<'_, S>,
state: &S,
reader: ReadHalf<R>,
mut writer: WriteHalf<W>,
request: I,
response: O,
answer: A,
) -> Result<()>
where
R: AsyncRead,
W: AsyncWrite,
S: Send + Sync,
I: Fn(&Request),
O: Fn(&Response),
A: Fn(Response) -> Result<Option<Message>>,
{
let mut lines = FramedRead::new(reader, LinesCodec::new());
while let Some(line) = lines.next().await {
let line = line.map_err(Box::new)?;
match serde_json::from_str::<Message>(&line).map_err(Box::new)? {
Message::Request(mut req) => {
(request)(&req);
let res = server.serve(&mut req, state).await;
if let Some(res) = res {
(response)(&res);
let msg = Message::Response(res);
write(&mut writer, &msg).await?;
}
}
Message::Response(reply) => {
let msg = (answer)(reply)?;
if let Some(msg) = msg {
write(&mut writer, &msg).await?;
}
}
}
}
Ok(())
}