1#![feature(test)]
3
4#[cfg(test)]
5extern crate test;
6
7#[macro_use]
8extern crate failure;
9#[macro_use]
10extern crate log;
11#[macro_use]
12extern crate serde_derive;
13extern crate serde;
14extern crate serde_json;
15
16extern crate cxmr_balances;
17extern crate cxmr_exchanges;
18extern crate cxmr_feeds;
19
20#[cfg(test)]
21mod tests;
22
23use std::cmp::Ordering;
24use std::fmt::Debug;
25
26use serde::ser::Serialize;
27
28use cxmr_balances::{IntoPoints, IntoPrice, ReverseRate};
29use cxmr_exchanges::{Order, OrderSide, Trade};
30use cxmr_feeds::{self as feeds, Event, EventData, Events};
31
32static MAX_ORDERS: usize = 20;
33
34#[derive(Debug, Fail)]
36pub enum Error {
37 #[fail(display = "{} sequence is lower than previous: {}.", _0, _1)]
39 SequenceConflict(u64, u64),
40
41 #[fail(display = "{} timestamp is lower than previous: {}.", _0, _1)]
43 TimestampConflict(u64, u64),
44}
45
46#[derive(Clone, Deserialize, Serialize, PartialEq)]
48pub struct OrderBook {
49 asks: OrderBookSide<u64>,
50 bids: OrderBookSide<ReverseRate>,
51 last_seq: u64,
52 pub last_update: u64,
54}
55
56impl Default for OrderBook {
57 fn default() -> Self {
58 OrderBook {
59 asks: OrderBookSide::new(),
60 bids: OrderBookSide::new(),
61 last_seq: 0,
62 last_update: 0,
63 }
64 }
65}
66
67impl OrderBook {
68 pub fn asks(&self) -> &OrderBookSide<u64> {
70 &self.asks
71 }
72
73 pub fn bids(&self) -> &OrderBookSide<ReverseRate> {
75 &self.bids
76 }
77
78 pub fn spread(&self) -> Option<f64> {
80 self.spread_nth(0)
81 }
82
83 pub fn spread_nth(&self, n: usize) -> Option<f64> {
85 let ask_rate = self.asks.nth_rate(n)? as f64;
86 let bid_rate = self.bids.nth_rate(n)? as f64;
87 Some((ask_rate - bid_rate) / bid_rate)
88 }
89
90 pub fn update<'a, I>(&mut self, events: I)
92 where
93 I: Iterator<Item = &'a Event>,
94 {
95 events.for_each(|event| self.update_from_event(event));
96 }
97
98 pub fn consume(&mut self, mut ev: Events) -> Result<Events, Error> {
101 self.check_update_events(&ev)?;
102 ev.events = self.consume_events(ev.events);
103 Ok(ev)
104 }
105
106 pub fn consume_events(&mut self, events: Vec<Event>) -> Vec<Event> {
109 events
110 .into_iter()
111 .map(|event| self.consume_event(event))
112 .collect::<Vec<Vec<Event>>>()
113 .concat()
114 }
115
116 pub fn consume_event(&mut self, event: Event) -> Vec<Event> {
119 if let Event::OrderBook(ref book) = event {
120 return self.set_orderbook(book);
121 }
122 self.update_from_event(&event);
123 vec![event]
124 }
125
126 #[inline(always)]
128 fn check_update_events(&mut self, ev: &Events) -> Result<(), Error> {
129 if let Some(timestamp) = ev.timestamp {
130 if timestamp < self.last_update {
131 return Err(Error::TimestampConflict(timestamp, self.last_update));
132 } else {
133 self.last_update = timestamp;
134 }
135 }
136 Ok(())
137 }
138
139 pub fn update_from_event(&mut self, event: &Event) {
141 match event {
142 Event::InsertOrder(ref order) => self.insert_order(order),
143 Event::ResetOrder(ref order) => self.reset_order(order),
144 Event::RemoveOrder(ref order) => self.remove_order(order),
145 Event::Trade(ref trade) => self.process_trade(trade),
146 Event::OrderBook(ref book) => {
147 self.asks.set_orders(&book.asks, OrderSide::Ask);
148 self.bids.set_orders(&book.bids, OrderSide::Bid);
149 }
150 }
151 }
152
153 pub fn update_from_rows(&mut self, events: &[EventData]) {
155 for event in events {
156 self.update_from_row(event);
157 }
158 }
159
160 pub fn update_from_row(&mut self, event: &EventData) {
162 if self.last_update > event.ts {
163 warn!("Tried to update {} with {}", self.last_update, event.ts);
164 return;
165 }
166 if event.is_bid {
167 self.bids.update_from_row(event);
168 } else {
169 self.asks.update_from_row(event);
170 }
171 self.last_update = event.ts;
172 }
173
174 fn set_orderbook(&mut self, book: &feeds::OrderBook) -> Vec<Event> {
175 let change_asks = self.asks.set_orders(&book.asks, OrderSide::Ask);
176 let change_bids = self.bids.set_orders(&book.bids, OrderSide::Bid);
177 let events = [&change_asks[..], &change_bids[..]].concat();
178 events
179 }
180
181 fn insert_order(&mut self, order: &Order) {
182 match order.side {
183 OrderSide::Ask => self.asks.insert_order(order),
184 OrderSide::Bid => self.bids.insert_order(order),
185 }
186 }
187
188 fn reset_order(&mut self, order: &Order) {
189 match order.side {
190 OrderSide::Ask => self.asks.reset_order(order),
191 OrderSide::Bid => self.bids.reset_order(order),
192 }
193 }
194
195 fn remove_order(&mut self, order: &Order) {
196 match order.side {
197 OrderSide::Ask => self.asks.remove_order(order),
198 OrderSide::Bid => self.bids.remove_order(order),
199 }
200 }
201
202 fn process_trade(&mut self, trade: &Trade) {
203 match trade.order.side {
204 OrderSide::Ask => self.asks.process_trade(trade),
205 OrderSide::Bid => self.bids.process_trade(trade),
206 }
207 }
208}
209
210impl<'a> From<&'a [EventData]> for OrderBook {
211 fn from(events: &'a [EventData]) -> Self {
212 let mut book = OrderBook::default();
213 book.update_from_rows(events);
214 book
215 }
216}
217
218#[derive(Clone, Deserialize, Serialize, PartialEq)]
220pub struct OrderBookSide<
221 Rate: IntoPoints + IntoPrice + Serialize + From<u64> + Into<u64> + PartialEq + Copy + Ord + Debug,
222>(pub Vec<(Rate, u64)>);
223
224impl<Rate> OrderBookSide<Rate>
225where
226 Rate:
227 IntoPoints + IntoPrice + Serialize + From<u64> + Into<u64> + PartialEq + Copy + Ord + Debug,
228{
229 pub fn new() -> Self {
230 OrderBookSide(Vec::new())
231 }
232
233 pub fn depth(&self) -> (f64, f64) {
235 let (ratesum, amount) = self
236 .0
237 .iter()
238 .fold((0.0, 0.0), |(rsum, asum), (rate, amount)| {
239 let rate = rate.clone().into_price();
240 let amount = amount.clone().into_price();
241 (rsum + (rate * amount), asum + amount)
242 });
243 (ratesum / amount, amount)
244 }
245
246 pub fn orders(&self) -> &Vec<(Rate, u64)> {
248 &self.0
249 }
250
251 pub fn first(&self) -> Option<u64> {
253 self.0.get(0).map(|(rate, _)| rate.into_points())
254 }
255
256 pub fn nth_rate(&self, n: usize) -> Option<u64> {
258 self.0.get(n).map(|(rate, _)| rate.into_points())
259 }
260
261 pub fn is_empty(&self) -> bool {
263 self.0.len() == 0
264 }
265
266 fn check_limits(&mut self) {
267 if self.0.len() > MAX_ORDERS {
268 self.0.truncate(MAX_ORDERS);
269 }
270 }
271
272 fn update_from_row(&mut self, event: &EventData) {
273 if event.is_trade {
274 self.process_raw_trade(event.rate.into_points().into(), event.amount as f64);
275 } else if event.amount < 0.0 {
276 self.remove_rate_amount(event.rate.into_points().into(), event.amount.abs() as f64);
277 } else {
278 self.reset_rate_amount(event.rate.into_points().into(), event.amount as f64);
279 }
280 }
281
282 fn reset_order(&mut self, order: &Order) {
283 if order.amount < 0.0 {
284 self.remove_rate_amount(order.rate.into_points().into(), order.amount.abs());
285 } else {
286 self.reset_rate_amount(order.rate.into_points().into(), order.amount);
287 }
288 }
289
290 fn reset_rate_amount(&mut self, rate: Rate, amount: f64) -> Option<()> {
291 match self.0.iter().position(|(r, _)| r == &rate) {
292 Some(index) => {
293 if amount == 0.0 {
294 self.0.remove(index);
295 } else {
296 unsafe {
297 let (_, ref mut order) = self.0.get_unchecked_mut(index);
298 *order = amount.into_points();
299 }
300 }
301 }
302 None => {
303 if amount != 0.0 {
304 let amount = amount.into_points();
305 self.0.push((rate, amount));
306 self.sort_orders();
307 self.check_limits();
308 }
309 }
310 }
311 None
312 }
313
314 fn insert_order(&mut self, order: &Order) {
315 if order.amount == 0.0 {
316 warn!("Tried to insert order with zero amount.");
317 return;
318 }
319 let rate: Rate = order.rate.into_points().into();
320 let amount = order.amount.into_points();
321 match self.0.iter().position(|(r, _)| r == &rate) {
322 Some(index) => unsafe {
323 let (_, ref mut order) = self.0.get_unchecked_mut(index);
324 *order += amount;
325 },
326 None => {
327 self.0.push((rate, amount));
328 self.sort_orders();
329 self.check_limits();
330 }
331 }
332 self.check_limits();
333 }
334
335 fn remove_order(&mut self, order: &Order) {
336 self.remove_rate_amount(order.rate.into_points().into(), order.amount);
337 }
338
339 fn remove_rate_amount(&mut self, rate: Rate, amount: f64) {
340 let amount = amount.into_points();
341 if let Some(index) = self.0.iter().position(|(r, _)| r == &rate) {
342 let remove = unsafe {
343 let mut order = self.0.get_unchecked_mut(index);
344 if amount < order.1 {
345 order.1 -= amount;
346 false
347 } else {
348 true
349 }
350 };
351 if remove {
352 self.0.remove(index);
353 }
354 }
355 }
356
357 fn process_trade(&mut self, trade: &Trade) {
358 self.process_raw_trade(trade.order.rate.into_points().into(), trade.order.amount);
359 }
360
361 fn process_raw_trade(&mut self, rate: Rate, amount: f64) -> Option<()> {
362 let amount = amount.into_points();
363 match self.0.iter().position(|(r, _)| r == &rate) {
364 Some(index) => {
365 let remove = unsafe {
366 let (_, ref mut order) = self.0.get_unchecked_mut(index);
367 if amount < *order {
368 *order -= amount;
369 false
370 } else {
371 true
372 }
373 };
374 if remove {
375 if index == 0 {
376 self.0.remove(index);
377 } else if index < self.0.len() - 1 {
378 self.0 = self.0.split_off(index + 1);
379 } else {
380 self.0.clear();
381 }
382 } else if index > 0 {
383 self.0 = self.0.split_off(index);
384 }
385 }
386 None => {
387 let first_rate = self.0.iter().position(|(r, _)| {
388 match r.partial_cmp(&rate).unwrap_or(Ordering::Equal) {
389 Ordering::Greater | Ordering::Equal => true,
390 Ordering::Less => false,
391 }
392 })?;
393 self.0 = self.0.split_off(first_rate);
394 }
395 }
396 None
397 }
398
399 fn set_orders(&mut self, orders: &Vec<feeds::OrderRate>, side: OrderSide) -> Vec<Event> {
400 let mut old_book: Vec<(Rate, u64)> = orders
401 .iter()
402 .filter(|order| order.amount > 0.0)
403 .map(|order| (order.rate.into_points().into(), order.amount.into_points()))
404 .collect();
405 std::mem::swap(&mut old_book, &mut self.0);
406 self.sort_orders();
407
408 let removed: Vec<Event> = old_book
409 .iter()
410 .filter(|(rate, _)| self.0.iter().find(|(r, _)| r == rate).is_none())
411 .map(|(rate, _)| {
412 Event::ResetOrder(Order {
413 side: side.clone(),
414 rate: rate.into_price(),
415 amount: 0.0,
416 })
417 })
418 .collect();
419
420 let resets: Vec<Event> = self
421 .0
422 .iter()
423 .filter(
424 |(rate, amount)| match old_book.iter().find(|(r, _)| r == rate) {
425 Some((_, prev)) => amount != prev,
426 None => true,
427 },
428 )
429 .map(|(rate, amount)| {
430 Event::ResetOrder(Order {
431 side: side.clone(),
432 rate: rate.into_price(),
433 amount: amount.into_price(),
434 })
435 })
436 .collect();
437
438 [&removed[..], &resets[..]].concat()
439 }
440
441 fn sort_orders(&mut self) {
442 self.0.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
443 }
444}
445
446impl From<&OrderBook> for feeds::OrderBook {
447 fn from(book: &OrderBook) -> feeds::OrderBook {
448 let asks = book
449 .asks()
450 .0
451 .iter()
452 .map(|(rate, amount)| feeds::OrderRate {
453 rate: rate.into_price(),
454 amount: amount.into_price(),
455 })
456 .collect();
457 let bids = book
458 .bids()
459 .0
460 .iter()
461 .map(|(rate, amount)| feeds::OrderRate {
462 rate: rate.into_price(),
463 amount: amount.into_price(),
464 })
465 .collect();
466 feeds::OrderBook {
467 pair: None,
468 asks,
469 bids,
470 }
471 }
472}
473
474impl From<&OrderBook> for Vec<EventData> {
475 fn from(book: &OrderBook) -> Vec<EventData> {
476 let mut orders: Vec<EventData> = book
477 .asks()
478 .0
479 .iter()
480 .map(|(rate, amount)| EventData {
481 ts: book.last_update,
482 is_bid: false,
483 is_trade: false,
484 rate: rate.into_price() as f32,
485 amount: amount.into_price() as f32,
486 })
487 .collect();
488 orders.extend(book.bids().0.iter().map(|(rate, amount)| EventData {
489 ts: book.last_update,
490 is_bid: true,
491 is_trade: false,
492 rate: rate.into_price() as f32,
493 amount: amount.into_price() as f32,
494 }));
495 orders
496 }
497}
498
499impl From<&OrderBook> for Event {
500 fn from(book: &OrderBook) -> Event {
501 Event::OrderBook(book.into())
502 }
503}
504
505impl ::std::fmt::Debug for OrderBook {
506 fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
507 let json = ::serde_json::to_string(self).map_err(|_| ::std::fmt::Error)?;
508 f.write_str(&json)
509 }
510}
511
512impl<Rate> Debug for OrderBookSide<Rate>
513where
514 Rate:
515 IntoPoints + IntoPrice + Serialize + From<u64> + Into<u64> + PartialEq + Copy + Ord + Debug,
516{
517 fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
518 let json = ::serde_json::to_string(self).map_err(|_| ::std::fmt::Error)?;
519 f.write_str(&json)
520 }
521}