1use crate::{exchange::Connector, instrument::InstrumentData};
2use barter_instrument::{
3 Keyed,
4 asset::name::AssetNameInternal,
5 exchange::ExchangeId,
6 instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
7};
8use barter_integration::{
9 Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
10};
11use derive_more::Display;
12use fnv::FnvHashMap;
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17pub mod book;
19
20pub mod candle;
22
23pub mod liquidation;
25
26pub mod trade;
28
29pub trait SubscriptionKind
31where
32 Self: Debug + Clone,
33{
34 type Event: Debug;
35 fn as_str(&self) -> &'static str;
36}
37
38#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
41pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
42 pub exchange: Exchange,
43 #[serde(flatten)]
44 pub instrument: Inst,
45 #[serde(alias = "type")]
46 pub kind: Kind,
47}
48
49pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
50 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
51) -> String
52where
53 Instrument: std::fmt::Display,
54 Kind: std::fmt::Display,
55{
56 subscriptions
57 .iter()
58 .map(
59 |Subscription {
60 exchange: _,
61 instrument,
62 kind,
63 }| { format_smolstr!("({instrument}, {kind})") },
64 )
65 .collect::<Vec<_>>()
66 .join(",")
67}
68
69impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
70where
71 Exchange: std::fmt::Display,
72 Instrument: std::fmt::Display,
73 Kind: std::fmt::Display,
74{
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
77 }
78}
79
80#[derive(
81 Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
82)]
83pub enum SubKind {
84 PublicTrades,
85 OrderBooksL1,
86 OrderBooksL2,
87 OrderBooksL3,
88 Liquidations,
89 Candles,
90}
91
92impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
93 for Subscription<Exchange, MarketDataInstrument, Kind>
94where
95 S: Into<AssetNameInternal>,
96{
97 fn from(
98 (exchange, base, quote, instrument_kind, kind): (
99 Exchange,
100 S,
101 S,
102 MarketDataInstrumentKind,
103 Kind,
104 ),
105 ) -> Self {
106 Self::new(exchange, (base, quote, instrument_kind), kind)
107 }
108}
109
110impl<InstrumentKey, Exchange, S, Kind>
111 From<(
112 InstrumentKey,
113 Exchange,
114 S,
115 S,
116 MarketDataInstrumentKind,
117 Kind,
118 )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
119where
120 S: Into<AssetNameInternal>,
121{
122 fn from(
123 (instrument_id, exchange, base, quote, instrument_kind, kind): (
124 InstrumentKey,
125 Exchange,
126 S,
127 S,
128 MarketDataInstrumentKind,
129 Kind,
130 ),
131 ) -> Self {
132 let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
133
134 Self::new(exchange, instrument, kind)
135 }
136}
137
138impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
139 for Subscription<Exchange, Instrument, Kind>
140where
141 I: Into<Instrument>,
142{
143 fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
144 Self::new(exchange, instrument, kind)
145 }
146}
147
148impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
149 pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
151 where
152 I: Into<Instrument>,
153 {
154 Self {
155 exchange,
156 instrument: instrument.into(),
157 kind,
158 }
159 }
160}
161
162impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
163where
164 Exchange: Connector,
165 Instrument: InstrumentData,
166{
167 fn validate(self) -> Result<Self, SocketError>
168 where
169 Self: Sized,
170 {
171 if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
173 Ok(self)
174 } else {
175 Err(SocketError::Unsupported {
176 entity: Exchange::ID.to_string(),
177 item: self.instrument.kind().to_string(),
178 })
179 }
180 }
181}
182
183#[allow(clippy::match_like_matches_macro)]
186pub fn exchange_supports_instrument_kind(
187 exchange: ExchangeId,
188 instrument_kind: &MarketDataInstrumentKind,
189) -> bool {
190 use barter_instrument::{
191 exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
192 };
193
194 match (exchange, instrument_kind) {
195 (
197 BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
198 | GateioPerpetualsBtc,
199 Spot,
200 ) => false,
201 (_, Spot) => true,
202
203 (GateioFuturesUsd | GateioFuturesBtc | Okx, Future(_)) => true,
205 (_, Future(_)) => false,
206
207 (
209 BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
210 | GateioPerpetualsBtc,
211 Perpetual,
212 ) => true,
213 (_, Perpetual) => false,
214
215 (GateioOptions | Okx, Option(_)) => true,
217 (_, Option(_)) => false,
218 }
219}
220
221impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
222where
223 Instrument: InstrumentData,
224{
225 fn validate(self) -> Result<Self, SocketError>
226 where
227 Self: Sized,
228 {
229 if exchange_supports_instrument_kind_sub_kind(
231 &self.exchange,
232 self.instrument.kind(),
233 self.kind,
234 ) {
235 Ok(self)
236 } else {
237 Err(SocketError::Unsupported {
238 entity: self.exchange.to_string(),
239 item: self.instrument.kind().to_string(),
240 })
241 }
242 }
243}
244
245pub fn exchange_supports_instrument_kind_sub_kind(
248 exchange_id: &ExchangeId,
249 instrument_kind: &MarketDataInstrumentKind,
250 sub_kind: SubKind,
251) -> bool {
252 use ExchangeId::*;
253 use MarketDataInstrumentKind::*;
254 use SubKind::*;
255
256 match (exchange_id, instrument_kind, sub_kind) {
257 (BinanceSpot, Spot, PublicTrades | OrderBooksL1) => true,
258 (BinanceFuturesUsd, Perpetual, PublicTrades | OrderBooksL1 | Liquidations) => true,
259 (Bitfinex, Spot, PublicTrades) => true,
260 (Bitmex, Perpetual, PublicTrades) => true,
261 (BybitSpot, Spot, PublicTrades) => true,
262 (BybitPerpetualsUsd, Perpetual, PublicTrades) => true,
263 (Coinbase, Spot, PublicTrades) => true,
264 (GateioSpot, Spot, PublicTrades) => true,
265 (GateioFuturesUsd, Future(_), PublicTrades) => true,
266 (GateioFuturesBtc, Future(_), PublicTrades) => true,
267 (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
268 (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
269 (GateioOptions, Option(_), PublicTrades) => true,
270 (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
271 (Okx, Spot | Future(_) | Perpetual | Option(_), PublicTrades) => true,
272
273 (_, _, _) => false,
274 }
275}
276
277#[derive(Clone, Eq, PartialEq, Debug)]
280pub struct SubscriptionMeta<InstrumentKey> {
281 pub instrument_map: Map<InstrumentKey>,
284 pub ws_subscriptions: Vec<WsMessage>,
286}
287
288#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
293pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
294
295impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
296 fn from_iter<Iter>(iter: Iter) -> Self
297 where
298 Iter: IntoIterator<Item = (SubscriptionId, T)>,
299 {
300 Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
301 }
302}
303
304impl<T> Map<T> {
305 pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
307 where
308 SubscriptionId: Borrow<SubId>,
309 SubId: AsRef<str> + Hash + Eq + ?Sized,
310 {
311 self.0
312 .get(id)
313 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
314 }
315
316 pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
318 where
319 SubscriptionId: Borrow<SubId>,
320 SubId: AsRef<str> + Hash + Eq + ?Sized,
321 {
322 self.0
323 .get_mut(id)
324 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 mod subscription {
333 use super::*;
334 use crate::{
335 exchange::{coinbase::Coinbase, okx::Okx},
336 subscription::trade::PublicTrades,
337 };
338 use barter_instrument::instrument::market_data::MarketDataInstrument;
339
340 mod de {
341 use super::*;
342 use crate::{
343 exchange::{
344 binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
345 gateio::perpetual::GateioPerpetualsUsd,
346 okx::Okx,
347 },
348 subscription::{book::OrderBooksL2, trade::PublicTrades},
349 };
350 use barter_instrument::instrument::market_data::MarketDataInstrument;
351
352 #[test]
353 fn test_subscription_okx_spot_public_trades() {
354 let input = r#"
355 {
356 "exchange": "okx",
357 "base": "btc",
358 "quote": "usdt",
359 "instrument_kind": "spot",
360 "kind": "public_trades"
361 }
362 "#;
363
364 serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
365 input,
366 )
367 .unwrap();
368 }
369
370 #[test]
371 fn test_subscription_binance_spot_public_trades() {
372 let input = r#"
373 {
374 "exchange": "binance_spot",
375 "base": "btc",
376 "quote": "usdt",
377 "instrument_kind": "spot",
378 "kind": "public_trades"
379 }
380 "#;
381
382 serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
383 .unwrap();
384 }
385
386 #[test]
387 fn test_subscription_binance_futures_usd_order_books_l2() {
388 let input = r#"
389 {
390 "exchange": "binance_futures_usd",
391 "base": "btc",
392 "quote": "usdt",
393 "instrument_kind": "perpetual",
394 "kind": "order_books_l2"
395 }
396 "#;
397
398 serde_json::from_str::<
399 Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
400 >(input)
401 .unwrap();
402 }
403
404 #[test]
405 fn subscription_gateio_futures_usd_public_trades() {
406 let input = r#"
407 {
408 "exchange": "gateio_perpetuals_usd",
409 "base": "btc",
410 "quote": "usdt",
411 "instrument_kind": "perpetual",
412 "kind": "public_trades"
413 }
414 "#;
415
416 serde_json::from_str::<
417 Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
418 >(input)
419 .unwrap();
420 }
421 }
422
423 #[test]
424 fn test_validate_bitfinex_public_trades() {
425 struct TestCase {
426 input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
427 expected:
428 Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
429 }
430
431 let tests = vec![
432 TestCase {
433 input: Subscription::from((
435 Coinbase,
436 "base",
437 "quote",
438 MarketDataInstrumentKind::Spot,
439 PublicTrades,
440 )),
441 expected: Ok(Subscription::from((
442 Coinbase,
443 "base",
444 "quote",
445 MarketDataInstrumentKind::Spot,
446 PublicTrades,
447 ))),
448 },
449 TestCase {
450 input: Subscription::from((
452 Coinbase,
453 "base",
454 "quote",
455 MarketDataInstrumentKind::Perpetual,
456 PublicTrades,
457 )),
458 expected: Err(SocketError::Unsupported {
459 entity: "".to_string(),
460 item: "".to_string(),
461 }),
462 },
463 ];
464
465 for (index, test) in tests.into_iter().enumerate() {
466 let actual = test.input.validate();
467 match (actual, test.expected) {
468 (Ok(actual), Ok(expected)) => {
469 assert_eq!(actual, expected, "TC{} failed", index)
470 }
471 (Err(_), Err(_)) => {
472 }
474 (actual, expected) => {
475 panic!(
477 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
478 );
479 }
480 }
481 }
482 }
483
484 #[test]
485 fn test_validate_okx_public_trades() {
486 struct TestCase {
487 input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
488 expected:
489 Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
490 }
491
492 let tests = vec![
493 TestCase {
494 input: Subscription::from((
496 Okx,
497 "base",
498 "quote",
499 MarketDataInstrumentKind::Spot,
500 PublicTrades,
501 )),
502 expected: Ok(Subscription::from((
503 Okx,
504 "base",
505 "quote",
506 MarketDataInstrumentKind::Spot,
507 PublicTrades,
508 ))),
509 },
510 TestCase {
511 input: Subscription::from((
513 Okx,
514 "base",
515 "quote",
516 MarketDataInstrumentKind::Perpetual,
517 PublicTrades,
518 )),
519 expected: Ok(Subscription::from((
520 Okx,
521 "base",
522 "quote",
523 MarketDataInstrumentKind::Perpetual,
524 PublicTrades,
525 ))),
526 },
527 ];
528
529 for (index, test) in tests.into_iter().enumerate() {
530 let actual = test.input.validate();
531 match (actual, test.expected) {
532 (Ok(actual), Ok(expected)) => {
533 assert_eq!(actual, expected, "TC{} failed", index)
534 }
535 (Err(_), Err(_)) => {
536 }
538 (actual, expected) => {
539 panic!(
541 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
542 );
543 }
544 }
545 }
546 }
547 }
548
549 mod instrument_map {
550 use super::*;
551 use barter_instrument::instrument::market_data::MarketDataInstrument;
552
553 #[test]
554 fn test_find_instrument() {
555 let ids = Map(FnvHashMap::from_iter([(
557 SubscriptionId::from("present"),
558 MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
559 )]));
560
561 struct TestCase {
562 input: SubscriptionId,
563 expected: Result<MarketDataInstrument, SocketError>,
564 }
565
566 let cases = vec![
567 TestCase {
568 input: SubscriptionId::from("present"),
570 expected: Ok(MarketDataInstrument::from((
571 "base",
572 "quote",
573 MarketDataInstrumentKind::Spot,
574 ))),
575 },
576 TestCase {
577 input: SubscriptionId::from("not present"),
579 expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
580 "not present",
581 ))),
582 },
583 ];
584
585 for (index, test) in cases.into_iter().enumerate() {
586 let actual = ids.find(&test.input);
587 match (actual, test.expected) {
588 (Ok(actual), Ok(expected)) => {
589 assert_eq!(*actual, expected, "TC{} failed", index)
590 }
591 (Err(_), Err(_)) => {
592 }
594 (actual, expected) => {
595 panic!(
597 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
598 );
599 }
600 }
601 }
602 }
603 }
604}