1use crate::{exchange::Connector, instrument::InstrumentData};
2use derive_more::Display;
3use fnv::FnvHashMap;
4use rustrade_instrument::{
5 Keyed,
6 asset::name::AssetNameInternal,
7 exchange::ExchangeId,
8 instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
9};
10use rustrade_integration::{
11 Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
12};
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17pub mod book;
19
20pub mod candle;
22
23pub mod liquidation;
25
26pub mod trade;
28
29pub trait SubscriptionKind
31where
32 Self: Debug + Clone,
33{
34 type Event: Debug;
35 fn as_str(&self) -> &'static str;
36}
37
38#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
41pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
42 pub exchange: Exchange,
43 #[serde(flatten)]
44 pub instrument: Inst,
45 #[serde(alias = "type")]
46 pub kind: Kind,
47}
48
49pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
50 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
51) -> String
52where
53 Instrument: std::fmt::Display,
54 Kind: std::fmt::Display,
55{
56 subscriptions
57 .iter()
58 .map(
59 |Subscription {
60 exchange: _,
61 instrument,
62 kind,
63 }| { format_smolstr!("({instrument}, {kind})") },
64 )
65 .collect::<Vec<_>>()
66 .join(",")
67}
68
69impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
70where
71 Exchange: std::fmt::Display,
72 Instrument: std::fmt::Display,
73 Kind: std::fmt::Display,
74{
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
77 }
78}
79
80#[derive(
81 Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
82)]
83pub enum SubKind {
84 PublicTrades,
85 OrderBooksL1,
86 OrderBooksL2,
87 OrderBooksL3,
88 Liquidations,
89 Candles,
90}
91
92impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
93 for Subscription<Exchange, MarketDataInstrument, Kind>
94where
95 S: Into<AssetNameInternal>,
96{
97 fn from(
98 (exchange, base, quote, instrument_kind, kind): (
99 Exchange,
100 S,
101 S,
102 MarketDataInstrumentKind,
103 Kind,
104 ),
105 ) -> Self {
106 Self::new(exchange, (base, quote, instrument_kind), kind)
107 }
108}
109
110impl<InstrumentKey, Exchange, S, Kind>
111 From<(
112 InstrumentKey,
113 Exchange,
114 S,
115 S,
116 MarketDataInstrumentKind,
117 Kind,
118 )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
119where
120 S: Into<AssetNameInternal>,
121{
122 fn from(
123 (instrument_id, exchange, base, quote, instrument_kind, kind): (
124 InstrumentKey,
125 Exchange,
126 S,
127 S,
128 MarketDataInstrumentKind,
129 Kind,
130 ),
131 ) -> Self {
132 let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
133
134 Self::new(exchange, instrument, kind)
135 }
136}
137
138impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
139 for Subscription<Exchange, Instrument, Kind>
140where
141 I: Into<Instrument>,
142{
143 fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
144 Self::new(exchange, instrument, kind)
145 }
146}
147
148impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
149 pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
151 where
152 I: Into<Instrument>,
153 {
154 Self {
155 exchange,
156 instrument: instrument.into(),
157 kind,
158 }
159 }
160}
161
162impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
163where
164 Exchange: Connector,
165 Instrument: InstrumentData,
166{
167 type Error = SocketError;
168
169 fn validate(self) -> Result<Self, Self::Error>
170 where
171 Self: Sized,
172 {
173 if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
175 Ok(self)
176 } else {
177 Err(SocketError::Unsupported {
178 entity: Exchange::ID.to_string(),
179 item: self.instrument.kind().to_string(),
180 })
181 }
182 }
183}
184
185#[allow(clippy::match_like_matches_macro)]
188pub fn exchange_supports_instrument_kind(
189 exchange: ExchangeId,
190 instrument_kind: &MarketDataInstrumentKind,
191) -> bool {
192 use rustrade_instrument::{
193 exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
194 };
195
196 match (exchange, instrument_kind) {
197 (
199 BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
200 | GateioPerpetualsBtc,
201 Spot,
202 ) => false,
203 (_, Spot) => true,
204
205 (GateioFuturesUsd | GateioFuturesBtc | Okx, Future { .. }) => true,
207 (_, Future { .. }) => false,
208
209 (
211 BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
212 | GateioPerpetualsBtc | HyperliquidPerp,
213 Perpetual,
214 ) => true,
215 (_, Perpetual) => false,
216
217 (GateioOptions | Okx, Option { .. }) => true,
219 (_, Option { .. }) => false,
220 }
221}
222
223impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
224where
225 Instrument: InstrumentData,
226{
227 type Error = SocketError;
228
229 fn validate(self) -> Result<Self, Self::Error>
230 where
231 Self: Sized,
232 {
233 if exchange_supports_instrument_kind_sub_kind(
235 &self.exchange,
236 self.instrument.kind(),
237 self.kind,
238 ) {
239 Ok(self)
240 } else {
241 Err(SocketError::Unsupported {
242 entity: self.exchange.to_string(),
243 item: format!("({}, {})", self.instrument.kind(), self.kind),
244 })
245 }
246 }
247}
248
249pub fn exchange_supports_instrument_kind_sub_kind(
252 exchange_id: &ExchangeId,
253 instrument_kind: &MarketDataInstrumentKind,
254 sub_kind: SubKind,
255) -> bool {
256 use ExchangeId::*;
257 use MarketDataInstrumentKind::*;
258 use SubKind::*;
259
260 match (exchange_id, instrument_kind, sub_kind) {
261 (BinanceSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
262 (
263 BinanceFuturesUsd,
264 Perpetual,
265 PublicTrades | OrderBooksL1 | OrderBooksL2 | Liquidations,
266 ) => true,
267 (Bitfinex, Spot, PublicTrades) => true,
268 (Bitmex, Perpetual, PublicTrades) => true,
269 (BybitSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
270 (BybitPerpetualsUsd, Perpetual, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
271 (Coinbase, Spot, PublicTrades) => true,
272 (GateioSpot, Spot, PublicTrades) => true,
273 (GateioFuturesUsd, Future { .. }, PublicTrades) => true,
274 (GateioFuturesBtc, Future { .. }, PublicTrades) => true,
275 (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
276 (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
277 (GateioOptions, Option { .. }, PublicTrades) => true,
278 (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
279 (Okx, Spot | Future { .. } | Perpetual | Option { .. }, PublicTrades) => true,
280 (HyperliquidPerp, Perpetual, PublicTrades | OrderBooksL2) => true,
281
282 (_, _, _) => false,
283 }
284}
285
286#[derive(Clone, Eq, PartialEq, Debug)]
289pub struct SubscriptionMeta<InstrumentKey> {
290 pub instrument_map: Map<InstrumentKey>,
293 pub ws_subscriptions: Vec<WsMessage>,
295}
296
297#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
302pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
303
304impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
305 fn from_iter<Iter>(iter: Iter) -> Self
306 where
307 Iter: IntoIterator<Item = (SubscriptionId, T)>,
308 {
309 Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
310 }
311}
312
313impl<T> Map<T> {
314 pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
316 where
317 SubscriptionId: Borrow<SubId>,
318 SubId: AsRef<str> + Hash + Eq + ?Sized,
319 {
320 self.0
321 .get(id)
322 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
323 }
324
325 pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
327 where
328 SubscriptionId: Borrow<SubId>,
329 SubId: AsRef<str> + Hash + Eq + ?Sized,
330 {
331 self.0
332 .get_mut(id)
333 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
334 }
335}
336
337#[cfg(test)]
338#[allow(clippy::unwrap_used)] mod tests {
340 use super::*;
341
342 mod subscription {
343 use super::*;
344 use crate::{
345 exchange::{coinbase::Coinbase, okx::Okx},
346 subscription::trade::PublicTrades,
347 };
348 use rustrade_instrument::instrument::market_data::MarketDataInstrument;
349
350 mod de {
351 use super::*;
352 use crate::{
353 exchange::{
354 binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
355 gateio::perpetual::GateioPerpetualsUsd,
356 okx::Okx,
357 },
358 subscription::{book::OrderBooksL2, trade::PublicTrades},
359 };
360 use rustrade_instrument::instrument::market_data::MarketDataInstrument;
361
362 #[test]
363 fn test_subscription_okx_spot_public_trades() {
364 let input = r#"
365 {
366 "exchange": "okx",
367 "base": "btc",
368 "quote": "usdt",
369 "instrument_kind": "spot",
370 "kind": "public_trades"
371 }
372 "#;
373
374 serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
375 input,
376 )
377 .unwrap();
378 }
379
380 #[test]
381 fn test_subscription_binance_spot_public_trades() {
382 let input = r#"
383 {
384 "exchange": "binance_spot",
385 "base": "btc",
386 "quote": "usdt",
387 "instrument_kind": "spot",
388 "kind": "public_trades"
389 }
390 "#;
391
392 serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
393 .unwrap();
394 }
395
396 #[test]
397 fn test_subscription_binance_futures_usd_order_books_l2() {
398 let input = r#"
399 {
400 "exchange": "binance_futures_usd",
401 "base": "btc",
402 "quote": "usdt",
403 "instrument_kind": "perpetual",
404 "kind": "order_books_l2"
405 }
406 "#;
407
408 serde_json::from_str::<
409 Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
410 >(input)
411 .unwrap();
412 }
413
414 #[test]
415 fn subscription_gateio_futures_usd_public_trades() {
416 let input = r#"
417 {
418 "exchange": "gateio_perpetuals_usd",
419 "base": "btc",
420 "quote": "usdt",
421 "instrument_kind": "perpetual",
422 "kind": "public_trades"
423 }
424 "#;
425
426 serde_json::from_str::<
427 Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
428 >(input)
429 .unwrap();
430 }
431 }
432
433 #[test]
434 fn test_validate_bitfinex_public_trades() {
435 struct TestCase {
436 input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
437 expected:
438 Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
439 }
440
441 let tests = vec![
442 TestCase {
443 input: Subscription::from((
445 Coinbase,
446 "base",
447 "quote",
448 MarketDataInstrumentKind::Spot,
449 PublicTrades,
450 )),
451 expected: Ok(Subscription::from((
452 Coinbase,
453 "base",
454 "quote",
455 MarketDataInstrumentKind::Spot,
456 PublicTrades,
457 ))),
458 },
459 TestCase {
460 input: Subscription::from((
462 Coinbase,
463 "base",
464 "quote",
465 MarketDataInstrumentKind::Perpetual,
466 PublicTrades,
467 )),
468 expected: Err(SocketError::Unsupported {
469 entity: "".to_string(),
470 item: "".to_string(),
471 }),
472 },
473 ];
474
475 for (index, test) in tests.into_iter().enumerate() {
476 let actual = test.input.validate();
477 match (actual, test.expected) {
478 (Ok(actual), Ok(expected)) => {
479 assert_eq!(actual, expected, "TC{} failed", index)
480 }
481 (Err(_), Err(_)) => {
482 }
484 (actual, expected) => {
485 panic!(
487 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
488 );
489 }
490 }
491 }
492 }
493
494 #[test]
495 fn test_validate_okx_public_trades() {
496 struct TestCase {
497 input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
498 expected:
499 Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
500 }
501
502 let tests = vec![
503 TestCase {
504 input: Subscription::from((
506 Okx,
507 "base",
508 "quote",
509 MarketDataInstrumentKind::Spot,
510 PublicTrades,
511 )),
512 expected: Ok(Subscription::from((
513 Okx,
514 "base",
515 "quote",
516 MarketDataInstrumentKind::Spot,
517 PublicTrades,
518 ))),
519 },
520 TestCase {
521 input: Subscription::from((
523 Okx,
524 "base",
525 "quote",
526 MarketDataInstrumentKind::Perpetual,
527 PublicTrades,
528 )),
529 expected: Ok(Subscription::from((
530 Okx,
531 "base",
532 "quote",
533 MarketDataInstrumentKind::Perpetual,
534 PublicTrades,
535 ))),
536 },
537 ];
538
539 for (index, test) in tests.into_iter().enumerate() {
540 let actual = test.input.validate();
541 match (actual, test.expected) {
542 (Ok(actual), Ok(expected)) => {
543 assert_eq!(actual, expected, "TC{} failed", index)
544 }
545 (Err(_), Err(_)) => {
546 }
548 (actual, expected) => {
549 panic!(
551 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
552 );
553 }
554 }
555 }
556 }
557 }
558
559 mod instrument_map {
560 use super::*;
561 use rustrade_instrument::instrument::market_data::MarketDataInstrument;
562
563 #[test]
564 fn test_find_instrument() {
565 let ids = Map(FnvHashMap::from_iter([(
567 SubscriptionId::from("present"),
568 MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
569 )]));
570
571 struct TestCase {
572 input: SubscriptionId,
573 expected: Result<MarketDataInstrument, SocketError>,
574 }
575
576 let cases = vec![
577 TestCase {
578 input: SubscriptionId::from("present"),
580 expected: Ok(MarketDataInstrument::from((
581 "base",
582 "quote",
583 MarketDataInstrumentKind::Spot,
584 ))),
585 },
586 TestCase {
587 input: SubscriptionId::from("not present"),
589 expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
590 "not present",
591 ))),
592 },
593 ];
594
595 for (index, test) in cases.into_iter().enumerate() {
596 let actual = ids.find(&test.input);
597 match (actual, test.expected) {
598 (Ok(actual), Ok(expected)) => {
599 assert_eq!(*actual, expected, "TC{} failed", index)
600 }
601 (Err(_), Err(_)) => {
602 }
604 (actual, expected) => {
605 panic!(
607 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
608 );
609 }
610 }
611 }
612 }
613 }
614}