barter_data/subscription/
mod.rs

1use crate::{exchange::Connector, instrument::InstrumentData};
2use barter_instrument::{
3    Keyed,
4    asset::name::AssetNameInternal,
5    exchange::ExchangeId,
6    instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
7};
8use barter_integration::{
9    Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
10};
11use derive_more::Display;
12use fnv::FnvHashMap;
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17/// OrderBook [`SubscriptionKind`]s and the associated Barter output data models.
18pub mod book;
19
20/// Candle [`SubscriptionKind`] and the associated Barter output data model.
21pub mod candle;
22
23/// Liquidation [`SubscriptionKind`] and the associated Barter output data model.
24pub mod liquidation;
25
26/// Public trade [`SubscriptionKind`] and the associated Barter output data model.
27pub mod trade;
28
29/// Defines kind of a [`Subscription`], and the output [`Self::Event`] that it yields.
30pub trait SubscriptionKind
31where
32    Self: Debug + Clone,
33{
34    type Event: Debug;
35    fn as_str(&self) -> &'static str;
36}
37
38/// Barter [`Subscription`] used to subscribe to a [`SubscriptionKind`] for a particular execution
39/// [`MarketDataInstrument`].
40#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
41pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
42    pub exchange: Exchange,
43    #[serde(flatten)]
44    pub instrument: Inst,
45    #[serde(alias = "type")]
46    pub kind: Kind,
47}
48
49pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
50    subscriptions: &[Subscription<Exchange, Instrument, Kind>],
51) -> String
52where
53    Instrument: std::fmt::Display,
54    Kind: std::fmt::Display,
55{
56    subscriptions
57        .iter()
58        .map(
59            |Subscription {
60                 exchange: _,
61                 instrument,
62                 kind,
63             }| { format_smolstr!("({instrument}, {kind})") },
64        )
65        .collect::<Vec<_>>()
66        .join(",")
67}
68
69impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
70where
71    Exchange: std::fmt::Display,
72    Instrument: std::fmt::Display,
73    Kind: std::fmt::Display,
74{
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
77    }
78}
79
80#[derive(
81    Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
82)]
83pub enum SubKind {
84    PublicTrades,
85    OrderBooksL1,
86    OrderBooksL2,
87    OrderBooksL3,
88    Liquidations,
89    Candles,
90}
91
92impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
93    for Subscription<Exchange, MarketDataInstrument, Kind>
94where
95    S: Into<AssetNameInternal>,
96{
97    fn from(
98        (exchange, base, quote, instrument_kind, kind): (
99            Exchange,
100            S,
101            S,
102            MarketDataInstrumentKind,
103            Kind,
104        ),
105    ) -> Self {
106        Self::new(exchange, (base, quote, instrument_kind), kind)
107    }
108}
109
110impl<InstrumentKey, Exchange, S, Kind>
111    From<(
112        InstrumentKey,
113        Exchange,
114        S,
115        S,
116        MarketDataInstrumentKind,
117        Kind,
118    )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
119where
120    S: Into<AssetNameInternal>,
121{
122    fn from(
123        (instrument_id, exchange, base, quote, instrument_kind, kind): (
124            InstrumentKey,
125            Exchange,
126            S,
127            S,
128            MarketDataInstrumentKind,
129            Kind,
130        ),
131    ) -> Self {
132        let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
133
134        Self::new(exchange, instrument, kind)
135    }
136}
137
138impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
139    for Subscription<Exchange, Instrument, Kind>
140where
141    I: Into<Instrument>,
142{
143    fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
144        Self::new(exchange, instrument, kind)
145    }
146}
147
148impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
149    /// Constructs a new [`Subscription`] using the provided configuration.
150    pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
151    where
152        I: Into<Instrument>,
153    {
154        Self {
155            exchange,
156            instrument: instrument.into(),
157            kind,
158        }
159    }
160}
161
162impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
163where
164    Exchange: Connector,
165    Instrument: InstrumentData,
166{
167    fn validate(self) -> Result<Self, SocketError>
168    where
169        Self: Sized,
170    {
171        // Validate the Exchange supports the Subscription InstrumentKind
172        if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
173            Ok(self)
174        } else {
175            Err(SocketError::Unsupported {
176                entity: Exchange::ID.to_string(),
177                item: self.instrument.kind().to_string(),
178            })
179        }
180    }
181}
182
183/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
184/// ingestion of market data for the provided [`MarketDataInstrumentKind`].
185#[allow(clippy::match_like_matches_macro)]
186pub fn exchange_supports_instrument_kind(
187    exchange: ExchangeId,
188    instrument_kind: &MarketDataInstrumentKind,
189) -> bool {
190    use barter_instrument::{
191        exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
192    };
193
194    match (exchange, instrument_kind) {
195        // Spot
196        (
197            BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
198            | GateioPerpetualsBtc,
199            Spot,
200        ) => false,
201        (_, Spot) => true,
202
203        // Future
204        (GateioFuturesUsd | GateioFuturesBtc | Okx, Future(_)) => true,
205        (_, Future(_)) => false,
206
207        // Perpetual
208        (
209            BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
210            | GateioPerpetualsBtc,
211            Perpetual,
212        ) => true,
213        (_, Perpetual) => false,
214
215        // Option
216        (GateioOptions | Okx, Option(_)) => true,
217        (_, Option(_)) => false,
218    }
219}
220
221impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
222where
223    Instrument: InstrumentData,
224{
225    fn validate(self) -> Result<Self, SocketError>
226    where
227        Self: Sized,
228    {
229        // Validate the Exchange supports the Subscription InstrumentKind
230        if exchange_supports_instrument_kind_sub_kind(
231            &self.exchange,
232            self.instrument.kind(),
233            self.kind,
234        ) {
235            Ok(self)
236        } else {
237            Err(SocketError::Unsupported {
238                entity: self.exchange.to_string(),
239                item: self.instrument.kind().to_string(),
240            })
241        }
242    }
243}
244
245/// Determines whether the [`Connector`] associated with this [`ExchangeId`] supports the
246/// ingestion of market data for the provided [`MarketDataInstrumentKind`] and [`SubKind`] combination.
247pub fn exchange_supports_instrument_kind_sub_kind(
248    exchange_id: &ExchangeId,
249    instrument_kind: &MarketDataInstrumentKind,
250    sub_kind: SubKind,
251) -> bool {
252    use ExchangeId::*;
253    use MarketDataInstrumentKind::*;
254    use SubKind::*;
255
256    match (exchange_id, instrument_kind, sub_kind) {
257        (BinanceSpot, Spot, PublicTrades | OrderBooksL1) => true,
258        (BinanceFuturesUsd, Perpetual, PublicTrades | OrderBooksL1 | Liquidations) => true,
259        (Bitfinex, Spot, PublicTrades) => true,
260        (Bitmex, Perpetual, PublicTrades) => true,
261        (BybitSpot, Spot, PublicTrades) => true,
262        (BybitPerpetualsUsd, Perpetual, PublicTrades) => true,
263        (Coinbase, Spot, PublicTrades) => true,
264        (GateioSpot, Spot, PublicTrades) => true,
265        (GateioFuturesUsd, Future(_), PublicTrades) => true,
266        (GateioFuturesBtc, Future(_), PublicTrades) => true,
267        (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
268        (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
269        (GateioOptions, Option(_), PublicTrades) => true,
270        (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
271        (Okx, Spot | Future(_) | Perpetual | Option(_), PublicTrades) => true,
272
273        (_, _, _) => false,
274    }
275}
276
277/// Metadata generated from a collection of Barter [`Subscription`]s, including the execution
278/// specific subscription payloads that are sent to the execution.
279#[derive(Clone, Eq, PartialEq, Debug)]
280pub struct SubscriptionMeta<InstrumentKey> {
281    /// `HashMap` containing the mapping between a [`SubscriptionId`] and
282    /// it's associated Barter [`MarketDataInstrument`].
283    pub instrument_map: Map<InstrumentKey>,
284    /// Collection of [`WsMessage`]s containing execution specific subscription payloads to be sent.
285    pub ws_subscriptions: Vec<WsMessage>,
286}
287
288/// New type`HashMap` that maps a [`SubscriptionId`] to some associated type `T`.
289///
290/// Used by [`ExchangeTransformer`](crate::transformer::ExchangeTransformer)s to identify the
291/// Barter [`MarketDataInstrument`] associated with incoming execution messages.
292#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
293pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
294
295impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
296    fn from_iter<Iter>(iter: Iter) -> Self
297    where
298        Iter: IntoIterator<Item = (SubscriptionId, T)>,
299    {
300        Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
301    }
302}
303
304impl<T> Map<T> {
305    /// Find the `InstrumentKey` associated with the provided [`SubscriptionId`].
306    pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
307    where
308        SubscriptionId: Borrow<SubId>,
309        SubId: AsRef<str> + Hash + Eq + ?Sized,
310    {
311        self.0
312            .get(id)
313            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
314    }
315
316    /// Find the mutable reference to `T` associated with the provided [`SubscriptionId`].
317    pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
318    where
319        SubscriptionId: Borrow<SubId>,
320        SubId: AsRef<str> + Hash + Eq + ?Sized,
321    {
322        self.0
323            .get_mut(id)
324            .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    mod subscription {
333        use super::*;
334        use crate::{
335            exchange::{coinbase::Coinbase, okx::Okx},
336            subscription::trade::PublicTrades,
337        };
338        use barter_instrument::instrument::market_data::MarketDataInstrument;
339
340        mod de {
341            use super::*;
342            use crate::{
343                exchange::{
344                    binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
345                    gateio::perpetual::GateioPerpetualsUsd,
346                    okx::Okx,
347                },
348                subscription::{book::OrderBooksL2, trade::PublicTrades},
349            };
350            use barter_instrument::instrument::market_data::MarketDataInstrument;
351
352            #[test]
353            fn test_subscription_okx_spot_public_trades() {
354                let input = r#"
355                {
356                    "exchange": "okx",
357                    "base": "btc",
358                    "quote": "usdt",
359                    "instrument_kind": "spot",
360                    "kind": "public_trades"
361                }
362                "#;
363
364                serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
365                    input,
366                )
367                .unwrap();
368            }
369
370            #[test]
371            fn test_subscription_binance_spot_public_trades() {
372                let input = r#"
373                {
374                    "exchange": "binance_spot",
375                    "base": "btc",
376                    "quote": "usdt",
377                    "instrument_kind": "spot",
378                    "kind": "public_trades"
379                }
380                "#;
381
382                serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
383                    .unwrap();
384            }
385
386            #[test]
387            fn test_subscription_binance_futures_usd_order_books_l2() {
388                let input = r#"
389                {
390                    "exchange": "binance_futures_usd",
391                    "base": "btc",
392                    "quote": "usdt",
393                    "instrument_kind": "perpetual",
394                    "kind": "order_books_l2"
395                }
396                "#;
397
398                serde_json::from_str::<
399                    Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
400                >(input)
401                .unwrap();
402            }
403
404            #[test]
405            fn subscription_gateio_futures_usd_public_trades() {
406                let input = r#"
407                {
408                    "exchange": "gateio_perpetuals_usd",
409                    "base": "btc",
410                    "quote": "usdt",
411                    "instrument_kind": "perpetual",
412                    "kind": "public_trades"
413                }
414                "#;
415
416                serde_json::from_str::<
417                    Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
418                >(input)
419                .unwrap();
420            }
421        }
422
423        #[test]
424        fn test_validate_bitfinex_public_trades() {
425            struct TestCase {
426                input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
427                expected:
428                    Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
429            }
430
431            let tests = vec![
432                TestCase {
433                    // TC0: Valid Coinbase Spot PublicTrades subscription
434                    input: Subscription::from((
435                        Coinbase,
436                        "base",
437                        "quote",
438                        MarketDataInstrumentKind::Spot,
439                        PublicTrades,
440                    )),
441                    expected: Ok(Subscription::from((
442                        Coinbase,
443                        "base",
444                        "quote",
445                        MarketDataInstrumentKind::Spot,
446                        PublicTrades,
447                    ))),
448                },
449                TestCase {
450                    // TC1: Invalid Coinbase FuturePerpetual PublicTrades subscription
451                    input: Subscription::from((
452                        Coinbase,
453                        "base",
454                        "quote",
455                        MarketDataInstrumentKind::Perpetual,
456                        PublicTrades,
457                    )),
458                    expected: Err(SocketError::Unsupported {
459                        entity: "".to_string(),
460                        item: "".to_string(),
461                    }),
462                },
463            ];
464
465            for (index, test) in tests.into_iter().enumerate() {
466                let actual = test.input.validate();
467                match (actual, test.expected) {
468                    (Ok(actual), Ok(expected)) => {
469                        assert_eq!(actual, expected, "TC{} failed", index)
470                    }
471                    (Err(_), Err(_)) => {
472                        // Test passed
473                    }
474                    (actual, expected) => {
475                        // Test failed
476                        panic!(
477                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
478                        );
479                    }
480                }
481            }
482        }
483
484        #[test]
485        fn test_validate_okx_public_trades() {
486            struct TestCase {
487                input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
488                expected:
489                    Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
490            }
491
492            let tests = vec![
493                TestCase {
494                    // TC0: Valid Okx Spot PublicTrades subscription
495                    input: Subscription::from((
496                        Okx,
497                        "base",
498                        "quote",
499                        MarketDataInstrumentKind::Spot,
500                        PublicTrades,
501                    )),
502                    expected: Ok(Subscription::from((
503                        Okx,
504                        "base",
505                        "quote",
506                        MarketDataInstrumentKind::Spot,
507                        PublicTrades,
508                    ))),
509                },
510                TestCase {
511                    // TC1: Valid Okx FuturePerpetual PublicTrades subscription
512                    input: Subscription::from((
513                        Okx,
514                        "base",
515                        "quote",
516                        MarketDataInstrumentKind::Perpetual,
517                        PublicTrades,
518                    )),
519                    expected: Ok(Subscription::from((
520                        Okx,
521                        "base",
522                        "quote",
523                        MarketDataInstrumentKind::Perpetual,
524                        PublicTrades,
525                    ))),
526                },
527            ];
528
529            for (index, test) in tests.into_iter().enumerate() {
530                let actual = test.input.validate();
531                match (actual, test.expected) {
532                    (Ok(actual), Ok(expected)) => {
533                        assert_eq!(actual, expected, "TC{} failed", index)
534                    }
535                    (Err(_), Err(_)) => {
536                        // Test passed
537                    }
538                    (actual, expected) => {
539                        // Test failed
540                        panic!(
541                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
542                        );
543                    }
544                }
545            }
546        }
547    }
548
549    mod instrument_map {
550        use super::*;
551        use barter_instrument::instrument::market_data::MarketDataInstrument;
552
553        #[test]
554        fn test_find_instrument() {
555            // Initialise SubscriptionId-InstrumentKey HashMap
556            let ids = Map(FnvHashMap::from_iter([(
557                SubscriptionId::from("present"),
558                MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
559            )]));
560
561            struct TestCase {
562                input: SubscriptionId,
563                expected: Result<MarketDataInstrument, SocketError>,
564            }
565
566            let cases = vec![
567                TestCase {
568                    // TC0: SubscriptionId (channel) is present in the HashMap
569                    input: SubscriptionId::from("present"),
570                    expected: Ok(MarketDataInstrument::from((
571                        "base",
572                        "quote",
573                        MarketDataInstrumentKind::Spot,
574                    ))),
575                },
576                TestCase {
577                    // TC1: SubscriptionId (channel) is not present in the HashMap
578                    input: SubscriptionId::from("not present"),
579                    expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
580                        "not present",
581                    ))),
582                },
583            ];
584
585            for (index, test) in cases.into_iter().enumerate() {
586                let actual = ids.find(&test.input);
587                match (actual, test.expected) {
588                    (Ok(actual), Ok(expected)) => {
589                        assert_eq!(*actual, expected, "TC{} failed", index)
590                    }
591                    (Err(_), Err(_)) => {
592                        // Test passed
593                    }
594                    (actual, expected) => {
595                        // Test failed
596                        panic!(
597                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
598                        );
599                    }
600                }
601            }
602        }
603    }
604}