use std::iter::FromIterator;
use chrono::offset::Utc;
use hashbrown::HashMap;
use serde_json::Value;
use cxmr_currency::CurrencyPair;
use cxmr_exchanges::Market;
use cxmr_exchanges::{Exchange, Order, OrderSide, Trade};
use cxmr_feeds::{Event, Events, OrderBook, OrderRate};
use cxmr_ws_client::{Command, Error, Parser, Protocol, Subscription};
pub struct BinancePublicProtocol;
impl Protocol<Events> for BinancePublicProtocol {
type Parser = BinancePublicParser;
fn subscription(sub: &Subscription) -> Option<(Command, Self::Parser)> {
match sub {
Subscription::Pairs(pairs) => {
let address = subscription_address(pairs);
Some((Command::Connect(address), BinancePublicParser::new(pairs)))
}
Subscription::User(_) => None,
}
}
}
pub struct BinancePublicParser {
subscribed: HashMap<String, Market>,
}
impl BinancePublicParser {
fn new(pairs: &Vec<CurrencyPair>) -> Self {
let subscribed = HashMap::from_iter(pairs.iter().map(|pair| {
let id = format!("{}{}", pair.quote(), pair.base()).to_lowercase();
let market = Market::new(Exchange::Binance, pair.clone());
(id, market)
}));
BinancePublicParser { subscribed }
}
}
impl Parser<Events> for BinancePublicParser {
fn parse(&mut self, text: &str) -> Result<Option<Events>, Error> {
let msg = ::serde_json::from_str::<Value>(text).map_err(|e| Error::Client(e.into()))?;
let mut split = msg.get("stream")?.as_str()?.split('@');
let chan_id = split.next()?;
let channel = split.next()?;
let data = msg.get("data")?;
match channel {
"depth20" => {
let asks = data
.get("asks")?
.as_array()?
.iter()
.map(|order| {
Ok(OrderRate {
rate: order.get(0)?.as_str()?.parse()?,
amount: order.get(1)?.as_str()?.parse()?,
})
})
.collect::<Result<Vec<OrderRate>, Error>>()?;
let bids = data
.get("bids")?
.as_array()?
.iter()
.map(|order| {
Ok(OrderRate {
rate: order.get(0)?.as_str()?.parse()?,
amount: order.get(1)?.as_str()?.parse()?,
})
})
.collect::<Result<Vec<OrderRate>, Error>>()?;
Ok(Some(Events {
market: self.subscribed.get(chan_id)?.clone(),
events: vec![Event::OrderBook(OrderBook {
pair: None,
asks: asks,
bids: bids,
})],
timestamp: Some(Utc::now().timestamp_millis() as u64),
}))
}
"trade" => {
let timestamp = data.get("E")?.as_u64()?;
let rate = data.get("p")?.as_str()?.parse::<f64>()?;
let amount = data.get("q")?.as_str()?.parse::<f64>()?;
let trade_id = data.get("t")?.as_i64()?;
let side = if data.get("m")?.as_bool()? {
OrderSide::Bid
} else {
OrderSide::Ask
};
Ok(Some(Events {
market: self.subscribed.get(chan_id)?.clone(),
events: vec![Event::Trade(Trade {
id: Some(trade_id),
order: Order {
side: side,
rate: rate,
amount: amount,
},
timestamp: timestamp,
})],
timestamp: Some(Utc::now().timestamp_millis() as u64),
}))
}
_ => Ok(None),
}
}
}
fn subscription_address(pairs: &Vec<CurrencyPair>) -> String {
format!(
"wss://stream.binance.com:9443/stream?streams={}",
pairs
.into_iter()
.map(|pair| {
let pair = format!("{}{}", pair.quote(), pair.base()).to_lowercase();
format!("{}@trade/{}@depth20", pair, pair)
})
.collect::<Vec<String>>()
.join("/")
)
}