solana_jsonrpc_core/
io.rs

1use std::sync::Arc;
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4
5use serde_json;
6use futures::{self, future, Future};
7
8use calls::{RemoteProcedure, Metadata, RpcMethodSimple, RpcMethod, RpcNotificationSimple, RpcNotification};
9use middleware::{self, Middleware};
10use types::{Error, ErrorCode, Version};
11use types::{Request, Response, Call, Output};
12
13/// A type representing middleware or RPC response before serialization.
14pub type FutureResponse = Box<Future<Item=Option<Response>, Error=()> + Send>;
15
16/// A type representing middleware or RPC call output.
17pub type FutureOutput = Box<Future<Item=Option<Output>, Error=()> + Send>;
18
19/// A type representing future string response.
20pub type FutureResult<F, G> = future::Map<
21	future::Either<future::FutureResult<Option<Response>, ()>, FutureRpcResult<F, G>>,
22	fn(Option<Response>) -> Option<String>,
23>;
24
25/// A type representing a result of a single method call.
26pub type FutureRpcOutput<F> = future::Either<
27	F,
28	future::Either<
29		FutureOutput,
30		future::FutureResult<Option<Output>, ()>,
31	>,
32>;
33
34/// A type representing an optional `Response` for RPC `Request`.
35pub type FutureRpcResult<F, G> = future::Either<
36	F,
37	future::Either<
38		future::Map<
39			FutureRpcOutput<G>,
40			fn(Option<Output>) -> Option<Response>,
41		>,
42		future::Map<
43			future::JoinAll<Vec<FutureRpcOutput<G>>>,
44			fn(Vec<Option<Output>>) -> Option<Response>,
45		>,
46	>,
47>;
48
49/// `IoHandler` json-rpc protocol compatibility
50#[derive(Debug, Clone, Copy)]
51pub enum Compatibility {
52	/// Compatible only with JSON-RPC 1.x
53	V1,
54	/// Compatible only with JSON-RPC 2.0
55	V2,
56	/// Compatible with both
57	Both,
58}
59
60impl Default for Compatibility {
61	fn default() -> Self {
62		Compatibility::V2
63	}
64}
65
66impl Compatibility {
67	fn is_version_valid(&self, version: Option<Version>) -> bool {
68		match (*self, version) {
69			(Compatibility::V1, None) |
70			(Compatibility::V2, Some(Version::V2)) |
71			(Compatibility::Both, _) => true,
72			_ => false,
73		}
74	}
75
76	fn default_version(&self) -> Option<Version> {
77		match *self {
78			Compatibility::V1 => None,
79			Compatibility::V2 | Compatibility::Both => Some(Version::V2),
80		}
81	}
82}
83
84/// Request handler
85///
86/// By default compatible only with jsonrpc v2
87#[derive(Debug)]
88pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
89	middleware: S,
90	compatibility: Compatibility,
91	methods: HashMap<String, RemoteProcedure<T>>,
92}
93
94impl<T: Metadata> Default for MetaIoHandler<T> {
95	fn default() -> Self {
96		MetaIoHandler::with_compatibility(Default::default())
97	}
98}
99
100impl<T: Metadata> MetaIoHandler<T> {
101	/// Creates new `MetaIoHandler` compatible with specified protocol version.
102	pub fn with_compatibility(compatibility: Compatibility) -> Self {
103		MetaIoHandler {
104			compatibility: compatibility,
105			middleware: Default::default(),
106			methods: Default::default(),
107		}
108	}
109}
110
111
112impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
113	/// Creates new `MetaIoHandler`
114	pub fn new(compatibility: Compatibility, middleware: S) -> Self {
115		MetaIoHandler {
116			compatibility: compatibility,
117			middleware: middleware,
118			methods: Default::default(),
119		}
120	}
121
122	/// Creates new `MetaIoHandler` with specified middleware.
123	pub fn with_middleware(middleware: S) -> Self {
124		MetaIoHandler {
125			compatibility: Default::default(),
126			middleware: middleware,
127			methods: Default::default(),
128		}
129	}
130
131	/// Adds an alias to a method.
132	pub fn add_alias(&mut self, alias: &str, other: &str) {
133		self.methods.insert(
134			alias.into(),
135			RemoteProcedure::Alias(other.into()),
136		);
137	}
138
139	/// Adds new supported asynchronous method
140	pub fn add_method<F>(&mut self, name: &str, method: F) where
141		F: RpcMethodSimple,
142	{
143		self.add_method_with_meta(name, move |params, _meta| {
144			method.call(params)
145		})
146	}
147
148	/// Adds new supported notification
149	pub fn add_notification<F>(&mut self, name: &str, notification: F) where
150		F: RpcNotificationSimple,
151	{
152		self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
153	}
154
155	/// Adds new supported asynchronous method with metadata support.
156	pub fn add_method_with_meta<F>(&mut self, name: &str, method: F) where
157		F: RpcMethod<T>,
158	{
159		self.methods.insert(
160			name.into(),
161			RemoteProcedure::Method(Arc::new(method)),
162		);
163	}
164
165	/// Adds new supported notification with metadata support.
166	pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F) where
167		F: RpcNotification<T>,
168	{
169		self.methods.insert(
170			name.into(),
171			RemoteProcedure::Notification(Arc::new(notification)),
172		);
173	}
174
175	/// Extend this `MetaIoHandler` with methods defined elsewhere.
176	pub fn extend_with<F>(&mut self, methods: F) where
177		F: Into<HashMap<String, RemoteProcedure<T>>>
178	{
179		self.methods.extend(methods.into())
180	}
181
182	/// Handle given request synchronously - will block until response is available.
183	/// If you have any asynchronous methods in your RPC it is much wiser to use
184	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
185	pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
186		self.handle_request(request, meta).wait().expect("Handler calls can never fail.")
187	}
188
189	/// Handle given request asynchronously.
190	pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future, S::CallFuture> {
191		use self::future::Either::{A, B};
192		fn as_string(response: Option<Response>) -> Option<String> {
193			let res = response.map(write_response);
194			debug!(target: "rpc", "Response: {}.", match res {
195				Some(ref res) => res,
196				None => "None",
197			});
198			res
199		}
200
201		trace!(target: "rpc", "Request: {}.", request);
202		let request = read_request(request);
203		let result = match request {
204			Err(error) => A(futures::finished(Some(Response::from(error, self.compatibility.default_version())))),
205			Ok(request) => B(self.handle_rpc_request(request, meta)),
206		};
207
208		result.map(as_string)
209	}
210
211	/// Handle deserialized RPC request.
212	pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future, S::CallFuture> {
213		use self::future::Either::{A, B};
214
215		fn output_as_response(output: Option<Output>) -> Option<Response> {
216			output.map(Response::Single)
217		}
218
219		fn outputs_as_batch(outs: Vec<Option<Output>>) -> Option<Response> {
220			let outs: Vec<_> = outs.into_iter().filter_map(|v| v).collect();
221			if outs.is_empty() {
222				None
223			} else {
224				Some(Response::Batch(outs))
225			}
226		}
227
228		self.middleware.on_request(request, meta, |request, meta| match request {
229			Request::Single(call) => {
230				A(self.handle_call(call, meta).map(output_as_response as fn(Option<Output>) ->
231												   Option<Response>))
232			},
233			Request::Batch(calls) => {
234				let futures: Vec<_> = calls.into_iter().map(move |call| self.handle_call(call, meta.clone())).collect();
235				B(futures::future::join_all(futures).map(outputs_as_batch as fn(Vec<Option<Output>>) ->
236																				Option<Response>))
237			},
238		})
239	}
240
241	/// Handle single call asynchronously.
242	pub fn handle_call(&self, call: Call, meta: T) -> FutureRpcOutput<S::CallFuture> {
243		use self::future::Either::{A, B};
244
245		self.middleware.on_call(call, meta, |call, meta| match call {
246			Call::MethodCall(method) => {
247				let params = method.params;
248				let id = method.id;
249				let jsonrpc = method.jsonrpc;
250				let valid_version = self.compatibility.is_version_valid(jsonrpc);
251
252				let call_method = |method: &Arc<RpcMethod<T>>| {
253					let method = method.clone();
254					futures::lazy(move || method.call(params, meta))
255				};
256
257				let result = match (valid_version, self.methods.get(&method.method)) {
258					(false, _) => Err(Error::invalid_version()),
259					(true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
260					(true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
261						Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
262						_ => Err(Error::method_not_found()),
263					},
264					(true, _) => Err(Error::method_not_found()),
265				};
266
267				match result {
268					Ok(result) => A(Box::new(
269						result.then(move |result| futures::finished(Some(Output::from(result, id, jsonrpc))))
270					) as _),
271					Err(err) => B(futures::finished(Some(Output::from(Err(err), id, jsonrpc)))),
272				}
273			},
274			Call::Notification(notification) => {
275				let params = notification.params;
276				let jsonrpc = notification.jsonrpc;
277				if !self.compatibility.is_version_valid(jsonrpc) {
278					return B(futures::finished(None));
279				}
280
281				match self.methods.get(&notification.method) {
282					Some(&RemoteProcedure::Notification(ref notification)) => {
283						notification.execute(params, meta);
284					},
285					Some(&RemoteProcedure::Alias(ref alias)) => {
286						if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
287							notification.execute(params, meta);
288						}
289					},
290					_ => {},
291				}
292
293				B(futures::finished(None))
294			},
295			Call::Invalid { id } => {
296				B(futures::finished(Some(Output::invalid_request(id, self.compatibility.default_version()))))
297			},
298		})
299	}
300}
301
302/// Simplified `IoHandler` with no `Metadata` associated with each request.
303#[derive(Debug, Default)]
304pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
305
306// Type inference helper
307impl IoHandler {
308	/// Creates new `IoHandler` without any metadata.
309	pub fn new() -> Self {
310		IoHandler::default()
311	}
312
313	/// Creates new `IoHandler` without any metadata compatible with specified protocol version.
314	pub fn with_compatibility(compatibility: Compatibility) -> Self {
315		IoHandler(MetaIoHandler::with_compatibility(compatibility))
316	}
317}
318
319impl<M: Metadata + Default> IoHandler<M> {
320	/// Handle given string request asynchronously.
321	pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse, FutureOutput> {
322		self.0.handle_request(request, M::default())
323	}
324
325	/// Handle deserialized RPC request asynchronously.
326	pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse, FutureOutput> {
327		self.0.handle_rpc_request(request, M::default())
328	}
329
330	/// Handle single Call asynchronously.
331	pub fn handle_call(&self, call: Call) -> FutureRpcOutput<FutureOutput> {
332		self.0.handle_call(call, M::default())
333	}
334
335	/// Handle given request synchronously - will block until response is available.
336	/// If you have any asynchronous methods in your RPC it is much wiser to use
337	/// `handle_request` instead and deal with asynchronous requests in a non-blocking fashion.
338	pub fn handle_request_sync(&self, request: &str) -> Option<String> {
339		self.0.handle_request_sync(request, M::default())
340	}
341}
342
343impl<M: Metadata> Deref for IoHandler<M> {
344	type Target = MetaIoHandler<M>;
345
346	fn deref(&self) -> &Self::Target {
347		&self.0
348	}
349}
350
351impl<M: Metadata> DerefMut for IoHandler<M> {
352	fn deref_mut(&mut self) -> &mut Self::Target {
353		&mut self.0
354	}
355}
356
357impl From<IoHandler> for MetaIoHandler<()> {
358	fn from(io: IoHandler) -> Self {
359		io.0
360	}
361}
362
363fn read_request(request_str: &str) -> Result<Request, Error> {
364	serde_json::from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
365}
366
367fn write_response(response: Response) -> String {
368	// this should never fail
369	serde_json::to_string(&response).unwrap()
370}
371
372#[cfg(test)]
373mod tests {
374	use futures;
375	use types::{Value};
376	use super::{IoHandler, Compatibility};
377
378	#[test]
379	fn test_io_handler() {
380		let mut io = IoHandler::new();
381
382		io.add_method("say_hello", |_| {
383			Ok(Value::String("hello".to_string()))
384		});
385
386		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
387		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
388
389		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
390	}
391
392	#[test]
393	fn test_io_handler_1dot0() {
394		let mut io = IoHandler::with_compatibility(Compatibility::Both);
395
396		io.add_method("say_hello", |_| {
397			Ok(Value::String("hello".to_string()))
398		});
399
400		let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
401		let response = r#"{"result":"hello","id":1}"#;
402
403		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
404	}
405
406	#[test]
407	fn test_async_io_handler() {
408		let mut io = IoHandler::new();
409
410		io.add_method("say_hello", |_| {
411			futures::finished(Value::String("hello".to_string()))
412		});
413
414		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
415		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
416
417		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
418	}
419
420	#[test]
421	fn test_notification() {
422		use std::sync::Arc;
423		use std::sync::atomic;
424
425		let mut io = IoHandler::new();
426
427		let called = Arc::new(atomic::AtomicBool::new(false));
428		let c = called.clone();
429		io.add_notification("say_hello", move |_| {
430			c.store(true, atomic::Ordering::SeqCst);
431		});
432		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
433
434		assert_eq!(io.handle_request_sync(request), None);
435		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
436	}
437
438	#[test]
439	fn test_method_not_found() {
440		let io = IoHandler::new();
441
442		let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
443		let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
444
445		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
446	}
447
448	#[test]
449	fn test_method_alias() {
450		let mut io = IoHandler::new();
451		io.add_method("say_hello", |_| {
452			Ok(Value::String("hello".to_string()))
453		});
454		io.add_alias("say_hello_alias", "say_hello");
455
456
457		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
458		let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
459
460		assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
461	}
462
463	#[test]
464	fn test_notification_alias() {
465		use std::sync::Arc;
466		use std::sync::atomic;
467
468		let mut io = IoHandler::new();
469
470		let called = Arc::new(atomic::AtomicBool::new(false));
471		let c = called.clone();
472		io.add_notification("say_hello", move |_| {
473			c.store(true, atomic::Ordering::SeqCst);
474		});
475		io.add_alias("say_hello_alias", "say_hello");
476
477		let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
478		assert_eq!(io.handle_request_sync(request), None);
479		assert_eq!(called.load(atomic::Ordering::SeqCst), true);
480	}
481
482	#[test]
483	fn test_send_sync() {
484		fn is_send_sync<T>(_obj: T) -> bool where
485			T: Send + Sync
486		{
487			true
488		}
489
490		let io = IoHandler::new();
491
492		assert!(is_send_sync(io))
493	}
494}