jsonrpc_stdio_server/
lib.rs1#![deny(missing_docs)]
21
22use std::future::Future;
23use std::sync::Arc;
24
25#[macro_use]
26extern crate log;
27
28pub use jsonrpc_core;
29pub use tokio;
30
31use jsonrpc_core::{MetaIoHandler, Metadata, Middleware};
32use tokio_util::codec::{FramedRead, LinesCodec};
33
34pub struct ServerBuilder<M: Metadata = (), T: Middleware<M> = jsonrpc_core::NoopMiddleware> {
36 handler: Arc<MetaIoHandler<M, T>>,
37}
38
39impl<M: Metadata, T: Middleware<M>> ServerBuilder<M, T>
40where
41 M: Default,
42 T::Future: Unpin,
43 T::CallFuture: Unpin,
44{
45 pub fn new(handler: impl Into<MetaIoHandler<M, T>>) -> Self {
47 ServerBuilder {
48 handler: Arc::new(handler.into()),
49 }
50 }
51
52 pub fn build(&self) -> impl Future<Output = ()> + 'static {
58 let handler = self.handler.clone();
59
60 async move {
61 let stdin = tokio::io::stdin();
62 let mut stdout = tokio::io::stdout();
63
64 let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());
65
66 use futures::StreamExt;
67 while let Some(request) = framed_stdin.next().await {
68 match request {
69 Ok(line) => {
70 let res = Self::process(&handler, line).await;
71 let mut sanitized = res.replace('\n', "");
72 sanitized.push('\n');
73 use tokio::io::AsyncWriteExt;
74 if let Err(e) = stdout.write_all(sanitized.as_bytes()).await {
75 log::warn!("Error writing response: {:?}", e);
76 }
77 }
78 Err(e) => {
79 log::warn!("Error reading line: {:?}", e);
80 }
81 }
82 }
83 }
84 }
85
86 fn process(io: &Arc<MetaIoHandler<M, T>>, input: String) -> impl Future<Output = String> + Send {
88 use jsonrpc_core::futures::FutureExt;
89 let f = io.handle_request(&input, Default::default());
90 f.map(move |result| match result {
91 Some(res) => res,
92 None => {
93 info!("JSON RPC request produced no response: {:?}", input);
94 String::from("")
95 }
96 })
97 }
98}