#![feature(test)]
#[cfg(test)]
extern crate test;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
extern crate cxmr_balances;
extern crate cxmr_exchanges;
extern crate cxmr_feeds;
#[cfg(test)]
mod tests;
use std::cmp::Ordering;
use std::fmt::Debug;
use serde::ser::Serialize;
use cxmr_balances::{IntoPoints, IntoPrice, ReverseRate};
use cxmr_exchanges::{Order, OrderSide, Trade};
use cxmr_feeds::{self as feeds, Event, EventData, Events};
static MAX_ORDERS: usize = 20;
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "{} sequence is lower than previous: {}.", _0, _1)]
SequenceConflict(u64, u64),
#[fail(display = "{} timestamp is lower than previous: {}.", _0, _1)]
TimestampConflict(u64, u64),
}
#[derive(Clone, Deserialize, Serialize, PartialEq)]
pub struct OrderBook {
asks: OrderBookSide<u64>,
bids: OrderBookSide<ReverseRate>,
last_seq: u64,
pub last_update: u64,
}
impl Default for OrderBook {
fn default() -> Self {
OrderBook {
asks: OrderBookSide::new(),
bids: OrderBookSide::new(),
last_seq: 0,
last_update: 0,
}
}
}
impl OrderBook {
pub fn asks(&self) -> &OrderBookSide<u64> {
&self.asks
}
pub fn bids(&self) -> &OrderBookSide<ReverseRate> {
&self.bids
}
pub fn spread(&self) -> Option<f64> {
self.spread_nth(0)
}
pub fn spread_nth(&self, n: usize) -> Option<f64> {
let ask_rate = self.asks.nth_rate(n)? as f64;
let bid_rate = self.bids.nth_rate(n)? as f64;
Some((ask_rate - bid_rate) / bid_rate)
}
pub fn update<'a, I>(&mut self, events: I)
where
I: Iterator<Item = &'a Event>,
{
events.for_each(|event| self.update_from_event(event));
}
pub fn consume(&mut self, mut ev: Events) -> Result<Events, Error> {
self.check_update_events(&ev)?;
ev.events = self.consume_events(ev.events);
Ok(ev)
}
pub fn consume_events(&mut self, events: Vec<Event>) -> Vec<Event> {
events
.into_iter()
.map(|event| self.consume_event(event))
.collect::<Vec<Vec<Event>>>()
.concat()
}
pub fn consume_event(&mut self, event: Event) -> Vec<Event> {
if let Event::OrderBook(ref book) = event {
return self.set_orderbook(book);
}
self.update_from_event(&event);
vec![event]
}
#[inline(always)]
fn check_update_events(&mut self, ev: &Events) -> Result<(), Error> {
if let Some(timestamp) = ev.timestamp {
if timestamp < self.last_update {
return Err(Error::TimestampConflict(timestamp, self.last_update));
} else {
self.last_update = timestamp;
}
}
Ok(())
}
pub fn update_from_event(&mut self, event: &Event) {
match event {
Event::InsertOrder(ref order) => self.insert_order(order),
Event::ResetOrder(ref order) => self.reset_order(order),
Event::RemoveOrder(ref order) => self.remove_order(order),
Event::Trade(ref trade) => self.process_trade(trade),
Event::OrderBook(ref book) => {
self.asks.set_orders(&book.asks, OrderSide::Ask);
self.bids.set_orders(&book.bids, OrderSide::Bid);
}
}
}
pub fn update_from_rows(&mut self, events: &[EventData]) {
for event in events {
self.update_from_row(event);
}
}
pub fn update_from_row(&mut self, event: &EventData) {
if self.last_update > event.ts {
warn!("Tried to update {} with {}", self.last_update, event.ts);
return;
}
if event.is_bid {
self.bids.update_from_row(event);
} else {
self.asks.update_from_row(event);
}
self.last_update = event.ts;
}
fn set_orderbook(&mut self, book: &feeds::OrderBook) -> Vec<Event> {
let change_asks = self.asks.set_orders(&book.asks, OrderSide::Ask);
let change_bids = self.bids.set_orders(&book.bids, OrderSide::Bid);
let events = [&change_asks[..], &change_bids[..]].concat();
events
}
fn insert_order(&mut self, order: &Order) {
match order.side {
OrderSide::Ask => self.asks.insert_order(order),
OrderSide::Bid => self.bids.insert_order(order),
}
}
fn reset_order(&mut self, order: &Order) {
match order.side {
OrderSide::Ask => self.asks.reset_order(order),
OrderSide::Bid => self.bids.reset_order(order),
}
}
fn remove_order(&mut self, order: &Order) {
match order.side {
OrderSide::Ask => self.asks.remove_order(order),
OrderSide::Bid => self.bids.remove_order(order),
}
}
fn process_trade(&mut self, trade: &Trade) {
match trade.order.side {
OrderSide::Ask => self.asks.process_trade(trade),
OrderSide::Bid => self.bids.process_trade(trade),
}
}
}
impl<'a> From<&'a [EventData]> for OrderBook {
fn from(events: &'a [EventData]) -> Self {
let mut book = OrderBook::default();
book.update_from_rows(events);
book
}
}
#[derive(Clone, Deserialize, Serialize, PartialEq)]
pub struct OrderBookSide<
Rate: IntoPoints + IntoPrice + Serialize + From<u64> + Into<u64> + PartialEq + Copy + Ord + Debug,
>(pub Vec<(Rate, u64)>);
impl<Rate> OrderBookSide<Rate>
where
Rate:
IntoPoints + IntoPrice + Serialize + From<u64> + Into<u64> + PartialEq + Copy + Ord + Debug,
{
pub fn new() -> Self {
OrderBookSide(Vec::new())
}
pub fn depth(&self) -> (f64, f64) {
let (ratesum, amount) = self
.0
.iter()
.fold((0.0, 0.0), |(rsum, asum), (rate, amount)| {
let rate = rate.clone().into_price();
let amount = amount.clone().into_price();
(rsum + (rate * amount), asum + amount)
});
(ratesum / amount, amount)
}
pub fn orders(&self) -> &Vec<(Rate, u64)> {
&self.0
}
pub fn first(&self) -> Option<u64> {
self.0.get(0).map(|(rate, _)| rate.into_points())
}
pub fn nth_rate(&self, n: usize) -> Option<u64> {
self.0.get(n).map(|(rate, _)| rate.into_points())
}
pub fn is_empty(&self) -> bool {
self.0.len() == 0
}
fn check_limits(&mut self) {
if self.0.len() > MAX_ORDERS {
self.0.truncate(MAX_ORDERS);
}
}
fn update_from_row(&mut self, event: &EventData) {
if event.is_trade {
self.process_raw_trade(event.rate.into_points().into(), event.amount as f64);
} else if event.amount < 0.0 {
self.remove_rate_amount(event.rate.into_points().into(), event.amount.abs() as f64);
} else {
self.reset_rate_amount(event.rate.into_points().into(), event.amount as f64);
}
}
fn reset_order(&mut self, order: &Order) {
if order.amount < 0.0 {
self.remove_rate_amount(order.rate.into_points().into(), order.amount.abs());
} else {
self.reset_rate_amount(order.rate.into_points().into(), order.amount);
}
}
fn reset_rate_amount(&mut self, rate: Rate, amount: f64) -> Option<()> {
match self.0.iter().position(|(r, _)| r == &rate) {
Some(index) => {
if amount == 0.0 {
self.0.remove(index);
} else {
unsafe {
let (_, ref mut order) = self.0.get_unchecked_mut(index);
*order = amount.into_points();
}
}
}
None => {
if amount != 0.0 {
let amount = amount.into_points();
self.0.push((rate, amount));
self.sort_orders();
self.check_limits();
}
}
}
None
}
fn insert_order(&mut self, order: &Order) {
if order.amount == 0.0 {
warn!("Tried to insert order with zero amount.");
return;
}
let rate: Rate = order.rate.into_points().into();
let amount = order.amount.into_points();
match self.0.iter().position(|(r, _)| r == &rate) {
Some(index) => unsafe {
let (_, ref mut order) = self.0.get_unchecked_mut(index);
*order += amount;
},
None => {
self.0.push((rate, amount));
self.sort_orders();
self.check_limits();
}
}
self.check_limits();
}
fn remove_order(&mut self, order: &Order) {
self.remove_rate_amount(order.rate.into_points().into(), order.amount);
}
fn remove_rate_amount(&mut self, rate: Rate, amount: f64) {
let amount = amount.into_points();
if let Some(index) = self.0.iter().position(|(r, _)| r == &rate) {
let remove = unsafe {
let mut order = self.0.get_unchecked_mut(index);
if amount < order.1 {
order.1 -= amount;
false
} else {
true
}
};
if remove {
self.0.remove(index);
}
}
}
fn process_trade(&mut self, trade: &Trade) {
self.process_raw_trade(trade.order.rate.into_points().into(), trade.order.amount);
}
fn process_raw_trade(&mut self, rate: Rate, amount: f64) -> Option<()> {
let amount = amount.into_points();
match self.0.iter().position(|(r, _)| r == &rate) {
Some(index) => {
let remove = unsafe {
let (_, ref mut order) = self.0.get_unchecked_mut(index);
if amount < *order {
*order -= amount;
false
} else {
true
}
};
if remove {
if index == 0 {
self.0.remove(index);
} else if index < self.0.len() - 1 {
self.0 = self.0.split_off(index + 1);
} else {
self.0.clear();
}
} else if index > 0 {
self.0 = self.0.split_off(index);
}
}
None => {
let first_rate = self.0.iter().position(|(r, _)| {
match r.partial_cmp(&rate).unwrap_or(Ordering::Equal) {
Ordering::Greater | Ordering::Equal => true,
Ordering::Less => false,
}
})?;
self.0 = self.0.split_off(first_rate);
}
}
None
}
fn set_orders(&mut self, orders: &Vec<feeds::OrderRate>, side: OrderSide) -> Vec<Event> {
let mut old_book: Vec<(Rate, u64)> = orders
.iter()
.filter(|order| order.amount > 0.0)
.map(|order| (order.rate.into_points().into(), order.amount.into_points()))
.collect();
std::mem::swap(&mut old_book, &mut self.0);
self.sort_orders();
let removed: Vec<Event> = old_book
.iter()
.filter(|(rate, _)| self.0.iter().find(|(r, _)| r == rate).is_none())
.map(|(rate, _)| {
Event::ResetOrder(Order {
side: side.clone(),
rate: rate.into_price(),
amount: 0.0,
})
})
.collect();
let resets: Vec<Event> = self
.0
.iter()
.filter(
|(rate, amount)| match old_book.iter().find(|(r, _)| r == rate) {
Some((_, prev)) => amount != prev,
None => true,
},
)
.map(|(rate, amount)| {
Event::ResetOrder(Order {
side: side.clone(),
rate: rate.into_price(),
amount: amount.into_price(),
})
})
.collect();
[&removed[..], &resets[..]].concat()
}
fn sort_orders(&mut self) {
self.0.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
}
}
impl From<&OrderBook> for feeds::OrderBook {
fn from(book: &OrderBook) -> feeds::OrderBook {
let asks = book
.asks()
.0
.iter()
.map(|(rate, amount)| feeds::OrderRate {
rate: rate.into_price(),
amount: amount.into_price(),
})
.collect();
let bids = book
.bids()
.0
.iter()
.map(|(rate, amount)| feeds::OrderRate {
rate: rate.into_price(),
amount: amount.into_price(),
})
.collect();
feeds::OrderBook {
pair: None,
asks,
bids,
}
}
}
impl From<&OrderBook> for Vec<EventData> {
fn from(book: &OrderBook) -> Vec<EventData> {
let mut orders: Vec<EventData> = book
.asks()
.0
.iter()
.map(|(rate, amount)| EventData {
ts: book.last_update,
is_bid: false,
is_trade: false,
rate: rate.into_price() as f32,
amount: amount.into_price() as f32,
})
.collect();
orders.extend(book.bids().0.iter().map(|(rate, amount)| EventData {
ts: book.last_update,
is_bid: true,
is_trade: false,
rate: rate.into_price() as f32,
amount: amount.into_price() as f32,
}));
orders
}
}
impl From<&OrderBook> for Event {
fn from(book: &OrderBook) -> Event {
Event::OrderBook(book.into())
}
}
impl ::std::fmt::Debug for OrderBook {
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
let json = ::serde_json::to_string(self).map_err(|_| ::std::fmt::Error)?;
f.write_str(&json)
}
}
impl<Rate> Debug for OrderBookSide<Rate>
where
Rate:
IntoPoints + IntoPrice + Serialize + From<u64> + Into<u64> + PartialEq + Copy + Ord + Debug,
{
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
let json = ::serde_json::to_string(self).map_err(|_| ::std::fmt::Error)?;
f.write_str(&json)
}
}