barter_data/subscription/
mod.rs

1use crate::{exchange::Connector, instrument::InstrumentData};
2use barter_instrument::{
3    Keyed,
4    asset::name::AssetNameInternal,
5    exchange::ExchangeId,
6    instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
7};
8use barter_integration::{
9    Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
10};
11use derive_more::Display;
12use fnv::FnvHashMap;
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17/// OrderBook [`SubscriptionKind`]s and the associated Barter output data models.
18pub mod book;
19
20/// Candle [`SubscriptionKind`] and the associated Barter output data model.
21pub mod candle;
22
23/// Liquidation [`SubscriptionKind`] and the associated Barter output data model.
24pub mod liquidation;
25
26/// Public trade [`SubscriptionKind`] and the associated Barter output data model.
27pub mod trade;
28
29/// Defines kind of a [`Subscription`], and the output [`Self::Event`] that it yields.
30pub trait SubscriptionKind
31where
32    Self: Debug + Clone,
33{
34    type Event: Debug;
35    fn as_str(&self) -> &'static str;
36}
37
38/// Barter [`Subscription`] used to subscribe to a [`SubscriptionKind`] for a particular exchange
39/// [`MarketDataInstrument`].
40#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
41pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
42    pub exchange: Exchange,
43    #[serde(flatten)]
44    pub instrument: Inst,
45    #[serde(alias = "type")]
46    pub kind: Kind,
47}
48
49pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
50    subscriptions: &[Subscription<Exchange, Instrument, Kind>],
51) -> String
52where
53    Instrument: std::fmt::Display,
54    Kind: std::fmt::Display,
55{
56    subscriptions
57        .iter()
58        .map(
59            |Subscription {
60                 exchange: _,
61                 instrument,
62                 kind,
63             }| { format_smolstr!("({instrument}, {kind})") },
64        )
65        .collect::<Vec<_>>()
66        .join(",")
67}
68
69impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
70where
71    Exchange: std::fmt::Display,
72    Instrument: std::fmt::Display,
73    Kind: std::fmt::Display,
74{
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
77    }
78}
79
80#[derive(
81    Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
82)]
83pub enum SubKind {
84    PublicTrades,
85    OrderBooksL1,
86    OrderBooksL2,
87    OrderBooksL3,
88    Liquidations,
89    Candles,
90}
91
92impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
93    for Subscription<Exchange, MarketDataInstrument, Kind>
94where
95    S: Into<AssetNameInternal>,
96{
97    fn from(
98        (exchange, base, quote, instrument_kind, kind): (
99            Exchange,
100            S,
101            S,
102            MarketDataInstrumentKind,
103            Kind,
104        ),
105    ) -> Self {
106        Self::new(exchange, (base, quote, instrument_kind), kind)
107    }
108}
109
110impl<InstrumentKey, Exchange, S, Kind>
111    From<(
112        InstrumentKey,
113        Exchange,
114        S,
115        S,
116        MarketDataInstrumentKind,
117        Kind,
118    )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
119where
120    S: Into<AssetNameInternal>,
121{
122    fn from(
123        (instrument_id, exchange, base, quote, instrument_kind, kind): (
124            InstrumentKey,
125            Exchange,
126            S,
127            S,
128            MarketDataInstrumentKind,
129            Kind,
130        ),
131    ) -> Self {
132        let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
133
134        Self::new(exchange, instrument, kind)
135    }
136}
137
138impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
139    for Subscription<Exchange, Instrument, Kind>
140where
141    I: Into<Instrument>,
142{
143    fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
144        Self::new(exchange, instrument, kind)
145    }
146}
147
148impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
149    /// Constructs a new [`Subscription`] using the provided configuration.
150    pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
151    where
152        I: Into<Instrument>,
153    {
154        Self {
155            exchange,
156            instrument: instrument.into(),
157            kind,
158        }
159    }
160}
161
162impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
163where
164    Exchange: Connector,
165    Instrument: InstrumentData,
166{
167    fn validate(self) -> Result<Self, SocketError>
168    where
169        Self: Sized,
170    {
171        // Validate the Exchange supports the Subscription InstrumentKind
172        if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
173            Ok(self)
174        } else {
175            Err(SocketError::Unsupported {
176                entity: Exchange::ID.to_string(),
177                item: self.instrument.kind().to_string(),
178            })
179        }
180    }
181}
182
183/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
184/// ingestion of market data for the provided [`MarketDataInstrumentKind`].
185#[allow(clippy::match_like_matches_macro)]
186pub fn exchange_supports_instrument_kind(
187    exchange: ExchangeId,
188    instrument_kind: &MarketDataInstrumentKind,
189) -> bool {
190    use barter_instrument::{
191        exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
192    };
193
194    match (exchange, instrument_kind) {
195        // Spot
196        (
197            BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
198            | GateioPerpetualsBtc,
199            Spot,
200        ) => false,
201        (_, Spot) => true,
202
203        // Future
204        (GateioFuturesUsd | GateioFuturesBtc | Okx, Future { .. }) => true,
205        (_, Future { .. }) => false,
206
207        // Perpetual
208        (
209            BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
210            | GateioPerpetualsBtc,
211            Perpetual,
212        ) => true,
213        (_, Perpetual) => false,
214
215        // Option
216        (GateioOptions | Okx, Option { .. }) => true,
217        (_, Option { .. }) => false,
218    }
219}
220
221impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
222where
223    Instrument: InstrumentData,
224{
225    fn validate(self) -> Result<Self, SocketError>
226    where
227        Self: Sized,
228    {
229        // Validate the Exchange supports the Subscription InstrumentKind
230        if exchange_supports_instrument_kind_sub_kind(
231            &self.exchange,
232            self.instrument.kind(),
233            self.kind,
234        ) {
235            Ok(self)
236        } else {
237            Err(SocketError::Unsupported {
238                entity: self.exchange.to_string(),
239                item: format!("({}, {})", self.instrument.kind(), self.kind),
240            })
241        }
242    }
243}
244
245/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
246/// ingestion of market data for the provided [`MarketDataInstrumentKind`] and [`SubKind`] combination.
247pub fn exchange_supports_instrument_kind_sub_kind(
248    exchange_id: &ExchangeId,
249    instrument_kind: &MarketDataInstrumentKind,
250    sub_kind: SubKind,
251) -> bool {
252    use ExchangeId::*;
253    use MarketDataInstrumentKind::*;
254    use SubKind::*;
255
256    match (exchange_id, instrument_kind, sub_kind) {
257        (BinanceSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
258        (
259            BinanceFuturesUsd,
260            Perpetual,
261            PublicTrades | OrderBooksL1 | OrderBooksL2 | Liquidations,
262        ) => true,
263        (Bitfinex, Spot, PublicTrades) => true,
264        (Bitmex, Perpetual, PublicTrades) => true,
265        (BybitSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
266        (BybitPerpetualsUsd, Perpetual, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
267        (Coinbase, Spot, PublicTrades) => true,
268        (GateioSpot, Spot, PublicTrades) => true,
269        (GateioFuturesUsd, Future { .. }, PublicTrades) => true,
270        (GateioFuturesBtc, Future { .. }, PublicTrades) => true,
271        (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
272        (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
273        (GateioOptions, Option { .. }, PublicTrades) => true,
274        (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
275        (Okx, Spot | Future { .. } | Perpetual | Option { .. }, PublicTrades) => true,
276
277        (_, _, _) => false,
278    }
279}
280
281/// Metadata generated from a collection of Barter [`Subscription`]s, including the exchange
282/// specific subscription payloads that are sent to the exchange.
283#[derive(Clone, Eq, PartialEq, Debug)]
284pub struct SubscriptionMeta<InstrumentKey> {
285    /// `HashMap` containing the mapping between a [`SubscriptionId`] and
286    /// it's associated Barter [`MarketDataInstrument`].
287    pub instrument_map: Map<InstrumentKey>,
288    /// Collection of [`WsMessage`]s containing exchange specific subscription payloads to be sent.
289    pub ws_subscriptions: Vec<WsMessage>,
290}
291
292/// New type`HashMap` that maps a [`SubscriptionId`] to some associated type `T`.
293///
294/// Used by [`ExchangeTransformer`](crate::transformer::ExchangeTransformer)s to identify the
295/// Barter [`MarketDataInstrument`] associated with incoming exchange messages.
296#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
297pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
298
299impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
300    fn from_iter<Iter>(iter: Iter) -> Self
301    where
302        Iter: IntoIterator<Item = (SubscriptionId, T)>,
303    {
304        Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
305    }
306}
307
308impl<T> Map<T> {
309    /// Find the `InstrumentKey` associated with the provided [`SubscriptionId`].
310    pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
311    where
312        SubscriptionId: Borrow<SubId>,
313        SubId: AsRef<str> + Hash + Eq + ?Sized,
314    {
315        self.0
316            .get(id)
317            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
318    }
319
320    /// Find the mutable reference to `T` associated with the provided [`SubscriptionId`].
321    pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
322    where
323        SubscriptionId: Borrow<SubId>,
324        SubId: AsRef<str> + Hash + Eq + ?Sized,
325    {
326        self.0
327            .get_mut(id)
328            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    mod subscription {
337        use super::*;
338        use crate::{
339            exchange::{coinbase::Coinbase, okx::Okx},
340            subscription::trade::PublicTrades,
341        };
342        use barter_instrument::instrument::market_data::MarketDataInstrument;
343
344        mod de {
345            use super::*;
346            use crate::{
347                exchange::{
348                    binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
349                    gateio::perpetual::GateioPerpetualsUsd,
350                    okx::Okx,
351                },
352                subscription::{book::OrderBooksL2, trade::PublicTrades},
353            };
354            use barter_instrument::instrument::market_data::MarketDataInstrument;
355
356            #[test]
357            fn test_subscription_okx_spot_public_trades() {
358                let input = r#"
359                {
360                    "exchange": "okx",
361                    "base": "btc",
362                    "quote": "usdt",
363                    "instrument_kind": "spot",
364                    "kind": "public_trades"
365                }
366                "#;
367
368                serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
369                    input,
370                )
371                .unwrap();
372            }
373
374            #[test]
375            fn test_subscription_binance_spot_public_trades() {
376                let input = r#"
377                {
378                    "exchange": "binance_spot",
379                    "base": "btc",
380                    "quote": "usdt",
381                    "instrument_kind": "spot",
382                    "kind": "public_trades"
383                }
384                "#;
385
386                serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
387                    .unwrap();
388            }
389
390            #[test]
391            fn test_subscription_binance_futures_usd_order_books_l2() {
392                let input = r#"
393                {
394                    "exchange": "binance_futures_usd",
395                    "base": "btc",
396                    "quote": "usdt",
397                    "instrument_kind": "perpetual",
398                    "kind": "order_books_l2"
399                }
400                "#;
401
402                serde_json::from_str::<
403                    Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
404                >(input)
405                .unwrap();
406            }
407
408            #[test]
409            fn subscription_gateio_futures_usd_public_trades() {
410                let input = r#"
411                {
412                    "exchange": "gateio_perpetuals_usd",
413                    "base": "btc",
414                    "quote": "usdt",
415                    "instrument_kind": "perpetual",
416                    "kind": "public_trades"
417                }
418                "#;
419
420                serde_json::from_str::<
421                    Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
422                >(input)
423                .unwrap();
424            }
425        }
426
427        #[test]
428        fn test_validate_bitfinex_public_trades() {
429            struct TestCase {
430                input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
431                expected:
432                    Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
433            }
434
435            let tests = vec![
436                TestCase {
437                    // TC0: Valid Coinbase Spot PublicTrades subscription
438                    input: Subscription::from((
439                        Coinbase,
440                        "base",
441                        "quote",
442                        MarketDataInstrumentKind::Spot,
443                        PublicTrades,
444                    )),
445                    expected: Ok(Subscription::from((
446                        Coinbase,
447                        "base",
448                        "quote",
449                        MarketDataInstrumentKind::Spot,
450                        PublicTrades,
451                    ))),
452                },
453                TestCase {
454                    // TC1: Invalid Coinbase FuturePerpetual PublicTrades subscription
455                    input: Subscription::from((
456                        Coinbase,
457                        "base",
458                        "quote",
459                        MarketDataInstrumentKind::Perpetual,
460                        PublicTrades,
461                    )),
462                    expected: Err(SocketError::Unsupported {
463                        entity: "".to_string(),
464                        item: "".to_string(),
465                    }),
466                },
467            ];
468
469            for (index, test) in tests.into_iter().enumerate() {
470                let actual = test.input.validate();
471                match (actual, test.expected) {
472                    (Ok(actual), Ok(expected)) => {
473                        assert_eq!(actual, expected, "TC{} failed", index)
474                    }
475                    (Err(_), Err(_)) => {
476                        // Test passed
477                    }
478                    (actual, expected) => {
479                        // Test failed
480                        panic!(
481                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
482                        );
483                    }
484                }
485            }
486        }
487
488        #[test]
489        fn test_validate_okx_public_trades() {
490            struct TestCase {
491                input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
492                expected:
493                    Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
494            }
495
496            let tests = vec![
497                TestCase {
498                    // TC0: Valid Okx Spot PublicTrades subscription
499                    input: Subscription::from((
500                        Okx,
501                        "base",
502                        "quote",
503                        MarketDataInstrumentKind::Spot,
504                        PublicTrades,
505                    )),
506                    expected: Ok(Subscription::from((
507                        Okx,
508                        "base",
509                        "quote",
510                        MarketDataInstrumentKind::Spot,
511                        PublicTrades,
512                    ))),
513                },
514                TestCase {
515                    // TC1: Valid Okx FuturePerpetual PublicTrades subscription
516                    input: Subscription::from((
517                        Okx,
518                        "base",
519                        "quote",
520                        MarketDataInstrumentKind::Perpetual,
521                        PublicTrades,
522                    )),
523                    expected: Ok(Subscription::from((
524                        Okx,
525                        "base",
526                        "quote",
527                        MarketDataInstrumentKind::Perpetual,
528                        PublicTrades,
529                    ))),
530                },
531            ];
532
533            for (index, test) in tests.into_iter().enumerate() {
534                let actual = test.input.validate();
535                match (actual, test.expected) {
536                    (Ok(actual), Ok(expected)) => {
537                        assert_eq!(actual, expected, "TC{} failed", index)
538                    }
539                    (Err(_), Err(_)) => {
540                        // Test passed
541                    }
542                    (actual, expected) => {
543                        // Test failed
544                        panic!(
545                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
546                        );
547                    }
548                }
549            }
550        }
551    }
552
553    mod instrument_map {
554        use super::*;
555        use barter_instrument::instrument::market_data::MarketDataInstrument;
556
557        #[test]
558        fn test_find_instrument() {
559            // Initialise SubscriptionId-InstrumentKey HashMap
560            let ids = Map(FnvHashMap::from_iter([(
561                SubscriptionId::from("present"),
562                MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
563            )]));
564
565            struct TestCase {
566                input: SubscriptionId,
567                expected: Result<MarketDataInstrument, SocketError>,
568            }
569
570            let cases = vec![
571                TestCase {
572                    // TC0: SubscriptionId (channel) is present in the HashMap
573                    input: SubscriptionId::from("present"),
574                    expected: Ok(MarketDataInstrument::from((
575                        "base",
576                        "quote",
577                        MarketDataInstrumentKind::Spot,
578                    ))),
579                },
580                TestCase {
581                    // TC1: SubscriptionId (channel) is not present in the HashMap
582                    input: SubscriptionId::from("not present"),
583                    expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
584                        "not present",
585                    ))),
586                },
587            ];
588
589            for (index, test) in cases.into_iter().enumerate() {
590                let actual = ids.find(&test.input);
591                match (actual, test.expected) {
592                    (Ok(actual), Ok(expected)) => {
593                        assert_eq!(*actual, expected, "TC{} failed", index)
594                    }
595                    (Err(_), Err(_)) => {
596                        // Test passed
597                    }
598                    (actual, expected) => {
599                        // Test failed
600                        panic!(
601                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
602                        );
603                    }
604                }
605            }
606        }
607    }
608}