use crate::books::Level;
use crate::subscription::{book::OrderBookL1, quote::Quote, trade::PublicTrade};
use chrono::{DateTime, TimeZone, Utc};
use databento::dbn::{Mbp1Msg, TradeMsg};
use rust_decimal::Decimal;
use rustrade_instrument::Side;
use smol_str::format_smolstr;
const UNDEF_PRICE: i64 = i64::MAX;
const UNDEF_SIZE: u32 = u32::MAX;
pub fn dbn_trade_to_public_trade(
trade: &TradeMsg,
) -> Result<(DateTime<Utc>, PublicTrade), &'static str> {
if trade.price == UNDEF_PRICE {
return Err("undefined price");
}
let price = Decimal::new(trade.price, 9);
let amount = Decimal::from(trade.size);
let time_exchange = nanos_to_datetime(trade.hd.ts_event)?;
#[allow(clippy::cast_sign_loss)]
let side = match trade.side as u8 {
b'A' => Some(Side::Sell),
b'B' => Some(Side::Buy),
b'N' => None,
_ => return Err("unknown trade side value"),
};
Ok((
time_exchange,
PublicTrade {
id: format_smolstr!("{}", trade.sequence),
price,
amount,
side,
},
))
}
pub fn dbn_mbp1_to_quote(msg: &Mbp1Msg) -> Result<(DateTime<Utc>, Quote), &'static str> {
let [level] = &msg.levels;
if level.bid_px == UNDEF_PRICE || level.ask_px == UNDEF_PRICE {
return Err("undefined bid or ask price");
}
let bid_price = Decimal::new(level.bid_px, 9);
let ask_price = Decimal::new(level.ask_px, 9);
let bid_amount = if level.bid_sz == UNDEF_SIZE {
Decimal::ZERO
} else {
Decimal::from(level.bid_sz)
};
let ask_amount = if level.ask_sz == UNDEF_SIZE {
Decimal::ZERO
} else {
Decimal::from(level.ask_sz)
};
let time_exchange = nanos_to_datetime(msg.hd.ts_event)?;
Ok((
time_exchange,
Quote {
bid_price,
bid_amount,
ask_price,
ask_amount,
},
))
}
pub fn dbn_mbp1_to_orderbook_l1(
msg: &Mbp1Msg,
) -> Result<(DateTime<Utc>, OrderBookL1), &'static str> {
let [level] = &msg.levels;
let time_exchange = nanos_to_datetime(msg.hd.ts_event)?;
let best_bid = if level.bid_px != UNDEF_PRICE {
let price = Decimal::new(level.bid_px, 9);
let amount = if level.bid_sz == UNDEF_SIZE {
Decimal::ZERO
} else {
Decimal::from(level.bid_sz)
};
Some(Level { price, amount })
} else {
None
};
let best_ask = if level.ask_px != UNDEF_PRICE {
let price = Decimal::new(level.ask_px, 9);
let amount = if level.ask_sz == UNDEF_SIZE {
Decimal::ZERO
} else {
Decimal::from(level.ask_sz)
};
Some(Level { price, amount })
} else {
None
};
Ok((
time_exchange,
OrderBookL1 {
last_update_time: time_exchange,
best_bid,
best_ask,
},
))
}
fn nanos_to_datetime(nanos: u64) -> Result<DateTime<Utc>, &'static str> {
let secs = i64::try_from(nanos / 1_000_000_000).map_err(|_| "timestamp out of i64 range")?;
let nsecs = (nanos % 1_000_000_000) as u32;
Utc.timestamp_opt(secs, nsecs)
.single()
.ok_or("invalid timestamp")
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn test_trade_conversion() {
use rust_decimal_macros::dec;
let mut trade = TradeMsg::default();
trade.hd.ts_event = 1_700_000_000_000_000_000;
trade.price = 150_250_000_000;
trade.size = 100;
trade.side = b'B' as i8;
trade.sequence = 12345;
let (time, public_trade) = dbn_trade_to_public_trade(&trade).unwrap();
assert_eq!(public_trade.price, dec!(150.25));
assert_eq!(public_trade.amount, dec!(100));
assert_eq!(public_trade.side, Some(Side::Buy));
assert_eq!(public_trade.id.as_str(), "12345");
assert_eq!(time.timestamp(), 1_700_000_000);
}
#[test]
fn test_trade_sell_side() {
let mut trade = TradeMsg::default();
trade.hd.ts_event = 1_700_000_000_000_000_000;
trade.price = 100_000_000_000;
trade.size = 50;
trade.side = b'A' as i8;
trade.sequence = 1;
let (_, public_trade) = dbn_trade_to_public_trade(&trade).unwrap();
assert_eq!(public_trade.side, Some(Side::Sell));
}
#[test]
fn test_trade_no_side() {
let mut trade = TradeMsg::default();
trade.hd.ts_event = 1_700_000_000_000_000_000;
trade.price = 100_000_000_000;
trade.size = 10;
trade.side = b'N' as i8;
trade.sequence = 1;
let (_, public_trade) = dbn_trade_to_public_trade(&trade).unwrap();
assert!(public_trade.side.is_none());
}
#[test]
fn test_trade_invalid_side_rejected() {
let mut trade = TradeMsg::default();
trade.hd.ts_event = 1_700_000_000_000_000_000;
trade.price = 100_000_000_000;
trade.size = 10;
trade.side = b'X' as i8;
assert!(dbn_trade_to_public_trade(&trade).is_err());
}
#[test]
fn test_trade_undefined_price() {
let mut trade = TradeMsg::default();
trade.hd.ts_event = 1_700_000_000_000_000_000;
trade.price = i64::MAX;
assert!(dbn_trade_to_public_trade(&trade).is_err());
}
#[test]
fn test_quote_conversion() {
use rust_decimal_macros::dec;
let mut msg = Mbp1Msg::default();
msg.hd.ts_event = 1_700_000_000_000_000_000;
msg.levels[0].bid_px = 100_000_000_000;
msg.levels[0].ask_px = 100_500_000_000;
msg.levels[0].bid_sz = 1000;
msg.levels[0].ask_sz = 500;
let (time, quote) = dbn_mbp1_to_quote(&msg).unwrap();
assert_eq!(quote.bid_price, dec!(100));
assert_eq!(quote.ask_price, dec!(100.5));
assert_eq!(quote.bid_amount, dec!(1000));
assert_eq!(quote.ask_amount, dec!(500));
assert_eq!(time.timestamp(), 1_700_000_000);
}
#[test]
fn test_orderbook_l1_conversion() {
let mut msg = Mbp1Msg::default();
msg.hd.ts_event = 1_700_000_000_000_000_000;
msg.levels[0].bid_px = 100_000_000_000;
msg.levels[0].ask_px = 100_500_000_000;
msg.levels[0].bid_sz = 1000;
msg.levels[0].ask_sz = 500;
let (time, l1) = dbn_mbp1_to_orderbook_l1(&msg).unwrap();
let best_bid = l1.best_bid.unwrap();
let best_ask = l1.best_ask.unwrap();
assert_eq!(best_bid.price, Decimal::from(100));
assert_eq!(best_ask.price, Decimal::from_str("100.5").unwrap());
assert_eq!(best_bid.amount, Decimal::from(1000));
assert_eq!(best_ask.amount, Decimal::from(500));
assert_eq!(time.timestamp(), 1_700_000_000);
assert_eq!(l1.last_update_time.timestamp(), 1_700_000_000);
}
#[test]
fn test_orderbook_l1_undefined_prices() {
let mut msg = Mbp1Msg::default();
msg.hd.ts_event = 1_700_000_000_000_000_000;
msg.levels[0].bid_px = i64::MAX; msg.levels[0].ask_px = i64::MAX; msg.levels[0].bid_sz = 1000;
msg.levels[0].ask_sz = 500;
let (_, l1) = dbn_mbp1_to_orderbook_l1(&msg).unwrap();
assert!(l1.best_bid.is_none());
assert!(l1.best_ask.is_none());
}
}