1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Tectonic DTF and TCP codec.

use byteorder::{BigEndian, WriteBytesExt};
use rayon::prelude::*;

use cxmr_feeds::{EventData, Events};

use super::Error;

bitflags! {
    pub struct Flags: u8 {
        const FLAG_EMPTY   = 0b0000_0000;
        const FLAG_IS_BID   = 0b0000_0001;
        const FLAG_IS_TRADE = 0b0000_0010;
    }
}

impl Flags {
    pub fn to_bool(&self) -> bool {
        (self.bits == 0b0000_0001) || (self.bits == 0b0000_0010)
    }
}
/// Tectonic command.
#[derive(Clone)]
pub enum Command {
    UseDb(String),
    CreateDb(String),
    Insert(Events),
}

impl Command {
    /// Encodes tectonic command.
    pub fn serialize(self) -> Result<SerializedCommand, Error> {
        match self {
            Command::Insert(events) => Ok(SerializedCommand::Many(serialize_cmds(events)?)),
            Command::UseDb(name) => Ok(SerializedCommand::Single(format!("USE {}\n", name))),
            Command::CreateDb(name) => Ok(SerializedCommand::Single(format!("CREATE {}\n", name))),
        }
    }
}

/// Serialized command.
pub enum SerializedCommand {
    Single(String),
    Many(Vec<String>),
}

/// Serializes event data into dtf format.
pub fn serialize_dtf(event: &EventData, ref_ts: u64) -> Vec<u8> {
    let mut buf: Vec<u8> = Vec::new();
    let _ = buf.write_u16::<BigEndian>((event.ts - ref_ts) as u16);
    let _ = buf.write_u8(0); // seq

    let mut flags = Flags::FLAG_EMPTY;
    if event.is_bid {
        flags |= Flags::FLAG_IS_BID;
    }
    if event.is_trade {
        flags |= Flags::FLAG_IS_TRADE;
    }
    let _ = buf.write_u8(flags.bits());

    let _ = buf.write_f32::<BigEndian>(event.rate);
    let _ = buf.write_f32::<BigEndian>(event.amount);
    buf
}

/// Serializes event data into tectonic command.
pub fn serialze_cmd(event: &EventData, db: &str) -> String {
    let is_trade = if event.is_trade { "t" } else { "f" };
    let is_bid = if event.is_bid { "t" } else { "f" };
    format!(
        "ADD {}, 0, {}, {}, {}, {}; INTO {}\n",
        event.ts, is_trade, is_bid, event.rate, event.amount, db
    )
}

/// Serializes events into tectonic commands.
pub fn serialize_cmds(events: Events) -> Result<Vec<String>, Error> {
    let ts = events.timestamp.unwrap_or(0);
    let db = format!(
        "{}_{}",
        events.market.exchange().short(),
        events.currency_pair()
    );
    let rows = events
        .events
        .into_iter()
        .map(|ev| <Vec<EventData>>::from(&ev))
        .flatten()
        .map(|mut event| {
            if event.ts == 0 {
                event.ts = ts;
            }
            serialze_cmd(&event, &db)
        })
        .collect();
    Ok(rows)
}

pub trait IntoTradesCsv {
    fn into_trades_csv(&self) -> String;
}

impl IntoTradesCsv for Vec<EventData> {
    fn into_trades_csv(&self) -> String {
        self.into_par_iter()
            .map(|row| row.to_trade_csv())
            .collect::<Vec<String>>()
            .join("\n")
    }
}

pub trait IntoCsv {
    fn into_csv(&self) -> String;
}

impl IntoCsv for [EventData] {
    fn into_csv(&self) -> String {
        data_rows_into_csv(&self)
    }
}

impl IntoCsv for Vec<EventData> {
    fn into_csv(&self) -> String {
        data_rows_into_csv(&self)
    }
}

fn data_rows_into_csv(vecs: &[EventData]) -> String {
    vecs.into_par_iter()
        .map(|row| row.to_csv())
        .collect::<Vec<String>>()
        .join("\n")
}