1use crate::{exchange::Connector, instrument::InstrumentData};
2use barter_instrument::{
3 Keyed,
4 asset::name::AssetNameInternal,
5 exchange::ExchangeId,
6 instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
7};
8use barter_integration::{
9 Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
10};
11use derive_more::Display;
12use fnv::FnvHashMap;
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17pub mod book;
19
20pub mod candle;
22
23pub mod liquidation;
25
26pub mod trade;
28
29pub trait SubscriptionKind
31where
32 Self: Debug + Clone,
33{
34 type Event: Debug;
35 fn as_str(&self) -> &'static str;
36}
37
38#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
41pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
42 pub exchange: Exchange,
43 #[serde(flatten)]
44 pub instrument: Inst,
45 #[serde(alias = "type")]
46 pub kind: Kind,
47}
48
49pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
50 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
51) -> String
52where
53 Instrument: std::fmt::Display,
54 Kind: std::fmt::Display,
55{
56 subscriptions
57 .iter()
58 .map(
59 |Subscription {
60 exchange: _,
61 instrument,
62 kind,
63 }| { format_smolstr!("({instrument}, {kind})") },
64 )
65 .collect::<Vec<_>>()
66 .join(",")
67}
68
69impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
70where
71 Exchange: std::fmt::Display,
72 Instrument: std::fmt::Display,
73 Kind: std::fmt::Display,
74{
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
77 }
78}
79
80#[derive(
81 Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
82)]
83pub enum SubKind {
84 PublicTrades,
85 OrderBooksL1,
86 OrderBooksL2,
87 OrderBooksL3,
88 Liquidations,
89 Candles,
90}
91
92impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
93 for Subscription<Exchange, MarketDataInstrument, Kind>
94where
95 S: Into<AssetNameInternal>,
96{
97 fn from(
98 (exchange, base, quote, instrument_kind, kind): (
99 Exchange,
100 S,
101 S,
102 MarketDataInstrumentKind,
103 Kind,
104 ),
105 ) -> Self {
106 Self::new(exchange, (base, quote, instrument_kind), kind)
107 }
108}
109
110impl<InstrumentKey, Exchange, S, Kind>
111 From<(
112 InstrumentKey,
113 Exchange,
114 S,
115 S,
116 MarketDataInstrumentKind,
117 Kind,
118 )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
119where
120 S: Into<AssetNameInternal>,
121{
122 fn from(
123 (instrument_id, exchange, base, quote, instrument_kind, kind): (
124 InstrumentKey,
125 Exchange,
126 S,
127 S,
128 MarketDataInstrumentKind,
129 Kind,
130 ),
131 ) -> Self {
132 let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
133
134 Self::new(exchange, instrument, kind)
135 }
136}
137
138impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
139 for Subscription<Exchange, Instrument, Kind>
140where
141 I: Into<Instrument>,
142{
143 fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
144 Self::new(exchange, instrument, kind)
145 }
146}
147
148impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
149 pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
151 where
152 I: Into<Instrument>,
153 {
154 Self {
155 exchange,
156 instrument: instrument.into(),
157 kind,
158 }
159 }
160}
161
162impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
163where
164 Exchange: Connector,
165 Instrument: InstrumentData,
166{
167 fn validate(self) -> Result<Self, SocketError>
168 where
169 Self: Sized,
170 {
171 if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
173 Ok(self)
174 } else {
175 Err(SocketError::Unsupported {
176 entity: Exchange::ID.to_string(),
177 item: self.instrument.kind().to_string(),
178 })
179 }
180 }
181}
182
183#[allow(clippy::match_like_matches_macro)]
186pub fn exchange_supports_instrument_kind(
187 exchange: ExchangeId,
188 instrument_kind: &MarketDataInstrumentKind,
189) -> bool {
190 use barter_instrument::{
191 exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
192 };
193
194 match (exchange, instrument_kind) {
195 (
197 BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
198 | GateioPerpetualsBtc,
199 Spot,
200 ) => false,
201 (_, Spot) => true,
202
203 (GateioFuturesUsd | GateioFuturesBtc | Okx, Future { .. }) => true,
205 (_, Future { .. }) => false,
206
207 (
209 BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
210 | GateioPerpetualsBtc,
211 Perpetual,
212 ) => true,
213 (_, Perpetual) => false,
214
215 (GateioOptions | Okx, Option { .. }) => true,
217 (_, Option { .. }) => false,
218 }
219}
220
221impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
222where
223 Instrument: InstrumentData,
224{
225 fn validate(self) -> Result<Self, SocketError>
226 where
227 Self: Sized,
228 {
229 if exchange_supports_instrument_kind_sub_kind(
231 &self.exchange,
232 self.instrument.kind(),
233 self.kind,
234 ) {
235 Ok(self)
236 } else {
237 Err(SocketError::Unsupported {
238 entity: self.exchange.to_string(),
239 item: format!("({}, {})", self.instrument.kind(), self.kind),
240 })
241 }
242 }
243}
244
245pub fn exchange_supports_instrument_kind_sub_kind(
248 exchange_id: &ExchangeId,
249 instrument_kind: &MarketDataInstrumentKind,
250 sub_kind: SubKind,
251) -> bool {
252 use ExchangeId::*;
253 use MarketDataInstrumentKind::*;
254 use SubKind::*;
255
256 match (exchange_id, instrument_kind, sub_kind) {
257 (BinanceSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
258 (
259 BinanceFuturesUsd,
260 Perpetual,
261 PublicTrades | OrderBooksL1 | OrderBooksL2 | Liquidations,
262 ) => true,
263 (Bitfinex, Spot, PublicTrades) => true,
264 (Bitmex, Perpetual, PublicTrades) => true,
265 (BybitSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
266 (BybitPerpetualsUsd, Perpetual, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
267 (Coinbase, Spot, PublicTrades) => true,
268 (GateioSpot, Spot, PublicTrades) => true,
269 (GateioFuturesUsd, Future { .. }, PublicTrades) => true,
270 (GateioFuturesBtc, Future { .. }, PublicTrades) => true,
271 (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
272 (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
273 (GateioOptions, Option { .. }, PublicTrades) => true,
274 (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
275 (Okx, Spot | Future { .. } | Perpetual | Option { .. }, PublicTrades) => true,
276
277 (_, _, _) => false,
278 }
279}
280
281#[derive(Clone, Eq, PartialEq, Debug)]
284pub struct SubscriptionMeta<InstrumentKey> {
285 pub instrument_map: Map<InstrumentKey>,
288 pub ws_subscriptions: Vec<WsMessage>,
290}
291
292#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
297pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
298
299impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
300 fn from_iter<Iter>(iter: Iter) -> Self
301 where
302 Iter: IntoIterator<Item = (SubscriptionId, T)>,
303 {
304 Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
305 }
306}
307
308impl<T> Map<T> {
309 pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
311 where
312 SubscriptionId: Borrow<SubId>,
313 SubId: AsRef<str> + Hash + Eq + ?Sized,
314 {
315 self.0
316 .get(id)
317 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
318 }
319
320 pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
322 where
323 SubscriptionId: Borrow<SubId>,
324 SubId: AsRef<str> + Hash + Eq + ?Sized,
325 {
326 self.0
327 .get_mut(id)
328 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 mod subscription {
337 use super::*;
338 use crate::{
339 exchange::{coinbase::Coinbase, okx::Okx},
340 subscription::trade::PublicTrades,
341 };
342 use barter_instrument::instrument::market_data::MarketDataInstrument;
343
344 mod de {
345 use super::*;
346 use crate::{
347 exchange::{
348 binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
349 gateio::perpetual::GateioPerpetualsUsd,
350 okx::Okx,
351 },
352 subscription::{book::OrderBooksL2, trade::PublicTrades},
353 };
354 use barter_instrument::instrument::market_data::MarketDataInstrument;
355
356 #[test]
357 fn test_subscription_okx_spot_public_trades() {
358 let input = r#"
359 {
360 "exchange": "okx",
361 "base": "btc",
362 "quote": "usdt",
363 "instrument_kind": "spot",
364 "kind": "public_trades"
365 }
366 "#;
367
368 serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
369 input,
370 )
371 .unwrap();
372 }
373
374 #[test]
375 fn test_subscription_binance_spot_public_trades() {
376 let input = r#"
377 {
378 "exchange": "binance_spot",
379 "base": "btc",
380 "quote": "usdt",
381 "instrument_kind": "spot",
382 "kind": "public_trades"
383 }
384 "#;
385
386 serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
387 .unwrap();
388 }
389
390 #[test]
391 fn test_subscription_binance_futures_usd_order_books_l2() {
392 let input = r#"
393 {
394 "exchange": "binance_futures_usd",
395 "base": "btc",
396 "quote": "usdt",
397 "instrument_kind": "perpetual",
398 "kind": "order_books_l2"
399 }
400 "#;
401
402 serde_json::from_str::<
403 Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
404 >(input)
405 .unwrap();
406 }
407
408 #[test]
409 fn subscription_gateio_futures_usd_public_trades() {
410 let input = r#"
411 {
412 "exchange": "gateio_perpetuals_usd",
413 "base": "btc",
414 "quote": "usdt",
415 "instrument_kind": "perpetual",
416 "kind": "public_trades"
417 }
418 "#;
419
420 serde_json::from_str::<
421 Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
422 >(input)
423 .unwrap();
424 }
425 }
426
427 #[test]
428 fn test_validate_bitfinex_public_trades() {
429 struct TestCase {
430 input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
431 expected:
432 Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
433 }
434
435 let tests = vec![
436 TestCase {
437 input: Subscription::from((
439 Coinbase,
440 "base",
441 "quote",
442 MarketDataInstrumentKind::Spot,
443 PublicTrades,
444 )),
445 expected: Ok(Subscription::from((
446 Coinbase,
447 "base",
448 "quote",
449 MarketDataInstrumentKind::Spot,
450 PublicTrades,
451 ))),
452 },
453 TestCase {
454 input: Subscription::from((
456 Coinbase,
457 "base",
458 "quote",
459 MarketDataInstrumentKind::Perpetual,
460 PublicTrades,
461 )),
462 expected: Err(SocketError::Unsupported {
463 entity: "".to_string(),
464 item: "".to_string(),
465 }),
466 },
467 ];
468
469 for (index, test) in tests.into_iter().enumerate() {
470 let actual = test.input.validate();
471 match (actual, test.expected) {
472 (Ok(actual), Ok(expected)) => {
473 assert_eq!(actual, expected, "TC{} failed", index)
474 }
475 (Err(_), Err(_)) => {
476 }
478 (actual, expected) => {
479 panic!(
481 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
482 );
483 }
484 }
485 }
486 }
487
488 #[test]
489 fn test_validate_okx_public_trades() {
490 struct TestCase {
491 input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
492 expected:
493 Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
494 }
495
496 let tests = vec![
497 TestCase {
498 input: Subscription::from((
500 Okx,
501 "base",
502 "quote",
503 MarketDataInstrumentKind::Spot,
504 PublicTrades,
505 )),
506 expected: Ok(Subscription::from((
507 Okx,
508 "base",
509 "quote",
510 MarketDataInstrumentKind::Spot,
511 PublicTrades,
512 ))),
513 },
514 TestCase {
515 input: Subscription::from((
517 Okx,
518 "base",
519 "quote",
520 MarketDataInstrumentKind::Perpetual,
521 PublicTrades,
522 )),
523 expected: Ok(Subscription::from((
524 Okx,
525 "base",
526 "quote",
527 MarketDataInstrumentKind::Perpetual,
528 PublicTrades,
529 ))),
530 },
531 ];
532
533 for (index, test) in tests.into_iter().enumerate() {
534 let actual = test.input.validate();
535 match (actual, test.expected) {
536 (Ok(actual), Ok(expected)) => {
537 assert_eq!(actual, expected, "TC{} failed", index)
538 }
539 (Err(_), Err(_)) => {
540 }
542 (actual, expected) => {
543 panic!(
545 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
546 );
547 }
548 }
549 }
550 }
551 }
552
553 mod instrument_map {
554 use super::*;
555 use barter_instrument::instrument::market_data::MarketDataInstrument;
556
557 #[test]
558 fn test_find_instrument() {
559 let ids = Map(FnvHashMap::from_iter([(
561 SubscriptionId::from("present"),
562 MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
563 )]));
564
565 struct TestCase {
566 input: SubscriptionId,
567 expected: Result<MarketDataInstrument, SocketError>,
568 }
569
570 let cases = vec![
571 TestCase {
572 input: SubscriptionId::from("present"),
574 expected: Ok(MarketDataInstrument::from((
575 "base",
576 "quote",
577 MarketDataInstrumentKind::Spot,
578 ))),
579 },
580 TestCase {
581 input: SubscriptionId::from("not present"),
583 expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
584 "not present",
585 ))),
586 },
587 ];
588
589 for (index, test) in cases.into_iter().enumerate() {
590 let actual = ids.find(&test.input);
591 match (actual, test.expected) {
592 (Ok(actual), Ok(expected)) => {
593 assert_eq!(*actual, expected, "TC{} failed", index)
594 }
595 (Err(_), Err(_)) => {
596 }
598 (actual, expected) => {
599 panic!(
601 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
602 );
603 }
604 }
605 }
606 }
607 }
608}