tetsy_jsonrpc_client_transports/
lib.rs

1//! JSON-RPC client implementation.
2
3#![deny(missing_docs)]
4
5use failure::{format_err, Fail};
6use futures::sync::{mpsc, oneshot};
7use futures::{future, prelude::*};
8use tetsy_jsonrpc_core::{Error, Params};
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use serde_json::Value;
12use std::marker::PhantomData;
13
14pub mod transports;
15
16#[cfg(test)]
17mod logger;
18
19/// The errors returned by the client.
20#[derive(Debug, Fail)]
21pub enum RpcError {
22	/// An error returned by the server.
23	#[fail(display = "Server returned rpc error {}", _0)]
24	JsonRpcError(Error),
25	/// Failure to parse server response.
26	#[fail(display = "Failed to parse server response as {}: {}", _0, _1)]
27	ParseError(String, failure::Error),
28	/// Request timed out.
29	#[fail(display = "Request timed out")]
30	Timeout,
31	/// Not rpc specific errors.
32	#[fail(display = "{}", _0)]
33	Other(failure::Error),
34}
35
36impl From<Error> for RpcError {
37	fn from(error: Error) -> Self {
38		RpcError::JsonRpcError(error)
39	}
40}
41
42/// An RPC call message.
43struct CallMessage {
44	/// The RPC method name.
45	method: String,
46	/// The RPC method parameters.
47	params: Params,
48	/// The oneshot channel to send the result of the rpc
49	/// call to.
50	sender: oneshot::Sender<Result<Value, RpcError>>,
51}
52
53/// An RPC notification.
54struct NotifyMessage {
55	/// The RPC method name.
56	method: String,
57	/// The RPC method paramters.
58	params: Params,
59}
60
61/// An RPC subscription.
62struct Subscription {
63	/// The subscribe method name.
64	subscribe: String,
65	/// The subscribe method parameters.
66	subscribe_params: Params,
67	/// The name of the notification.
68	notification: String,
69	/// The unsubscribe method name.
70	unsubscribe: String,
71}
72
73/// An RPC subscribe message.
74struct SubscribeMessage {
75	/// The subscription to subscribe to.
76	subscription: Subscription,
77	/// The channel to send notifications to.
78	sender: mpsc::Sender<Result<Value, RpcError>>,
79}
80
81/// A message sent to the `RpcClient`.
82enum RpcMessage {
83	/// Make an RPC call.
84	Call(CallMessage),
85	/// Send a notification.
86	Notify(NotifyMessage),
87	/// Subscribe to a notification.
88	Subscribe(SubscribeMessage),
89}
90
91impl From<CallMessage> for RpcMessage {
92	fn from(msg: CallMessage) -> Self {
93		RpcMessage::Call(msg)
94	}
95}
96
97impl From<NotifyMessage> for RpcMessage {
98	fn from(msg: NotifyMessage) -> Self {
99		RpcMessage::Notify(msg)
100	}
101}
102
103impl From<SubscribeMessage> for RpcMessage {
104	fn from(msg: SubscribeMessage) -> Self {
105		RpcMessage::Subscribe(msg)
106	}
107}
108
109/// A channel to a `RpcClient`.
110#[derive(Clone)]
111pub struct RpcChannel(mpsc::Sender<RpcMessage>);
112
113impl RpcChannel {
114	fn send(
115		&self,
116		msg: RpcMessage,
117	) -> impl Future<Item = mpsc::Sender<RpcMessage>, Error = mpsc::SendError<RpcMessage>> {
118		self.0.to_owned().send(msg)
119	}
120}
121
122impl From<mpsc::Sender<RpcMessage>> for RpcChannel {
123	fn from(sender: mpsc::Sender<RpcMessage>) -> Self {
124		RpcChannel(sender)
125	}
126}
127
128/// The future returned by the rpc call.
129pub struct RpcFuture {
130	recv: oneshot::Receiver<Result<Value, RpcError>>,
131}
132
133impl RpcFuture {
134	/// Creates a new `RpcFuture`.
135	pub fn new(recv: oneshot::Receiver<Result<Value, RpcError>>) -> Self {
136		RpcFuture { recv }
137	}
138}
139
140impl Future for RpcFuture {
141	type Item = Value;
142	type Error = RpcError;
143
144	fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
145		// TODO should timeout (#410)
146		match self.recv.poll() {
147			Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
148			Ok(Async::Ready(Err(error))) => Err(error),
149			Ok(Async::NotReady) => Ok(Async::NotReady),
150			Err(error) => Err(RpcError::Other(error.into())),
151		}
152	}
153}
154
155/// The stream returned by a subscribe.
156pub struct SubscriptionStream {
157	recv: mpsc::Receiver<Result<Value, RpcError>>,
158}
159
160impl SubscriptionStream {
161	/// Crates a new `SubscriptionStream`.
162	pub fn new(recv: mpsc::Receiver<Result<Value, RpcError>>) -> Self {
163		SubscriptionStream { recv }
164	}
165}
166
167impl Stream for SubscriptionStream {
168	type Item = Value;
169	type Error = RpcError;
170
171	fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
172		match self.recv.poll() {
173			Ok(Async::Ready(Some(Ok(value)))) => Ok(Async::Ready(Some(value))),
174			Ok(Async::Ready(Some(Err(error)))) => Err(error),
175			Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
176			Ok(Async::NotReady) => Ok(Async::NotReady),
177			Err(()) => Err(RpcError::Other(format_err!("mpsc channel returned an error."))),
178		}
179	}
180}
181
182/// A typed subscription stream.
183pub struct TypedSubscriptionStream<T> {
184	_marker: PhantomData<T>,
185	returns: &'static str,
186	stream: SubscriptionStream,
187}
188
189impl<T> TypedSubscriptionStream<T> {
190	/// Creates a new `TypedSubscriptionStream`.
191	pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self {
192		TypedSubscriptionStream {
193			_marker: PhantomData,
194			returns,
195			stream,
196		}
197	}
198}
199
200impl<T: DeserializeOwned + 'static> Stream for TypedSubscriptionStream<T> {
201	type Item = T;
202	type Error = RpcError;
203
204	fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
205		let result = match self.stream.poll()? {
206			Async::Ready(Some(value)) => serde_json::from_value::<T>(value)
207				.map(|result| Async::Ready(Some(result)))
208				.map_err(|error| RpcError::ParseError(self.returns.into(), error.into()))?,
209			Async::Ready(None) => Async::Ready(None),
210			Async::NotReady => Async::NotReady,
211		};
212		Ok(result)
213	}
214}
215
216/// Client for raw JSON RPC requests
217#[derive(Clone)]
218pub struct RawClient(RpcChannel);
219
220impl From<RpcChannel> for RawClient {
221	fn from(channel: RpcChannel) -> Self {
222		RawClient(channel)
223	}
224}
225
226impl RawClient {
227	/// Call RPC method with raw JSON.
228	pub fn call_method(&self, method: &str, params: Params) -> impl Future<Item = Value, Error = RpcError> {
229		let (sender, receiver) = oneshot::channel();
230		let msg = CallMessage {
231			method: method.into(),
232			params,
233			sender,
234		};
235		self.0
236			.send(msg.into())
237			.map_err(|error| RpcError::Other(error.into()))
238			.and_then(|_| RpcFuture::new(receiver))
239	}
240
241	/// Send RPC notification with raw JSON.
242	pub fn notify(&self, method: &str, params: Params) -> impl Future<Item = (), Error = RpcError> {
243		let msg = NotifyMessage {
244			method: method.into(),
245			params,
246		};
247		self.0
248			.send(msg.into())
249			.map(|_| ())
250			.map_err(|error| RpcError::Other(error.into()))
251	}
252
253	/// Subscribe to topic with raw JSON.
254	pub fn subscribe(
255		&self,
256		subscribe: &str,
257		subscribe_params: Params,
258		notification: &str,
259		unsubscribe: &str,
260	) -> impl Future<Item = SubscriptionStream, Error = RpcError> {
261		let (sender, receiver) = mpsc::channel(0);
262		let msg = SubscribeMessage {
263			subscription: Subscription {
264				subscribe: subscribe.into(),
265				subscribe_params,
266				notification: notification.into(),
267				unsubscribe: unsubscribe.into(),
268			},
269			sender,
270		};
271		self.0
272			.send(msg.into())
273			.map_err(|error| RpcError::Other(error.into()))
274			.map(|_| SubscriptionStream::new(receiver))
275	}
276}
277
278/// Client for typed JSON RPC requests
279#[derive(Clone)]
280pub struct TypedClient(RawClient);
281
282impl From<RpcChannel> for TypedClient {
283	fn from(channel: RpcChannel) -> Self {
284		TypedClient(channel.into())
285	}
286}
287
288impl TypedClient {
289	/// Create a new `TypedClient`.
290	pub fn new(raw_cli: RawClient) -> Self {
291		TypedClient(raw_cli)
292	}
293
294	/// Call RPC with serialization of request and deserialization of response.
295	pub fn call_method<T: Serialize, R: DeserializeOwned + 'static>(
296		&self,
297		method: &str,
298		returns: &'static str,
299		args: T,
300	) -> impl Future<Item = R, Error = RpcError> {
301		let args =
302			serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
303		let params = match args {
304			Value::Array(vec) => Params::Array(vec),
305			Value::Null => Params::None,
306			Value::Object(map) => Params::Map(map),
307			_ => {
308				return future::Either::A(future::err(RpcError::Other(format_err!(
309					"RPC params should serialize to a JSON array, JSON object or null"
310				))))
311			}
312		};
313
314		future::Either::B(self.0.call_method(method, params).and_then(move |value: Value| {
315			log::debug!("response: {:?}", value);
316			let result =
317				serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns.into(), error.into()));
318			future::done(result)
319		}))
320	}
321
322	/// Call RPC with serialization of request only.
323	pub fn notify<T: Serialize>(&self, method: &str, args: T) -> impl Future<Item = (), Error = RpcError> {
324		let args =
325			serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
326		let params = match args {
327			Value::Array(vec) => Params::Array(vec),
328			Value::Null => Params::None,
329			_ => {
330				return future::Either::A(future::err(RpcError::Other(format_err!(
331					"RPC params should serialize to a JSON array, or null"
332				))))
333			}
334		};
335
336		future::Either::B(self.0.notify(method, params))
337	}
338
339	/// Subscribe with serialization of request and deserialization of response.
340	pub fn subscribe<T: Serialize, R: DeserializeOwned + 'static>(
341		&self,
342		subscribe: &str,
343		subscribe_params: T,
344		topic: &str,
345		unsubscribe: &str,
346		returns: &'static str,
347	) -> impl Future<Item = TypedSubscriptionStream<R>, Error = RpcError> {
348		let args = serde_json::to_value(subscribe_params)
349			.expect("Only types with infallible serialisation can be used for JSON-RPC");
350
351		let params = match args {
352			Value::Array(vec) => Params::Array(vec),
353			Value::Null => Params::None,
354			_ => {
355				return future::Either::A(future::err(RpcError::Other(format_err!(
356					"RPC params should serialize to a JSON array, or null"
357				))))
358			}
359		};
360
361		let typed_stream = self
362			.0
363			.subscribe(subscribe, params, topic, unsubscribe)
364			.map(move |stream| TypedSubscriptionStream::new(stream, returns));
365		future::Either::B(typed_stream)
366	}
367}
368
369#[cfg(test)]
370mod tests {
371	use super::*;
372	use crate::transports::local;
373	use crate::{RpcChannel, RpcError, TypedClient};
374	use tetsy_jsonrpc_core::{self as core, IoHandler};
375	use tetsy_jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
376	use std::sync::atomic::{AtomicBool, Ordering};
377	use std::sync::Arc;
378
379	#[derive(Clone)]
380	struct AddClient(TypedClient);
381
382	impl From<RpcChannel> for AddClient {
383		fn from(channel: RpcChannel) -> Self {
384			AddClient(channel.into())
385		}
386	}
387
388	impl AddClient {
389		fn add(&self, a: u64, b: u64) -> impl Future<Item = u64, Error = RpcError> {
390			self.0.call_method("add", "u64", (a, b))
391		}
392
393		fn completed(&self, success: bool) -> impl Future<Item = (), Error = RpcError> {
394			self.0.notify("completed", (success,))
395		}
396	}
397
398	#[test]
399	fn test_client_terminates() {
400		crate::logger::init_log();
401		let mut handler = IoHandler::new();
402		handler.add_method("add", |params: Params| {
403			let (a, b) = params.parse::<(u64, u64)>()?;
404			let res = a + b;
405			Ok(tetsy_jsonrpc_core::to_value(res).unwrap())
406		});
407
408		let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
409		let fut = client
410			.clone()
411			.add(3, 4)
412			.and_then(move |res| client.add(res, 5))
413			.join(rpc_client)
414			.map(|(res, ())| {
415				assert_eq!(res, 12);
416			})
417			.map_err(|err| {
418				eprintln!("{:?}", err);
419				assert!(false);
420			});
421		tokio::run(fut);
422	}
423
424	#[test]
425	fn should_send_notification() {
426		crate::logger::init_log();
427		let mut handler = IoHandler::new();
428		handler.add_notification("completed", |params: Params| {
429			let (success,) = params.parse::<(bool,)>().expect("expected to receive one boolean");
430			assert_eq!(success, true);
431		});
432
433		let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
434		let fut = client
435			.clone()
436			.completed(true)
437			.map(move |()| drop(client))
438			.join(rpc_client)
439			.map(|_| ())
440			.map_err(|err| {
441				eprintln!("{:?}", err);
442				assert!(false);
443			});
444		tokio::run(fut);
445	}
446
447	#[test]
448	fn should_handle_subscription() {
449		crate::logger::init_log();
450		// given
451		let mut handler = PubSubHandler::<local::LocalMeta, _>::default();
452		let called = Arc::new(AtomicBool::new(false));
453		let called2 = called.clone();
454		handler.add_subscription(
455			"hello",
456			("subscribe_hello", |params, _meta, subscriber: Subscriber| {
457				assert_eq!(params, core::Params::None);
458				let sink = subscriber
459					.assign_id(SubscriptionId::Number(5))
460					.expect("assigned subscription id");
461				std::thread::spawn(move || {
462					for i in 0..3 {
463						std::thread::sleep(std::time::Duration::from_millis(100));
464						let value = serde_json::json!({
465							"subscription": 5,
466							"result": vec![i],
467						});
468						sink.notify(serde_json::from_value(value).unwrap())
469							.wait()
470							.expect("sent notification");
471					}
472				});
473			}),
474			("unsubscribe_hello", move |id, _meta| {
475				// Should be called because session is dropped.
476				called2.store(true, Ordering::SeqCst);
477				assert_eq!(id, SubscriptionId::Number(5));
478				future::ok(core::Value::Bool(true))
479			}),
480		);
481
482		// when
483		let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler);
484		let received = Arc::new(std::sync::Mutex::new(vec![]));
485		let r2 = received.clone();
486		let fut = client
487			.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")
488			.and_then(|stream| {
489				stream
490					.into_future()
491					.map(move |(result, _)| {
492						drop(client);
493						r2.lock().unwrap().push(result.unwrap());
494					})
495					.map_err(|_| {
496						panic!("Expected message not received.");
497					})
498			})
499			.join(rpc_client)
500			.map(|(res, _)| {
501				log::info!("ok {:?}", res);
502			})
503			.map_err(|err| {
504				log::error!("err {:?}", err);
505			});
506		tokio::run(fut);
507		assert_eq!(called.load(Ordering::SeqCst), true);
508		assert!(
509			!received.lock().unwrap().is_empty(),
510			"Expected at least one received item."
511		);
512	}
513}