1#![deny(missing_docs)]
4
5use failure::{format_err, Fail};
6use futures::sync::{mpsc, oneshot};
7use futures::{future, prelude::*};
8use tetsy_jsonrpc_core::{Error, Params};
9use serde::de::DeserializeOwned;
10use serde::Serialize;
11use serde_json::Value;
12use std::marker::PhantomData;
13
14pub mod transports;
15
16#[cfg(test)]
17mod logger;
18
19#[derive(Debug, Fail)]
21pub enum RpcError {
22 #[fail(display = "Server returned rpc error {}", _0)]
24 JsonRpcError(Error),
25 #[fail(display = "Failed to parse server response as {}: {}", _0, _1)]
27 ParseError(String, failure::Error),
28 #[fail(display = "Request timed out")]
30 Timeout,
31 #[fail(display = "{}", _0)]
33 Other(failure::Error),
34}
35
36impl From<Error> for RpcError {
37 fn from(error: Error) -> Self {
38 RpcError::JsonRpcError(error)
39 }
40}
41
42struct CallMessage {
44 method: String,
46 params: Params,
48 sender: oneshot::Sender<Result<Value, RpcError>>,
51}
52
53struct NotifyMessage {
55 method: String,
57 params: Params,
59}
60
61struct Subscription {
63 subscribe: String,
65 subscribe_params: Params,
67 notification: String,
69 unsubscribe: String,
71}
72
73struct SubscribeMessage {
75 subscription: Subscription,
77 sender: mpsc::Sender<Result<Value, RpcError>>,
79}
80
81enum RpcMessage {
83 Call(CallMessage),
85 Notify(NotifyMessage),
87 Subscribe(SubscribeMessage),
89}
90
91impl From<CallMessage> for RpcMessage {
92 fn from(msg: CallMessage) -> Self {
93 RpcMessage::Call(msg)
94 }
95}
96
97impl From<NotifyMessage> for RpcMessage {
98 fn from(msg: NotifyMessage) -> Self {
99 RpcMessage::Notify(msg)
100 }
101}
102
103impl From<SubscribeMessage> for RpcMessage {
104 fn from(msg: SubscribeMessage) -> Self {
105 RpcMessage::Subscribe(msg)
106 }
107}
108
109#[derive(Clone)]
111pub struct RpcChannel(mpsc::Sender<RpcMessage>);
112
113impl RpcChannel {
114 fn send(
115 &self,
116 msg: RpcMessage,
117 ) -> impl Future<Item = mpsc::Sender<RpcMessage>, Error = mpsc::SendError<RpcMessage>> {
118 self.0.to_owned().send(msg)
119 }
120}
121
122impl From<mpsc::Sender<RpcMessage>> for RpcChannel {
123 fn from(sender: mpsc::Sender<RpcMessage>) -> Self {
124 RpcChannel(sender)
125 }
126}
127
128pub struct RpcFuture {
130 recv: oneshot::Receiver<Result<Value, RpcError>>,
131}
132
133impl RpcFuture {
134 pub fn new(recv: oneshot::Receiver<Result<Value, RpcError>>) -> Self {
136 RpcFuture { recv }
137 }
138}
139
140impl Future for RpcFuture {
141 type Item = Value;
142 type Error = RpcError;
143
144 fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
145 match self.recv.poll() {
147 Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
148 Ok(Async::Ready(Err(error))) => Err(error),
149 Ok(Async::NotReady) => Ok(Async::NotReady),
150 Err(error) => Err(RpcError::Other(error.into())),
151 }
152 }
153}
154
155pub struct SubscriptionStream {
157 recv: mpsc::Receiver<Result<Value, RpcError>>,
158}
159
160impl SubscriptionStream {
161 pub fn new(recv: mpsc::Receiver<Result<Value, RpcError>>) -> Self {
163 SubscriptionStream { recv }
164 }
165}
166
167impl Stream for SubscriptionStream {
168 type Item = Value;
169 type Error = RpcError;
170
171 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
172 match self.recv.poll() {
173 Ok(Async::Ready(Some(Ok(value)))) => Ok(Async::Ready(Some(value))),
174 Ok(Async::Ready(Some(Err(error)))) => Err(error),
175 Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
176 Ok(Async::NotReady) => Ok(Async::NotReady),
177 Err(()) => Err(RpcError::Other(format_err!("mpsc channel returned an error."))),
178 }
179 }
180}
181
182pub struct TypedSubscriptionStream<T> {
184 _marker: PhantomData<T>,
185 returns: &'static str,
186 stream: SubscriptionStream,
187}
188
189impl<T> TypedSubscriptionStream<T> {
190 pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self {
192 TypedSubscriptionStream {
193 _marker: PhantomData,
194 returns,
195 stream,
196 }
197 }
198}
199
200impl<T: DeserializeOwned + 'static> Stream for TypedSubscriptionStream<T> {
201 type Item = T;
202 type Error = RpcError;
203
204 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
205 let result = match self.stream.poll()? {
206 Async::Ready(Some(value)) => serde_json::from_value::<T>(value)
207 .map(|result| Async::Ready(Some(result)))
208 .map_err(|error| RpcError::ParseError(self.returns.into(), error.into()))?,
209 Async::Ready(None) => Async::Ready(None),
210 Async::NotReady => Async::NotReady,
211 };
212 Ok(result)
213 }
214}
215
216#[derive(Clone)]
218pub struct RawClient(RpcChannel);
219
220impl From<RpcChannel> for RawClient {
221 fn from(channel: RpcChannel) -> Self {
222 RawClient(channel)
223 }
224}
225
226impl RawClient {
227 pub fn call_method(&self, method: &str, params: Params) -> impl Future<Item = Value, Error = RpcError> {
229 let (sender, receiver) = oneshot::channel();
230 let msg = CallMessage {
231 method: method.into(),
232 params,
233 sender,
234 };
235 self.0
236 .send(msg.into())
237 .map_err(|error| RpcError::Other(error.into()))
238 .and_then(|_| RpcFuture::new(receiver))
239 }
240
241 pub fn notify(&self, method: &str, params: Params) -> impl Future<Item = (), Error = RpcError> {
243 let msg = NotifyMessage {
244 method: method.into(),
245 params,
246 };
247 self.0
248 .send(msg.into())
249 .map(|_| ())
250 .map_err(|error| RpcError::Other(error.into()))
251 }
252
253 pub fn subscribe(
255 &self,
256 subscribe: &str,
257 subscribe_params: Params,
258 notification: &str,
259 unsubscribe: &str,
260 ) -> impl Future<Item = SubscriptionStream, Error = RpcError> {
261 let (sender, receiver) = mpsc::channel(0);
262 let msg = SubscribeMessage {
263 subscription: Subscription {
264 subscribe: subscribe.into(),
265 subscribe_params,
266 notification: notification.into(),
267 unsubscribe: unsubscribe.into(),
268 },
269 sender,
270 };
271 self.0
272 .send(msg.into())
273 .map_err(|error| RpcError::Other(error.into()))
274 .map(|_| SubscriptionStream::new(receiver))
275 }
276}
277
278#[derive(Clone)]
280pub struct TypedClient(RawClient);
281
282impl From<RpcChannel> for TypedClient {
283 fn from(channel: RpcChannel) -> Self {
284 TypedClient(channel.into())
285 }
286}
287
288impl TypedClient {
289 pub fn new(raw_cli: RawClient) -> Self {
291 TypedClient(raw_cli)
292 }
293
294 pub fn call_method<T: Serialize, R: DeserializeOwned + 'static>(
296 &self,
297 method: &str,
298 returns: &'static str,
299 args: T,
300 ) -> impl Future<Item = R, Error = RpcError> {
301 let args =
302 serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
303 let params = match args {
304 Value::Array(vec) => Params::Array(vec),
305 Value::Null => Params::None,
306 Value::Object(map) => Params::Map(map),
307 _ => {
308 return future::Either::A(future::err(RpcError::Other(format_err!(
309 "RPC params should serialize to a JSON array, JSON object or null"
310 ))))
311 }
312 };
313
314 future::Either::B(self.0.call_method(method, params).and_then(move |value: Value| {
315 log::debug!("response: {:?}", value);
316 let result =
317 serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns.into(), error.into()));
318 future::done(result)
319 }))
320 }
321
322 pub fn notify<T: Serialize>(&self, method: &str, args: T) -> impl Future<Item = (), Error = RpcError> {
324 let args =
325 serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
326 let params = match args {
327 Value::Array(vec) => Params::Array(vec),
328 Value::Null => Params::None,
329 _ => {
330 return future::Either::A(future::err(RpcError::Other(format_err!(
331 "RPC params should serialize to a JSON array, or null"
332 ))))
333 }
334 };
335
336 future::Either::B(self.0.notify(method, params))
337 }
338
339 pub fn subscribe<T: Serialize, R: DeserializeOwned + 'static>(
341 &self,
342 subscribe: &str,
343 subscribe_params: T,
344 topic: &str,
345 unsubscribe: &str,
346 returns: &'static str,
347 ) -> impl Future<Item = TypedSubscriptionStream<R>, Error = RpcError> {
348 let args = serde_json::to_value(subscribe_params)
349 .expect("Only types with infallible serialisation can be used for JSON-RPC");
350
351 let params = match args {
352 Value::Array(vec) => Params::Array(vec),
353 Value::Null => Params::None,
354 _ => {
355 return future::Either::A(future::err(RpcError::Other(format_err!(
356 "RPC params should serialize to a JSON array, or null"
357 ))))
358 }
359 };
360
361 let typed_stream = self
362 .0
363 .subscribe(subscribe, params, topic, unsubscribe)
364 .map(move |stream| TypedSubscriptionStream::new(stream, returns));
365 future::Either::B(typed_stream)
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::transports::local;
373 use crate::{RpcChannel, RpcError, TypedClient};
374 use tetsy_jsonrpc_core::{self as core, IoHandler};
375 use tetsy_jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
376 use std::sync::atomic::{AtomicBool, Ordering};
377 use std::sync::Arc;
378
379 #[derive(Clone)]
380 struct AddClient(TypedClient);
381
382 impl From<RpcChannel> for AddClient {
383 fn from(channel: RpcChannel) -> Self {
384 AddClient(channel.into())
385 }
386 }
387
388 impl AddClient {
389 fn add(&self, a: u64, b: u64) -> impl Future<Item = u64, Error = RpcError> {
390 self.0.call_method("add", "u64", (a, b))
391 }
392
393 fn completed(&self, success: bool) -> impl Future<Item = (), Error = RpcError> {
394 self.0.notify("completed", (success,))
395 }
396 }
397
398 #[test]
399 fn test_client_terminates() {
400 crate::logger::init_log();
401 let mut handler = IoHandler::new();
402 handler.add_method("add", |params: Params| {
403 let (a, b) = params.parse::<(u64, u64)>()?;
404 let res = a + b;
405 Ok(tetsy_jsonrpc_core::to_value(res).unwrap())
406 });
407
408 let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
409 let fut = client
410 .clone()
411 .add(3, 4)
412 .and_then(move |res| client.add(res, 5))
413 .join(rpc_client)
414 .map(|(res, ())| {
415 assert_eq!(res, 12);
416 })
417 .map_err(|err| {
418 eprintln!("{:?}", err);
419 assert!(false);
420 });
421 tokio::run(fut);
422 }
423
424 #[test]
425 fn should_send_notification() {
426 crate::logger::init_log();
427 let mut handler = IoHandler::new();
428 handler.add_notification("completed", |params: Params| {
429 let (success,) = params.parse::<(bool,)>().expect("expected to receive one boolean");
430 assert_eq!(success, true);
431 });
432
433 let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
434 let fut = client
435 .clone()
436 .completed(true)
437 .map(move |()| drop(client))
438 .join(rpc_client)
439 .map(|_| ())
440 .map_err(|err| {
441 eprintln!("{:?}", err);
442 assert!(false);
443 });
444 tokio::run(fut);
445 }
446
447 #[test]
448 fn should_handle_subscription() {
449 crate::logger::init_log();
450 let mut handler = PubSubHandler::<local::LocalMeta, _>::default();
452 let called = Arc::new(AtomicBool::new(false));
453 let called2 = called.clone();
454 handler.add_subscription(
455 "hello",
456 ("subscribe_hello", |params, _meta, subscriber: Subscriber| {
457 assert_eq!(params, core::Params::None);
458 let sink = subscriber
459 .assign_id(SubscriptionId::Number(5))
460 .expect("assigned subscription id");
461 std::thread::spawn(move || {
462 for i in 0..3 {
463 std::thread::sleep(std::time::Duration::from_millis(100));
464 let value = serde_json::json!({
465 "subscription": 5,
466 "result": vec![i],
467 });
468 sink.notify(serde_json::from_value(value).unwrap())
469 .wait()
470 .expect("sent notification");
471 }
472 });
473 }),
474 ("unsubscribe_hello", move |id, _meta| {
475 called2.store(true, Ordering::SeqCst);
477 assert_eq!(id, SubscriptionId::Number(5));
478 future::ok(core::Value::Bool(true))
479 }),
480 );
481
482 let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler);
484 let received = Arc::new(std::sync::Mutex::new(vec![]));
485 let r2 = received.clone();
486 let fut = client
487 .subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")
488 .and_then(|stream| {
489 stream
490 .into_future()
491 .map(move |(result, _)| {
492 drop(client);
493 r2.lock().unwrap().push(result.unwrap());
494 })
495 .map_err(|_| {
496 panic!("Expected message not received.");
497 })
498 })
499 .join(rpc_client)
500 .map(|(res, _)| {
501 log::info!("ok {:?}", res);
502 })
503 .map_err(|err| {
504 log::error!("err {:?}", err);
505 });
506 tokio::run(fut);
507 assert_eq!(called.load(Ordering::SeqCst), true);
508 assert!(
509 !received.lock().unwrap().is_empty(),
510 "Expected at least one received item."
511 );
512 }
513}