1#![deny(missing_docs)]
3use failure::Error;
4use futures::prelude::*;
5use futures::sync::mpsc;
6use jsonrpc_core_client::RpcClient;
7pub use jsonrpc_core_client::{RpcChannel, RpcError};
8use log::info;
9use std::collections::VecDeque;
10use websocket::{ClientBuilder, OwnedMessage};
11
12pub fn connect<T>(url: &str) -> Result<impl Future<Item = T, Error = RpcError>, Error>
16where
17 T: From<RpcChannel>,
18{
19 let client = ClientBuilder::new(url)?
20 .async_connect(None)
21 .map(|(client, _)| {
22 let (sink, stream) = client.split();
23 let (sink, stream) = WebsocketClient::new(sink, stream).split();
24 let (sender, receiver) = mpsc::channel(0);
25 let rpc_client = RpcClient::new(sink, stream, receiver).map_err(|error| eprintln!("{:?}", error));
26 tokio::spawn(rpc_client);
27 sender.into()
28 })
29 .map_err(|error| RpcError::Other(error.into()));
30 Ok(client)
31}
32
33struct WebsocketClient<TSink, TStream> {
34 sink: TSink,
35 stream: TStream,
36 queue: VecDeque<OwnedMessage>,
37}
38
39impl<TSink, TStream, TError> WebsocketClient<TSink, TStream>
40where
41 TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
42 TStream: Stream<Item = OwnedMessage, Error = TError>,
43 TError: Into<Error>,
44{
45 pub fn new(sink: TSink, stream: TStream) -> Self {
46 Self {
47 sink,
48 stream,
49 queue: VecDeque::new(),
50 }
51 }
52}
53
54impl<TSink, TStream, TError> Sink for WebsocketClient<TSink, TStream>
55where
56 TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
57 TStream: Stream<Item = OwnedMessage, Error = TError>,
58 TError: Into<Error>,
59{
60 type SinkItem = String;
61 type SinkError = RpcError;
62
63 fn start_send(&mut self, request: Self::SinkItem) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
64 self.queue.push_back(OwnedMessage::Text(request));
65 Ok(AsyncSink::Ready)
66 }
67
68 fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
69 loop {
70 match self.queue.pop_front() {
71 Some(request) => match self.sink.start_send(request) {
72 Ok(AsyncSink::Ready) => continue,
73 Ok(AsyncSink::NotReady(request)) => {
74 self.queue.push_front(request);
75 break;
76 }
77 Err(error) => return Err(RpcError::Other(error.into())),
78 },
79 None => break,
80 }
81 }
82 self.sink.poll_complete().map_err(|error| RpcError::Other(error.into()))
83 }
84}
85
86impl<TSink, TStream, TError> Stream for WebsocketClient<TSink, TStream>
87where
88 TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
89 TStream: Stream<Item = OwnedMessage, Error = TError>,
90 TError: Into<Error>,
91{
92 type Item = String;
93 type Error = RpcError;
94
95 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
96 loop {
97 match self.stream.poll() {
98 Ok(Async::Ready(Some(message))) => match message {
99 OwnedMessage::Text(data) => return Ok(Async::Ready(Some(data))),
100 OwnedMessage::Binary(data) => info!("server sent binary data {:?}", data),
101 OwnedMessage::Ping(p) => self.queue.push_front(OwnedMessage::Pong(p)),
102 OwnedMessage::Pong(_) => {}
103 OwnedMessage::Close(c) => self.queue.push_front(OwnedMessage::Close(c)),
104 },
105 Ok(Async::Ready(None)) => {
106 return Ok(Async::Ready(None));
108 }
109 Ok(Async::NotReady) => return Ok(Async::NotReady),
110 Err(error) => return Err(RpcError::Other(error.into())),
111 }
112 }
113 }
114}