use std::collections::VecDeque;
use std::fs::{self, File};
use std::io::ErrorKind::InvalidData;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Instant;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use cxmr_currency::CurrencyPair;
use cxmr_exchanges::{Exchange, Market};
use cxmr_feeds::EventData;
use super::{serialize_dtf, BatchMetadata, Error, Flags, Metadata};
static MAGIC_VALUE: &[u8] = &[0x44, 0x54, 0x46, 0x90, 0x01];
const SYMBOL_LEN: usize = 20;
static SYMBOL_OFFSET: u64 = 5;
static LEN_OFFSET: u64 = 25;
pub static MAX_TS_OFFSET: u64 = 33;
pub static MAIN_OFFSET: u64 = 80;
#[derive(Debug)]
pub struct Contents {
pub path: PathBuf,
pub metadata: Metadata,
pub events: Option<Vec<EventData>>,
}
impl PartialOrd for Contents {
fn partial_cmp(&self, other: &Contents) -> Option<std::cmp::Ordering> {
self.metadata.min_ts.partial_cmp(&other.metadata.min_ts)
}
}
impl PartialEq for Contents {
fn eq(&self, other: &Contents) -> bool {
self.metadata.min_ts == other.metadata.min_ts
}
}
pub fn read_dtf<P: AsRef<Path>>(path: P) -> Result<Contents, Error> {
read_dtf_file(path, true)
}
pub fn read_dtf_file<P: AsRef<Path>>(path: P, contents: bool) -> Result<Contents, Error> {
let path = path.as_ref();
let (metadata, events) = if contents {
let (metadata, events) = read_file(path)?;
(metadata, Some(events))
} else {
let metadata = read_meta(path)?;
(metadata, None)
};
Ok(Contents {
path: path.to_path_buf(),
events,
metadata,
})
}
pub fn read_meta<P: AsRef<Path>>(fname: P) -> Result<Metadata, Error> {
let mut rdr = file_reader(fname)?;
read_meta_from_buf(&mut rdr)
}
pub fn read_file<P: AsRef<Path>>(fname: P) -> Result<(Metadata, Vec<EventData>), Error> {
read_file_rows(fname, false, None)
}
pub fn read_file_limit<P: AsRef<Path>>(
fname: P,
trades_only: bool,
limit: Option<i64>,
) -> Result<(Metadata, Vec<EventData>), Error> {
read_file_rows(fname, trades_only, limit)
}
pub fn read_file_range<P: AsRef<Path>>(
fname: P,
min_ts: u64,
max_ts: u64,
trades_only: bool,
) -> Result<Vec<EventData>, Error> {
let mut rdr = file_reader(fname)?;
range(&mut rdr, min_ts, max_ts, trades_only)
}
pub fn create_dtf<P: AsRef<Path>>(fname: P, symbol: &str, ups: &[EventData]) -> Result<(), Error> {
let mut wtr = file_writer(fname, true)?;
write_magic_value(&mut wtr)?;
write_symbol(&mut wtr, symbol)?;
write_metadata(&mut wtr, ups)?;
write_main(&mut wtr, ups)?;
wtr.flush()?;
Ok(())
}
pub fn append_dtf(fname: &Path, ups: &[EventData]) -> Result<(), Error> {
let (ups, new_max_ts, cur_len) = {
let mut rdr = file_reader(fname)?;
let old_max_ts = read_max_ts(&mut rdr)?;
let ups: Vec<EventData> = ups
.into_iter()
.filter(|up| up.ts > old_max_ts)
.cloned()
.collect();
if ups.is_empty() {
return Ok(());
}
let new_min_ts = ups[0].ts;
let new_max_ts = ups[ups.len() - 1].ts;
if new_min_ts <= old_max_ts {
panic!("Cannot append data!(not implemented)");
}
let cur_len = read_len(&mut rdr)?;
(ups, new_max_ts, cur_len)
};
let new_len = cur_len + ups.len() as u64;
let mut wtr = file_writer(fname, false)?;
write_len(&mut wtr, new_len)?;
write_max_ts(&mut wtr, new_max_ts)?;
if cur_len == 0 {
wtr.seek(SeekFrom::Start(MAIN_OFFSET))?;
} else {
wtr.seek(SeekFrom::End(0))?;
}
write_batches(&mut wtr, &ups)?;
wtr.flush()?;
Ok(())
}
fn read_file_rows<P: AsRef<Path>>(
fname: P,
only_trades: bool,
limit: Option<i64>,
) -> Result<(Metadata, Vec<EventData>), Error> {
let start = Instant::now();
let mut rdr = file_reader(fname.as_ref())?;
let rdr = &mut rdr;
let metadata = read_meta_from_buf(rdr)?;
rdr.seek(SeekFrom::Start(MAIN_OFFSET)).expect("SEEKING");
let mut limit = limit.unwrap_or(-1);
let capacity = if limit <= 0 {
metadata.events as usize
} else {
limit as usize
};
let mut rows: Vec<EventData> = Vec::with_capacity(capacity);
if metadata.events == 0 {
return Ok((metadata, rows));
}
while let Ok(is_ref) = rdr.read_u8() {
if is_ref != 0x1 {
break;
}
let meta = read_one_batch_meta(rdr)?;
let mut batch = Vec::with_capacity(meta.count as usize);
for _i in 0..meta.count {
let row = read_one_row(rdr, &meta)?;
if !only_trades || row.is_trade {
batch.push(row);
}
if limit == 1 {
break;
} else if limit != -1 {
limit -= 1;
}
}
batch.sort_by(|a, b| a.ts.cmp(&b.ts));
rows.extend(batch);
}
debug!(
"Finished reading {} rows from {:?} in {:?}",
metadata.events,
fname.as_ref(),
start.elapsed(),
);
Ok((metadata, rows))
}
pub fn read_missing_periods<P: AsRef<Path>>(
fname: P,
max_diff: u64,
) -> Result<(Metadata, Vec<(u64, u64)>), Error> {
let mut rdr = file_reader(fname)?;
let rdr = &mut rdr;
let metadata = read_meta_from_buf(rdr)?;
rdr.seek(SeekFrom::Start(MAIN_OFFSET)).expect("SEEKING");
let mut vres: Vec<(u64, u64)> = Vec::new();
if metadata.events == 0 {
return Ok((metadata, vres));
}
let mut first_book = false;
let mut ts_orders: u64 = 0;
let mut prev_ts: u64 = 0;
let mut last_ts: u64 = 0;
while let Ok(is_ref) = rdr.read_u8() {
if is_ref != 0x1 {
break;
}
let meta = read_one_batch_meta(rdr)?;
for _i in 0..meta.count {
let row = read_one_row(rdr, &meta)?;
if row.ts < last_ts {
continue;
}
if !row.is_trade && row.ts == last_ts {
ts_orders += 1;
continue;
}
if row.ts == last_ts && ts_orders == 40 || (row.ts - last_ts) > max_diff {
if !first_book {
first_book = true;
} else {
vres.push((row.ts, row.ts - prev_ts));
}
}
if row.ts != last_ts {
prev_ts = last_ts;
last_ts = row.ts;
ts_orders = 0;
}
}
}
Ok((metadata, vres))
}
pub fn decode_buffer(mut buf: &mut dyn Read) -> Vec<EventData> {
let mut v = Vec::new();
while let Ok(ups) = read_one_batch(&mut buf) {
v.extend(ups);
}
v
}
pub fn read_one_batch(rdr: &mut impl Read) -> Result<Vec<EventData>, Error> {
let r = rdr.read_u8()?;
let is_ref = r == 0x1;
if !is_ref {
Ok(vec![])
} else {
let meta = read_one_batch_meta(rdr)?;
read_one_batch_main(rdr, &meta)
}
}
pub fn read_one_batch_meta(rdr: &mut impl Read) -> Result<BatchMetadata, Error> {
let ref_ts = rdr.read_u64::<BigEndian>()?;
let ref_seq = rdr.read_u32::<BigEndian>()?;
let count = rdr.read_u16::<BigEndian>()?;
Ok(BatchMetadata {
ref_ts,
ref_seq,
count,
})
}
pub fn read_one_batch_main(
rdr: &mut impl Read,
meta: &BatchMetadata,
) -> Result<Vec<EventData>, Error> {
let mut v: Vec<EventData> = Vec::with_capacity(meta.count as usize);
for _i in 0..meta.count {
v.push(read_one_row(rdr, meta)?);
}
Ok(v)
}
pub fn read_one_batch_deque(
rdr: &mut impl Read,
meta: &BatchMetadata,
) -> Result<VecDeque<EventData>, Error> {
let mut v: VecDeque<EventData> = VecDeque::with_capacity(meta.count as usize);
for _i in 0..meta.count {
v.push_back(read_one_row(rdr, meta)?);
}
if v.len() >= 2 {
for i in 0..v.len() - 1 {
let a = v.get(i).unwrap();
let b = v.get(i + 1).unwrap();
if a.ts < b.ts {
v.swap(i, i + 1);
}
}
}
Ok(v)
}
pub fn read_row(rdr: &mut dyn Read, ref_ts: u64, ref_seq: u32) -> Result<EventData, Error> {
let ts = rdr.read_u16::<BigEndian>()? as u64 + ref_ts;
let _seq = rdr.read_u8()? as u32 + ref_seq;
let flags = rdr.read_u8()?;
let is_trade = (Flags::from_bits(flags).ok_or(InvalidData)? & Flags::FLAG_IS_TRADE).to_bool();
let is_bid = (Flags::from_bits(flags).ok_or(InvalidData)? & Flags::FLAG_IS_BID).to_bool();
let rate = rdr.read_f32::<BigEndian>()?;
let amount = rdr.read_f32::<BigEndian>()?;
Ok(EventData {
ts,
is_trade,
is_bid,
rate,
amount,
})
}
fn read_one_row(rdr: &mut dyn Read, meta: &BatchMetadata) -> Result<EventData, Error> {
read_row(rdr, meta.ref_ts, meta.ref_seq)
}
pub fn file_reader<P: AsRef<Path>>(fname: P) -> Result<BufReader<File>, Error> {
debug!("Opening DTF file {:?}", fname.as_ref());
let file = File::open(fname)?;
let mut rdr = BufReader::new(file);
if !read_magic_value(&mut rdr)? {
Err(Error::InvalidInput("incorrect magic value".to_owned()))
} else {
Ok(rdr)
}
}
pub fn read_first<T: BufRead + Seek>(rdr: &mut T) -> Result<EventData, Error> {
rdr.seek(SeekFrom::Start(MAIN_OFFSET)).expect("SEEKING");
let is_ref = rdr.read_u8()?;
if is_ref != 0x1 {
return Err(io::ErrorKind::InvalidData.into());
}
let meta = read_one_batch_meta(rdr)?;
read_one_row(rdr, &meta)
}
pub fn read_magic_value<T: BufRead + Seek>(rdr: &mut T) -> Result<bool, Error> {
rdr.seek(SeekFrom::Start(0))?;
let mut buf = vec![0u8; 5];
rdr.read_exact(&mut buf)?;
Ok(buf == MAGIC_VALUE)
}
pub fn read_len<T: BufRead + Seek>(rdr: &mut T) -> Result<u64, Error> {
rdr.seek(SeekFrom::Start(LEN_OFFSET))?;
rdr.read_u64::<BigEndian>().map_err(|e| e.into())
}
pub fn read_min_ts<T: BufRead + Seek>(rdr: &mut T) -> Result<u64, Error> {
Ok(read_first(rdr)?.ts)
}
pub fn read_max_ts<T: BufRead + Seek>(rdr: &mut T) -> Result<u64, Error> {
rdr.seek(SeekFrom::Start(MAX_TS_OFFSET))?;
rdr.read_u64::<BigEndian>().map_err(|e| e.into())
}
pub fn read_symbol<T: BufRead + Seek>(rdr: &mut T) -> Result<String, Error> {
rdr.seek(SeekFrom::Start(SYMBOL_OFFSET))?;
let mut buffer = [0; SYMBOL_LEN];
rdr.read_exact(&mut buffer)?;
let ret = ::std::str::from_utf8(&buffer)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "symbol read"))?
.trim()
.to_owned();
Ok(ret)
}
fn file_writer<P: AsRef<Path>>(fname: P, create: bool) -> Result<BufWriter<File>, Error> {
let new_file = if create {
File::create(fname)?
} else {
fs::OpenOptions::new().write(true).open(fname)?
};
Ok(BufWriter::new(new_file))
}
fn write_magic_value(wtr: &mut dyn Write) -> Result<usize, Error> {
wtr.write(MAGIC_VALUE).map_err(|e| e.into())
}
fn write_symbol(wtr: &mut dyn Write, symbol: &str) -> Result<usize, Error> {
if symbol.len() > SYMBOL_LEN {
return Err(Error::InvalidInput(format!(
"Symbol length is longer than {}",
SYMBOL_LEN
)));
}
let padded_symbol = format!("{:width$}", symbol, width = SYMBOL_LEN);
assert_eq!(padded_symbol.len(), SYMBOL_LEN);
wtr.write(padded_symbol.as_bytes()).map_err(|e| e.into())
}
fn write_len<T: Write + Seek>(wtr: &mut BufWriter<T>, len: u64) -> Result<(), Error> {
let _ = wtr.seek(SeekFrom::Start(LEN_OFFSET));
wtr.write_u64::<BigEndian>(len).map_err(|e| e.into())
}
fn write_max_ts<T: Write + Seek>(wtr: &mut BufWriter<T>, max_ts: u64) -> Result<(), Error> {
let _ = wtr.seek(SeekFrom::Start(MAX_TS_OFFSET));
wtr.write_u64::<BigEndian>(max_ts).map_err(|e| e.into())
}
fn write_metadata<T: Write + Seek>(wtr: &mut BufWriter<T>, ups: &[EventData]) -> Result<(), Error> {
write_len(wtr, ups.len() as u64)?;
write_max_ts(wtr, get_max_ts(ups))
}
fn write_reference(wtr: &mut dyn Write, ref_ts: u64, ref_seq: u32, len: u16) -> Result<(), Error> {
wtr.write_u8(true as u8)?;
wtr.write_u64::<BigEndian>(ref_ts)?;
wtr.write_u32::<BigEndian>(ref_seq)?;
wtr.write_u16::<BigEndian>(len).map_err(|e| e.into())
}
pub fn write_batches(mut wtr: &mut dyn Write, ups: &[EventData]) -> Result<(), Error> {
if ups.len() == 0 {
return Ok(());
}
let mut buf: Vec<u8> = Vec::new();
let mut ref_ts = ups[0].ts;
let mut count = 0;
for elem in ups.iter() {
if count != 0
&& (
elem.ts >= ref_ts + 0xFFFF
|| elem.ts < ref_ts
)
{
write_reference(&mut wtr, ref_ts, 0, count)?;
let _ = wtr.write(buf.as_slice());
buf.clear();
ref_ts = elem.ts;
count = 0;
}
let serialized = serialize_dtf(&elem, ref_ts);
let _ = buf.write(serialized.as_slice());
count += 1;
}
write_reference(&mut wtr, ref_ts, 0, count)?;
wtr.write_all(buf.as_slice())?;
Ok(())
}
fn write_main<T: Write + Seek>(wtr: &mut BufWriter<T>, ups: &[EventData]) -> Result<(), Error> {
wtr.seek(SeekFrom::Start(MAIN_OFFSET))?;
if !ups.is_empty() {
write_batches(wtr, ups)?;
}
Ok(())
}
fn get_max_ts(rows: &[EventData]) -> u64 {
rows.last().map(|row| row.ts).unwrap_or(0)
}
pub fn range<T: BufRead + Seek>(
rdr: &mut T,
min_ts: u64,
max_ts: u64,
trades_only: bool,
) -> Result<Vec<EventData>, Error> {
if min_ts > max_ts {
return Err(Error::InvalidTimeRange);
}
rdr.seek(SeekFrom::Start(MAIN_OFFSET)).expect("SEEKING");
let mut v: Vec<EventData> = Vec::new();
loop {
match rdr.read_u8() {
Ok(byte) => {
if byte != 0x1 {
return Ok(v);
}
}
Err(_e) => {
return Ok(v);
}
};
let current_meta = read_one_batch_meta(rdr)?;
let current_ref_ts = current_meta.ref_ts;
let current_count = current_meta.count;
let bytes_to_skip = current_count * 12 ;
rdr.seek(SeekFrom::Current(bytes_to_skip as i64))
.expect(&format!("Skipping {} rows", current_count));
match rdr.read_u8() {
Ok(byte) => {
if byte != 0x1 {
return Ok(v);
}
}
Err(_e) => {
return Ok(v);
}
};
let next_meta = read_one_batch_meta(rdr)?;
let next_ref_ts = next_meta.ref_ts;
if min_ts <= current_ref_ts && max_ts <= current_ref_ts {
return Ok(v);
} else
if (min_ts <= current_ref_ts && max_ts <= next_ref_ts)
|| (min_ts < next_ref_ts && max_ts >= next_ref_ts)
|| (min_ts > current_ref_ts && max_ts < next_ref_ts)
{
let bytes_to_scrollback = - (bytes_to_skip as i64) - 14 - 1 ;
rdr.seek(SeekFrom::Current(bytes_to_scrollback))
.expect("scrolling back");
let mut batch: Vec<EventData> = Vec::with_capacity(current_meta.count as usize);
for _i in 0..current_meta.count {
let row = read_one_row(rdr, ¤t_meta)?;
if trades_only && !row.is_trade {
continue;
}
if row.ts <= max_ts && row.ts >= min_ts {
batch.push(row);
}
}
v.extend(batch);
} else if min_ts >= next_ref_ts {
let bytes_to_scrollback = - 14 - 1 ;
rdr.seek(SeekFrom::Current(bytes_to_scrollback))
.expect("SKIPPING n ROWS");
} else {
println!(
"{}, {}, {}, {}",
min_ts, max_ts, current_ref_ts, next_ref_ts
);
panic!("Should have covered all the cases.");
}
}
}
pub fn read_meta_from_buf<T: BufRead + Seek>(mut rdr: &mut T) -> Result<Metadata, Error> {
let symbol = read_symbol(&mut rdr)?;
let events = read_len(&mut rdr)?;
let max_ts = read_max_ts(&mut rdr)?;
let min_ts = if events > 0 {
read_min_ts(&mut rdr)?
} else {
max_ts
};
let market = split_symbol(&symbol)?;
Ok(Metadata {
market,
events,
max_ts,
min_ts,
})
}
fn split_symbol(symbol: &str) -> Result<Market, Error> {
let symbol = symbol.split('.').next()?;
let index = symbol.find('_')?;
let exchange = Exchange::from_str(&symbol[..index])?;
let pair = CurrencyPair::from_str(&symbol[index + 1..])?;
Ok(Market::new(exchange, pair))
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn read_dtf_file() {
let (meta, records) = read_file("../../tests/data-002/bnc_ETH_PAX.dtf").unwrap();
assert_eq!(meta.market.as_str(), "bnc_ETH_PAX");
assert_eq!(meta.events, records.len() as u64);
assert_eq!(meta.min_ts, 1547996975975);
assert_eq!(meta.max_ts, 1548077909379);
}
}