rs_jsonrpc_core/
io.rs

1use std::sync::Arc;
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4
5use serde_json;
6use futures::{self, future, Future};
7
8use calls::{RemoteProcedure, Metadata, RpcMethodSimple, RpcMethod, RpcNotificationSimple, RpcNotification};
9use middleware::{self, Middleware};
10use types::{Params, Error, ErrorCode, Version};
11use types::{Request, Response, Call, Output};
12
13/// A type representing middleware or RPC response before serialization.
14pub type FutureResponse = Box<Future<Item=Option<Response>, Error=()> + Send>;
15
16/// A type representing future string response.
17pub type FutureResult<F> = future::Map<
18	future::Either<future::FutureResult<Option<Response>, ()>, F>,
19	fn(Option<Response>) -> Option<String>,
20>;
21
22/// A type representing a result of a single method call.
23pub type FutureOutput = future::Either<
24	Box<Future<Item=Option<Output>, Error=()> + Send>,
25	future::FutureResult<Option<Output>, ()>,
26>;
27
28/// `IoHandler` json-rpc protocol compatibility
29#[derive(Debug, Clone, Copy)]
30pub enum Compatibility {
31	/// Compatible only with JSON-RPC 1.x
32	V1,
33	/// Compatible only with JSON-RPC 2.0
34	V2,
35	/// Compatible with both
36	Both,
37}
38
39impl Default for Compatibility {
40	fn default() -> Self {
41		Compatibility::V2
42	}
43}
44
45impl Compatibility {
46	fn is_version_valid(&self, version: Option<Version>) -> bool {
47		match (*self, version) {
48			(Compatibility::V1, None) |
49			(Compatibility::V2, Some(Version::V2)) |
50			(Compatibility::Both, _) => true,
51			_ => false,
52		}
53	}
54
55	fn default_version(&self) -> Option<Version> {
56		match *self {
57			Compatibility::V1 => None,
58			Compatibility::V2 | Compatibility::Both => Some(Version::V2),
59		}
60	}
61}
62
63/// Request handler
64///
65/// By default compatible only with jsonrpc v2
66#[derive(Debug)]
67pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
68	middleware: S,
69	compatibility: Compatibility,
70	methods: HashMap<String, RemoteProcedure<T>>,
71}
72
73impl<T: Metadata> Default for MetaIoHandler<T> {
74	fn default() -> Self {
75		MetaIoHandler::with_compatibility(Default::default())
76	}
77}
78
79impl<T: Metadata> MetaIoHandler<T> {
80	/// Creates new `MetaIoHandler` compatible with specified protocol version.
81	pub fn with_compatibility(compatibility: Compatibility) -> Self {
82		MetaIoHandler {
83			compatibility: compatibility,
84			middleware: Default::default(),
85			methods: Default::default(),
86		}
87	}
88}
89
90
91impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
92	/// Creates new `MetaIoHandler`
93	pub fn new(compatibility: Compatibility, middleware: S) -> Self {
94		MetaIoHandler {
95			compatibility: compatibility,
96			middleware: middleware,
97			methods: Default::default(),
98		}
99	}
100
101	/// Creates new `MetaIoHandler` with specified middleware.
102	pub fn with_middleware(middleware: S) -> Self {
103		MetaIoHandler {
104			compatibility: Default::default(),
105			middleware: middleware,
106			methods: Default::default(),
107		}
108	}
109
110	/// Adds an alias to a method.
111	pub fn add_alias(&mut self, alias: &str, other: &str) {
112		self.methods.insert(
113			alias.into(),
114			RemoteProcedure::Alias(other.into()),
115		);
116	}
117
118	/// Adds new supported asynchronous method
119	pub fn add_method<F>(&mut self, name: &str, method: F) where
120		F: RpcMethodSimple,
121	{
122		self.add_method_with_meta(name, move |params, _meta| {
123			method.call(params)
124		})
125	}
126
127	/// Adds new supported notification
128	pub fn add_notification<F>(&mut self, name: &str, notification: F) where
129		F: RpcNotificationSimple,
130	{
131		self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
132	}
133
134	/// Adds new supported asynchronous method with metadata support.
135	pub fn add_method_with_meta<F>(&mut self, name: &str, method: F) where
136		F: RpcMethod<T>,
137	{
138		self.methods.insert(
139			name.into(),
140			RemoteProcedure::Method(Arc::new(method)),
141		);
142	}
143
144	/// Adds new supported notification with metadata support.
145	pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F) where
146		F: RpcNotification<T>,
147	{
148		self.methods.insert(
149			name.into(),
150			RemoteProcedure::Notification(Arc::new(notification)),
151		);
152	}
153
154	/// Extend this `MetaIoHandler` with methods defined elsewhere.
155	pub fn extend_with<F>(&mut self, methods: F) where
156		F: Into<HashMap<String, RemoteProcedure<T>>>
157	{
158		self.methods.extend(methods.into())
159	}
160
161	/// Handle given request synchronously - will block until response is available.
162	/// If you have any asynchronous methods in your RPC it is much wiser to use
163	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
164	pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
165		self.handle_request(request, meta).wait().expect("Handler calls can never fail.")
166	}
167
168	/// Handle given request asynchronously.
169	pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future> {
170		use self::future::Either::{A, B};
171		fn as_string(response: Option<Response>) -> Option<String> {
172			let res = response.map(write_response);
173			debug!(target: "rpc", "Response: {}.", match res {
174				Some(ref res) => res,
175				None => "None",
176			});
177			res
178		}
179
180		trace!(target: "rpc", "Request: {}.", request);
181		let request = read_request(request);
182		let result = match request {
183			Err(error) => A(futures::finished(Some(Response::from(error, self.compatibility.default_version())))),
184			Ok(request) => B(self.handle_rpc_request(request, meta)),
185		};
186
187		result.map(as_string)
188	}
189
190	/// Handle deserialized RPC request.
191	pub fn handle_rpc_request(&self, request: Request, meta: T) -> S::Future {
192		use self::future::Either::{A, B};
193
194		self.middleware.on_request(request, meta, |request, meta| match request {
195			Request::Single(call) => {
196				A(self.handle_call(call, meta).map(|output| output.map(Response::Single)))
197			},
198			Request::Batch(calls) => {
199				let futures: Vec<_> = calls.into_iter().map(move |call| self.handle_call(call, meta.clone())).collect();
200				B(futures::future::join_all(futures).map(|outs| {
201					let outs: Vec<_> = outs.into_iter().filter_map(|v| v).collect();
202					if outs.is_empty() {
203						None
204					} else {
205						Some(Response::Batch(outs))
206					}
207				}))
208			},
209		})
210	}
211
212	/// Handle single call asynchronously.
213	pub fn handle_call(&self, call: Call, meta: T) -> FutureOutput {
214		use self::future::Either::{A, B};
215
216		match call {
217			Call::MethodCall(method) => {
218				let params = method.params.unwrap_or(Params::None);
219				let id = method.id;
220				let jsonrpc = method.jsonrpc;
221				let valid_version = self.compatibility.is_version_valid(jsonrpc);
222
223				let call_method = |method: &Arc<RpcMethod<T>>| {
224					let method = method.clone();
225					futures::lazy(move || method.call(params, meta))
226				};
227
228				let result = match (valid_version, self.methods.get(&method.method)) {
229					(false, _) => Err(Error::invalid_version()),
230					(true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
231					(true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
232						Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
233						_ => Err(Error::method_not_found()),
234					},
235					(true, _) => Err(Error::method_not_found()),
236				};
237
238				match result {
239					Ok(result) => A(Box::new(
240						result.then(move |result| futures::finished(Some(Output::from(result, id, jsonrpc))))
241					)),
242					Err(err) => B(futures::finished(Some(Output::from(Err(err), id, jsonrpc)))),
243				}
244			},
245			Call::Notification(notification) => {
246				let params = notification.params.unwrap_or(Params::None);
247				let jsonrpc = notification.jsonrpc;
248				if !self.compatibility.is_version_valid(jsonrpc) {
249					return B(futures::finished(None));
250				}
251
252				match self.methods.get(&notification.method) {
253					Some(&RemoteProcedure::Notification(ref notification)) => {
254						notification.execute(params, meta);
255					},
256					Some(&RemoteProcedure::Alias(ref alias)) => {
257						if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
258							notification.execute(params, meta);
259						}
260					},
261					_ => {},
262				}
263
264				B(futures::finished(None))
265			},
266			Call::Invalid(id) => {
267				B(futures::finished(Some(Output::invalid_request(id, self.compatibility.default_version()))))
268			},
269		}
270	}
271}
272
273/// Simplified `IoHandler` with no `Metadata` associated with each request.
274#[derive(Debug, Default)]
275pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
276
277// Type inference helper
278impl IoHandler {
279	/// Creates new `IoHandler` without any metadata.
280	pub fn new() -> Self {
281		IoHandler::default()
282	}
283
284	/// Creates new `IoHandler` without any metadata compatible with specified protocol version.
285	pub fn with_compatibility(compatibility: Compatibility) -> Self {
286		IoHandler(MetaIoHandler::with_compatibility(compatibility))
287	}
288}
289
290impl<M: Metadata> IoHandler<M> {
291	/// Handle given string request asynchronously.
292	pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse> {
293		self.0.handle_request(request, M::default())
294	}
295
296	/// Handle deserialized RPC request asynchronously.
297	pub fn handle_rpc_request(&self, request: Request) -> FutureResponse {
298		self.0.handle_rpc_request(request, M::default())
299	}
300
301	/// Handle single Call asynchronously.
302	pub fn handle_call(&self, call: Call) -> FutureOutput {
303		self.0.handle_call(call, M::default())
304	}
305
306	/// Handle given request synchronously - will block until response is available.
307	/// If you have any asynchronous methods in your RPC it is much wiser to use
308	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
309	pub fn handle_request_sync(&self, request: &str) -> Option<String> {
310		self.0.handle_request_sync(request, M::default())
311	}
312}
313
314impl<M: Metadata> Deref for IoHandler<M> {
315	type Target = MetaIoHandler<M>;
316
317	fn deref(&self) -> &Self::Target {
318		&self.0
319	}
320}
321
322impl<M: Metadata> DerefMut for IoHandler<M> {
323	fn deref_mut(&mut self) -> &mut Self::Target {
324		&mut self.0
325	}
326}
327
328impl From<IoHandler> for MetaIoHandler<()> {
329	fn from(io: IoHandler) -> Self {
330		io.0
331	}
332}
333
334fn read_request(request_str: &str) -> Result<Request, Error> {
335	serde_json::from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
336}
337
338fn write_response(response: Response) -> String {
339	// this should never fail
340	serde_json::to_string(&response).unwrap()
341}
342
343#[cfg(test)]
344mod tests {
345	use futures;
346	use types::{Value};
347	use super::{IoHandler, Compatibility};
348
349	#[test]
350	fn test_io_handler() {
351		let mut io = IoHandler::new();
352
353		io.add_method("say_hello", |_| {
354			Ok(Value::String("hello".to_string()))
355		});
356
357		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
358		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
359
360		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
361	}
362
363	#[test]
364	fn test_io_handler_1dot0() {
365		let mut io = IoHandler::with_compatibility(Compatibility::Both);
366
367		io.add_method("say_hello", |_| {
368			Ok(Value::String("hello".to_string()))
369		});
370
371		let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
372		let response = r#"{"result":"hello","id":1}"#;
373
374		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
375	}
376
377	#[test]
378	fn test_async_io_handler() {
379		let mut io = IoHandler::new();
380
381		io.add_method("say_hello", |_| {
382			futures::finished(Value::String("hello".to_string()))
383		});
384
385		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
386		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
387
388		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
389	}
390
391	#[test]
392	fn test_notification() {
393		use std::sync::Arc;
394		use std::sync::atomic;
395
396		let mut io = IoHandler::new();
397
398		let called = Arc::new(atomic::AtomicBool::new(false));
399		let c = called.clone();
400		io.add_notification("say_hello", move |_| {
401			c.store(true, atomic::Ordering::SeqCst);
402		});
403		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
404
405		assert_eq!(io.handle_request_sync(request), None);
406		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
407	}
408
409	#[test]
410	fn test_method_not_found() {
411		let io = IoHandler::new();
412
413		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
414		let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
415
416		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
417	}
418
419	#[test]
420	fn test_method_alias() {
421		let mut io = IoHandler::new();
422		io.add_method("say_hello", |_| {
423			Ok(Value::String("hello".to_string()))
424		});
425		io.add_alias("say_hello_alias", "say_hello");
426
427
428		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
429		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
430
431		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
432	}
433
434	#[test]
435	fn test_notification_alias() {
436		use std::sync::Arc;
437		use std::sync::atomic;
438
439		let mut io = IoHandler::new();
440
441		let called = Arc::new(atomic::AtomicBool::new(false));
442		let c = called.clone();
443		io.add_notification("say_hello", move |_| {
444			c.store(true, atomic::Ordering::SeqCst);
445		});
446		io.add_alias("say_hello_alias", "say_hello");
447
448		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
449		assert_eq!(io.handle_request_sync(request), None);
450		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
451	}
452
453	#[test]
454	fn test_send_sync() {
455		fn is_send_sync<T>(_obj: T) -> bool where
456			T: Send + Sync
457		{
458			true
459		}
460
461		let io = IoHandler::new();
462
463		assert!(is_send_sync(io))
464	}
465}