#![feature(try_trait, test)]
#[cfg(test)]
extern crate test;
#[macro_use]
extern crate lazy_static;
extern crate chrono;
extern crate hashbrown;
extern crate serde_json;
extern crate cxmr_currency;
extern crate cxmr_exchanges;
extern crate cxmr_feeds;
extern crate cxmr_ws_client;
mod pair;
use chrono::offset::Utc;
use std::convert::TryFrom;
use std::iter::FromIterator;
use hashbrown::HashMap;
use serde_json::Value;
use cxmr_currency::CurrencyPair;
use cxmr_exchanges::{Exchange, Market, Order, OrderSide, Trade};
use cxmr_feeds::{Event, Events, OrderBook, OrderRate};
use cxmr_ws_client::{Command, Error, Parser, Protocol, Subscription};
use self::pair::POLONIEX_PAIRS_MAP;
pub struct PoloniexPublicProtocol;
impl Protocol<Events> for PoloniexPublicProtocol {
type Parser = PoloniexPublicParser;
fn subscription(sub: &Subscription) -> Option<(Command, Self::Parser)> {
match sub {
Subscription::Pairs(pairs) => {
let address = "wss://api2.poloniex.com:443".to_owned();
let pairs: Vec<_> = pairs
.into_iter()
.filter(|pair| POLONIEX_PAIRS_MAP.contains_key(pair))
.map(|pair| pair.clone())
.collect();
let commands: Vec<String> = pairs.iter().map(|pair| subscribe(pair)).collect();
Some((
Command::Commands(address, commands),
PoloniexPublicParser::new(&pairs),
))
}
Subscription::User(_) => None,
}
}
}
pub struct PoloniexPublicParser {
subscribed: HashMap<u64, Market>,
}
impl PoloniexPublicParser {
fn new(pairs: &Vec<CurrencyPair>) -> Self {
let subscribed = HashMap::from_iter(pairs.iter().filter_map(|pair| {
let id = POLONIEX_PAIRS_MAP.get(pair)?;
let market = Market::new(Exchange::Poloniex, pair.clone());
Some((*id, market))
}));
PoloniexPublicParser { subscribed }
}
}
impl Parser<Events> for PoloniexPublicParser {
fn parse(&mut self, text: &str) -> Result<Option<Events>, Error> {
let msg = ::serde_json::from_str::<Value>(text).map_err(|e| Error::Client(e.into()))?;
let arr = msg.as_array()?;
if arr.len() <= 2 {
return Ok(None);
}
let chan_id = msg.get(0)?.as_u64()?;
let events = msg.get(2)?.as_array()?;
let mut results = Vec::with_capacity(events.len());
for event in events {
results.push(parse_event(event)?);
}
let market = self.subscribed.get(&chan_id)?.clone();
Ok(Some(Events {
market,
events: results,
timestamp: Some(Utc::now().timestamp_millis() as u64),
}))
}
}
fn subscribe(pair: &CurrencyPair) -> String {
format!(
"{{\"command\":\"subscribe\", \"channel\":\"{}\"}}",
pair.join_reversed()
)
}
fn parse_event(event: &Value) -> Result<Event, Error> {
match event.get(0)?.as_str()? {
"i" => Ok(Event::OrderBook(parse_order_book(event.get(1)?)?)),
"o" => Ok(Event::ResetOrder(parse_order(event)?)),
"t" => Ok(Event::Trade(parse_trade(event)?)),
any => Err(Error::UnexpectedEventKind(any.to_owned()).into()),
}
}
fn parse_order_book(event: &Value) -> Result<OrderBook, Error> {
let pair = event.get("currencyPair")?.as_str()?;
let pair = CurrencyPair::split_reversed(pair)?;
let books = event.get("orderBook")?.as_array()?;
let asks = books
.get(0)?
.as_object()?
.into_iter()
.map(|(rate, amount)| {
Ok(OrderRate {
rate: rate.as_str().parse()?,
amount: amount.as_str()?.parse()?,
})
})
.collect::<Result<_, Error>>()?;
let bids = books
.get(1)?
.as_object()?
.into_iter()
.map(|(rate, amount)| {
Ok(OrderRate {
rate: rate.as_str().parse()?,
amount: amount.as_str()?.parse()?,
})
})
.collect::<Result<_, Error>>()?;
Ok(OrderBook {
asks: asks,
bids: bids,
pair: Some(pair),
})
}
fn parse_order(event: &Value) -> Result<Order, Error> {
Ok(Order {
side: OrderSide::try_from(event.get(1)?.as_i64()?)?,
rate: event.get(2)?.as_str()?.parse()?,
amount: event.get(3)?.as_str()?.parse()?,
})
}
fn parse_trade(event: &Value) -> Result<Trade, Error> {
Ok(Trade {
id: Some(event.get(1)?.as_str()?.parse::<i64>()?),
order: Order {
side: OrderSide::try_from(event.get(2)?.as_i64()?)?,
rate: event.get(3)?.as_str()?.parse::<f64>()?,
amount: event.get(4)?.as_str()?.parse::<f64>()?,
},
timestamp: event.get(5)?.as_u64()? * 1000,
})
}
#[cfg(test)]
mod tests {
use super::*;
use test::Bencher;
fn create_order() -> Order {
Order {
side: OrderSide::Bid,
rate: 0.00002789,
amount: 1788.27536750,
}
}
fn create_trade() -> Trade {
Trade {
id: Some(14179278),
order: create_order(),
timestamp: 1509576585000,
}
}
#[test]
fn parse_test() {
let body = "[117,103957441,[[\"o\",1,\"0.00002789\",\"1788.27536750\"], \
[\"t\",\"14179278\",1,\"0.00002789\",\"1788.27536750\",1509576585]]]";
let mut parser = PoloniexPublicParser::new(&vec![CurrencyPair::XrpBtc]);
let ev = parser.parse(body).unwrap().unwrap();
assert_eq!(ev.market, Market::PoloniexXrpBtc);
assert_eq!(ev.events.len(), 2);
if let &Event::ResetOrder(ref o) = ev.events.get(0).unwrap() {
assert_eq!(o, &create_order());
} else {
panic!("expected order");
}
if let &Event::Trade(ref t) = ev.events.get(1).unwrap() {
assert_eq!(t, &create_trade());
} else {
panic!("expected trade");
}
}
#[bench]
fn parse_body(b: &mut Bencher) {
let body = "[117,103957441,[[\"o\",1,\"0.00002789\",\"1788.27536750\"]]]";
let mut parser = PoloniexPublicParser::new(&vec![CurrencyPair::XrpBtc]);
b.iter(|| {
parser.parse(body).unwrap();
});
}
#[bench]
fn parse_body_log(b: &mut Bencher) {
let body = "[117,103957441,[[\"o\",1,\"0.00002789\",\"1788.27536750\"]]]";
let mut parser = PoloniexPublicParser::new(&vec![CurrencyPair::XrpBtc]);
b.iter(|| {
parser.parse(body).unwrap();
});
}
#[bench]
fn parse_body_two(b: &mut Bencher) {
let body = "[117,103957441,[[\"o\",1,\"0.00002789\",\"1788.27536750\"], \
[\"o\",1,\"0.00002784\",\"82074.71641065\"]]]";
let mut parser = PoloniexPublicParser::new(&vec![CurrencyPair::XrpBtc]);
b.iter(|| {
parser.parse(body).unwrap();
});
}
}