1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//! Contains structure which provides futures::Stream to websocket-feed of Coinbase api

extern crate url;

use std::time::{SystemTime, UNIX_EPOCH};
use self::url::Url;
use futures::{Future, Sink, Stream};
use serde_json;
use tokio_tungstenite::connect_async;
use hyper::Method;

use {private::Private, ASync};
use super::tokio_tungstenite::tungstenite::Message as TMessage;
use error::WSError;
use structs::wsfeed::*;

pub struct WSFeed;

fn convert_msg(msg: TMessage) -> Message {
    match msg {
        TMessage::Text(str) => serde_json::from_str(&str).unwrap_or_else(|e| {
            Message::InternalError(WSError::Serde {
                error: e,
                data: str,
            })
        }),
        _ => unreachable!(), // filtered in stream
    }
}

impl WSFeed {
    // Constructor for simple subcription with product_ids and channels
    pub fn new(
        uri: &str,
        product_ids: &[&str],
        channels: &[ChannelType],
    ) -> impl Stream<Item = Message, Error = WSError> {
        let subscribe = Subscribe {
            _type: SubscribeCmd::Subscribe,
            product_ids: product_ids.into_iter().map(|x| x.to_string()).collect(),
            channels: channels
                .to_vec()
                .into_iter()
                .map(|x| Channel::Name(x))
                .collect::<Vec<_>>(),
            auth: None
        };

        Self::new_with_sub(uri, subscribe)
    }

    // Constructor for extended subcription via Subscribe structure
    pub fn new_with_sub(
        uri: &str,
        subsribe: Subscribe,
    ) -> impl Stream<Item = Message, Error = WSError> {
        let url = Url::parse(uri).unwrap();

        connect_async(url)
            .map_err(WSError::Connect)
            .and_then(move |(ws_stream, _)| {
                debug!("WebSocket handshake has been successfully completed");
                let (sink, stream) = ws_stream.split();

                let subsribe = serde_json::to_string(&subsribe).unwrap();

                sink.send(TMessage::Text(subsribe))
                    .map_err(WSError::Send)
                    .and_then(|_| {
                        debug!("subsription sent");
                        let stream = stream
                            .filter(|msg| msg.is_text())
                            .map_err(WSError::Read)
                            .map(convert_msg);
                        Ok(stream)
                    })
            }).flatten_stream()
    }

    // Constructor for simple subcription with product_ids and channels with auth
    pub fn new_with_auth(
        uri: &str,
        product_ids: &[&str],
        channels: &[ChannelType],
        key: &str, secret: &str, passphrase: &str
    ) -> impl Stream<Item = Message, Error = WSError> {

        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("leap-second")
            .as_secs();

        let signature = Private::<ASync>::sign(secret, timestamp, Method::GET, "/users/self/verify", "");

        let auth = Auth {
            signature,
            key: key.to_string(),
            passphrase: passphrase.to_string(),
            timestamp: timestamp.to_string()
        };

        let subscribe = Subscribe {
            _type: SubscribeCmd::Subscribe,
            product_ids: product_ids.into_iter().map(|x| x.to_string()).collect(),
            channels: channels
                .to_vec()
                .into_iter()
                .map(|x| Channel::Name(x))
                .collect::<Vec<_>>(),
            auth: Some(auth)
        };

        Self::new_with_sub(uri, subscribe)
    }
}