use chrono::Utc;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::fmt;
use tokio::sync::broadcast;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum MarketDataEvent {
Trade(TradeEvent),
OrderBook(OrderBookEvent),
Ticker(TickerEvent),
Liquidation(LiquidationEvent),
FundingRate(FundingRateEvent),
Kline(KlineEvent),
}
impl MarketDataEvent {
pub fn symbol(&self) -> &Symbol {
match self {
MarketDataEvent::Trade(e) => &e.symbol,
MarketDataEvent::OrderBook(e) => &e.symbol,
MarketDataEvent::Ticker(e) => &e.symbol,
MarketDataEvent::Liquidation(e) => &e.symbol,
MarketDataEvent::FundingRate(e) => &e.symbol,
MarketDataEvent::Kline(e) => &e.symbol,
}
}
pub fn exchange(&self) -> Exchange {
match self {
MarketDataEvent::Trade(e) => e.exchange,
MarketDataEvent::OrderBook(e) => e.exchange,
MarketDataEvent::Ticker(e) => e.exchange,
MarketDataEvent::Liquidation(e) => e.exchange,
MarketDataEvent::FundingRate(e) => e.exchange,
MarketDataEvent::Kline(e) => e.exchange,
}
}
pub fn timestamp(&self) -> i64 {
match self {
MarketDataEvent::Trade(e) => e.timestamp,
MarketDataEvent::OrderBook(e) => e.timestamp,
MarketDataEvent::Ticker(e) => e.timestamp,
MarketDataEvent::Liquidation(e) => e.timestamp,
MarketDataEvent::FundingRate(e) => e.timestamp,
MarketDataEvent::Kline(e) => e.close_time,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TradeEvent {
pub exchange: Exchange,
pub symbol: Symbol,
pub timestamp: i64,
pub received_at: i64,
pub price: Decimal,
pub quantity: Decimal,
pub side: Side,
pub trade_id: String,
pub buyer_is_maker: Option<bool>,
}
impl TradeEvent {
pub fn new(
exchange: Exchange,
symbol: Symbol,
timestamp: i64,
price: Decimal,
quantity: Decimal,
side: Side,
trade_id: String,
) -> Self {
Self {
exchange,
symbol,
timestamp,
received_at: Utc::now().timestamp_micros(),
price,
quantity,
side,
trade_id,
buyer_is_maker: None,
}
}
pub fn notional(&self) -> Decimal {
self.price * self.quantity
}
pub fn latency_micros(&self) -> i64 {
self.received_at - self.timestamp
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrderBookEvent {
pub exchange: Exchange,
pub symbol: Symbol,
pub timestamp: i64,
pub sequence: u64,
pub is_snapshot: bool,
pub bids: Vec<PriceLevel>,
pub asks: Vec<PriceLevel>,
}
impl OrderBookEvent {
pub fn best_bid(&self) -> Option<&PriceLevel> {
self.bids.first()
}
pub fn best_ask(&self) -> Option<&PriceLevel> {
self.asks.first()
}
pub fn mid_price(&self) -> Option<Decimal> {
match (self.best_bid(), self.best_ask()) {
(Some(bid), Some(ask)) => Some((bid.price + ask.price) / Decimal::from(2)),
_ => None,
}
}
pub fn spread(&self) -> Option<Decimal> {
match (self.best_bid(), self.best_ask()) {
(Some(bid), Some(ask)) => Some(ask.price - bid.price),
_ => None,
}
}
pub fn spread_bps(&self) -> Option<Decimal> {
match (self.mid_price(), self.spread()) {
(Some(mid), Some(spread)) if mid > Decimal::ZERO => {
Some((spread / mid) * Decimal::from(10000))
}
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriceLevel {
pub price: Decimal,
pub quantity: Decimal,
}
impl PriceLevel {
pub fn new(price: Decimal, quantity: Decimal) -> Self {
Self { price, quantity }
}
pub fn notional(&self) -> Decimal {
self.price * self.quantity
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TickerEvent {
pub exchange: Exchange,
pub symbol: Symbol,
pub timestamp: i64,
pub last_price: Decimal,
pub best_bid: Option<Decimal>,
pub best_ask: Option<Decimal>,
pub volume_24h: Decimal,
pub quote_volume_24h: Decimal,
pub price_change_24h: Option<Decimal>,
pub price_change_pct_24h: Option<Decimal>,
pub high_24h: Option<Decimal>,
pub low_24h: Option<Decimal>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiquidationEvent {
pub exchange: Exchange,
pub symbol: Symbol,
pub timestamp: i64,
pub side: Side,
pub price: Decimal,
pub quantity: Decimal,
pub order_id: Option<String>,
}
impl LiquidationEvent {
pub fn notional(&self) -> Decimal {
self.price * self.quantity
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FundingRateEvent {
pub exchange: Exchange,
pub symbol: Symbol,
pub timestamp: i64,
pub rate: Decimal,
pub next_funding_time: i64,
}
impl FundingRateEvent {
pub fn annualized_rate(&self) -> Decimal {
self.rate * Decimal::from(365 * 3) }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KlineEvent {
pub exchange: Exchange,
pub symbol: Symbol,
pub interval: String,
pub open_time: i64,
pub close_time: i64,
pub open: Decimal,
pub high: Decimal,
pub low: Decimal,
pub close: Decimal,
pub volume: Decimal,
pub quote_volume: Option<Decimal>,
pub trades: Option<u64>,
pub is_closed: bool,
}
impl KlineEvent {
pub fn typical_price(&self) -> Decimal {
(self.high + self.low + self.close) / Decimal::from(3)
}
pub fn price_change(&self) -> Decimal {
self.close - self.open
}
pub fn price_change_pct(&self) -> Decimal {
if self.open > Decimal::ZERO {
((self.close - self.open) / self.open) * Decimal::from(100)
} else {
Decimal::ZERO
}
}
pub fn range(&self) -> Decimal {
self.high - self.low
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Symbol {
pub base: String,
pub quote: String,
pub market_type: MarketType,
}
impl Symbol {
pub fn new(base: impl Into<String>, quote: impl Into<String>) -> Self {
Self {
base: base.into(),
quote: quote.into(),
market_type: MarketType::Spot,
}
}
pub fn new_with_type(
base: impl Into<String>,
quote: impl Into<String>,
market_type: MarketType,
) -> Self {
Self {
base: base.into(),
quote: quote.into(),
market_type,
}
}
pub fn from_exchange_format(s: &str, exchange: Exchange) -> Option<Self> {
match exchange {
Exchange::Binance => {
if s.ends_with("USDT") {
let base = s.trim_end_matches("USDT");
Some(Symbol::new(base, "USDT"))
} else if s.ends_with("BUSD") {
let base = s.trim_end_matches("BUSD");
Some(Symbol::new(base, "BUSD"))
} else {
None
}
}
Exchange::Bybit => {
if s.ends_with("USDT") {
let base = s.trim_end_matches("USDT");
Some(Symbol::new(base, "USDT"))
} else {
None
}
}
Exchange::Coinbase => {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() == 2 {
Some(Symbol::new(parts[0], parts[1]))
} else {
None
}
}
Exchange::Kraken => {
if s.contains('/') {
let parts: Vec<&str> = s.split('/').collect();
if parts.len() == 2 {
Some(Symbol::new(parts[0], parts[1]))
} else {
None
}
} else {
None }
}
Exchange::Okx => {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() == 2 {
Some(Symbol::new(parts[0], parts[1]))
} else {
None
}
}
Exchange::Kucoin => {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() == 2 {
Some(Symbol::new(parts[0], parts[1]))
} else {
None
}
}
}
}
pub fn to_exchange_format(&self, exchange: Exchange) -> String {
match exchange {
Exchange::Binance => {
format!("{}{}", self.base.to_uppercase(), self.quote.to_uppercase())
}
Exchange::Bybit => format!("{}{}", self.base.to_uppercase(), self.quote.to_uppercase()),
Exchange::Coinbase => {
format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase())
}
Exchange::Kraken => {
format!("{}/{}", self.base.to_uppercase(), self.quote.to_uppercase())
}
Exchange::Okx => format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase()),
Exchange::Kucoin => {
format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase())
}
}
}
}
impl fmt::Display for Symbol {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.base, self.quote)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Exchange {
Binance,
Bybit,
Coinbase,
Kraken,
Okx,
Kucoin,
}
impl fmt::Display for Exchange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Exchange::Binance => write!(f, "binance"),
Exchange::Bybit => write!(f, "bybit"),
Exchange::Coinbase => write!(f, "coinbase"),
Exchange::Kraken => write!(f, "kraken"),
Exchange::Okx => write!(f, "okx"),
Exchange::Kucoin => write!(f, "kucoin"),
}
}
}
impl std::str::FromStr for Exchange {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"binance" => Ok(Exchange::Binance),
"bybit" => Ok(Exchange::Bybit),
"coinbase" => Ok(Exchange::Coinbase),
"kraken" => Ok(Exchange::Kraken),
"okx" => Ok(Exchange::Okx),
"kucoin" => Ok(Exchange::Kucoin),
_ => Err(format!("Unknown exchange: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum MarketType {
Spot,
Perpetual,
Futures,
Options,
}
impl fmt::Display for MarketType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MarketType::Spot => write!(f, "spot"),
MarketType::Perpetual => write!(f, "perpetual"),
MarketType::Futures => write!(f, "futures"),
MarketType::Options => write!(f, "options"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Side {
Buy,
Sell,
}
impl fmt::Display for Side {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Side::Buy => write!(f, "buy"),
Side::Sell => write!(f, "sell"),
}
}
}
impl std::str::FromStr for Side {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"buy" | "bid" | "long" => Ok(Side::Buy),
"sell" | "ask" | "short" => Ok(Side::Sell),
_ => Err(format!("Unknown side: {}", s)),
}
}
}
pub struct MarketDataBus {
tx: broadcast::Sender<MarketDataEvent>,
capacity: usize,
}
impl MarketDataBus {
pub fn new(capacity: usize) -> Self {
let (tx, _rx) = broadcast::channel(capacity);
Self { tx, capacity }
}
pub fn publish(&self, event: MarketDataEvent) -> crate::Result<usize> {
let receivers = self.tx.send(event)?;
Ok(receivers)
}
pub fn subscribe(&self) -> broadcast::Receiver<MarketDataEvent> {
self.tx.subscribe()
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl Default for MarketDataBus {
fn default() -> Self {
Self::new(5000)
}
}
impl Clone for MarketDataBus {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
capacity: self.capacity,
}
}
}
#[cfg(test)]
mod market_data_bus_tests {
use super::*;
#[test]
fn test_market_data_bus() {
let bus = MarketDataBus::new(100);
assert_eq!(bus.subscriber_count(), 0);
assert_eq!(bus.capacity(), 100);
let _rx = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let bus2 = bus.clone();
assert_eq!(bus2.subscriber_count(), 1);
let _rx2 = bus2.subscribe();
assert_eq!(bus.subscriber_count(), 2);
}
#[test]
fn test_market_data_bus_default() {
let bus = MarketDataBus::default();
assert_eq!(bus.capacity(), 5000);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_symbol_parsing() {
let sym = Symbol::from_exchange_format("BTCUSDT", Exchange::Binance).unwrap();
assert_eq!(sym.base, "BTC");
assert_eq!(sym.quote, "USDT");
let sym = Symbol::from_exchange_format("BTC-USD", Exchange::Coinbase).unwrap();
assert_eq!(sym.base, "BTC");
assert_eq!(sym.quote, "USD");
}
#[test]
fn test_symbol_formatting() {
let sym = Symbol::new("BTC", "USDT");
assert_eq!(sym.to_exchange_format(Exchange::Binance), "BTCUSDT");
assert_eq!(sym.to_exchange_format(Exchange::Coinbase), "BTC-USDT");
assert_eq!(sym.to_exchange_format(Exchange::Kraken), "BTC/USDT");
}
#[test]
fn test_spread_calculation() {
let event = OrderBookEvent {
exchange: Exchange::Binance,
symbol: Symbol::new("BTC", "USDT"),
timestamp: 0,
sequence: 1,
is_snapshot: true,
bids: vec![PriceLevel::new(Decimal::from(50000), Decimal::from(1))],
asks: vec![PriceLevel::new(Decimal::from(50010), Decimal::from(1))],
};
assert_eq!(event.spread(), Some(Decimal::from(10)));
assert_eq!(event.mid_price(), Some(Decimal::from(50005)));
}
#[test]
fn test_kline_calculations() {
let kline = KlineEvent {
exchange: Exchange::Binance,
symbol: Symbol::new("BTC", "USDT"),
interval: "1m".to_string(),
open_time: 0,
close_time: 60_000_000,
open: Decimal::from(50000),
high: Decimal::from(51000),
low: Decimal::from(49000),
close: Decimal::from(50500),
volume: Decimal::from(100),
quote_volume: None,
trades: None,
is_closed: true,
};
assert_eq!(kline.price_change(), Decimal::from(500));
assert_eq!(kline.range(), Decimal::from(2000));
}
}