1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#![deny(missing_docs)]
use failure::Error;
use futures::prelude::*;
use futures::sync::mpsc;
use jsonrpc_core_client::RpcClient;
pub use jsonrpc_core_client::{RpcChannel, RpcError};
use log::info;
use std::collections::VecDeque;
use websocket::{ClientBuilder, OwnedMessage};
pub fn connect<T>(url: &str) -> Result<impl Future<Item = T, Error = RpcError>, Error>
where
T: From<RpcChannel>,
{
let client = ClientBuilder::new(url)?
.async_connect(None)
.map(|(client, _)| {
let (sink, stream) = client.split();
let (sink, stream) = WebsocketClient::new(sink, stream).split();
let (sender, receiver) = mpsc::channel(0);
let rpc_client = RpcClient::new(sink, stream, receiver).map_err(|error| eprintln!("{:?}", error));
tokio::spawn(rpc_client);
sender.into()
})
.map_err(|error| RpcError::Other(error.into()));
Ok(client)
}
struct WebsocketClient<TSink, TStream> {
sink: TSink,
stream: TStream,
queue: VecDeque<OwnedMessage>,
}
impl<TSink, TStream, TError> WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
{
pub fn new(sink: TSink, stream: TStream) -> Self {
Self {
sink,
stream,
queue: VecDeque::new(),
}
}
}
impl<TSink, TStream, TError> Sink for WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
{
type SinkItem = String;
type SinkError = RpcError;
fn start_send(&mut self, request: Self::SinkItem) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
self.queue.push_back(OwnedMessage::Text(request));
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
loop {
match self.queue.pop_front() {
Some(request) => match self.sink.start_send(request) {
Ok(AsyncSink::Ready) => continue,
Ok(AsyncSink::NotReady(request)) => {
self.queue.push_front(request);
break;
}
Err(error) => return Err(RpcError::Other(error.into())),
},
None => break,
}
}
self.sink.poll_complete().map_err(|error| RpcError::Other(error.into()))
}
}
impl<TSink, TStream, TError> Stream for WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
{
type Item = String;
type Error = RpcError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
loop {
match self.stream.poll() {
Ok(Async::Ready(Some(message))) => match message {
OwnedMessage::Text(data) => return Ok(Async::Ready(Some(data))),
OwnedMessage::Binary(data) => info!("server sent binary data {:?}", data),
OwnedMessage::Ping(p) => self.queue.push_front(OwnedMessage::Pong(p)),
OwnedMessage::Pong(_) => {}
OwnedMessage::Close(c) => self.queue.push_front(OwnedMessage::Close(c)),
},
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(None));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => return Err(RpcError::Other(error.into())),
}
}
}
}