cxmr-ws-client-binance 0.0.1

Binance WebSocket feed client.
Documentation
use std::iter::FromIterator;

use chrono::offset::Utc;
use hashbrown::HashMap;
use serde_json::Value;

use cxmr_currency::CurrencyPair;
use cxmr_exchanges::Market;
use cxmr_exchanges::{Exchange, Order, OrderSide, Trade};
use cxmr_feeds::{Event, Events, OrderBook, OrderRate};
use cxmr_ws_client::{Command, Error, Parser, Protocol, Subscription};

/// Binance Public feed WebSocket protocol.
pub struct BinancePublicProtocol;

// Implementation of Binance Public feed WebSocket protocol.
impl Protocol<Events> for BinancePublicProtocol {
    /// Protocol parser.
    type Parser = BinancePublicParser;

    /// Creates protocol parser and stream address.
    fn subscription(sub: &Subscription) -> Option<(Command, Self::Parser)> {
        // let parser = BinancePrivateParser
        match sub {
            Subscription::Pairs(pairs) => {
                let address = subscription_address(pairs);
                Some((Command::Connect(address), BinancePublicParser::new(pairs)))
            }
            Subscription::User(_) => None,
        }
    }
}

/// Binance Public feed WebSocket protocol parser.
pub struct BinancePublicParser {
    subscribed: HashMap<String, Market>,
}

impl BinancePublicParser {
    fn new(pairs: &Vec<CurrencyPair>) -> Self {
        let subscribed = HashMap::from_iter(pairs.iter().map(|pair| {
            let id = format!("{}{}", pair.quote(), pair.base()).to_lowercase();
            let market = Market::new(Exchange::Binance, pair.clone());
            (id, market)
        }));
        BinancePublicParser { subscribed }
    }
}

// Implementation of Binance WebSocket protocol parser.
impl Parser<Events> for BinancePublicParser {
    /// Parses received WebSocket message.
    fn parse(&mut self, text: &str) -> Result<Option<Events>, Error> {
        let msg = ::serde_json::from_str::<Value>(text).map_err(|e| Error::Client(e.into()))?;
        let mut split = msg.get("stream")?.as_str()?.split('@');
        let chan_id = split.next()?;
        let channel = split.next()?;
        let data = msg.get("data")?;
        match channel {
            "depth20" => {
                // let _seq = data.get("lastUpdateId")?.as_u64()?;
                let asks = data
                    .get("asks")?
                    .as_array()?
                    .iter()
                    .map(|order| {
                        Ok(OrderRate {
                            rate: order.get(0)?.as_str()?.parse()?,
                            amount: order.get(1)?.as_str()?.parse()?,
                        })
                    })
                    .collect::<Result<Vec<OrderRate>, Error>>()?;
                let bids = data
                    .get("bids")?
                    .as_array()?
                    .iter()
                    .map(|order| {
                        Ok(OrderRate {
                            rate: order.get(0)?.as_str()?.parse()?,
                            amount: order.get(1)?.as_str()?.parse()?,
                        })
                    })
                    .collect::<Result<Vec<OrderRate>, Error>>()?;
                Ok(Some(Events {
                    market: self.subscribed.get(chan_id)?.clone(),
                    events: vec![Event::OrderBook(OrderBook {
                        pair: None,
                        asks: asks,
                        bids: bids,
                    })],
                    timestamp: Some(Utc::now().timestamp_millis() as u64),
                }))
            }
            "trade" => {
                let timestamp = data.get("E")?.as_u64()?;
                let rate = data.get("p")?.as_str()?.parse::<f64>()?;
                let amount = data.get("q")?.as_str()?.parse::<f64>()?;
                let trade_id = data.get("t")?.as_i64()?;
                let side = if data.get("m")?.as_bool()? {
                    OrderSide::Bid
                } else {
                    OrderSide::Ask
                };
                Ok(Some(Events {
                    market: self.subscribed.get(chan_id)?.clone(),
                    events: vec![Event::Trade(Trade {
                        id: Some(trade_id),
                        order: Order {
                            side: side,
                            rate: rate,
                            amount: amount,
                        },
                        timestamp: timestamp,
                    })],
                    timestamp: Some(Utc::now().timestamp_millis() as u64),
                }))
            }
            _ => Ok(None),
        }
    }
}

fn subscription_address(pairs: &Vec<CurrencyPair>) -> String {
    format!(
        "wss://stream.binance.com:9443/stream?streams={}",
        pairs
            .into_iter()
            .map(|pair| {
                let pair = format!("{}{}", pair.quote(), pair.base()).to_lowercase();
                format!("{}@trade/{}@depth20", pair, pair)
            })
            .collect::<Vec<String>>()
            .join("/")
    )
}