1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#![deny(missing_docs)]
use std::future::Future;
use std::sync::Arc;
#[macro_use]
extern crate log;
pub use jsonrpc_core;
pub use tokio;
use jsonrpc_core::{MetaIoHandler, Metadata, Middleware};
use tokio_util::codec::{FramedRead, LinesCodec};
pub struct ServerBuilder<M: Metadata = (), T: Middleware<M> = jsonrpc_core::NoopMiddleware> {
handler: Arc<MetaIoHandler<M, T>>,
}
impl<M: Metadata, T: Middleware<M>> ServerBuilder<M, T>
where
M: Default,
T::Future: Unpin,
T::CallFuture: Unpin,
{
pub fn new(handler: impl Into<MetaIoHandler<M, T>>) -> Self {
ServerBuilder {
handler: Arc::new(handler.into()),
}
}
pub fn build(&self) -> impl Future<Output = ()> + 'static {
let handler = self.handler.clone();
async move {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());
use futures::StreamExt;
while let Some(request) = framed_stdin.next().await {
match request {
Ok(line) => {
let res = Self::process(&handler, line).await;
let mut sanitized = res.replace('\n', "");
sanitized.push('\n');
use tokio::io::AsyncWriteExt;
if let Err(e) = stdout.write_all(sanitized.as_bytes()).await {
log::warn!("Error writing response: {:?}", e);
}
}
Err(e) => {
log::warn!("Error reading line: {:?}", e);
}
}
}
}
}
fn process(io: &Arc<MetaIoHandler<M, T>>, input: String) -> impl Future<Output = String> + Send {
use jsonrpc_core::futures::FutureExt;
let f = io.handle_request(&input, Default::default());
f.map(move |result| match result {
Some(res) => res,
None => {
info!("JSON RPC request produced no response: {:?}", input);
String::from("")
}
})
}
}