use rust_decimal::Decimal;
use crate::core::types::{
OrderBook, OrderbookDelta as OrderbookDeltaData, PublicTrade, Ticker, TradeSide,
};
use crate::core::types::{Kline, StreamEvent};
#[derive(Debug, Clone, PartialEq)]
pub struct CanonicalTrade {
pub symbol: String,
pub price: Decimal,
pub quantity: Decimal,
pub side: TradeSide,
pub timestamp_ms: i64,
pub trade_id: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CanonicalTicker {
pub symbol: String,
pub last_price: Decimal,
pub bid_price: Option<Decimal>,
pub ask_price: Option<Decimal>,
pub volume_24h: Option<Decimal>,
pub timestamp_ms: i64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CanonicalLevel {
pub price: Decimal,
pub quantity: Decimal,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CanonicalOrderbook {
pub symbol: String,
pub bids: Vec<CanonicalLevel>,
pub asks: Vec<CanonicalLevel>,
pub sequence: Option<u64>,
pub timestamp_ms: i64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CanonicalOrderbookDelta {
pub symbol: String,
pub bid_updates: Vec<CanonicalLevel>,
pub ask_updates: Vec<CanonicalLevel>,
pub first_update_id: Option<u64>,
pub last_update_id: Option<u64>,
pub prev_update_id: Option<u64>,
pub timestamp_ms: i64,
}
#[derive(Debug, Clone, PartialEq)]
pub struct CanonicalKline {
pub symbol: String,
pub open: Decimal,
pub high: Decimal,
pub low: Decimal,
pub close: Decimal,
pub volume: Decimal,
pub open_time_ms: i64,
pub close_time_ms: i64,
pub interval: String,
}
#[derive(Debug, Clone, PartialEq)]
pub enum CanonicalEvent {
Trade(CanonicalTrade),
Ticker(CanonicalTicker),
Orderbook(CanonicalOrderbook),
OrderbookDelta(CanonicalOrderbookDelta),
Kline(CanonicalKline),
Other,
}
pub trait Canonicalize {
type Output;
fn canonicalize(&self) -> Option<Self::Output>;
}
pub fn normalize_ts_to_ms(ts: i64) -> i64 {
let abs = ts.unsigned_abs();
if abs > 10_000_000_000_000_000 {
ts / 1_000_000
} else if abs > 10_000_000_000_000 {
ts / 1_000
} else if abs > 10_000_000_000 {
ts
} else if abs > 0 {
ts * 1_000
} else {
0
}
}
#[inline]
fn f64_to_decimal(v: f64) -> Option<Decimal> {
Decimal::try_from(v).ok()
}
#[inline]
fn f64_to_decimal_opt(v: Option<f64>) -> Option<Decimal> {
v.and_then(|x| Decimal::try_from(x).ok())
}
fn to_canonical_level(price: f64, size: f64) -> Option<CanonicalLevel> {
Some(CanonicalLevel {
price: f64_to_decimal(price)?,
quantity: f64_to_decimal(size)?,
})
}
fn levels_from_book_levels(
levels: &[crate::core::types::OrderBookLevel],
) -> Vec<CanonicalLevel> {
levels
.iter()
.filter_map(|l| to_canonical_level(l.price, l.size))
.collect()
}
impl Canonicalize for PublicTrade {
type Output = CanonicalTrade;
fn canonicalize(&self) -> Option<CanonicalTrade> {
Some(CanonicalTrade {
symbol: self.symbol.clone(),
price: f64_to_decimal(self.price)?,
quantity: f64_to_decimal(self.quantity)?,
side: self.side,
timestamp_ms: normalize_ts_to_ms(self.timestamp),
trade_id: Some(self.id.clone()),
})
}
}
impl Canonicalize for Ticker {
type Output = CanonicalTicker;
fn canonicalize(&self) -> Option<CanonicalTicker> {
Some(CanonicalTicker {
symbol: self.symbol.clone(),
last_price: f64_to_decimal(self.last_price)?,
bid_price: f64_to_decimal_opt(self.bid_price),
ask_price: f64_to_decimal_opt(self.ask_price),
volume_24h: f64_to_decimal_opt(self.volume_24h),
timestamp_ms: normalize_ts_to_ms(self.timestamp),
})
}
}
impl Canonicalize for OrderBook {
type Output = CanonicalOrderbook;
fn canonicalize(&self) -> Option<CanonicalOrderbook> {
let mut bids = levels_from_book_levels(&self.bids);
let mut asks = levels_from_book_levels(&self.asks);
bids.sort_by(|a, b| b.price.cmp(&a.price)); asks.sort_by(|a, b| a.price.cmp(&b.price));
let sequence = self
.sequence
.as_deref()
.and_then(|s| s.parse::<u64>().ok())
.or(self.last_update_id);
Some(CanonicalOrderbook {
symbol: String::new(),
bids,
asks,
sequence,
timestamp_ms: normalize_ts_to_ms(self.timestamp),
})
}
}
impl Canonicalize for OrderbookDeltaData {
type Output = CanonicalOrderbookDelta;
fn canonicalize(&self) -> Option<CanonicalOrderbookDelta> {
Some(CanonicalOrderbookDelta {
symbol: String::new(),
bid_updates: levels_from_book_levels(&self.bids),
ask_updates: levels_from_book_levels(&self.asks),
first_update_id: self.first_update_id,
last_update_id: self.last_update_id,
prev_update_id: self.prev_update_id,
timestamp_ms: normalize_ts_to_ms(self.timestamp),
})
}
}
impl Canonicalize for Kline {
type Output = CanonicalKline;
fn canonicalize(&self) -> Option<CanonicalKline> {
Some(CanonicalKline {
symbol: String::new(),
open: f64_to_decimal(self.open)?,
high: f64_to_decimal(self.high)?,
low: f64_to_decimal(self.low)?,
close: f64_to_decimal(self.close)?,
volume: f64_to_decimal(self.volume)?,
open_time_ms: normalize_ts_to_ms(self.open_time),
close_time_ms: self
.close_time
.map(normalize_ts_to_ms)
.unwrap_or_else(|| normalize_ts_to_ms(self.open_time)),
interval: String::new(),
})
}
}
impl Canonicalize for StreamEvent {
type Output = CanonicalEvent;
fn canonicalize(&self) -> Option<CanonicalEvent> {
match self {
StreamEvent::Trade(t) => t.canonicalize().map(CanonicalEvent::Trade),
StreamEvent::Ticker(t) => t.canonicalize().map(CanonicalEvent::Ticker),
StreamEvent::OrderbookSnapshot(ob) => {
ob.canonicalize().map(CanonicalEvent::Orderbook)
}
StreamEvent::OrderbookDelta(delta) => {
delta.canonicalize().map(CanonicalEvent::OrderbookDelta)
}
StreamEvent::Kline(k) => {
k.canonicalize().map(CanonicalEvent::Kline)
}
StreamEvent::MarkPriceKline { kline, .. }
| StreamEvent::IndexPriceKline { kline, .. }
| StreamEvent::PremiumIndexKline { kline, .. } => {
let _ = kline;
Some(CanonicalEvent::Other)
}
_ => Some(CanonicalEvent::Other),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rust_decimal::prelude::FromStr;
#[test]
fn timestamp_seconds_to_ms() {
assert_eq!(normalize_ts_to_ms(1_700_000_000), 1_700_000_000_000);
}
#[test]
fn timestamp_ms_identity() {
assert_eq!(normalize_ts_to_ms(1_700_000_000_000), 1_700_000_000_000);
}
#[test]
fn timestamp_us_to_ms() {
assert_eq!(normalize_ts_to_ms(1_700_000_000_000_000), 1_700_000_000_000);
}
#[test]
fn timestamp_ns_to_ms() {
assert_eq!(
normalize_ts_to_ms(1_700_000_000_000_000_000),
1_700_000_000_000
);
}
#[test]
fn timestamp_zero() {
assert_eq!(normalize_ts_to_ms(0), 0);
}
#[test]
fn trade_canonicalize_basic() {
let trade = PublicTrade {
id: "12345".to_string(),
symbol: "BTCUSDT".to_string(),
price: 65432.1,
quantity: 0.5,
side: TradeSide::Buy,
timestamp: 1_700_000_000_000,
};
let c = trade.canonicalize().expect("should canonicalize");
assert_eq!(c.symbol, "BTCUSDT");
assert_eq!(c.price, Decimal::try_from(65432.1_f64).unwrap());
assert_eq!(c.quantity, Decimal::try_from(0.5_f64).unwrap());
assert_eq!(c.side, TradeSide::Buy);
assert_eq!(c.timestamp_ms, 1_700_000_000_000);
assert_eq!(c.trade_id, Some("12345".to_string()));
}
#[test]
fn trade_canonicalize_sell_side() {
let trade = PublicTrade {
id: "99".to_string(),
symbol: "ETHUSDT".to_string(),
price: 3200.0,
quantity: 1.0,
side: TradeSide::Sell,
timestamp: 1_700_000_001_000,
};
let c = trade.canonicalize().expect("should canonicalize");
assert_eq!(c.side, TradeSide::Sell);
}
#[test]
fn ticker_canonicalize_missing_bid_ask() {
let ticker = Ticker {
symbol: "SOLUSDT".to_string(),
last_price: 180.0,
bid_price: None,
ask_price: None,
high_24h: None,
low_24h: None,
volume_24h: None,
quote_volume_24h: None,
price_change_24h: None,
price_change_percent_24h: None,
timestamp: 1_700_000_000_000,
};
let c = ticker.canonicalize().expect("should canonicalize");
assert_eq!(c.symbol, "SOLUSDT");
assert_eq!(c.last_price, Decimal::try_from(180.0_f64).unwrap());
assert!(c.bid_price.is_none());
assert!(c.ask_price.is_none());
assert!(c.volume_24h.is_none());
}
#[test]
fn ticker_canonicalize_with_bid_ask() {
let ticker = Ticker {
symbol: "BTCUSDT".to_string(),
last_price: 65000.0,
bid_price: Some(64999.0),
ask_price: Some(65001.0),
high_24h: None,
low_24h: None,
volume_24h: Some(1234.5),
quote_volume_24h: None,
price_change_24h: None,
price_change_percent_24h: None,
timestamp: 1_700_000_000_000,
};
let c = ticker.canonicalize().expect("should canonicalize");
assert!(c.bid_price.is_some());
assert!(c.ask_price.is_some());
assert!(c.volume_24h.is_some());
}
#[test]
fn kline_canonicalize_basic() {
let kline = Kline {
open_time: 1_700_000_000_000,
open: 64000.0,
high: 65000.0,
low: 63500.0,
close: 64800.0,
volume: 123.456,
quote_volume: None,
close_time: Some(1_700_000_059_999),
trades: None,
};
let c = kline.canonicalize().expect("should canonicalize");
assert_eq!(c.open, Decimal::try_from(64000.0_f64).unwrap());
assert_eq!(c.high, Decimal::try_from(65000.0_f64).unwrap());
assert_eq!(c.low, Decimal::try_from(63500.0_f64).unwrap());
assert_eq!(c.close, Decimal::try_from(64800.0_f64).unwrap());
assert_eq!(c.volume, Decimal::try_from(123.456_f64).unwrap());
assert_eq!(c.open_time_ms, 1_700_000_000_000);
assert_eq!(c.close_time_ms, 1_700_000_059_999);
}
#[test]
fn orderbook_canonical_sort_invariant() {
use crate::core::types::OrderBookLevel;
let ob = OrderBook {
bids: vec![
OrderBookLevel::new(100.0, 1.0),
OrderBookLevel::new(102.0, 0.5),
OrderBookLevel::new(101.0, 2.0),
],
asks: vec![
OrderBookLevel::new(105.0, 1.0),
OrderBookLevel::new(103.0, 3.0),
OrderBookLevel::new(104.0, 2.0),
],
timestamp: 1_700_000_000_000,
sequence: None,
last_update_id: None,
first_update_id: None,
prev_update_id: None,
event_time: None,
transaction_time: None,
checksum: None,
};
let c = ob.canonicalize().expect("should canonicalize");
assert_eq!(c.bids[0].price, Decimal::from_str("102").unwrap());
assert_eq!(c.bids[1].price, Decimal::from_str("101").unwrap());
assert_eq!(c.bids[2].price, Decimal::from_str("100").unwrap());
assert_eq!(c.asks[0].price, Decimal::from_str("103").unwrap());
assert_eq!(c.asks[1].price, Decimal::from_str("104").unwrap());
assert_eq!(c.asks[2].price, Decimal::from_str("105").unwrap());
}
#[test]
fn stream_event_trade_canonicalize() {
let event = StreamEvent::Trade(PublicTrade {
id: "1".to_string(),
symbol: "BTCUSDT".to_string(),
price: 65000.0,
quantity: 0.1,
side: TradeSide::Buy,
timestamp: 1_700_000_000_000,
});
match event.canonicalize() {
Some(CanonicalEvent::Trade(t)) => {
assert_eq!(t.symbol, "BTCUSDT");
}
other => panic!("expected CanonicalEvent::Trade, got {:?}", other),
}
}
#[test]
fn stream_event_ticker_canonicalize() {
let event = StreamEvent::Ticker(Ticker {
symbol: "ETHUSDT".to_string(),
last_price: 3000.0,
bid_price: None,
ask_price: None,
high_24h: None,
low_24h: None,
volume_24h: None,
quote_volume_24h: None,
price_change_24h: None,
price_change_percent_24h: None,
timestamp: 1_700_000_000_000,
});
match event.canonicalize() {
Some(CanonicalEvent::Ticker(t)) => assert_eq!(t.symbol, "ETHUSDT"),
other => panic!("expected CanonicalEvent::Ticker, got {:?}", other),
}
}
#[test]
fn stream_event_other_canonicalize() {
let event = StreamEvent::FundingRate {
symbol: "BTCUSDT".to_string(),
rate: 0.0001,
next_funding_time: None,
timestamp: 1_700_000_000_000,
};
assert!(matches!(event.canonicalize(), Some(CanonicalEvent::Other)));
}
}