Skip to main content

barter_data/subscription/
mod.rs

1use crate::{exchange::Connector, instrument::InstrumentData};
2use barter_instrument::{
3    Keyed,
4    asset::name::AssetNameInternal,
5    exchange::ExchangeId,
6    instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
7};
8use barter_integration::{
9    Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
10};
11use derive_more::Display;
12use fnv::FnvHashMap;
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17/// OrderBook [`SubscriptionKind`]s and the associated Barter output data models.
18pub mod book;
19
20/// Candle [`SubscriptionKind`] and the associated Barter output data model.
21pub mod candle;
22
23/// Liquidation [`SubscriptionKind`] and the associated Barter output data model.
24pub mod liquidation;
25
26/// Public trade [`SubscriptionKind`] and the associated Barter output data model.
27pub mod trade;
28
29/// Defines kind of a [`Subscription`], and the output [`Self::Event`] that it yields.
30pub trait SubscriptionKind
31where
32    Self: Debug + Clone,
33{
34    type Event: Debug;
35    fn as_str(&self) -> &'static str;
36}
37
38/// Barter [`Subscription`] used to subscribe to a [`SubscriptionKind`] for a particular exchange
39/// [`MarketDataInstrument`].
40#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
41pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
42    pub exchange: Exchange,
43    #[serde(flatten)]
44    pub instrument: Inst,
45    #[serde(alias = "type")]
46    pub kind: Kind,
47}
48
49pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
50    subscriptions: &[Subscription<Exchange, Instrument, Kind>],
51) -> String
52where
53    Instrument: std::fmt::Display,
54    Kind: std::fmt::Display,
55{
56    subscriptions
57        .iter()
58        .map(
59            |Subscription {
60                 exchange: _,
61                 instrument,
62                 kind,
63             }| { format_smolstr!("({instrument}, {kind})") },
64        )
65        .collect::<Vec<_>>()
66        .join(",")
67}
68
69impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
70where
71    Exchange: std::fmt::Display,
72    Instrument: std::fmt::Display,
73    Kind: std::fmt::Display,
74{
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
77    }
78}
79
80#[derive(
81    Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
82)]
83pub enum SubKind {
84    PublicTrades,
85    OrderBooksL1,
86    OrderBooksL2,
87    OrderBooksL3,
88    Liquidations,
89    Candles,
90}
91
92impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
93    for Subscription<Exchange, MarketDataInstrument, Kind>
94where
95    S: Into<AssetNameInternal>,
96{
97    fn from(
98        (exchange, base, quote, instrument_kind, kind): (
99            Exchange,
100            S,
101            S,
102            MarketDataInstrumentKind,
103            Kind,
104        ),
105    ) -> Self {
106        Self::new(exchange, (base, quote, instrument_kind), kind)
107    }
108}
109
110impl<InstrumentKey, Exchange, S, Kind>
111    From<(
112        InstrumentKey,
113        Exchange,
114        S,
115        S,
116        MarketDataInstrumentKind,
117        Kind,
118    )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
119where
120    S: Into<AssetNameInternal>,
121{
122    fn from(
123        (instrument_id, exchange, base, quote, instrument_kind, kind): (
124            InstrumentKey,
125            Exchange,
126            S,
127            S,
128            MarketDataInstrumentKind,
129            Kind,
130        ),
131    ) -> Self {
132        let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
133
134        Self::new(exchange, instrument, kind)
135    }
136}
137
138impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
139    for Subscription<Exchange, Instrument, Kind>
140where
141    I: Into<Instrument>,
142{
143    fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
144        Self::new(exchange, instrument, kind)
145    }
146}
147
148impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
149    /// Constructs a new [`Subscription`] using the provided configuration.
150    pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
151    where
152        I: Into<Instrument>,
153    {
154        Self {
155            exchange,
156            instrument: instrument.into(),
157            kind,
158        }
159    }
160}
161
162impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
163where
164    Exchange: Connector,
165    Instrument: InstrumentData,
166{
167    type Error = SocketError;
168
169    fn validate(self) -> Result<Self, Self::Error>
170    where
171        Self: Sized,
172    {
173        // Validate the Exchange supports the Subscription InstrumentKind
174        if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
175            Ok(self)
176        } else {
177            Err(SocketError::Unsupported {
178                entity: Exchange::ID.to_string(),
179                item: self.instrument.kind().to_string(),
180            })
181        }
182    }
183}
184
185/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
186/// ingestion of market data for the provided [`MarketDataInstrumentKind`].
187#[allow(clippy::match_like_matches_macro)]
188pub fn exchange_supports_instrument_kind(
189    exchange: ExchangeId,
190    instrument_kind: &MarketDataInstrumentKind,
191) -> bool {
192    use barter_instrument::{
193        exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
194    };
195
196    match (exchange, instrument_kind) {
197        // Spot
198        (
199            BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
200            | GateioPerpetualsBtc,
201            Spot,
202        ) => false,
203        (_, Spot) => true,
204
205        // Future
206        (GateioFuturesUsd | GateioFuturesBtc | Okx, Future { .. }) => true,
207        (_, Future { .. }) => false,
208
209        // Perpetual
210        (
211            BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
212            | GateioPerpetualsBtc,
213            Perpetual,
214        ) => true,
215        (_, Perpetual) => false,
216
217        // Option
218        (GateioOptions | Okx, Option { .. }) => true,
219        (_, Option { .. }) => false,
220    }
221}
222
223impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
224where
225    Instrument: InstrumentData,
226{
227    type Error = SocketError;
228
229    fn validate(self) -> Result<Self, Self::Error>
230    where
231        Self: Sized,
232    {
233        // Validate the Exchange supports the Subscription InstrumentKind
234        if exchange_supports_instrument_kind_sub_kind(
235            &self.exchange,
236            self.instrument.kind(),
237            self.kind,
238        ) {
239            Ok(self)
240        } else {
241            Err(SocketError::Unsupported {
242                entity: self.exchange.to_string(),
243                item: format!("({}, {})", self.instrument.kind(), self.kind),
244            })
245        }
246    }
247}
248
249/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
250/// ingestion of market data for the provided [`MarketDataInstrumentKind`] and [`SubKind`] combination.
251pub fn exchange_supports_instrument_kind_sub_kind(
252    exchange_id: &ExchangeId,
253    instrument_kind: &MarketDataInstrumentKind,
254    sub_kind: SubKind,
255) -> bool {
256    use ExchangeId::*;
257    use MarketDataInstrumentKind::*;
258    use SubKind::*;
259
260    match (exchange_id, instrument_kind, sub_kind) {
261        (BinanceSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
262        (
263            BinanceFuturesUsd,
264            Perpetual,
265            PublicTrades | OrderBooksL1 | OrderBooksL2 | Liquidations,
266        ) => true,
267        (Bitfinex, Spot, PublicTrades) => true,
268        (Bitmex, Perpetual, PublicTrades) => true,
269        (BybitSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
270        (BybitPerpetualsUsd, Perpetual, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
271        (Coinbase, Spot, PublicTrades) => true,
272        (GateioSpot, Spot, PublicTrades) => true,
273        (GateioFuturesUsd, Future { .. }, PublicTrades) => true,
274        (GateioFuturesBtc, Future { .. }, PublicTrades) => true,
275        (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
276        (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
277        (GateioOptions, Option { .. }, PublicTrades) => true,
278        (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
279        (Okx, Spot | Future { .. } | Perpetual | Option { .. }, PublicTrades) => true,
280
281        (_, _, _) => false,
282    }
283}
284
285/// Metadata generated from a collection of Barter [`Subscription`]s, including the exchange
286/// specific subscription payloads that are sent to the exchange.
287#[derive(Clone, Eq, PartialEq, Debug)]
288pub struct SubscriptionMeta<InstrumentKey> {
289    /// `HashMap` containing the mapping between a [`SubscriptionId`] and
290    /// it's associated Barter [`MarketDataInstrument`].
291    pub instrument_map: Map<InstrumentKey>,
292    /// Collection of [`WsMessage`]s containing exchange specific subscription payloads to be sent.
293    pub ws_subscriptions: Vec<WsMessage>,
294}
295
296/// New type`HashMap` that maps a [`SubscriptionId`] to some associated type `T`.
297///
298/// Used by [`ExchangeTransformer`](crate::transformer::ExchangeTransformer)s to identify the
299/// Barter [`MarketDataInstrument`] associated with incoming exchange messages.
300#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
301pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
302
303impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
304    fn from_iter<Iter>(iter: Iter) -> Self
305    where
306        Iter: IntoIterator<Item = (SubscriptionId, T)>,
307    {
308        Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
309    }
310}
311
312impl<T> Map<T> {
313    /// Find the `InstrumentKey` associated with the provided [`SubscriptionId`].
314    pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
315    where
316        SubscriptionId: Borrow<SubId>,
317        SubId: AsRef<str> + Hash + Eq + ?Sized,
318    {
319        self.0
320            .get(id)
321            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
322    }
323
324    /// Find the mutable reference to `T` associated with the provided [`SubscriptionId`].
325    pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
326    where
327        SubscriptionId: Borrow<SubId>,
328        SubId: AsRef<str> + Hash + Eq + ?Sized,
329    {
330        self.0
331            .get_mut(id)
332            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    mod subscription {
341        use super::*;
342        use crate::{
343            exchange::{coinbase::Coinbase, okx::Okx},
344            subscription::trade::PublicTrades,
345        };
346        use barter_instrument::instrument::market_data::MarketDataInstrument;
347
348        mod de {
349            use super::*;
350            use crate::{
351                exchange::{
352                    binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
353                    gateio::perpetual::GateioPerpetualsUsd,
354                    okx::Okx,
355                },
356                subscription::{book::OrderBooksL2, trade::PublicTrades},
357            };
358            use barter_instrument::instrument::market_data::MarketDataInstrument;
359
360            #[test]
361            fn test_subscription_okx_spot_public_trades() {
362                let input = r#"
363                {
364                    "exchange": "okx",
365                    "base": "btc",
366                    "quote": "usdt",
367                    "instrument_kind": "spot",
368                    "kind": "public_trades"
369                }
370                "#;
371
372                serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
373                    input,
374                )
375                .unwrap();
376            }
377
378            #[test]
379            fn test_subscription_binance_spot_public_trades() {
380                let input = r#"
381                {
382                    "exchange": "binance_spot",
383                    "base": "btc",
384                    "quote": "usdt",
385                    "instrument_kind": "spot",
386                    "kind": "public_trades"
387                }
388                "#;
389
390                serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
391                    .unwrap();
392            }
393
394            #[test]
395            fn test_subscription_binance_futures_usd_order_books_l2() {
396                let input = r#"
397                {
398                    "exchange": "binance_futures_usd",
399                    "base": "btc",
400                    "quote": "usdt",
401                    "instrument_kind": "perpetual",
402                    "kind": "order_books_l2"
403                }
404                "#;
405
406                serde_json::from_str::<
407                    Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
408                >(input)
409                .unwrap();
410            }
411
412            #[test]
413            fn subscription_gateio_futures_usd_public_trades() {
414                let input = r#"
415                {
416                    "exchange": "gateio_perpetuals_usd",
417                    "base": "btc",
418                    "quote": "usdt",
419                    "instrument_kind": "perpetual",
420                    "kind": "public_trades"
421                }
422                "#;
423
424                serde_json::from_str::<
425                    Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
426                >(input)
427                .unwrap();
428            }
429        }
430
431        #[test]
432        fn test_validate_bitfinex_public_trades() {
433            struct TestCase {
434                input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
435                expected:
436                    Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
437            }
438
439            let tests = vec![
440                TestCase {
441                    // TC0: Valid Coinbase Spot PublicTrades subscription
442                    input: Subscription::from((
443                        Coinbase,
444                        "base",
445                        "quote",
446                        MarketDataInstrumentKind::Spot,
447                        PublicTrades,
448                    )),
449                    expected: Ok(Subscription::from((
450                        Coinbase,
451                        "base",
452                        "quote",
453                        MarketDataInstrumentKind::Spot,
454                        PublicTrades,
455                    ))),
456                },
457                TestCase {
458                    // TC1: Invalid Coinbase FuturePerpetual PublicTrades subscription
459                    input: Subscription::from((
460                        Coinbase,
461                        "base",
462                        "quote",
463                        MarketDataInstrumentKind::Perpetual,
464                        PublicTrades,
465                    )),
466                    expected: Err(SocketError::Unsupported {
467                        entity: "".to_string(),
468                        item: "".to_string(),
469                    }),
470                },
471            ];
472
473            for (index, test) in tests.into_iter().enumerate() {
474                let actual = test.input.validate();
475                match (actual, test.expected) {
476                    (Ok(actual), Ok(expected)) => {
477                        assert_eq!(actual, expected, "TC{} failed", index)
478                    }
479                    (Err(_), Err(_)) => {
480                        // Test passed
481                    }
482                    (actual, expected) => {
483                        // Test failed
484                        panic!(
485                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
486                        );
487                    }
488                }
489            }
490        }
491
492        #[test]
493        fn test_validate_okx_public_trades() {
494            struct TestCase {
495                input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
496                expected:
497                    Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
498            }
499
500            let tests = vec![
501                TestCase {
502                    // TC0: Valid Okx Spot PublicTrades subscription
503                    input: Subscription::from((
504                        Okx,
505                        "base",
506                        "quote",
507                        MarketDataInstrumentKind::Spot,
508                        PublicTrades,
509                    )),
510                    expected: Ok(Subscription::from((
511                        Okx,
512                        "base",
513                        "quote",
514                        MarketDataInstrumentKind::Spot,
515                        PublicTrades,
516                    ))),
517                },
518                TestCase {
519                    // TC1: Valid Okx FuturePerpetual PublicTrades subscription
520                    input: Subscription::from((
521                        Okx,
522                        "base",
523                        "quote",
524                        MarketDataInstrumentKind::Perpetual,
525                        PublicTrades,
526                    )),
527                    expected: Ok(Subscription::from((
528                        Okx,
529                        "base",
530                        "quote",
531                        MarketDataInstrumentKind::Perpetual,
532                        PublicTrades,
533                    ))),
534                },
535            ];
536
537            for (index, test) in tests.into_iter().enumerate() {
538                let actual = test.input.validate();
539                match (actual, test.expected) {
540                    (Ok(actual), Ok(expected)) => {
541                        assert_eq!(actual, expected, "TC{} failed", index)
542                    }
543                    (Err(_), Err(_)) => {
544                        // Test passed
545                    }
546                    (actual, expected) => {
547                        // Test failed
548                        panic!(
549                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
550                        );
551                    }
552                }
553            }
554        }
555    }
556
557    mod instrument_map {
558        use super::*;
559        use barter_instrument::instrument::market_data::MarketDataInstrument;
560
561        #[test]
562        fn test_find_instrument() {
563            // Initialise SubscriptionId-InstrumentKey HashMap
564            let ids = Map(FnvHashMap::from_iter([(
565                SubscriptionId::from("present"),
566                MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
567            )]));
568
569            struct TestCase {
570                input: SubscriptionId,
571                expected: Result<MarketDataInstrument, SocketError>,
572            }
573
574            let cases = vec![
575                TestCase {
576                    // TC0: SubscriptionId (channel) is present in the HashMap
577                    input: SubscriptionId::from("present"),
578                    expected: Ok(MarketDataInstrument::from((
579                        "base",
580                        "quote",
581                        MarketDataInstrumentKind::Spot,
582                    ))),
583                },
584                TestCase {
585                    // TC1: SubscriptionId (channel) is not present in the HashMap
586                    input: SubscriptionId::from("not present"),
587                    expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
588                        "not present",
589                    ))),
590                },
591            ];
592
593            for (index, test) in cases.into_iter().enumerate() {
594                let actual = ids.find(&test.input);
595                match (actual, test.expected) {
596                    (Ok(actual), Ok(expected)) => {
597                        assert_eq!(*actual, expected, "TC{} failed", index)
598                    }
599                    (Err(_), Err(_)) => {
600                        // Test passed
601                    }
602                    (actual, expected) => {
603                        // Test failed
604                        panic!(
605                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
606                        );
607                    }
608                }
609            }
610        }
611    }
612}