use memmap2::Mmap;
use smol_str::SmolStr;
use std::fs::File;
use std::path::Path;
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct Tick {
pub timestamp_nanos: i64,
pub price: i64,
pub volume: i64,
pub flags: u64,
}
impl Tick {
pub fn from_trade(timestamp: i64, price: f64, volume: f64) -> Self {
Tick {
timestamp_nanos: timestamp,
price: price as i64,
volume: volume as i64,
flags: 0,
}
}
pub fn from_quote(
timestamp: i64,
_bid: f64,
_ask: f64,
_bid_size: f64,
_ask_size: f64,
) -> Self {
Tick {
timestamp_nanos: timestamp,
price: ((_bid + _ask) / 2.0) as i64,
volume: (_bid_size + _ask_size) as i64,
flags: 1,
}
}
}
pub enum DuplicatePolicy {
First,
Last,
All,
Error,
}
pub struct TickBuffer {
data: Vec<Tick>,
symbol: SmolStr,
price_scale: u8,
volume_scale: u8,
allow_unordered: bool,
duplicate_policy: DuplicatePolicy,
}
impl TickBuffer {
pub fn new(symbol: impl Into<SmolStr>) -> Self {
TickBuffer {
data: Vec::new(),
symbol: symbol.into(),
price_scale: 8,
volume_scale: 0,
allow_unordered: false,
duplicate_policy: DuplicatePolicy::First,
}
}
pub fn push(&mut self, tick: Tick) -> Result<(), super::Error> {
if !self.allow_unordered {
if let Some(last) = self.data.last()
&& tick.timestamp_nanos < last.timestamp_nanos
{
return Err(super::Error::OutOfOrderTick {
current: tick.timestamp_nanos,
previous: last.timestamp_nanos,
});
}
} else {
let pos = self
.data
.binary_search_by_key(&tick.timestamp_nanos, |t| t.timestamp_nanos)
.unwrap_or_else(|e| e);
self.data.insert(pos, tick);
return Ok(());
}
self.data.push(tick);
Ok(())
}
pub fn symbol(&self) -> &SmolStr {
&self.symbol
}
pub fn as_slice(&self) -> &[Tick] {
&self.data
}
pub fn into_inner(self) -> Vec<Tick> {
self.data
}
pub fn with_price_scale(mut self, scale: u8) -> Self {
self.price_scale = scale;
self
}
pub fn with_volume_scale(mut self, scale: u8) -> Self {
self.volume_scale = scale;
self
}
pub fn with_allow_unordered(mut self, allow: bool) -> Self {
self.allow_unordered = allow;
self
}
pub fn with_duplicate_policy(mut self, policy: DuplicatePolicy) -> Self {
self.duplicate_policy = policy;
self
}
}
pub struct MmapTickReader {
mmap: Mmap,
cursor: usize,
}
impl MmapTickReader {
pub fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
let file = File::open(path.as_ref())?;
let mmap = unsafe { Mmap::map(&file)? };
Ok(MmapTickReader { mmap, cursor: 0 })
}
pub fn remaining(&self) -> usize {
let remaining_bytes = self.mmap.len().saturating_sub(self.cursor);
remaining_bytes / 32
}
}
impl Iterator for MmapTickReader {
type Item = Tick;
fn next(&mut self) -> Option<Self::Item> {
if self.cursor + 32 > self.mmap.len() {
return None;
}
let tick = unsafe { std::ptr::read(self.mmap[self.cursor..].as_ptr() as *const Tick) };
self.cursor += 32;
Some(tick)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tick_from_trade() {
let t = Tick::from_trade(1_000_000, 100.5, 500.0);
assert_eq!(t.timestamp_nanos, 1_000_000);
assert_eq!(t.price, 100);
assert_eq!(t.volume, 500);
}
#[test]
fn test_tick_from_quote() {
let t = Tick::from_quote(1_000_000, 100.0, 101.0, 1000.0, 500.0);
assert_eq!(t.timestamp_nanos, 1_000_000);
assert_eq!(t.price, 100);
assert_eq!(t.flags, 1);
}
#[test]
fn test_tick_buffer_push_ordered() {
let mut buf = TickBuffer::new("AAPL");
buf.push(Tick::from_trade(0, 100.0, 1000.0)).unwrap();
buf.push(Tick::from_trade(1_000_000_000, 101.0, 500.0))
.unwrap();
assert_eq!(buf.as_slice().len(), 2);
}
#[test]
fn test_tick_buffer_out_of_order_rejected() {
let mut buf = TickBuffer::new("AAPL");
buf.push(Tick::from_trade(1_000_000_000, 100.0, 1000.0))
.unwrap();
let err = buf.push(Tick::from_trade(0, 101.0, 500.0));
assert!(err.is_err());
}
#[test]
fn test_tick_buffer_allow_unordered() {
let mut buf = TickBuffer::new("AAPL").with_allow_unordered(true);
buf.push(Tick::from_trade(1_000_000_000, 100.0, 1000.0))
.unwrap();
buf.push(Tick::from_trade(0, 101.0, 500.0)).unwrap();
assert_eq!(buf.as_slice()[0].timestamp_nanos, 0);
assert_eq!(buf.as_slice()[1].timestamp_nanos, 1_000_000_000);
}
}