1use crate::{exchange::Connector, instrument::InstrumentData};
2use barter_instrument::{
3 Keyed,
4 asset::name::AssetNameInternal,
5 exchange::ExchangeId,
6 instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
7};
8use barter_integration::{
9 Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
10};
11use derive_more::Display;
12use fnv::FnvHashMap;
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17pub mod book;
19
20pub mod candle;
22
23pub mod liquidation;
25
26pub mod trade;
28
29pub trait SubscriptionKind
31where
32 Self: Debug + Clone,
33{
34 type Event: Debug;
35 fn as_str(&self) -> &'static str;
36}
37
38#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
41pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
42 pub exchange: Exchange,
43 #[serde(flatten)]
44 pub instrument: Inst,
45 #[serde(alias = "type")]
46 pub kind: Kind,
47}
48
49pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
50 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
51) -> String
52where
53 Instrument: std::fmt::Display,
54 Kind: std::fmt::Display,
55{
56 subscriptions
57 .iter()
58 .map(
59 |Subscription {
60 exchange: _,
61 instrument,
62 kind,
63 }| { format_smolstr!("({instrument}, {kind})") },
64 )
65 .collect::<Vec<_>>()
66 .join(",")
67}
68
69impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
70where
71 Exchange: std::fmt::Display,
72 Instrument: std::fmt::Display,
73 Kind: std::fmt::Display,
74{
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
77 }
78}
79
80#[derive(
81 Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
82)]
83pub enum SubKind {
84 PublicTrades,
85 OrderBooksL1,
86 OrderBooksL2,
87 OrderBooksL3,
88 Liquidations,
89 Candles,
90}
91
92impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
93 for Subscription<Exchange, MarketDataInstrument, Kind>
94where
95 S: Into<AssetNameInternal>,
96{
97 fn from(
98 (exchange, base, quote, instrument_kind, kind): (
99 Exchange,
100 S,
101 S,
102 MarketDataInstrumentKind,
103 Kind,
104 ),
105 ) -> Self {
106 Self::new(exchange, (base, quote, instrument_kind), kind)
107 }
108}
109
110impl<InstrumentKey, Exchange, S, Kind>
111 From<(
112 InstrumentKey,
113 Exchange,
114 S,
115 S,
116 MarketDataInstrumentKind,
117 Kind,
118 )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
119where
120 S: Into<AssetNameInternal>,
121{
122 fn from(
123 (instrument_id, exchange, base, quote, instrument_kind, kind): (
124 InstrumentKey,
125 Exchange,
126 S,
127 S,
128 MarketDataInstrumentKind,
129 Kind,
130 ),
131 ) -> Self {
132 let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
133
134 Self::new(exchange, instrument, kind)
135 }
136}
137
138impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
139 for Subscription<Exchange, Instrument, Kind>
140where
141 I: Into<Instrument>,
142{
143 fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
144 Self::new(exchange, instrument, kind)
145 }
146}
147
148impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
149 pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
151 where
152 I: Into<Instrument>,
153 {
154 Self {
155 exchange,
156 instrument: instrument.into(),
157 kind,
158 }
159 }
160}
161
162impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
163where
164 Exchange: Connector,
165 Instrument: InstrumentData,
166{
167 type Error = SocketError;
168
169 fn validate(self) -> Result<Self, Self::Error>
170 where
171 Self: Sized,
172 {
173 if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
175 Ok(self)
176 } else {
177 Err(SocketError::Unsupported {
178 entity: Exchange::ID.to_string(),
179 item: self.instrument.kind().to_string(),
180 })
181 }
182 }
183}
184
185#[allow(clippy::match_like_matches_macro)]
188pub fn exchange_supports_instrument_kind(
189 exchange: ExchangeId,
190 instrument_kind: &MarketDataInstrumentKind,
191) -> bool {
192 use barter_instrument::{
193 exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
194 };
195
196 match (exchange, instrument_kind) {
197 (
199 BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
200 | GateioPerpetualsBtc,
201 Spot,
202 ) => false,
203 (_, Spot) => true,
204
205 (GateioFuturesUsd | GateioFuturesBtc | Okx, Future { .. }) => true,
207 (_, Future { .. }) => false,
208
209 (
211 BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
212 | GateioPerpetualsBtc,
213 Perpetual,
214 ) => true,
215 (_, Perpetual) => false,
216
217 (GateioOptions | Okx, Option { .. }) => true,
219 (_, Option { .. }) => false,
220 }
221}
222
223impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
224where
225 Instrument: InstrumentData,
226{
227 type Error = SocketError;
228
229 fn validate(self) -> Result<Self, Self::Error>
230 where
231 Self: Sized,
232 {
233 if exchange_supports_instrument_kind_sub_kind(
235 &self.exchange,
236 self.instrument.kind(),
237 self.kind,
238 ) {
239 Ok(self)
240 } else {
241 Err(SocketError::Unsupported {
242 entity: self.exchange.to_string(),
243 item: format!("({}, {})", self.instrument.kind(), self.kind),
244 })
245 }
246 }
247}
248
249pub fn exchange_supports_instrument_kind_sub_kind(
252 exchange_id: &ExchangeId,
253 instrument_kind: &MarketDataInstrumentKind,
254 sub_kind: SubKind,
255) -> bool {
256 use ExchangeId::*;
257 use MarketDataInstrumentKind::*;
258 use SubKind::*;
259
260 match (exchange_id, instrument_kind, sub_kind) {
261 (BinanceSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
262 (
263 BinanceFuturesUsd,
264 Perpetual,
265 PublicTrades | OrderBooksL1 | OrderBooksL2 | Liquidations,
266 ) => true,
267 (Bitfinex, Spot, PublicTrades) => true,
268 (Bitmex, Perpetual, PublicTrades) => true,
269 (BybitSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
270 (BybitPerpetualsUsd, Perpetual, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
271 (Coinbase, Spot, PublicTrades) => true,
272 (GateioSpot, Spot, PublicTrades) => true,
273 (GateioFuturesUsd, Future { .. }, PublicTrades) => true,
274 (GateioFuturesBtc, Future { .. }, PublicTrades) => true,
275 (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
276 (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
277 (GateioOptions, Option { .. }, PublicTrades) => true,
278 (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
279 (Okx, Spot | Future { .. } | Perpetual | Option { .. }, PublicTrades) => true,
280
281 (_, _, _) => false,
282 }
283}
284
285#[derive(Clone, Eq, PartialEq, Debug)]
288pub struct SubscriptionMeta<InstrumentKey> {
289 pub instrument_map: Map<InstrumentKey>,
292 pub ws_subscriptions: Vec<WsMessage>,
294}
295
296#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
301pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
302
303impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
304 fn from_iter<Iter>(iter: Iter) -> Self
305 where
306 Iter: IntoIterator<Item = (SubscriptionId, T)>,
307 {
308 Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
309 }
310}
311
312impl<T> Map<T> {
313 pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
315 where
316 SubscriptionId: Borrow<SubId>,
317 SubId: AsRef<str> + Hash + Eq + ?Sized,
318 {
319 self.0
320 .get(id)
321 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
322 }
323
324 pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
326 where
327 SubscriptionId: Borrow<SubId>,
328 SubId: AsRef<str> + Hash + Eq + ?Sized,
329 {
330 self.0
331 .get_mut(id)
332 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 mod subscription {
341 use super::*;
342 use crate::{
343 exchange::{coinbase::Coinbase, okx::Okx},
344 subscription::trade::PublicTrades,
345 };
346 use barter_instrument::instrument::market_data::MarketDataInstrument;
347
348 mod de {
349 use super::*;
350 use crate::{
351 exchange::{
352 binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
353 gateio::perpetual::GateioPerpetualsUsd,
354 okx::Okx,
355 },
356 subscription::{book::OrderBooksL2, trade::PublicTrades},
357 };
358 use barter_instrument::instrument::market_data::MarketDataInstrument;
359
360 #[test]
361 fn test_subscription_okx_spot_public_trades() {
362 let input = r#"
363 {
364 "exchange": "okx",
365 "base": "btc",
366 "quote": "usdt",
367 "instrument_kind": "spot",
368 "kind": "public_trades"
369 }
370 "#;
371
372 serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
373 input,
374 )
375 .unwrap();
376 }
377
378 #[test]
379 fn test_subscription_binance_spot_public_trades() {
380 let input = r#"
381 {
382 "exchange": "binance_spot",
383 "base": "btc",
384 "quote": "usdt",
385 "instrument_kind": "spot",
386 "kind": "public_trades"
387 }
388 "#;
389
390 serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
391 .unwrap();
392 }
393
394 #[test]
395 fn test_subscription_binance_futures_usd_order_books_l2() {
396 let input = r#"
397 {
398 "exchange": "binance_futures_usd",
399 "base": "btc",
400 "quote": "usdt",
401 "instrument_kind": "perpetual",
402 "kind": "order_books_l2"
403 }
404 "#;
405
406 serde_json::from_str::<
407 Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
408 >(input)
409 .unwrap();
410 }
411
412 #[test]
413 fn subscription_gateio_futures_usd_public_trades() {
414 let input = r#"
415 {
416 "exchange": "gateio_perpetuals_usd",
417 "base": "btc",
418 "quote": "usdt",
419 "instrument_kind": "perpetual",
420 "kind": "public_trades"
421 }
422 "#;
423
424 serde_json::from_str::<
425 Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
426 >(input)
427 .unwrap();
428 }
429 }
430
431 #[test]
432 fn test_validate_bitfinex_public_trades() {
433 struct TestCase {
434 input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
435 expected:
436 Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
437 }
438
439 let tests = vec![
440 TestCase {
441 input: Subscription::from((
443 Coinbase,
444 "base",
445 "quote",
446 MarketDataInstrumentKind::Spot,
447 PublicTrades,
448 )),
449 expected: Ok(Subscription::from((
450 Coinbase,
451 "base",
452 "quote",
453 MarketDataInstrumentKind::Spot,
454 PublicTrades,
455 ))),
456 },
457 TestCase {
458 input: Subscription::from((
460 Coinbase,
461 "base",
462 "quote",
463 MarketDataInstrumentKind::Perpetual,
464 PublicTrades,
465 )),
466 expected: Err(SocketError::Unsupported {
467 entity: "".to_string(),
468 item: "".to_string(),
469 }),
470 },
471 ];
472
473 for (index, test) in tests.into_iter().enumerate() {
474 let actual = test.input.validate();
475 match (actual, test.expected) {
476 (Ok(actual), Ok(expected)) => {
477 assert_eq!(actual, expected, "TC{} failed", index)
478 }
479 (Err(_), Err(_)) => {
480 }
482 (actual, expected) => {
483 panic!(
485 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
486 );
487 }
488 }
489 }
490 }
491
492 #[test]
493 fn test_validate_okx_public_trades() {
494 struct TestCase {
495 input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
496 expected:
497 Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
498 }
499
500 let tests = vec![
501 TestCase {
502 input: Subscription::from((
504 Okx,
505 "base",
506 "quote",
507 MarketDataInstrumentKind::Spot,
508 PublicTrades,
509 )),
510 expected: Ok(Subscription::from((
511 Okx,
512 "base",
513 "quote",
514 MarketDataInstrumentKind::Spot,
515 PublicTrades,
516 ))),
517 },
518 TestCase {
519 input: Subscription::from((
521 Okx,
522 "base",
523 "quote",
524 MarketDataInstrumentKind::Perpetual,
525 PublicTrades,
526 )),
527 expected: Ok(Subscription::from((
528 Okx,
529 "base",
530 "quote",
531 MarketDataInstrumentKind::Perpetual,
532 PublicTrades,
533 ))),
534 },
535 ];
536
537 for (index, test) in tests.into_iter().enumerate() {
538 let actual = test.input.validate();
539 match (actual, test.expected) {
540 (Ok(actual), Ok(expected)) => {
541 assert_eq!(actual, expected, "TC{} failed", index)
542 }
543 (Err(_), Err(_)) => {
544 }
546 (actual, expected) => {
547 panic!(
549 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
550 );
551 }
552 }
553 }
554 }
555 }
556
557 mod instrument_map {
558 use super::*;
559 use barter_instrument::instrument::market_data::MarketDataInstrument;
560
561 #[test]
562 fn test_find_instrument() {
563 let ids = Map(FnvHashMap::from_iter([(
565 SubscriptionId::from("present"),
566 MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
567 )]));
568
569 struct TestCase {
570 input: SubscriptionId,
571 expected: Result<MarketDataInstrument, SocketError>,
572 }
573
574 let cases = vec![
575 TestCase {
576 input: SubscriptionId::from("present"),
578 expected: Ok(MarketDataInstrument::from((
579 "base",
580 "quote",
581 MarketDataInstrumentKind::Spot,
582 ))),
583 },
584 TestCase {
585 input: SubscriptionId::from("not present"),
587 expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
588 "not present",
589 ))),
590 },
591 ];
592
593 for (index, test) in cases.into_iter().enumerate() {
594 let actual = ids.find(&test.input);
595 match (actual, test.expected) {
596 (Ok(actual), Ok(expected)) => {
597 assert_eq!(*actual, expected, "TC{} failed", index)
598 }
599 (Err(_), Err(_)) => {
600 }
602 (actual, expected) => {
603 panic!(
605 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
606 );
607 }
608 }
609 }
610 }
611 }
612}