1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
extern crate url;
use std::time::{SystemTime, UNIX_EPOCH};
use self::url::Url;
use futures::{Future, Sink, Stream};
use serde_json;
use tokio_tungstenite::connect_async;
use hyper::Method;
use {private::Private, ASync};
use super::tokio_tungstenite::tungstenite::Message as TMessage;
use error::WSError;
use structs::wsfeed::*;
pub struct WSFeed;
fn convert_msg(msg: TMessage) -> Message {
match msg {
TMessage::Text(str) => serde_json::from_str(&str).unwrap_or_else(|e| {
Message::InternalError(WSError::Serde {
error: e,
data: str,
})
}),
_ => unreachable!(), }
}
impl WSFeed {
pub fn new(
uri: &str,
product_ids: &[&str],
channels: &[ChannelType],
) -> impl Stream<Item = Message, Error = WSError> {
let subscribe = Subscribe {
_type: SubscribeCmd::Subscribe,
product_ids: product_ids.into_iter().map(|x| x.to_string()).collect(),
channels: channels
.to_vec()
.into_iter()
.map(|x| Channel::Name(x))
.collect::<Vec<_>>(),
auth: None
};
Self::new_with_sub(uri, subscribe)
}
pub fn new_with_sub(
uri: &str,
subsribe: Subscribe,
) -> impl Stream<Item = Message, Error = WSError> {
let url = Url::parse(uri).unwrap();
connect_async(url)
.map_err(WSError::Connect)
.and_then(move |(ws_stream, _)| {
debug!("WebSocket handshake has been successfully completed");
let (sink, stream) = ws_stream.split();
let subsribe = serde_json::to_string(&subsribe).unwrap();
sink.send(TMessage::Text(subsribe))
.map_err(WSError::Send)
.and_then(|_| {
debug!("subsription sent");
let stream = stream
.filter(|msg| msg.is_text())
.map_err(WSError::Read)
.map(convert_msg);
Ok(stream)
})
}).flatten_stream()
}
pub fn new_with_auth(
uri: &str,
product_ids: &[&str],
channels: &[ChannelType],
key: &str, secret: &str, passphrase: &str
) -> impl Stream<Item = Message, Error = WSError> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("leap-second")
.as_secs();
let signature = Private::<ASync>::sign(secret, timestamp, Method::GET, "/users/self/verify", "");
let auth = Auth {
signature,
key: key.to_string(),
passphrase: passphrase.to_string(),
timestamp: timestamp.to_string()
};
let subscribe = Subscribe {
_type: SubscribeCmd::Subscribe,
product_ids: product_ids.into_iter().map(|x| x.to_string()).collect(),
channels: channels
.to_vec()
.into_iter()
.map(|x| Channel::Name(x))
.collect::<Vec<_>>(),
auth: Some(auth)
};
Self::new_with_sub(uri, subscribe)
}
}