bitconch_jsonrpc_core/
io.rs

1use std::sync::Arc;
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4
5use serde_json;
6use futures::{self, future, Future};
7
8use calls::{RemoteProcedure, Metadata, RpcMethodSimple, RpcMethod, RpcNotificationSimple, RpcNotification};
9use middleware::{self, Middleware};
10use types::{Error, ErrorCode, Version};
11use types::{Request, Response, Call, Output};
12
13/// A type representing middleware or RPC response before serialization.
14pub type FutureResponse = Box<Future<Item=Option<Response>, Error=()> + Send>;
15
16/// A type representing future string response.
17pub type FutureResult<F> = future::Map<
18	future::Either<future::FutureResult<Option<Response>, ()>, FutureRpcResult<F>>,
19	fn(Option<Response>) -> Option<String>,
20>;
21
22/// A type representing a result of a single method call.
23pub type FutureOutput = future::Either<
24	Box<Future<Item=Option<Output>, Error=()> + Send>,
25	future::FutureResult<Option<Output>, ()>,
26>;
27
28/// A type representing an optional `Response` for RPC `Request`.
29pub type FutureRpcResult<F> = future::Either<
30	F,
31	future::Either<
32		future::Map<
33			FutureOutput,
34			fn(Option<Output>) -> Option<Response>,
35		>,
36		future::Map<
37			future::JoinAll<Vec<FutureOutput>>,
38			fn(Vec<Option<Output>>) -> Option<Response>,
39		>,
40	>,
41>;
42
43/// `IoHandler` json-rpc protocol compatibility
44#[derive(Debug, Clone, Copy)]
45pub enum Compatibility {
46	/// Compatible only with JSON-RPC 1.x
47	V1,
48	/// Compatible only with JSON-RPC 2.0
49	V2,
50	/// Compatible with both
51	Both,
52}
53
54impl Default for Compatibility {
55	fn default() -> Self {
56		Compatibility::V2
57	}
58}
59
60impl Compatibility {
61	fn is_version_valid(&self, version: Option<Version>) -> bool {
62		match (*self, version) {
63			(Compatibility::V1, None) |
64			(Compatibility::V2, Some(Version::V2)) |
65			(Compatibility::Both, _) => true,
66			_ => false,
67		}
68	}
69
70	fn default_version(&self) -> Option<Version> {
71		match *self {
72			Compatibility::V1 => None,
73			Compatibility::V2 | Compatibility::Both => Some(Version::V2),
74		}
75	}
76}
77
78/// Request handler
79///
80/// By default compatible only with jsonrpc v2
81#[derive(Debug)]
82pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
83	middleware: S,
84	compatibility: Compatibility,
85	methods: HashMap<String, RemoteProcedure<T>>,
86}
87
88impl<T: Metadata> Default for MetaIoHandler<T> {
89	fn default() -> Self {
90		MetaIoHandler::with_compatibility(Default::default())
91	}
92}
93
94impl<T: Metadata> MetaIoHandler<T> {
95	/// Creates new `MetaIoHandler` compatible with specified protocol version.
96	pub fn with_compatibility(compatibility: Compatibility) -> Self {
97		MetaIoHandler {
98			compatibility: compatibility,
99			middleware: Default::default(),
100			methods: Default::default(),
101		}
102	}
103}
104
105
106impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
107	/// Creates new `MetaIoHandler`
108	pub fn new(compatibility: Compatibility, middleware: S) -> Self {
109		MetaIoHandler {
110			compatibility: compatibility,
111			middleware: middleware,
112			methods: Default::default(),
113		}
114	}
115
116	/// Creates new `MetaIoHandler` with specified middleware.
117	pub fn with_middleware(middleware: S) -> Self {
118		MetaIoHandler {
119			compatibility: Default::default(),
120			middleware: middleware,
121			methods: Default::default(),
122		}
123	}
124
125	/// Adds an alias to a method.
126	pub fn add_alias(&mut self, alias: &str, other: &str) {
127		self.methods.insert(
128			alias.into(),
129			RemoteProcedure::Alias(other.into()),
130		);
131	}
132
133	/// Adds new supported asynchronous method
134	pub fn add_method<F>(&mut self, name: &str, method: F) where
135		F: RpcMethodSimple,
136	{
137		self.add_method_with_meta(name, move |params, _meta| {
138			method.call(params)
139		})
140	}
141
142	/// Adds new supported notification
143	pub fn add_notification<F>(&mut self, name: &str, notification: F) where
144		F: RpcNotificationSimple,
145	{
146		self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
147	}
148
149	/// Adds new supported asynchronous method with metadata support.
150	pub fn add_method_with_meta<F>(&mut self, name: &str, method: F) where
151		F: RpcMethod<T>,
152	{
153		self.methods.insert(
154			name.into(),
155			RemoteProcedure::Method(Arc::new(method)),
156		);
157	}
158
159	/// Adds new supported notification with metadata support.
160	pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F) where
161		F: RpcNotification<T>,
162	{
163		self.methods.insert(
164			name.into(),
165			RemoteProcedure::Notification(Arc::new(notification)),
166		);
167	}
168
169	/// Extend this `MetaIoHandler` with methods defined elsewhere.
170	pub fn extend_with<F>(&mut self, methods: F) where
171		F: Into<HashMap<String, RemoteProcedure<T>>>
172	{
173		self.methods.extend(methods.into())
174	}
175
176	/// Handle given request synchronously - will block until response is available.
177	/// If you have any asynchronous methods in your RPC it is much wiser to use
178	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
179	pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
180		self.handle_request(request, meta).wait().expect("Handler calls can never fail.")
181	}
182
183	/// Handle given request asynchronously.
184	pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future> {
185		use self::future::Either::{A, B};
186		fn as_string(response: Option<Response>) -> Option<String> {
187			let res = response.map(write_response);
188			debug!(target: "rpc", "Response: {}.", match res {
189				Some(ref res) => res,
190				None => "None",
191			});
192			res
193		}
194
195		trace!(target: "rpc", "Request: {}.", request);
196		let request = read_request(request);
197		let result = match request {
198			Err(error) => A(futures::finished(Some(Response::from(error, self.compatibility.default_version())))),
199			Ok(request) => B(self.handle_rpc_request(request, meta)),
200		};
201
202		result.map(as_string)
203	}
204
205	/// Handle deserialized RPC request.
206	pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future> {
207		use self::future::Either::{A, B};
208
209		fn output_as_response(output: Option<Output>) -> Option<Response> {
210			output.map(Response::Single)
211		}
212
213		fn outputs_as_batch(outs: Vec<Option<Output>>) -> Option<Response> {
214			let outs: Vec<_> = outs.into_iter().filter_map(|v| v).collect();
215			if outs.is_empty() {
216				None
217			} else {
218				Some(Response::Batch(outs))
219			}
220		}
221
222		self.middleware.on_request(request, meta, |request, meta| match request {
223			Request::Single(call) => {
224				A(self.handle_call(call, meta).map(output_as_response as fn(Option<Output>) ->
225												   Option<Response>))
226			},
227			Request::Batch(calls) => {
228				let futures: Vec<_> = calls.into_iter().map(move |call| self.handle_call(call, meta.clone())).collect();
229				B(futures::future::join_all(futures).map(outputs_as_batch as fn(Vec<Option<Output>>) ->
230																				Option<Response>))
231			},
232		})
233	}
234
235	/// Handle single call asynchronously.
236	pub fn handle_call(&self, call: Call, meta: T) -> FutureOutput {
237		use self::future::Either::{A, B};
238
239		match call {
240			Call::MethodCall(method) => {
241				let params = method.params;
242				let id = method.id;
243				let jsonrpc = method.jsonrpc;
244				let valid_version = self.compatibility.is_version_valid(jsonrpc);
245
246				let call_method = |method: &Arc<RpcMethod<T>>| {
247					let method = method.clone();
248					futures::lazy(move || method.call(params, meta))
249				};
250
251				let result = match (valid_version, self.methods.get(&method.method)) {
252					(false, _) => Err(Error::invalid_version()),
253					(true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
254					(true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
255						Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
256						_ => Err(Error::method_not_found()),
257					},
258					(true, _) => Err(Error::method_not_found()),
259				};
260
261				match result {
262					Ok(result) => A(Box::new(
263						result.then(move |result| futures::finished(Some(Output::from(result, id, jsonrpc))))
264					)),
265					Err(err) => B(futures::finished(Some(Output::from(Err(err), id, jsonrpc)))),
266				}
267			},
268			Call::Notification(notification) => {
269				let params = notification.params;
270				let jsonrpc = notification.jsonrpc;
271				if !self.compatibility.is_version_valid(jsonrpc) {
272					return B(futures::finished(None));
273				}
274
275				match self.methods.get(&notification.method) {
276					Some(&RemoteProcedure::Notification(ref notification)) => {
277						notification.execute(params, meta);
278					},
279					Some(&RemoteProcedure::Alias(ref alias)) => {
280						if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
281							notification.execute(params, meta);
282						}
283					},
284					_ => {},
285				}
286
287				B(futures::finished(None))
288			},
289			Call::Invalid { id } => {
290				B(futures::finished(Some(Output::invalid_request(id, self.compatibility.default_version()))))
291			},
292		}
293	}
294}
295
296/// Simplified `IoHandler` with no `Metadata` associated with each request.
297#[derive(Debug, Default)]
298pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
299
300// Type inference helper
301impl IoHandler {
302	/// Creates new `IoHandler` without any metadata.
303	pub fn new() -> Self {
304		IoHandler::default()
305	}
306
307	/// Creates new `IoHandler` without any metadata compatible with specified protocol version.
308	pub fn with_compatibility(compatibility: Compatibility) -> Self {
309		IoHandler(MetaIoHandler::with_compatibility(compatibility))
310	}
311}
312
313impl<M: Metadata + Default> IoHandler<M> {
314	/// Handle given string request asynchronously.
315	pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse> {
316		self.0.handle_request(request, M::default())
317	}
318
319	/// Handle deserialized RPC request asynchronously.
320	pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse> {
321		self.0.handle_rpc_request(request, M::default())
322	}
323
324	/// Handle single Call asynchronously.
325	pub fn handle_call(&self, call: Call) -> FutureOutput {
326		self.0.handle_call(call, M::default())
327	}
328
329	/// Handle given request synchronously - will block until response is available.
330	/// If you have any asynchronous methods in your RPC it is much wiser to use
331	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
332	pub fn handle_request_sync(&self, request: &str) -> Option<String> {
333		self.0.handle_request_sync(request, M::default())
334	}
335}
336
337impl<M: Metadata> Deref for IoHandler<M> {
338	type Target = MetaIoHandler<M>;
339
340	fn deref(&self) -> &Self::Target {
341		&self.0
342	}
343}
344
345impl<M: Metadata> DerefMut for IoHandler<M> {
346	fn deref_mut(&mut self) -> &mut Self::Target {
347		&mut self.0
348	}
349}
350
351impl From<IoHandler> for MetaIoHandler<()> {
352	fn from(io: IoHandler) -> Self {
353		io.0
354	}
355}
356
357fn read_request(request_str: &str) -> Result<Request, Error> {
358	serde_json::from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
359}
360
361fn write_response(response: Response) -> String {
362	// this should never fail
363	serde_json::to_string(&response).unwrap()
364}
365
366#[cfg(test)]
367mod tests {
368	use futures;
369	use types::{Value};
370	use super::{IoHandler, Compatibility};
371
372	#[test]
373	fn test_io_handler() {
374		let mut io = IoHandler::new();
375
376		io.add_method("say_hello", |_| {
377			Ok(Value::String("hello".to_string()))
378		});
379
380		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
381		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
382
383		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
384	}
385
386	#[test]
387	fn test_io_handler_1dot0() {
388		let mut io = IoHandler::with_compatibility(Compatibility::Both);
389
390		io.add_method("say_hello", |_| {
391			Ok(Value::String("hello".to_string()))
392		});
393
394		let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
395		let response = r#"{"result":"hello","id":1}"#;
396
397		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
398	}
399
400	#[test]
401	fn test_async_io_handler() {
402		let mut io = IoHandler::new();
403
404		io.add_method("say_hello", |_| {
405			futures::finished(Value::String("hello".to_string()))
406		});
407
408		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
409		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
410
411		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
412	}
413
414	#[test]
415	fn test_notification() {
416		use std::sync::Arc;
417		use std::sync::atomic;
418
419		let mut io = IoHandler::new();
420
421		let called = Arc::new(atomic::AtomicBool::new(false));
422		let c = called.clone();
423		io.add_notification("say_hello", move |_| {
424			c.store(true, atomic::Ordering::SeqCst);
425		});
426		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
427
428		assert_eq!(io.handle_request_sync(request), None);
429		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
430	}
431
432	#[test]
433	fn test_method_not_found() {
434		let io = IoHandler::new();
435
436		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
437		let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
438
439		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
440	}
441
442	#[test]
443	fn test_method_alias() {
444		let mut io = IoHandler::new();
445		io.add_method("say_hello", |_| {
446			Ok(Value::String("hello".to_string()))
447		});
448		io.add_alias("say_hello_alias", "say_hello");
449
450
451		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
452		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
453
454		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
455	}
456
457	#[test]
458	fn test_notification_alias() {
459		use std::sync::Arc;
460		use std::sync::atomic;
461
462		let mut io = IoHandler::new();
463
464		let called = Arc::new(atomic::AtomicBool::new(false));
465		let c = called.clone();
466		io.add_notification("say_hello", move |_| {
467			c.store(true, atomic::Ordering::SeqCst);
468		});
469		io.add_alias("say_hello_alias", "say_hello");
470
471		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
472		assert_eq!(io.handle_request_sync(request), None);
473		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
474	}
475
476	#[test]
477	fn test_send_sync() {
478		fn is_send_sync<T>(_obj: T) -> bool where
479			T: Send + Sync
480		{
481			true
482		}
483
484		let io = IoHandler::new();
485
486		assert!(is_send_sync(io))
487	}
488}