1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
//! WebSocket stream connection.

use chrono::prelude::*;
use futures::future::{self, Future, Loop};
use futures::prelude::*;
use websocket::{ClientBuilder, OwnedMessage, WebSocketError};
use websocket::async::TcpStream;
use websocket::client::async::{Client as WebSocketClient, TlsStream};

use event::{Event, EventExt};
use stream::{Command, Events};
use util::{boxfuture, FutureExt};

use super::errors::*;
use super::{Handle, Message, Protocol};

/// Connects to WebSocket stream.
pub fn connect<P: Protocol>(handle: Handle) -> BoxFuture<()> {
    // Connect to WebSocket server
    ClientBuilder::new(P::address())
        .unwrap() // panics on address parse
        .async_connect_secure(None, &handle.reactor)
        .map(|(duplex, _)| duplex)
        .map_err(|e| e.into())
        .and_then(move |stream| {
            // Start connection stream processing loop.
            future::loop_fn((stream, Some(handle)), move |(stream, handle)| {
                // Check for subscribe commands.
                let handle = handle.unwrap();
                if let Ok(cmd) = handle.commands.try_recv() {
                    // Serialize and send commands to connection.
                    return send_cmd::<P>(cmd, stream, handle);
                }
                // Read from stream and process messages.
                // Break loop if connection was closed.
                stream
                    .into_future()
                    .or_else(|(err, stream)| close_with_err(stream, err))
                    .and_then(move |(body, stream)| match body {
                        Some(msg) => handle_message::<P>(msg, stream, handle),
                        None => break_loop(), // closed stream
                    })
                    .into_box()
            })
        })
        .into_box()
}

/// TLS-encrypted asynchronous WebSocket stream client.
type Client = WebSocketClient<TlsStream<TcpStream>>;

/// WebSocket Stream loop state.
type LoopState = (Client, Option<Handle>);

/// WebSocket Stream loop future.
type LoopFuture = BoxFuture<Loop<(), LoopState>>;

/// Returns future that breaks WebSocket stream loop.
fn break_loop() -> LoopFuture {
    boxfuture::ok(Loop::Break(()))
}

/// Returns future that continues WebSocket stream loop.
fn continue_loop(stream: Client, handle: Handle) -> LoopFuture {
    boxfuture::ok(Loop::Continue((stream, Some(handle))))
}

/// Parse raw websocket message.
/// It can be `Close`, `Text` or a wrong message.
fn handle_message<P: Protocol>(body: OwnedMessage, stream: Client, handle: Handle) -> LoopFuture {
    match body {
        OwnedMessage::Text(body) => handle_body::<P>(body, stream, handle),
        OwnedMessage::Close(_) => break_loop(),
        _ => {
            // we will investigate further on any
            error!("unexpected websocket message");
            break_loop()
        }
    }
}

/// Serializes an sends command to WebSocket connection.
fn send_cmd<P: Protocol>(cmd: Command, stream: Client, handle: Handle) -> LoopFuture {
    send_msg(OwnedMessage::Text(P::serialize(cmd)), stream, handle)
}

/// Closes a stream after logging an error.
fn close_with_err(
    stream: Client,
    err: WebSocketError,
) -> BoxFuture<(Option<OwnedMessage>, Client)> {
    error!("Could not receive message: {:?}", err);
    stream
        .send(OwnedMessage::Close(None))
        .map(|stream| (None, stream))
        .map_err(|e| e.into())
        .into_box()
}

/// Parses body of a `Text` WebSocket message.
/// Processes message and sends events if any.
fn handle_body<P: Protocol>(body: String, stream: Client, handle: Handle) -> LoopFuture {
    trace!("parsing msg: {}", &body);
    match P::parse(&body) {
        Ok(Some(msg)) => process_message::<P>(msg, stream, handle),
        Ok(None) => continue_loop(stream, handle), // empty message
        Err(err) => {
            error!("websocket market {:?} parse error: {}", P::market(), err);
            continue_loop(stream, handle)
        }
    }
}

/// Processes message and sends events if any.
/// Registers currency pair channel on orderbook.
fn process_message<P: Protocol>(msg: Message, stream: Client, handle: Handle) -> LoopFuture {
    // Check if event is order book
    if msg.events.has_order_book() {
        // Register new channel and send events
        return handle_orderbook::<P>(msg, stream, handle);
    }
    send_and_continue::<P>(msg, stream, handle)
}

/// Registers currency pair channel id in `Handle`.
/// Sends parsed events to handle sender.
fn handle_orderbook<P: Protocol>(msg: Message, stream: Client, handle: Handle) -> LoopFuture {
    // unwrap orderbook from msg
    let pair = match msg.events[0] {
        // NOTE: we unwrap here because if `None` it's wrong impl.
        Event::OrderBook(ref book) => book.pair.clone().unwrap(),
        _ => panic!("code was mangled?"),
    };
    // create new handle with registered channel
    let handle = handle.with_channel(msg.chan_id, pair);
    send_and_continue::<P>(msg, stream, handle)
}

/// Sends message to handle sender.
fn send_and_continue<P: Protocol>(msg: Message, stream: Client, handle: Handle) -> LoopFuture {
    // get event currency pair from handle
    let pair = match handle.get_pair(&msg.chan_id) {
        Some(pair) => Some(pair),
        None => {
            // log an error if not heart beat
            if msg.chan_id != 0 {
                error!(
                    "message on unknown channel, market: {:?} chan: {:?}",
                    P::market(),
                    msg.chan_id
                );
                return continue_loop(stream, handle);
            }
            None
        }
    };
    // create stream events struct
    let event = Events {
        market: Some(P::market()),
        pair: pair,
        events: msg.events,
        timestamp: UTC::now().timestamp(),
    };
    // send event for a receiver
    handle.sender.unbounded_send(event).unwrap();
    // continue loop with registered pair
    continue_loop(stream, handle)
}

/// Sends message to a WebSocket stream and continues loop.
fn send_msg(msg: OwnedMessage, stream: Client, handle: Handle) -> LoopFuture {
    stream
        .send(msg)
        .map(|stream| Loop::Continue((stream, Some(handle))))
        .map_err(|e| e.into())
        .into_box()
}