1use crate::{exchange::Connector, instrument::InstrumentData};
2use derive_more::Display;
3use fnv::FnvHashMap;
4use rustrade_instrument::{
5 Keyed,
6 asset::name::AssetNameInternal,
7 exchange::ExchangeId,
8 instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
9};
10use rustrade_integration::{
11 Validator, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
12};
13use serde::{Deserialize, Serialize};
14use smol_str::{ToSmolStr, format_smolstr};
15use std::{borrow::Borrow, fmt::Debug, hash::Hash};
16
17pub mod book;
19
20pub mod candle;
22
23pub mod greeks;
25
26pub mod liquidation;
28
29pub mod quote;
31
32pub mod trade;
34
35pub trait SubscriptionKind
37where
38 Self: Debug + Clone,
39{
40 type Event: Debug;
41 fn as_str(&self) -> &'static str;
42}
43
44#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
47pub struct Subscription<Exchange = ExchangeId, Inst = MarketDataInstrument, Kind = SubKind> {
48 pub exchange: Exchange,
49 #[serde(flatten)]
50 pub instrument: Inst,
51 #[serde(alias = "type")]
52 pub kind: Kind,
53}
54
55pub fn display_subscriptions_without_exchange<Exchange, Instrument, Kind>(
56 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
57) -> String
58where
59 Instrument: std::fmt::Display,
60 Kind: std::fmt::Display,
61{
62 subscriptions
63 .iter()
64 .map(
65 |Subscription {
66 exchange: _,
67 instrument,
68 kind,
69 }| { format_smolstr!("({instrument}, {kind})") },
70 )
71 .collect::<Vec<_>>()
72 .join(",")
73}
74
75impl<Exchange, Instrument, Kind> std::fmt::Display for Subscription<Exchange, Instrument, Kind>
76where
77 Exchange: std::fmt::Display,
78 Instrument: std::fmt::Display,
79 Kind: std::fmt::Display,
80{
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 write!(f, "({}|{}|{})", self.exchange, self.kind, self.instrument)
83 }
84}
85
86#[derive(
87 Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Display, Deserialize, Serialize,
88)]
89pub enum SubKind {
90 PublicTrades,
91 OrderBooksL1,
92 OrderBooksL2,
93 OrderBooksL3,
94 Liquidations,
95 Candles,
96 Quotes,
99}
100
101impl<Exchange, S, Kind> From<(Exchange, S, S, MarketDataInstrumentKind, Kind)>
102 for Subscription<Exchange, MarketDataInstrument, Kind>
103where
104 S: Into<AssetNameInternal>,
105{
106 fn from(
107 (exchange, base, quote, instrument_kind, kind): (
108 Exchange,
109 S,
110 S,
111 MarketDataInstrumentKind,
112 Kind,
113 ),
114 ) -> Self {
115 Self::new(exchange, (base, quote, instrument_kind), kind)
116 }
117}
118
119impl<InstrumentKey, Exchange, S, Kind>
120 From<(
121 InstrumentKey,
122 Exchange,
123 S,
124 S,
125 MarketDataInstrumentKind,
126 Kind,
127 )> for Subscription<Exchange, Keyed<InstrumentKey, MarketDataInstrument>, Kind>
128where
129 S: Into<AssetNameInternal>,
130{
131 fn from(
132 (instrument_id, exchange, base, quote, instrument_kind, kind): (
133 InstrumentKey,
134 Exchange,
135 S,
136 S,
137 MarketDataInstrumentKind,
138 Kind,
139 ),
140 ) -> Self {
141 let instrument = Keyed::new(instrument_id, (base, quote, instrument_kind).into());
142
143 Self::new(exchange, instrument, kind)
144 }
145}
146
147impl<Exchange, I, Instrument, Kind> From<(Exchange, I, Kind)>
148 for Subscription<Exchange, Instrument, Kind>
149where
150 I: Into<Instrument>,
151{
152 fn from((exchange, instrument, kind): (Exchange, I, Kind)) -> Self {
153 Self::new(exchange, instrument, kind)
154 }
155}
156
157impl<Instrument, Exchange, Kind> Subscription<Exchange, Instrument, Kind> {
158 pub fn new<I>(exchange: Exchange, instrument: I, kind: Kind) -> Self
160 where
161 I: Into<Instrument>,
162 {
163 Self {
164 exchange,
165 instrument: instrument.into(),
166 kind,
167 }
168 }
169}
170
171impl<Exchange, Instrument, Kind> Validator for Subscription<Exchange, Instrument, Kind>
172where
173 Exchange: Connector,
174 Instrument: InstrumentData,
175{
176 type Error = SocketError;
177
178 fn validate(self) -> Result<Self, Self::Error>
179 where
180 Self: Sized,
181 {
182 if exchange_supports_instrument_kind(Exchange::ID, self.instrument.kind()) {
184 Ok(self)
185 } else {
186 Err(SocketError::Unsupported {
187 entity: Exchange::ID.to_string(),
188 item: self.instrument.kind().to_string(),
189 })
190 }
191 }
192}
193
194#[allow(clippy::match_like_matches_macro)]
197pub fn exchange_supports_instrument_kind(
198 exchange: ExchangeId,
199 instrument_kind: &MarketDataInstrumentKind,
200) -> bool {
201 use rustrade_instrument::{
202 exchange::ExchangeId::*, instrument::market_data::kind::MarketDataInstrumentKind::*,
203 };
204
205 match (exchange, instrument_kind) {
206 (
208 BinanceFuturesUsd | Bitmex | BybitPerpetualsUsd | GateioPerpetualsUsd
209 | GateioPerpetualsBtc,
210 Spot,
211 ) => false,
212 (_, Spot) => true,
213
214 (GateioFuturesUsd | GateioFuturesBtc | Okx, Future { .. }) => true,
216 (_, Future { .. }) => false,
217
218 (
220 BinanceFuturesUsd | Bitmex | Okx | BybitPerpetualsUsd | GateioPerpetualsUsd
221 | GateioPerpetualsBtc | HyperliquidPerp,
222 Perpetual,
223 ) => true,
224 (_, Perpetual) => false,
225
226 (GateioOptions | Okx, Option { .. }) => true,
228 (_, Option { .. }) => false,
229 }
230}
231
232impl<Instrument> Validator for Subscription<ExchangeId, Instrument, SubKind>
233where
234 Instrument: InstrumentData,
235{
236 type Error = SocketError;
237
238 fn validate(self) -> Result<Self, Self::Error>
239 where
240 Self: Sized,
241 {
242 if exchange_supports_instrument_kind_sub_kind(
244 &self.exchange,
245 self.instrument.kind(),
246 self.kind,
247 ) {
248 Ok(self)
249 } else {
250 Err(SocketError::Unsupported {
251 entity: self.exchange.to_string(),
252 item: format!("({}, {})", self.instrument.kind(), self.kind),
253 })
254 }
255 }
256}
257
258pub fn exchange_supports_instrument_kind_sub_kind(
261 exchange_id: &ExchangeId,
262 instrument_kind: &MarketDataInstrumentKind,
263 sub_kind: SubKind,
264) -> bool {
265 use ExchangeId::*;
266 use MarketDataInstrumentKind::*;
267 use SubKind::*;
268
269 match (exchange_id, instrument_kind, sub_kind) {
270 (BinanceSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
271 (
272 BinanceFuturesUsd,
273 Perpetual,
274 PublicTrades | OrderBooksL1 | OrderBooksL2 | Liquidations,
275 ) => true,
276 (Bitfinex, Spot, PublicTrades) => true,
277 (Bitmex, Perpetual, PublicTrades) => true,
278 (BybitSpot, Spot, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
279 (BybitPerpetualsUsd, Perpetual, PublicTrades | OrderBooksL1 | OrderBooksL2) => true,
280 (Coinbase, Spot, PublicTrades) => true,
281 (GateioSpot, Spot, PublicTrades) => true,
282 (GateioFuturesUsd, Future { .. }, PublicTrades) => true,
283 (GateioFuturesBtc, Future { .. }, PublicTrades) => true,
284 (GateioPerpetualsUsd, Perpetual, PublicTrades) => true,
285 (GateioPerpetualsBtc, Perpetual, PublicTrades) => true,
286 (GateioOptions, Option { .. }, PublicTrades) => true,
287 (Kraken, Spot, PublicTrades | OrderBooksL1) => true,
288 (Okx, Spot | Future { .. } | Perpetual | Option { .. }, PublicTrades) => true,
289 (HyperliquidPerp, Perpetual, PublicTrades | OrderBooksL2) => true,
290
291 (_, _, _) => false,
292 }
293}
294
295#[derive(Clone, Eq, PartialEq, Debug)]
298pub struct SubscriptionMeta<InstrumentKey> {
299 pub instrument_map: Map<InstrumentKey>,
302 pub ws_subscriptions: Vec<WsMessage>,
304}
305
306#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
311pub struct Map<T>(pub FnvHashMap<SubscriptionId, T>);
312
313impl<T> FromIterator<(SubscriptionId, T)> for Map<T> {
314 fn from_iter<Iter>(iter: Iter) -> Self
315 where
316 Iter: IntoIterator<Item = (SubscriptionId, T)>,
317 {
318 Self(iter.into_iter().collect::<FnvHashMap<SubscriptionId, T>>())
319 }
320}
321
322impl<T> Map<T> {
323 pub fn find<SubId>(&self, id: &SubId) -> Result<&T, SocketError>
325 where
326 SubscriptionId: Borrow<SubId>,
327 SubId: AsRef<str> + Hash + Eq + ?Sized,
328 {
329 self.0
330 .get(id)
331 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
332 }
333
334 pub fn find_mut<SubId>(&mut self, id: &SubId) -> Result<&mut T, SocketError>
336 where
337 SubscriptionId: Borrow<SubId>,
338 SubId: AsRef<str> + Hash + Eq + ?Sized,
339 {
340 self.0
341 .get_mut(id)
342 .ok_or_else(|| SocketError::Unidentifiable(SubscriptionId(id.as_ref().to_smolstr())))
343 }
344}
345
346#[cfg(test)]
347#[allow(clippy::unwrap_used)] mod tests {
349 use super::*;
350
351 mod subscription {
352 use super::*;
353 use crate::{
354 exchange::{coinbase::Coinbase, okx::Okx},
355 subscription::trade::PublicTrades,
356 };
357 use rustrade_instrument::instrument::market_data::MarketDataInstrument;
358
359 mod de {
360 use super::*;
361 use crate::{
362 exchange::{
363 binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
364 gateio::perpetual::GateioPerpetualsUsd,
365 okx::Okx,
366 },
367 subscription::{book::OrderBooksL2, trade::PublicTrades},
368 };
369 use rustrade_instrument::instrument::market_data::MarketDataInstrument;
370
371 #[test]
372 fn test_subscription_okx_spot_public_trades() {
373 let input = r#"
374 {
375 "exchange": "okx",
376 "base": "btc",
377 "quote": "usdt",
378 "instrument_kind": "spot",
379 "kind": "public_trades"
380 }
381 "#;
382
383 serde_json::from_str::<Subscription<Okx, MarketDataInstrument, PublicTrades>>(
384 input,
385 )
386 .unwrap();
387 }
388
389 #[test]
390 fn test_subscription_binance_spot_public_trades() {
391 let input = r#"
392 {
393 "exchange": "binance_spot",
394 "base": "btc",
395 "quote": "usdt",
396 "instrument_kind": "spot",
397 "kind": "public_trades"
398 }
399 "#;
400
401 serde_json::from_str::<Subscription<BinanceSpot, MarketDataInstrument, PublicTrades>>(input)
402 .unwrap();
403 }
404
405 #[test]
406 fn test_subscription_binance_futures_usd_order_books_l2() {
407 let input = r#"
408 {
409 "exchange": "binance_futures_usd",
410 "base": "btc",
411 "quote": "usdt",
412 "instrument_kind": "perpetual",
413 "kind": "order_books_l2"
414 }
415 "#;
416
417 serde_json::from_str::<
418 Subscription<BinanceFuturesUsd, MarketDataInstrument, OrderBooksL2>,
419 >(input)
420 .unwrap();
421 }
422
423 #[test]
424 fn subscription_gateio_futures_usd_public_trades() {
425 let input = r#"
426 {
427 "exchange": "gateio_perpetuals_usd",
428 "base": "btc",
429 "quote": "usdt",
430 "instrument_kind": "perpetual",
431 "kind": "public_trades"
432 }
433 "#;
434
435 serde_json::from_str::<
436 Subscription<GateioPerpetualsUsd, MarketDataInstrument, PublicTrades>,
437 >(input)
438 .unwrap();
439 }
440 }
441
442 #[test]
443 fn test_validate_bitfinex_public_trades() {
444 struct TestCase {
445 input: Subscription<Coinbase, MarketDataInstrument, PublicTrades>,
446 expected:
447 Result<Subscription<Coinbase, MarketDataInstrument, PublicTrades>, SocketError>,
448 }
449
450 let tests = vec![
451 TestCase {
452 input: Subscription::from((
454 Coinbase,
455 "base",
456 "quote",
457 MarketDataInstrumentKind::Spot,
458 PublicTrades,
459 )),
460 expected: Ok(Subscription::from((
461 Coinbase,
462 "base",
463 "quote",
464 MarketDataInstrumentKind::Spot,
465 PublicTrades,
466 ))),
467 },
468 TestCase {
469 input: Subscription::from((
471 Coinbase,
472 "base",
473 "quote",
474 MarketDataInstrumentKind::Perpetual,
475 PublicTrades,
476 )),
477 expected: Err(SocketError::Unsupported {
478 entity: "".to_string(),
479 item: "".to_string(),
480 }),
481 },
482 ];
483
484 for (index, test) in tests.into_iter().enumerate() {
485 let actual = test.input.validate();
486 match (actual, test.expected) {
487 (Ok(actual), Ok(expected)) => {
488 assert_eq!(actual, expected, "TC{} failed", index)
489 }
490 (Err(_), Err(_)) => {
491 }
493 (actual, expected) => {
494 panic!(
496 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
497 );
498 }
499 }
500 }
501 }
502
503 #[test]
504 fn test_validate_okx_public_trades() {
505 struct TestCase {
506 input: Subscription<Okx, MarketDataInstrument, PublicTrades>,
507 expected:
508 Result<Subscription<Okx, MarketDataInstrument, PublicTrades>, SocketError>,
509 }
510
511 let tests = vec![
512 TestCase {
513 input: Subscription::from((
515 Okx,
516 "base",
517 "quote",
518 MarketDataInstrumentKind::Spot,
519 PublicTrades,
520 )),
521 expected: Ok(Subscription::from((
522 Okx,
523 "base",
524 "quote",
525 MarketDataInstrumentKind::Spot,
526 PublicTrades,
527 ))),
528 },
529 TestCase {
530 input: Subscription::from((
532 Okx,
533 "base",
534 "quote",
535 MarketDataInstrumentKind::Perpetual,
536 PublicTrades,
537 )),
538 expected: Ok(Subscription::from((
539 Okx,
540 "base",
541 "quote",
542 MarketDataInstrumentKind::Perpetual,
543 PublicTrades,
544 ))),
545 },
546 ];
547
548 for (index, test) in tests.into_iter().enumerate() {
549 let actual = test.input.validate();
550 match (actual, test.expected) {
551 (Ok(actual), Ok(expected)) => {
552 assert_eq!(actual, expected, "TC{} failed", index)
553 }
554 (Err(_), Err(_)) => {
555 }
557 (actual, expected) => {
558 panic!(
560 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
561 );
562 }
563 }
564 }
565 }
566 }
567
568 mod instrument_map {
569 use super::*;
570 use rustrade_instrument::instrument::market_data::MarketDataInstrument;
571
572 #[test]
573 fn test_find_instrument() {
574 let ids = Map(FnvHashMap::from_iter([(
576 SubscriptionId::from("present"),
577 MarketDataInstrument::from(("base", "quote", MarketDataInstrumentKind::Spot)),
578 )]));
579
580 struct TestCase {
581 input: SubscriptionId,
582 expected: Result<MarketDataInstrument, SocketError>,
583 }
584
585 let cases = vec![
586 TestCase {
587 input: SubscriptionId::from("present"),
589 expected: Ok(MarketDataInstrument::from((
590 "base",
591 "quote",
592 MarketDataInstrumentKind::Spot,
593 ))),
594 },
595 TestCase {
596 input: SubscriptionId::from("not present"),
598 expected: Err(SocketError::Unidentifiable(SubscriptionId::from(
599 "not present",
600 ))),
601 },
602 ];
603
604 for (index, test) in cases.into_iter().enumerate() {
605 let actual = ids.find(&test.input);
606 match (actual, test.expected) {
607 (Ok(actual), Ok(expected)) => {
608 assert_eq!(*actual, expected, "TC{} failed", index)
609 }
610 (Err(_), Err(_)) => {
611 }
613 (actual, expected) => {
614 panic!(
616 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
617 );
618 }
619 }
620 }
621 }
622 }
623}