jsonrpc_ws_client/
lib.rs

1//! JSON-RPC websocket client implementation.
2#![deny(missing_docs)]
3use failure::Error;
4use futures::prelude::*;
5use futures::sync::mpsc;
6use jsonrpc_core_client::RpcClient;
7pub use jsonrpc_core_client::{RpcChannel, RpcError};
8use log::info;
9use std::collections::VecDeque;
10use websocket::{ClientBuilder, OwnedMessage};
11
12/// Connect to a JSON-RPC websocket server.
13///
14/// Uses an unbuffered channel to queue outgoing rpc messages.
15pub fn connect<T>(url: &str) -> Result<impl Future<Item = T, Error = RpcError>, Error>
16where
17	T: From<RpcChannel>,
18{
19	let client = ClientBuilder::new(url)?
20		.async_connect(None)
21		.map(|(client, _)| {
22			let (sink, stream) = client.split();
23			let (sink, stream) = WebsocketClient::new(sink, stream).split();
24			let (sender, receiver) = mpsc::channel(0);
25			let rpc_client = RpcClient::new(sink, stream, receiver).map_err(|error| eprintln!("{:?}", error));
26			tokio::spawn(rpc_client);
27			sender.into()
28		})
29		.map_err(|error| RpcError::Other(error.into()));
30	Ok(client)
31}
32
33struct WebsocketClient<TSink, TStream> {
34	sink: TSink,
35	stream: TStream,
36	queue: VecDeque<OwnedMessage>,
37}
38
39impl<TSink, TStream, TError> WebsocketClient<TSink, TStream>
40where
41	TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
42	TStream: Stream<Item = OwnedMessage, Error = TError>,
43	TError: Into<Error>,
44{
45	pub fn new(sink: TSink, stream: TStream) -> Self {
46		Self {
47			sink,
48			stream,
49			queue: VecDeque::new(),
50		}
51	}
52}
53
54impl<TSink, TStream, TError> Sink for WebsocketClient<TSink, TStream>
55where
56	TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
57	TStream: Stream<Item = OwnedMessage, Error = TError>,
58	TError: Into<Error>,
59{
60	type SinkItem = String;
61	type SinkError = RpcError;
62
63	fn start_send(&mut self, request: Self::SinkItem) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
64		self.queue.push_back(OwnedMessage::Text(request));
65		Ok(AsyncSink::Ready)
66	}
67
68	fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
69		loop {
70			match self.queue.pop_front() {
71				Some(request) => match self.sink.start_send(request) {
72					Ok(AsyncSink::Ready) => continue,
73					Ok(AsyncSink::NotReady(request)) => {
74						self.queue.push_front(request);
75						break;
76					}
77					Err(error) => return Err(RpcError::Other(error.into())),
78				},
79				None => break,
80			}
81		}
82		self.sink.poll_complete().map_err(|error| RpcError::Other(error.into()))
83	}
84}
85
86impl<TSink, TStream, TError> Stream for WebsocketClient<TSink, TStream>
87where
88	TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
89	TStream: Stream<Item = OwnedMessage, Error = TError>,
90	TError: Into<Error>,
91{
92	type Item = String;
93	type Error = RpcError;
94
95	fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
96		loop {
97			match self.stream.poll() {
98				Ok(Async::Ready(Some(message))) => match message {
99					OwnedMessage::Text(data) => return Ok(Async::Ready(Some(data))),
100					OwnedMessage::Binary(data) => info!("server sent binary data {:?}", data),
101					OwnedMessage::Ping(p) => self.queue.push_front(OwnedMessage::Pong(p)),
102					OwnedMessage::Pong(_) => {}
103					OwnedMessage::Close(c) => self.queue.push_front(OwnedMessage::Close(c)),
104				},
105				Ok(Async::Ready(None)) => {
106					// TODO try to reconnect (#411).
107					return Ok(Async::Ready(None));
108				}
109				Ok(Async::NotReady) => return Ok(Async::NotReady),
110				Err(error) => return Err(RpcError::Other(error.into())),
111			}
112		}
113	}
114}