1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::{Level3OrderBook, WSClient};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use super::ws_client_internal::{MiscMessage, WSClientInternal};
use super::{Candlestick, OrderBook, OrderBookSnapshot, Ticker, Trade, BBO};
use log::*;
use serde_json::Value;

pub(super) const EXCHANGE_NAME: &str = "bitstamp";

const WEBSOCKET_URL: &str = "wss://ws.bitstamp.net";

/// The WebSocket client for Bitstamp Spot market.
///
/// Bitstamp has only Spot market.
///
///   * WebSocket API doc: <https://www.bitstamp.net/websocket/v2/>
///   * Trading at: <https://www.bitstamp.net/market/tradeview/>
pub struct BitstampWSClient<'a> {
    client: WSClientInternal<'a>,
}

fn channel_to_command(channel: &str, subscribe: bool) -> String {
    if channel.starts_with('{') {
        return channel.to_string();
    }
    format!(
        r#"{{"event":"bts:{}","data":{{"channel":"{}"}}}}"#,
        if subscribe {
            "subscribe"
        } else {
            "unsubscribe"
        },
        channel
    )
}

fn channels_to_commands(channels: &[String], subscribe: bool) -> Vec<String> {
    channels
        .iter()
        .map(|ch| channel_to_command(ch, subscribe))
        .collect()
}

fn on_misc_msg(msg: &str) -> MiscMessage {
    let resp = serde_json::from_str::<HashMap<String, Value>>(&msg);
    if resp.is_err() {
        error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
        return MiscMessage::Misc;
    }
    let obj = resp.unwrap();

    let event = obj.get("event").unwrap().as_str().unwrap();
    match event {
        "bts:subscription_succeeded" | "bts:unsubscription_succeeded" => {
            debug!("Received {} from {}", msg, EXCHANGE_NAME);
            MiscMessage::Misc
        }
        "bts:error" => {
            error!("Received {} from {}", msg, EXCHANGE_NAME);
            panic!("Received {} from {}", msg, EXCHANGE_NAME);
        }
        "bts:request_reconnect" => {
            warn!("Received {} from {}", msg, EXCHANGE_NAME);
            MiscMessage::Reconnect
        }
        _ => MiscMessage::Normal,
    }
}

fn to_raw_channel(channel: &str, pair: &str) -> String {
    format!("{}_{}", channel, pair)
}

#[rustfmt::skip]
impl_trait!(Trade, BitstampWSClient, subscribe_trade, "live_trades", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBook, BitstampWSClient, subscribe_orderbook, "diff_order_book", to_raw_channel);
#[rustfmt::skip]
impl_trait!(OrderBookSnapshot, BitstampWSClient, subscribe_orderbook_snapshot, "order_book", to_raw_channel);
#[rustfmt::skip]
impl_trait!(Level3OrderBook, BitstampWSClient, subscribe_l3_orderbook, "live_orders", to_raw_channel);

impl<'a> Ticker for BitstampWSClient<'a> {
    fn subscribe_ticker(&self, _pairs: &[String]) {
        panic!("Bitstamp WebSocket does NOT have ticker channel");
    }
}

impl<'a> BBO for BitstampWSClient<'a> {
    fn subscribe_bbo(&self, _pairs: &[String]) {
        panic!("Bitstamp WebSocket does NOT have BBO channel");
    }
}

impl<'a> Candlestick for BitstampWSClient<'a> {
    fn subscribe_candlestick(&self, _pairs: &[String], _interval: u32) {
        panic!("Bitstamp does NOT have candlestick channel");
    }
}

define_client!(
    BitstampWSClient,
    EXCHANGE_NAME,
    WEBSOCKET_URL,
    channels_to_commands,
    on_misc_msg,
    None
);

#[cfg(test)]
mod tests {
    #[test]
    fn test_channel_to_command() {
        assert_eq!(
            r#"{"event":"bts:subscribe","data":{"channel":"live_trades_btcusd"}}"#,
            super::channel_to_command("live_trades_btcusd", true)
        );

        assert_eq!(
            r#"{"event":"bts:unsubscribe","data":{"channel":"live_trades_btcusd"}}"#,
            super::channel_to_command("live_trades_btcusd", false)
        );
    }
}