Skip to main content

rustrade_data/subscription/
mod.rs

1use crate::{exchange::Connector, instrument::InstrumentData};
2use derive_more::Display;
3use fnv::FnvHashMap;
4use rustrade_instrument::{
5    Keyed,
6    asset::name::AssetNameInternal,
7    exchange::ExchangeId,
8    instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
9};
10use rustrade_integration::{
11    Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
12};
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17/// OrderBook [`SubscriptionKind`]s and the associated Barter output data models.
18pub mod book;
19
20/// Candle [`SubscriptionKind`] and the associated Barter output data model.
21pub mod candle;
22
23/// Option Greeks data model for options analytics.
24pub mod greeks;
25
26/// Liquidation [`SubscriptionKind`] and the associated Barter output data model.
27pub mod liquidation;
28
29/// Quote [`SubscriptionKind`] and the associated Barter output data model.
30pub mod quote;
31
32/// Public trade [`SubscriptionKind`] and the associated Barter output data model.
33pub mod trade;
34
35/// Defines kind of a [`Subscription`], and the output [`Self::Event`] that it yields.
36pub trait SubscriptionKind
37where
38    Self: Debug + Clone,
39{
40    type Event: Debug;
41    fn as_str(&self) -> &'static str;
42}
43
44/// Barter [`Subscription`] used to subscribe to a [`SubscriptionKind`] for a particular exchange
45/// [`MarketDataInstrument`].
46#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
47pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
48    pub exchange: Exchange,
49    #[serde(flatten)]
50    pub instrument: Inst,
51    #[serde(alias = "type")]
52    pub kind: Kind,
53}
54
55pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
56    subscriptions: &[Subscription<Exchange, Instrument, Kind>],
57) -> String
58where
59    Instrument: std::fmt::Display,
60    Kind: std::fmt::Display,
61{
62    subscriptions
63        .iter()
64        .map(
65            |Subscription {
66                 exchange: _,
67                 instrument,
68                 kind,
69             }| { format_smolstr!("({instrument}, {kind})") },
70        )
71        .collect::<Vec<_>>()
72        .join(",")
73}
74
75impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
76where
77    Exchange: std::fmt::Display,
78    Instrument: std::fmt::Display,
79    Kind: std::fmt::Display,
80{
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
83    }
84}
85
86#[derive(
87    Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
88)]
89pub enum SubKind {
90    PublicTrades,
91    OrderBooksL1,
92    OrderBooksL2,
93    OrderBooksL3,
94    Liquidations,
95    Candles,
96    /// Real-time top-of-book quotes (best bid/ask). Generic subscription kind
97    /// that may be supported by multiple exchanges providing quote data.
98    Quotes,
99}
100
101impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
102    for Subscription<Exchange, MarketDataInstrument, Kind>
103where
104    S: Into<AssetNameInternal>,
105{
106    fn from(
107        (exchange, base, quote, instrument_kind, kind): (
108            Exchange,
109            S,
110            S,
111            MarketDataInstrumentKind,
112            Kind,
113        ),
114    ) -> Self {
115        Self::new(exchange, (base, quote, instrument_kind), kind)
116    }
117}
118
119impl<InstrumentKey, Exchange, S, Kind>
120    From<(
121        InstrumentKey,
122        Exchange,
123        S,
124        S,
125        MarketDataInstrumentKind,
126        Kind,
127    )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
128where
129    S: Into<AssetNameInternal>,
130{
131    fn from(
132        (instrument_id, exchange, base, quote, instrument_kind, kind): (
133            InstrumentKey,
134            Exchange,
135            S,
136            S,
137            MarketDataInstrumentKind,
138            Kind,
139        ),
140    ) -> Self {
141        let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
142
143        Self::new(exchange, instrument, kind)
144    }
145}
146
147impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
148    for Subscription<Exchange, Instrument, Kind>
149where
150    I: Into<Instrument>,
151{
152    fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
153        Self::new(exchange, instrument, kind)
154    }
155}
156
157impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
158    /// Constructs a new [`Subscription`] using the provided configuration.
159    pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
160    where
161        I: Into<Instrument>,
162    {
163        Self {
164            exchange,
165            instrument: instrument.into(),
166            kind,
167        }
168    }
169}
170
171impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
172where
173    Exchange: Connector,
174    Instrument: InstrumentData,
175{
176    type Error = SocketError;
177
178    fn validate(self) -> Result<Self, Self::Error>
179    where
180        Self: Sized,
181    {
182        // Validate the Exchange supports the Subscription InstrumentKind
183        if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
184            Ok(self)
185        } else {
186            Err(SocketError::Unsupported {
187                entity: Exchange::ID.to_string(),
188                item: self.instrument.kind().to_string(),
189            })
190        }
191    }
192}
193
194/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
195/// ingestion of market data for the provided [`MarketDataInstrumentKind`].
196#[allow(clippy::match_like_matches_macro)]
197pub fn exchange_supports_instrument_kind(
198    exchange: ExchangeId,
199    instrument_kind: &MarketDataInstrumentKind,
200) -> bool {
201    use rustrade_instrument::{
202        exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
203    };
204
205    match (exchange, instrument_kind) {
206        // Spot
207        (
208            BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
209            | GateioPerpetualsBtc,
210            Spot,
211        ) => false,
212        (_, Spot) => true,
213
214        // Future
215        (GateioFuturesUsd | GateioFuturesBtc | Okx, Future { .. }) => true,
216        (_, Future { .. }) => false,
217
218        // Perpetual
219        (
220            BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
221            | GateioPerpetualsBtc | HyperliquidPerp,
222            Perpetual,
223        ) => true,
224        (_, Perpetual) => false,
225
226        // Option
227        (GateioOptions | Okx, Option { .. }) => true,
228        (_, Option { .. }) => false,
229    }
230}
231
232impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
233where
234    Instrument: InstrumentData,
235{
236    type Error = SocketError;
237
238    fn validate(self) -> Result<Self, Self::Error>
239    where
240        Self: Sized,
241    {
242        // Validate the Exchange supports the Subscription InstrumentKind
243        if exchange_supports_instrument_kind_sub_kind(
244            &self.exchange,
245            self.instrument.kind(),
246            self.kind,
247        ) {
248            Ok(self)
249        } else {
250            Err(SocketError::Unsupported {
251                entity: self.exchange.to_string(),
252                item: format!("({}, {})", self.instrument.kind(), self.kind),
253            })
254        }
255    }
256}
257
258/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
259/// ingestion of market data for the provided [`MarketDataInstrumentKind`] and [`SubKind`] combination.
260pub fn exchange_supports_instrument_kind_sub_kind(
261    exchange_id: &ExchangeId,
262    instrument_kind: &MarketDataInstrumentKind,
263    sub_kind: SubKind,
264) -> bool {
265    use ExchangeId::*;
266    use MarketDataInstrumentKind::*;
267    use SubKind::*;
268
269    match (exchange_id, instrument_kind, sub_kind) {
270        (BinanceSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
271        (
272            BinanceFuturesUsd,
273            Perpetual,
274            PublicTrades | OrderBooksL1 | OrderBooksL2 | Liquidations,
275        ) => true,
276        (Bitfinex, Spot, PublicTrades) => true,
277        (Bitmex, Perpetual, PublicTrades) => true,
278        (BybitSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
279        (BybitPerpetualsUsd, Perpetual, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
280        (Coinbase, Spot, PublicTrades) => true,
281        (GateioSpot, Spot, PublicTrades) => true,
282        (GateioFuturesUsd, Future { .. }, PublicTrades) => true,
283        (GateioFuturesBtc, Future { .. }, PublicTrades) => true,
284        (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
285        (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
286        (GateioOptions, Option { .. }, PublicTrades) => true,
287        (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
288        (Okx, Spot | Future { .. } | Perpetual | Option { .. }, PublicTrades) => true,
289        (HyperliquidPerp, Perpetual, PublicTrades | OrderBooksL2) => true,
290
291        (_, _, _) => false,
292    }
293}
294
295/// Metadata generated from a collection of Barter [`Subscription`]s, including the exchange
296/// specific subscription payloads that are sent to the exchange.
297#[derive(Clone, Eq, PartialEq, Debug)]
298pub struct SubscriptionMeta<InstrumentKey> {
299    /// `HashMap` containing the mapping between a [`SubscriptionId`] and
300    /// it's associated Barter [`MarketDataInstrument`].
301    pub instrument_map: Map<InstrumentKey>,
302    /// Collection of [`WsMessage`]s containing exchange specific subscription payloads to be sent.
303    pub ws_subscriptions: Vec<WsMessage>,
304}
305
306/// New type`HashMap` that maps a [`SubscriptionId`] to some associated type `T`.
307///
308/// Used by [`ExchangeTransformer`](crate::transformer::ExchangeTransformer)s to identify the
309/// Barter [`MarketDataInstrument`] associated with incoming exchange messages.
310#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
311pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
312
313impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
314    fn from_iter<Iter>(iter: Iter) -> Self
315    where
316        Iter: IntoIterator<Item = (SubscriptionId, T)>,
317    {
318        Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
319    }
320}
321
322impl<T> Map<T> {
323    /// Find the `InstrumentKey` associated with the provided [`SubscriptionId`].
324    pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
325    where
326        SubscriptionId: Borrow<SubId>,
327        SubId: AsRef<str> + Hash + Eq + ?Sized,
328    {
329        self.0
330            .get(id)
331            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
332    }
333
334    /// Find the mutable reference to `T` associated with the provided [`SubscriptionId`].
335    pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
336    where
337        SubscriptionId: Borrow<SubId>,
338        SubId: AsRef<str> + Hash + Eq + ?Sized,
339    {
340        self.0
341            .get_mut(id)
342            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
343    }
344}
345
346#[cfg(test)]
347#[allow(clippy::unwrap_used)] // Test code: panics on bad input are acceptable
348mod tests {
349    use super::*;
350
351    mod subscription {
352        use super::*;
353        use crate::{
354            exchange::{coinbase::Coinbase, okx::Okx},
355            subscription::trade::PublicTrades,
356        };
357        use rustrade_instrument::instrument::market_data::MarketDataInstrument;
358
359        mod de {
360            use super::*;
361            use crate::{
362                exchange::{
363                    binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
364                    gateio::perpetual::GateioPerpetualsUsd,
365                    okx::Okx,
366                },
367                subscription::{book::OrderBooksL2, trade::PublicTrades},
368            };
369            use rustrade_instrument::instrument::market_data::MarketDataInstrument;
370
371            #[test]
372            fn test_subscription_okx_spot_public_trades() {
373                let input = r#"
374                {
375                    "exchange": "okx",
376                    "base": "btc",
377                    "quote": "usdt",
378                    "instrument_kind": "spot",
379                    "kind": "public_trades"
380                }
381                "#;
382
383                serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
384                    input,
385                )
386                .unwrap();
387            }
388
389            #[test]
390            fn test_subscription_binance_spot_public_trades() {
391                let input = r#"
392                {
393                    "exchange": "binance_spot",
394                    "base": "btc",
395                    "quote": "usdt",
396                    "instrument_kind": "spot",
397                    "kind": "public_trades"
398                }
399                "#;
400
401                serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
402                    .unwrap();
403            }
404
405            #[test]
406            fn test_subscription_binance_futures_usd_order_books_l2() {
407                let input = r#"
408                {
409                    "exchange": "binance_futures_usd",
410                    "base": "btc",
411                    "quote": "usdt",
412                    "instrument_kind": "perpetual",
413                    "kind": "order_books_l2"
414                }
415                "#;
416
417                serde_json::from_str::<
418                    Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
419                >(input)
420                .unwrap();
421            }
422
423            #[test]
424            fn subscription_gateio_futures_usd_public_trades() {
425                let input = r#"
426                {
427                    "exchange": "gateio_perpetuals_usd",
428                    "base": "btc",
429                    "quote": "usdt",
430                    "instrument_kind": "perpetual",
431                    "kind": "public_trades"
432                }
433                "#;
434
435                serde_json::from_str::<
436                    Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
437                >(input)
438                .unwrap();
439            }
440        }
441
442        #[test]
443        fn test_validate_bitfinex_public_trades() {
444            struct TestCase {
445                input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
446                expected:
447                    Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
448            }
449
450            let tests = vec![
451                TestCase {
452                    // TC0: Valid Coinbase Spot PublicTrades subscription
453                    input: Subscription::from((
454                        Coinbase,
455                        "base",
456                        "quote",
457                        MarketDataInstrumentKind::Spot,
458                        PublicTrades,
459                    )),
460                    expected: Ok(Subscription::from((
461                        Coinbase,
462                        "base",
463                        "quote",
464                        MarketDataInstrumentKind::Spot,
465                        PublicTrades,
466                    ))),
467                },
468                TestCase {
469                    // TC1: Invalid Coinbase FuturePerpetual PublicTrades subscription
470                    input: Subscription::from((
471                        Coinbase,
472                        "base",
473                        "quote",
474                        MarketDataInstrumentKind::Perpetual,
475                        PublicTrades,
476                    )),
477                    expected: Err(SocketError::Unsupported {
478                        entity: "".to_string(),
479                        item: "".to_string(),
480                    }),
481                },
482            ];
483
484            for (index, test) in tests.into_iter().enumerate() {
485                let actual = test.input.validate();
486                match (actual, test.expected) {
487                    (Ok(actual), Ok(expected)) => {
488                        assert_eq!(actual, expected, "TC{} failed", index)
489                    }
490                    (Err(_), Err(_)) => {
491                        // Test passed
492                    }
493                    (actual, expected) => {
494                        // Test failed
495                        panic!(
496                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
497                        );
498                    }
499                }
500            }
501        }
502
503        #[test]
504        fn test_validate_okx_public_trades() {
505            struct TestCase {
506                input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
507                expected:
508                    Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
509            }
510
511            let tests = vec![
512                TestCase {
513                    // TC0: Valid Okx Spot PublicTrades subscription
514                    input: Subscription::from((
515                        Okx,
516                        "base",
517                        "quote",
518                        MarketDataInstrumentKind::Spot,
519                        PublicTrades,
520                    )),
521                    expected: Ok(Subscription::from((
522                        Okx,
523                        "base",
524                        "quote",
525                        MarketDataInstrumentKind::Spot,
526                        PublicTrades,
527                    ))),
528                },
529                TestCase {
530                    // TC1: Valid Okx FuturePerpetual PublicTrades subscription
531                    input: Subscription::from((
532                        Okx,
533                        "base",
534                        "quote",
535                        MarketDataInstrumentKind::Perpetual,
536                        PublicTrades,
537                    )),
538                    expected: Ok(Subscription::from((
539                        Okx,
540                        "base",
541                        "quote",
542                        MarketDataInstrumentKind::Perpetual,
543                        PublicTrades,
544                    ))),
545                },
546            ];
547
548            for (index, test) in tests.into_iter().enumerate() {
549                let actual = test.input.validate();
550                match (actual, test.expected) {
551                    (Ok(actual), Ok(expected)) => {
552                        assert_eq!(actual, expected, "TC{} failed", index)
553                    }
554                    (Err(_), Err(_)) => {
555                        // Test passed
556                    }
557                    (actual, expected) => {
558                        // Test failed
559                        panic!(
560                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
561                        );
562                    }
563                }
564            }
565        }
566    }
567
568    mod instrument_map {
569        use super::*;
570        use rustrade_instrument::instrument::market_data::MarketDataInstrument;
571
572        #[test]
573        fn test_find_instrument() {
574            // Initialise SubscriptionId-InstrumentKey HashMap
575            let ids = Map(FnvHashMap::from_iter([(
576                SubscriptionId::from("present"),
577                MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
578            )]));
579
580            struct TestCase {
581                input: SubscriptionId,
582                expected: Result<MarketDataInstrument, SocketError>,
583            }
584
585            let cases = vec![
586                TestCase {
587                    // TC0: SubscriptionId (channel) is present in the HashMap
588                    input: SubscriptionId::from("present"),
589                    expected: Ok(MarketDataInstrument::from((
590                        "base",
591                        "quote",
592                        MarketDataInstrumentKind::Spot,
593                    ))),
594                },
595                TestCase {
596                    // TC1: SubscriptionId (channel) is not present in the HashMap
597                    input: SubscriptionId::from("not present"),
598                    expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
599                        "not present",
600                    ))),
601                },
602            ];
603
604            for (index, test) in cases.into_iter().enumerate() {
605                let actual = ids.find(&test.input);
606                match (actual, test.expected) {
607                    (Ok(actual), Ok(expected)) => {
608                        assert_eq!(*actual, expected, "TC{} failed", index)
609                    }
610                    (Err(_), Err(_)) => {
611                        // Test passed
612                    }
613                    (actual, expected) => {
614                        // Test failed
615                        panic!(
616                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
617                        );
618                    }
619                }
620            }
621        }
622    }
623}