1use chrono::Utc;
14use rust_decimal::Decimal;
15use serde::{Deserialize, Serialize};
16use std::fmt;
17use tokio::sync::broadcast;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21#[serde(tag = "type", rename_all = "snake_case")]
22pub enum MarketDataEvent {
23 Trade(TradeEvent),
25 OrderBook(OrderBookEvent),
27 Ticker(TickerEvent),
29 Liquidation(LiquidationEvent),
31 FundingRate(FundingRateEvent),
33 Kline(KlineEvent),
35}
36
37impl MarketDataEvent {
38 pub fn symbol(&self) -> &Symbol {
40 match self {
41 MarketDataEvent::Trade(e) => &e.symbol,
42 MarketDataEvent::OrderBook(e) => &e.symbol,
43 MarketDataEvent::Ticker(e) => &e.symbol,
44 MarketDataEvent::Liquidation(e) => &e.symbol,
45 MarketDataEvent::FundingRate(e) => &e.symbol,
46 MarketDataEvent::Kline(e) => &e.symbol,
47 }
48 }
49
50 pub fn exchange(&self) -> Exchange {
52 match self {
53 MarketDataEvent::Trade(e) => e.exchange,
54 MarketDataEvent::OrderBook(e) => e.exchange,
55 MarketDataEvent::Ticker(e) => e.exchange,
56 MarketDataEvent::Liquidation(e) => e.exchange,
57 MarketDataEvent::FundingRate(e) => e.exchange,
58 MarketDataEvent::Kline(e) => e.exchange,
59 }
60 }
61
62 pub fn timestamp(&self) -> i64 {
64 match self {
65 MarketDataEvent::Trade(e) => e.timestamp,
66 MarketDataEvent::OrderBook(e) => e.timestamp,
67 MarketDataEvent::Ticker(e) => e.timestamp,
68 MarketDataEvent::Liquidation(e) => e.timestamp,
69 MarketDataEvent::FundingRate(e) => e.timestamp,
70 MarketDataEvent::Kline(e) => e.close_time,
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct TradeEvent {
78 pub exchange: Exchange,
80 pub symbol: Symbol,
82 pub timestamp: i64,
84 pub received_at: i64,
86 pub price: Decimal,
88 pub quantity: Decimal,
90 pub side: Side,
92 pub trade_id: String,
94 pub buyer_is_maker: Option<bool>,
96}
97
98impl TradeEvent {
99 pub fn new(
101 exchange: Exchange,
102 symbol: Symbol,
103 timestamp: i64,
104 price: Decimal,
105 quantity: Decimal,
106 side: Side,
107 trade_id: String,
108 ) -> Self {
109 Self {
110 exchange,
111 symbol,
112 timestamp,
113 received_at: Utc::now().timestamp_micros(),
114 price,
115 quantity,
116 side,
117 trade_id,
118 buyer_is_maker: None,
119 }
120 }
121
122 pub fn notional(&self) -> Decimal {
124 self.price * self.quantity
125 }
126
127 pub fn latency_micros(&self) -> i64 {
129 self.received_at - self.timestamp
130 }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct OrderBookEvent {
136 pub exchange: Exchange,
138 pub symbol: Symbol,
140 pub timestamp: i64,
142 pub sequence: u64,
144 pub is_snapshot: bool,
146 pub bids: Vec<PriceLevel>,
148 pub asks: Vec<PriceLevel>,
150}
151
152impl OrderBookEvent {
153 pub fn best_bid(&self) -> Option<&PriceLevel> {
155 self.bids.first()
156 }
157
158 pub fn best_ask(&self) -> Option<&PriceLevel> {
160 self.asks.first()
161 }
162
163 pub fn mid_price(&self) -> Option<Decimal> {
165 match (self.best_bid(), self.best_ask()) {
166 (Some(bid), Some(ask)) => Some((bid.price + ask.price) / Decimal::from(2)),
167 _ => None,
168 }
169 }
170
171 pub fn spread(&self) -> Option<Decimal> {
173 match (self.best_bid(), self.best_ask()) {
174 (Some(bid), Some(ask)) => Some(ask.price - bid.price),
175 _ => None,
176 }
177 }
178
179 pub fn spread_bps(&self) -> Option<Decimal> {
181 match (self.mid_price(), self.spread()) {
182 (Some(mid), Some(spread)) if mid > Decimal::ZERO => {
183 Some((spread / mid) * Decimal::from(10000))
184 }
185 _ => None,
186 }
187 }
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct PriceLevel {
193 pub price: Decimal,
195 pub quantity: Decimal,
197}
198
199impl PriceLevel {
200 pub fn new(price: Decimal, quantity: Decimal) -> Self {
201 Self { price, quantity }
202 }
203
204 pub fn notional(&self) -> Decimal {
206 self.price * self.quantity
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct TickerEvent {
213 pub exchange: Exchange,
214 pub symbol: Symbol,
215 pub timestamp: i64,
216 pub last_price: Decimal,
217 pub best_bid: Option<Decimal>,
218 pub best_ask: Option<Decimal>,
219 pub volume_24h: Decimal,
220 pub quote_volume_24h: Decimal,
221 pub price_change_24h: Option<Decimal>,
222 pub price_change_pct_24h: Option<Decimal>,
223 pub high_24h: Option<Decimal>,
224 pub low_24h: Option<Decimal>,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct LiquidationEvent {
230 pub exchange: Exchange,
231 pub symbol: Symbol,
232 pub timestamp: i64,
233 pub side: Side,
235 pub price: Decimal,
237 pub quantity: Decimal,
239 pub order_id: Option<String>,
241}
242
243impl LiquidationEvent {
244 pub fn notional(&self) -> Decimal {
246 self.price * self.quantity
247 }
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct FundingRateEvent {
253 pub exchange: Exchange,
254 pub symbol: Symbol,
255 pub timestamp: i64,
256 pub rate: Decimal,
258 pub next_funding_time: i64,
260}
261
262impl FundingRateEvent {
263 pub fn annualized_rate(&self) -> Decimal {
265 self.rate * Decimal::from(365 * 3) }
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct KlineEvent {
272 pub exchange: Exchange,
273 pub symbol: Symbol,
274 pub interval: String,
276 pub open_time: i64,
278 pub close_time: i64,
280 pub open: Decimal,
282 pub high: Decimal,
284 pub low: Decimal,
286 pub close: Decimal,
288 pub volume: Decimal,
290 pub quote_volume: Option<Decimal>,
292 pub trades: Option<u64>,
294 pub is_closed: bool,
296}
297
298impl KlineEvent {
299 pub fn typical_price(&self) -> Decimal {
301 (self.high + self.low + self.close) / Decimal::from(3)
302 }
303
304 pub fn price_change(&self) -> Decimal {
306 self.close - self.open
307 }
308
309 pub fn price_change_pct(&self) -> Decimal {
311 if self.open > Decimal::ZERO {
312 ((self.close - self.open) / self.open) * Decimal::from(100)
313 } else {
314 Decimal::ZERO
315 }
316 }
317
318 pub fn range(&self) -> Decimal {
320 self.high - self.low
321 }
322}
323
324#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
326pub struct Symbol {
327 pub base: String,
329 pub quote: String,
331 pub market_type: MarketType,
333}
334
335impl Symbol {
336 pub fn new(base: impl Into<String>, quote: impl Into<String>) -> Self {
337 Self {
338 base: base.into(),
339 quote: quote.into(),
340 market_type: MarketType::Spot,
341 }
342 }
343
344 pub fn new_with_type(
345 base: impl Into<String>,
346 quote: impl Into<String>,
347 market_type: MarketType,
348 ) -> Self {
349 Self {
350 base: base.into(),
351 quote: quote.into(),
352 market_type,
353 }
354 }
355
356 pub fn from_exchange_format(s: &str, exchange: Exchange) -> Option<Self> {
358 match exchange {
359 Exchange::Binance => {
360 if s.ends_with("USDT") {
362 let base = s.trim_end_matches("USDT");
363 Some(Symbol::new(base, "USDT"))
364 } else if s.ends_with("BUSD") {
365 let base = s.trim_end_matches("BUSD");
366 Some(Symbol::new(base, "BUSD"))
367 } else {
368 None
369 }
370 }
371 Exchange::Bybit => {
372 if s.ends_with("USDT") {
374 let base = s.trim_end_matches("USDT");
375 Some(Symbol::new(base, "USDT"))
376 } else {
377 None
378 }
379 }
380 Exchange::Coinbase => {
381 let parts: Vec<&str> = s.split('-').collect();
383 if parts.len() == 2 {
384 Some(Symbol::new(parts[0], parts[1]))
385 } else {
386 None
387 }
388 }
389 Exchange::Kraken => {
390 if s.contains('/') {
392 let parts: Vec<&str> = s.split('/').collect();
393 if parts.len() == 2 {
394 Some(Symbol::new(parts[0], parts[1]))
395 } else {
396 None
397 }
398 } else {
399 None }
402 }
403 Exchange::Okx => {
404 let parts: Vec<&str> = s.split('-').collect();
406 if parts.len() == 2 {
407 Some(Symbol::new(parts[0], parts[1]))
408 } else {
409 None
410 }
411 }
412 Exchange::Kucoin => {
413 let parts: Vec<&str> = s.split('-').collect();
415 if parts.len() == 2 {
416 Some(Symbol::new(parts[0], parts[1]))
417 } else {
418 None
419 }
420 }
421 }
422 }
423
424 pub fn to_exchange_format(&self, exchange: Exchange) -> String {
426 match exchange {
427 Exchange::Binance => {
428 format!("{}{}", self.base.to_uppercase(), self.quote.to_uppercase())
429 }
430 Exchange::Bybit => format!("{}{}", self.base.to_uppercase(), self.quote.to_uppercase()),
431 Exchange::Coinbase => {
432 format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase())
433 }
434 Exchange::Kraken => {
435 format!("{}/{}", self.base.to_uppercase(), self.quote.to_uppercase())
436 }
437 Exchange::Okx => format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase()),
438 Exchange::Kucoin => {
439 format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase())
440 }
441 }
442 }
443}
444
445impl fmt::Display for Symbol {
446 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447 write!(f, "{}/{}", self.base, self.quote)
448 }
449}
450
451#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
453pub enum Exchange {
454 Binance,
455 Bybit,
456 Coinbase,
457 Kraken,
458 Okx,
459 Kucoin,
460}
461
462impl fmt::Display for Exchange {
463 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464 match self {
465 Exchange::Binance => write!(f, "binance"),
466 Exchange::Bybit => write!(f, "bybit"),
467 Exchange::Coinbase => write!(f, "coinbase"),
468 Exchange::Kraken => write!(f, "kraken"),
469 Exchange::Okx => write!(f, "okx"),
470 Exchange::Kucoin => write!(f, "kucoin"),
471 }
472 }
473}
474
475impl std::str::FromStr for Exchange {
476 type Err = String;
477
478 fn from_str(s: &str) -> Result<Self, Self::Err> {
479 match s.to_lowercase().as_str() {
480 "binance" => Ok(Exchange::Binance),
481 "bybit" => Ok(Exchange::Bybit),
482 "coinbase" => Ok(Exchange::Coinbase),
483 "kraken" => Ok(Exchange::Kraken),
484 "okx" => Ok(Exchange::Okx),
485 "kucoin" => Ok(Exchange::Kucoin),
486 _ => Err(format!("Unknown exchange: {}", s)),
487 }
488 }
489}
490
491#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
493pub enum MarketType {
494 Spot,
496 Perpetual,
498 Futures,
500 Options,
502}
503
504impl fmt::Display for MarketType {
505 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506 match self {
507 MarketType::Spot => write!(f, "spot"),
508 MarketType::Perpetual => write!(f, "perpetual"),
509 MarketType::Futures => write!(f, "futures"),
510 MarketType::Options => write!(f, "options"),
511 }
512 }
513}
514
515#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
517pub enum Side {
518 Buy,
520 Sell,
522}
523
524impl fmt::Display for Side {
525 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526 match self {
527 Side::Buy => write!(f, "buy"),
528 Side::Sell => write!(f, "sell"),
529 }
530 }
531}
532
533impl std::str::FromStr for Side {
534 type Err = String;
535
536 fn from_str(s: &str) -> Result<Self, Self::Err> {
537 match s.to_lowercase().as_str() {
538 "buy" | "bid" | "long" => Ok(Side::Buy),
539 "sell" | "ask" | "short" => Ok(Side::Sell),
540 _ => Err(format!("Unknown side: {}", s)),
541 }
542 }
543}
544
545pub struct MarketDataBus {
573 tx: broadcast::Sender<MarketDataEvent>,
574 capacity: usize,
575}
576
577impl MarketDataBus {
578 pub fn new(capacity: usize) -> Self {
584 let (tx, _rx) = broadcast::channel(capacity);
585 Self { tx, capacity }
586 }
587
588 pub fn publish(&self, event: MarketDataEvent) -> crate::Result<usize> {
592 let receivers = self.tx.send(event)?;
593 Ok(receivers)
594 }
595
596 pub fn subscribe(&self) -> broadcast::Receiver<MarketDataEvent> {
598 self.tx.subscribe()
599 }
600
601 pub fn subscriber_count(&self) -> usize {
603 self.tx.receiver_count()
604 }
605
606 pub fn capacity(&self) -> usize {
608 self.capacity
609 }
610}
611
612impl Default for MarketDataBus {
613 fn default() -> Self {
614 Self::new(5000)
615 }
616}
617
618impl Clone for MarketDataBus {
619 fn clone(&self) -> Self {
620 Self {
621 tx: self.tx.clone(),
622 capacity: self.capacity,
623 }
624 }
625}
626
627#[cfg(test)]
628mod market_data_bus_tests {
629 use super::*;
630
631 #[test]
632 fn test_market_data_bus() {
633 let bus = MarketDataBus::new(100);
634 assert_eq!(bus.subscriber_count(), 0);
635 assert_eq!(bus.capacity(), 100);
636
637 let _rx = bus.subscribe();
638 assert_eq!(bus.subscriber_count(), 1);
639
640 let bus2 = bus.clone();
642 assert_eq!(bus2.subscriber_count(), 1);
643
644 let _rx2 = bus2.subscribe();
645 assert_eq!(bus.subscriber_count(), 2);
646 }
647
648 #[test]
649 fn test_market_data_bus_default() {
650 let bus = MarketDataBus::default();
651 assert_eq!(bus.capacity(), 5000);
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658
659 #[test]
660 fn test_symbol_parsing() {
661 let sym = Symbol::from_exchange_format("BTCUSDT", Exchange::Binance).unwrap();
662 assert_eq!(sym.base, "BTC");
663 assert_eq!(sym.quote, "USDT");
664
665 let sym = Symbol::from_exchange_format("BTC-USD", Exchange::Coinbase).unwrap();
666 assert_eq!(sym.base, "BTC");
667 assert_eq!(sym.quote, "USD");
668 }
669
670 #[test]
671 fn test_symbol_formatting() {
672 let sym = Symbol::new("BTC", "USDT");
673 assert_eq!(sym.to_exchange_format(Exchange::Binance), "BTCUSDT");
674 assert_eq!(sym.to_exchange_format(Exchange::Coinbase), "BTC-USDT");
675 assert_eq!(sym.to_exchange_format(Exchange::Kraken), "BTC/USDT");
676 }
677
678 #[test]
679 fn test_spread_calculation() {
680 let event = OrderBookEvent {
681 exchange: Exchange::Binance,
682 symbol: Symbol::new("BTC", "USDT"),
683 timestamp: 0,
684 sequence: 1,
685 is_snapshot: true,
686 bids: vec![PriceLevel::new(Decimal::from(50000), Decimal::from(1))],
687 asks: vec![PriceLevel::new(Decimal::from(50010), Decimal::from(1))],
688 };
689
690 assert_eq!(event.spread(), Some(Decimal::from(10)));
691 assert_eq!(event.mid_price(), Some(Decimal::from(50005)));
692 }
693
694 #[test]
695 fn test_kline_calculations() {
696 let kline = KlineEvent {
697 exchange: Exchange::Binance,
698 symbol: Symbol::new("BTC", "USDT"),
699 interval: "1m".to_string(),
700 open_time: 0,
701 close_time: 60_000_000,
702 open: Decimal::from(50000),
703 high: Decimal::from(51000),
704 low: Decimal::from(49000),
705 close: Decimal::from(50500),
706 volume: Decimal::from(100),
707 quote_volume: None,
708 trades: None,
709 is_closed: true,
710 };
711
712 assert_eq!(kline.price_change(), Decimal::from(500));
713 assert_eq!(kline.range(), Decimal::from(2000));
714 }
715}