jsonrpc_stdio_server/
lib.rs

1//! jsonrpc server using stdin/stdout
2//!
3//! ```no_run
4//!
5//! use jsonrpc_stdio_server::ServerBuilder;
6//! use jsonrpc_stdio_server::jsonrpc_core::*;
7//!
8//! #[tokio::main]
9//! async fn main() {
10//!     let mut io = IoHandler::default();
11//!     io.add_sync_method("say_hello", |_params| {
12//!         Ok(Value::String("hello".to_owned()))
13//!     });
14//!
15//!     let server = ServerBuilder::new(io).build();
16//!     server.await;
17//! }
18//! ```
19
20#![deny(missing_docs)]
21
22use std::future::Future;
23use std::sync::Arc;
24
25#[macro_use]
26extern crate log;
27
28pub use jsonrpc_core;
29pub use tokio;
30
31use jsonrpc_core::{MetaIoHandler, Metadata, Middleware};
32use tokio_util::codec::{FramedRead, LinesCodec};
33
34/// Stdio server builder
35pub struct ServerBuilder<M: Metadata = (), T: Middleware<M> = jsonrpc_core::NoopMiddleware> {
36	handler: Arc<MetaIoHandler<M, T>>,
37}
38
39impl<M: Metadata, T: Middleware<M>> ServerBuilder<M, T>
40where
41	M: Default,
42	T::Future: Unpin,
43	T::CallFuture: Unpin,
44{
45	/// Returns a new server instance
46	pub fn new(handler: impl Into<MetaIoHandler<M, T>>) -> Self {
47		ServerBuilder {
48			handler: Arc::new(handler.into()),
49		}
50	}
51
52	/// Returns a server future that needs to be polled in order to make progress.
53	///
54	/// Will block until EOF is read or until an error occurs.
55	/// The server reads from STDIN line-by-line, one request is taken
56	/// per line and each response is written to STDOUT on a new line.
57	pub fn build(&self) -> impl Future<Output = ()> + 'static {
58		let handler = self.handler.clone();
59
60		async move {
61			let stdin = tokio::io::stdin();
62			let mut stdout = tokio::io::stdout();
63
64			let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());
65
66			use futures::StreamExt;
67			while let Some(request) = framed_stdin.next().await {
68				match request {
69					Ok(line) => {
70						let res = Self::process(&handler, line).await;
71						let mut sanitized = res.replace('\n', "");
72						sanitized.push('\n');
73						use tokio::io::AsyncWriteExt;
74						if let Err(e) = stdout.write_all(sanitized.as_bytes()).await {
75							log::warn!("Error writing response: {:?}", e);
76						}
77					}
78					Err(e) => {
79						log::warn!("Error reading line: {:?}", e);
80					}
81				}
82			}
83		}
84	}
85
86	/// Process a request asynchronously
87	fn process(io: &Arc<MetaIoHandler<M, T>>, input: String) -> impl Future<Output = String> + Send {
88		use jsonrpc_core::futures::FutureExt;
89		let f = io.handle_request(&input, Default::default());
90		f.map(move |result| match result {
91			Some(res) => res,
92			None => {
93				info!("JSON RPC request produced no response: {:?}", input);
94				String::from("")
95			}
96		})
97	}
98}