Skip to main content

jflow_core/
market.rs

1//! Unified Market Data Types for JANUS
2//!
3//! This module defines normalized event types for market data across all exchanges.
4//! All exchange-specific connectors convert their data into these unified types.
5//!
6//! ## MarketDataBus
7//!
8//! The [`MarketDataBus`] provides an in-process broadcast channel for streaming
9//! live market data between JANUS modules. The Data module publishes
10//! [`MarketDataEvent`]s and the Forward module subscribes to consume them for
11//! indicator calculation and strategy-driven signal generation.
12
13use chrono::Utc;
14use rust_decimal::Decimal;
15use serde::{Deserialize, Serialize};
16use std::fmt;
17use tokio::sync::broadcast;
18
19/// Unified market data event envelope
20#[derive(Debug, Clone, Serialize, Deserialize)]
21#[serde(tag = "type", rename_all = "snake_case")]
22pub enum MarketDataEvent {
23    /// Real-time trade execution
24    Trade(TradeEvent),
25    /// Order book snapshot or update
26    OrderBook(OrderBookEvent),
27    /// Ticker/24h statistics
28    Ticker(TickerEvent),
29    /// Liquidation event (futures)
30    Liquidation(LiquidationEvent),
31    /// Funding rate update (futures)
32    FundingRate(FundingRateEvent),
33    /// OHLCV candle/kline
34    Kline(KlineEvent),
35}
36
37impl MarketDataEvent {
38    /// Get the symbol for this event
39    pub fn symbol(&self) -> &Symbol {
40        match self {
41            MarketDataEvent::Trade(e) => &e.symbol,
42            MarketDataEvent::OrderBook(e) => &e.symbol,
43            MarketDataEvent::Ticker(e) => &e.symbol,
44            MarketDataEvent::Liquidation(e) => &e.symbol,
45            MarketDataEvent::FundingRate(e) => &e.symbol,
46            MarketDataEvent::Kline(e) => &e.symbol,
47        }
48    }
49
50    /// Get the exchange for this event
51    pub fn exchange(&self) -> Exchange {
52        match self {
53            MarketDataEvent::Trade(e) => e.exchange,
54            MarketDataEvent::OrderBook(e) => e.exchange,
55            MarketDataEvent::Ticker(e) => e.exchange,
56            MarketDataEvent::Liquidation(e) => e.exchange,
57            MarketDataEvent::FundingRate(e) => e.exchange,
58            MarketDataEvent::Kline(e) => e.exchange,
59        }
60    }
61
62    /// Get the timestamp for this event
63    pub fn timestamp(&self) -> i64 {
64        match self {
65            MarketDataEvent::Trade(e) => e.timestamp,
66            MarketDataEvent::OrderBook(e) => e.timestamp,
67            MarketDataEvent::Ticker(e) => e.timestamp,
68            MarketDataEvent::Liquidation(e) => e.timestamp,
69            MarketDataEvent::FundingRate(e) => e.timestamp,
70            MarketDataEvent::Kline(e) => e.close_time,
71        }
72    }
73}
74
75/// Normalized trade event
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct TradeEvent {
78    /// Exchange that produced this trade
79    pub exchange: Exchange,
80    /// Trading pair symbol
81    pub symbol: Symbol,
82    /// Trade execution timestamp (exchange time, Unix microseconds)
83    pub timestamp: i64,
84    /// Reception timestamp (our system time, Unix microseconds)
85    pub received_at: i64,
86    /// Execution price
87    pub price: Decimal,
88    /// Trade quantity/size
89    pub quantity: Decimal,
90    /// Trade side from taker perspective
91    pub side: Side,
92    /// Exchange-specific trade ID
93    pub trade_id: String,
94    /// Whether buyer was the market maker (if available)
95    pub buyer_is_maker: Option<bool>,
96}
97
98impl TradeEvent {
99    /// Create a new trade event with current reception time
100    pub fn new(
101        exchange: Exchange,
102        symbol: Symbol,
103        timestamp: i64,
104        price: Decimal,
105        quantity: Decimal,
106        side: Side,
107        trade_id: String,
108    ) -> Self {
109        Self {
110            exchange,
111            symbol,
112            timestamp,
113            received_at: Utc::now().timestamp_micros(),
114            price,
115            quantity,
116            side,
117            trade_id,
118            buyer_is_maker: None,
119        }
120    }
121
122    /// Calculate the notional value of this trade
123    pub fn notional(&self) -> Decimal {
124        self.price * self.quantity
125    }
126
127    /// Get the latency between exchange timestamp and reception
128    pub fn latency_micros(&self) -> i64 {
129        self.received_at - self.timestamp
130    }
131}
132
133/// Order book snapshot or delta update
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct OrderBookEvent {
136    /// Exchange that produced this order book
137    pub exchange: Exchange,
138    /// Trading pair symbol
139    pub symbol: Symbol,
140    /// Event timestamp (Unix microseconds)
141    pub timestamp: i64,
142    /// Sequence number for ordering updates
143    pub sequence: u64,
144    /// Whether this is a snapshot (true) or delta (false)
145    pub is_snapshot: bool,
146    /// Bid levels (price, quantity)
147    pub bids: Vec<PriceLevel>,
148    /// Ask levels (price, quantity)
149    pub asks: Vec<PriceLevel>,
150}
151
152impl OrderBookEvent {
153    /// Get the best bid price
154    pub fn best_bid(&self) -> Option<&PriceLevel> {
155        self.bids.first()
156    }
157
158    /// Get the best ask price
159    pub fn best_ask(&self) -> Option<&PriceLevel> {
160        self.asks.first()
161    }
162
163    /// Calculate the mid price
164    pub fn mid_price(&self) -> Option<Decimal> {
165        match (self.best_bid(), self.best_ask()) {
166            (Some(bid), Some(ask)) => Some((bid.price + ask.price) / Decimal::from(2)),
167            _ => None,
168        }
169    }
170
171    /// Calculate the spread
172    pub fn spread(&self) -> Option<Decimal> {
173        match (self.best_bid(), self.best_ask()) {
174            (Some(bid), Some(ask)) => Some(ask.price - bid.price),
175            _ => None,
176        }
177    }
178
179    /// Calculate the spread in basis points
180    pub fn spread_bps(&self) -> Option<Decimal> {
181        match (self.mid_price(), self.spread()) {
182            (Some(mid), Some(spread)) if mid > Decimal::ZERO => {
183                Some((spread / mid) * Decimal::from(10000))
184            }
185            _ => None,
186        }
187    }
188}
189
190/// Price level in order book
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct PriceLevel {
193    /// Price at this level
194    pub price: Decimal,
195    /// Total quantity available at this price
196    pub quantity: Decimal,
197}
198
199impl PriceLevel {
200    pub fn new(price: Decimal, quantity: Decimal) -> Self {
201        Self { price, quantity }
202    }
203
204    /// Calculate notional value at this level
205    pub fn notional(&self) -> Decimal {
206        self.price * self.quantity
207    }
208}
209
210/// 24-hour ticker statistics
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct TickerEvent {
213    pub exchange: Exchange,
214    pub symbol: Symbol,
215    pub timestamp: i64,
216    pub last_price: Decimal,
217    pub best_bid: Option<Decimal>,
218    pub best_ask: Option<Decimal>,
219    pub volume_24h: Decimal,
220    pub quote_volume_24h: Decimal,
221    pub price_change_24h: Option<Decimal>,
222    pub price_change_pct_24h: Option<Decimal>,
223    pub high_24h: Option<Decimal>,
224    pub low_24h: Option<Decimal>,
225}
226
227/// Liquidation event (futures markets)
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct LiquidationEvent {
230    pub exchange: Exchange,
231    pub symbol: Symbol,
232    pub timestamp: i64,
233    /// Side of the liquidated position
234    pub side: Side,
235    /// Liquidation price
236    pub price: Decimal,
237    /// Liquidated quantity
238    pub quantity: Decimal,
239    /// Order ID (if available)
240    pub order_id: Option<String>,
241}
242
243impl LiquidationEvent {
244    /// Calculate the notional value of the liquidation
245    pub fn notional(&self) -> Decimal {
246        self.price * self.quantity
247    }
248}
249
250/// Funding rate event (perpetual futures)
251#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct FundingRateEvent {
253    pub exchange: Exchange,
254    pub symbol: Symbol,
255    pub timestamp: i64,
256    /// Current funding rate (as a percentage, e.g., 0.01 = 1%)
257    pub rate: Decimal,
258    /// Next funding time (Unix timestamp)
259    pub next_funding_time: i64,
260}
261
262impl FundingRateEvent {
263    /// Get annualized funding rate (assuming 8-hour funding intervals)
264    pub fn annualized_rate(&self) -> Decimal {
265        self.rate * Decimal::from(365 * 3) // 3 funding periods per day
266    }
267}
268
269/// OHLCV candlestick/kline data
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct KlineEvent {
272    pub exchange: Exchange,
273    pub symbol: Symbol,
274    /// Interval (e.g., "1m", "5m", "1h")
275    pub interval: String,
276    /// Candle open time (Unix microseconds)
277    pub open_time: i64,
278    /// Candle close time (Unix microseconds)
279    pub close_time: i64,
280    /// Opening price
281    pub open: Decimal,
282    /// Highest price
283    pub high: Decimal,
284    /// Lowest price
285    pub low: Decimal,
286    /// Closing price
287    pub close: Decimal,
288    /// Volume in base currency
289    pub volume: Decimal,
290    /// Quote volume (volume * price)
291    pub quote_volume: Option<Decimal>,
292    /// Number of trades in this candle
293    pub trades: Option<u64>,
294    /// Whether this candle is closed/complete
295    pub is_closed: bool,
296}
297
298impl KlineEvent {
299    /// Get the typical price (HLC/3)
300    pub fn typical_price(&self) -> Decimal {
301        (self.high + self.low + self.close) / Decimal::from(3)
302    }
303
304    /// Get the price change for this candle
305    pub fn price_change(&self) -> Decimal {
306        self.close - self.open
307    }
308
309    /// Get the price change percentage
310    pub fn price_change_pct(&self) -> Decimal {
311        if self.open > Decimal::ZERO {
312            ((self.close - self.open) / self.open) * Decimal::from(100)
313        } else {
314            Decimal::ZERO
315        }
316    }
317
318    /// Get the candle range (high - low)
319    pub fn range(&self) -> Decimal {
320        self.high - self.low
321    }
322}
323
324/// Trading pair symbol
325#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
326pub struct Symbol {
327    /// Base currency (e.g., BTC)
328    pub base: String,
329    /// Quote currency (e.g., USDT)
330    pub quote: String,
331    /// Market type
332    pub market_type: MarketType,
333}
334
335impl Symbol {
336    pub fn new(base: impl Into<String>, quote: impl Into<String>) -> Self {
337        Self {
338            base: base.into(),
339            quote: quote.into(),
340            market_type: MarketType::Spot,
341        }
342    }
343
344    pub fn new_with_type(
345        base: impl Into<String>,
346        quote: impl Into<String>,
347        market_type: MarketType,
348    ) -> Self {
349        Self {
350            base: base.into(),
351            quote: quote.into(),
352            market_type,
353        }
354    }
355
356    /// Parse symbol from exchange-specific format
357    pub fn from_exchange_format(s: &str, exchange: Exchange) -> Option<Self> {
358        match exchange {
359            Exchange::Binance => {
360                // Binance: BTCUSD
361                if s.ends_with("USDT") {
362                    let base = s.trim_end_matches("USDT");
363                    Some(Symbol::new(base, "USDT"))
364                } else if s.ends_with("BUSD") {
365                    let base = s.trim_end_matches("BUSD");
366                    Some(Symbol::new(base, "BUSD"))
367                } else {
368                    None
369                }
370            }
371            Exchange::Bybit => {
372                // Bybit: BTCUSD
373                if s.ends_with("USDT") {
374                    let base = s.trim_end_matches("USDT");
375                    Some(Symbol::new(base, "USDT"))
376                } else {
377                    None
378                }
379            }
380            Exchange::Coinbase => {
381                // Coinbase: BTC-USD
382                let parts: Vec<&str> = s.split('-').collect();
383                if parts.len() == 2 {
384                    Some(Symbol::new(parts[0], parts[1]))
385                } else {
386                    None
387                }
388            }
389            Exchange::Kraken => {
390                // Kraken: BTC/USD or XXBTZUSD
391                if s.contains('/') {
392                    let parts: Vec<&str> = s.split('/').collect();
393                    if parts.len() == 2 {
394                        Some(Symbol::new(parts[0], parts[1]))
395                    } else {
396                        None
397                    }
398                } else {
399                    // Try to parse XXBTZUSD format
400                    None // Complex, implement if needed
401                }
402            }
403            Exchange::Okx => {
404                // OKX: BTC-USDT
405                let parts: Vec<&str> = s.split('-').collect();
406                if parts.len() == 2 {
407                    Some(Symbol::new(parts[0], parts[1]))
408                } else {
409                    None
410                }
411            }
412            Exchange::Kucoin => {
413                // Kucoin: BTC-USDT
414                let parts: Vec<&str> = s.split('-').collect();
415                if parts.len() == 2 {
416                    Some(Symbol::new(parts[0], parts[1]))
417                } else {
418                    None
419                }
420            }
421        }
422    }
423
424    /// Format symbol for specific exchange
425    pub fn to_exchange_format(&self, exchange: Exchange) -> String {
426        match exchange {
427            Exchange::Binance => {
428                format!("{}{}", self.base.to_uppercase(), self.quote.to_uppercase())
429            }
430            Exchange::Bybit => format!("{}{}", self.base.to_uppercase(), self.quote.to_uppercase()),
431            Exchange::Coinbase => {
432                format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase())
433            }
434            Exchange::Kraken => {
435                format!("{}/{}", self.base.to_uppercase(), self.quote.to_uppercase())
436            }
437            Exchange::Okx => format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase()),
438            Exchange::Kucoin => {
439                format!("{}-{}", self.base.to_uppercase(), self.quote.to_uppercase())
440            }
441        }
442    }
443}
444
445impl fmt::Display for Symbol {
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        write!(f, "{}/{}", self.base, self.quote)
448    }
449}
450
451/// Supported cryptocurrency exchanges
452#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
453pub enum Exchange {
454    Binance,
455    Bybit,
456    Coinbase,
457    Kraken,
458    Okx,
459    Kucoin,
460}
461
462impl fmt::Display for Exchange {
463    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464        match self {
465            Exchange::Binance => write!(f, "binance"),
466            Exchange::Bybit => write!(f, "bybit"),
467            Exchange::Coinbase => write!(f, "coinbase"),
468            Exchange::Kraken => write!(f, "kraken"),
469            Exchange::Okx => write!(f, "okx"),
470            Exchange::Kucoin => write!(f, "kucoin"),
471        }
472    }
473}
474
475impl std::str::FromStr for Exchange {
476    type Err = String;
477
478    fn from_str(s: &str) -> Result<Self, Self::Err> {
479        match s.to_lowercase().as_str() {
480            "binance" => Ok(Exchange::Binance),
481            "bybit" => Ok(Exchange::Bybit),
482            "coinbase" => Ok(Exchange::Coinbase),
483            "kraken" => Ok(Exchange::Kraken),
484            "okx" => Ok(Exchange::Okx),
485            "kucoin" => Ok(Exchange::Kucoin),
486            _ => Err(format!("Unknown exchange: {}", s)),
487        }
488    }
489}
490
491/// Market type
492#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
493pub enum MarketType {
494    /// Spot market
495    Spot,
496    /// Perpetual futures
497    Perpetual,
498    /// Dated futures
499    Futures,
500    /// Options
501    Options,
502}
503
504impl fmt::Display for MarketType {
505    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
506        match self {
507            MarketType::Spot => write!(f, "spot"),
508            MarketType::Perpetual => write!(f, "perpetual"),
509            MarketType::Futures => write!(f, "futures"),
510            MarketType::Options => write!(f, "options"),
511        }
512    }
513}
514
515/// Trade side
516#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
517pub enum Side {
518    /// Buy/Long
519    Buy,
520    /// Sell/Short
521    Sell,
522}
523
524impl fmt::Display for Side {
525    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526        match self {
527            Side::Buy => write!(f, "buy"),
528            Side::Sell => write!(f, "sell"),
529        }
530    }
531}
532
533impl std::str::FromStr for Side {
534    type Err = String;
535
536    fn from_str(s: &str) -> Result<Self, Self::Err> {
537        match s.to_lowercase().as_str() {
538            "buy" | "bid" | "long" => Ok(Side::Buy),
539            "sell" | "ask" | "short" => Ok(Side::Sell),
540            _ => Err(format!("Unknown side: {}", s)),
541        }
542    }
543}
544
545// ═══════════════════════════════════════════════════════════════════════════
546// MarketDataBus — broadcast channel for live market data between modules
547// ═══════════════════════════════════════════════════════════════════════════
548
549/// Broadcast bus for streaming market data events between JANUS modules.
550///
551/// The Data module publishes [`MarketDataEvent`]s (trades, klines, order book
552/// updates, etc.) and the Forward module subscribes to receive them for
553/// real-time indicator calculation and strategy evaluation.
554///
555/// This is the in-process equivalent of the [`SignalBus`](crate::SignalBus)
556/// but for raw/normalised market data rather than trading signals.
557///
558/// # Example
559///
560/// ```rust,no_run
561/// use janus_core::market::{MarketDataBus, MarketDataEvent};
562///
563/// let bus = MarketDataBus::new(5000);
564/// let mut rx = bus.subscribe();
565///
566/// // Publisher (Data module)
567/// // bus.publish(some_event).unwrap();
568///
569/// // Consumer (Forward module)
570/// // let event = rx.recv().await.unwrap();
571/// ```
572pub struct MarketDataBus {
573    tx: broadcast::Sender<MarketDataEvent>,
574    capacity: usize,
575}
576
577impl MarketDataBus {
578    /// Create a new market data bus with the given channel capacity.
579    ///
580    /// A larger capacity reduces the chance of slow subscribers missing
581    /// events but consumes more memory. 5 000 is a reasonable default for
582    /// multi-asset 1-minute candle + trade ingestion.
583    pub fn new(capacity: usize) -> Self {
584        let (tx, _rx) = broadcast::channel(capacity);
585        Self { tx, capacity }
586    }
587
588    /// Publish a market data event to all subscribers.
589    ///
590    /// Returns the number of active receivers that will see the event.
591    pub fn publish(&self, event: MarketDataEvent) -> crate::Result<usize> {
592        let receivers = self.tx.send(event)?;
593        Ok(receivers)
594    }
595
596    /// Subscribe to the market data stream.
597    pub fn subscribe(&self) -> broadcast::Receiver<MarketDataEvent> {
598        self.tx.subscribe()
599    }
600
601    /// Current number of active subscribers.
602    pub fn subscriber_count(&self) -> usize {
603        self.tx.receiver_count()
604    }
605
606    /// Channel capacity.
607    pub fn capacity(&self) -> usize {
608        self.capacity
609    }
610}
611
612impl Default for MarketDataBus {
613    fn default() -> Self {
614        Self::new(5000)
615    }
616}
617
618impl Clone for MarketDataBus {
619    fn clone(&self) -> Self {
620        Self {
621            tx: self.tx.clone(),
622            capacity: self.capacity,
623        }
624    }
625}
626
627#[cfg(test)]
628mod market_data_bus_tests {
629    use super::*;
630
631    #[test]
632    fn test_market_data_bus() {
633        let bus = MarketDataBus::new(100);
634        assert_eq!(bus.subscriber_count(), 0);
635        assert_eq!(bus.capacity(), 100);
636
637        let _rx = bus.subscribe();
638        assert_eq!(bus.subscriber_count(), 1);
639
640        // Clone preserves the same underlying channel
641        let bus2 = bus.clone();
642        assert_eq!(bus2.subscriber_count(), 1);
643
644        let _rx2 = bus2.subscribe();
645        assert_eq!(bus.subscriber_count(), 2);
646    }
647
648    #[test]
649    fn test_market_data_bus_default() {
650        let bus = MarketDataBus::default();
651        assert_eq!(bus.capacity(), 5000);
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use super::*;
658
659    #[test]
660    fn test_symbol_parsing() {
661        let sym = Symbol::from_exchange_format("BTCUSDT", Exchange::Binance).unwrap();
662        assert_eq!(sym.base, "BTC");
663        assert_eq!(sym.quote, "USDT");
664
665        let sym = Symbol::from_exchange_format("BTC-USD", Exchange::Coinbase).unwrap();
666        assert_eq!(sym.base, "BTC");
667        assert_eq!(sym.quote, "USD");
668    }
669
670    #[test]
671    fn test_symbol_formatting() {
672        let sym = Symbol::new("BTC", "USDT");
673        assert_eq!(sym.to_exchange_format(Exchange::Binance), "BTCUSDT");
674        assert_eq!(sym.to_exchange_format(Exchange::Coinbase), "BTC-USDT");
675        assert_eq!(sym.to_exchange_format(Exchange::Kraken), "BTC/USDT");
676    }
677
678    #[test]
679    fn test_spread_calculation() {
680        let event = OrderBookEvent {
681            exchange: Exchange::Binance,
682            symbol: Symbol::new("BTC", "USDT"),
683            timestamp: 0,
684            sequence: 1,
685            is_snapshot: true,
686            bids: vec![PriceLevel::new(Decimal::from(50000), Decimal::from(1))],
687            asks: vec![PriceLevel::new(Decimal::from(50010), Decimal::from(1))],
688        };
689
690        assert_eq!(event.spread(), Some(Decimal::from(10)));
691        assert_eq!(event.mid_price(), Some(Decimal::from(50005)));
692    }
693
694    #[test]
695    fn test_kline_calculations() {
696        let kline = KlineEvent {
697            exchange: Exchange::Binance,
698            symbol: Symbol::new("BTC", "USDT"),
699            interval: "1m".to_string(),
700            open_time: 0,
701            close_time: 60_000_000,
702            open: Decimal::from(50000),
703            high: Decimal::from(51000),
704            low: Decimal::from(49000),
705            close: Decimal::from(50500),
706            volume: Decimal::from(100),
707            quote_volume: None,
708            trades: None,
709            is_closed: true,
710        };
711
712        assert_eq!(kline.price_change(), Decimal::from(500));
713        assert_eq!(kline.range(), Decimal::from(2000));
714    }
715}