rustrade_data/streams/builder/dynamic/mod.rs
1use crate::{
2 Identifier,
3 error::DataError,
4 exchange::{
5 binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6 bitfinex::{Bitfinex, market::BitfinexMarket},
7 bitmex::{Bitmex, market::BitmexMarket},
8 bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9 coinbase::{Coinbase, market::CoinbaseMarket},
10 gateio::{
11 future::{GateioFuturesBtc, GateioFuturesUsd},
12 market::GateioMarket,
13 option::GateioOptions,
14 perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15 spot::GateioSpot,
16 },
17 kraken::{Kraken, market::KrakenMarket},
18 okx::{Okx, market::OkxMarket},
19 },
20 instrument::InstrumentData,
21 streams::{
22 consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23 reconnect::stream::ReconnectingStream,
24 },
25 subscriber::WebSocketSubscriber,
26 subscription::{
27 SubKind, Subscription,
28 book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
29 liquidation::{Liquidation, Liquidations},
30 trade::{PublicTrade, PublicTrades},
31 },
32};
33use fnv::FnvHashMap;
34use futures::{Stream, stream::SelectAll};
35use futures_util::{StreamExt, future::try_join_all};
36use itertools::Itertools;
37use rustrade_instrument::exchange::ExchangeId;
38use rustrade_integration::{
39 Validator,
40 channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
41 error::SocketError,
42};
43use std::{
44 fmt::{Debug, Display},
45 sync::Arc,
46};
47use tokio_stream::wrappers::UnboundedReceiverStream;
48use vecmap::VecMap;
49
50pub mod indexed;
51
52#[derive(Debug)]
53pub struct DynamicStreams<InstrumentKey> {
54 pub trades:
55 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
56 pub l1s:
57 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
58 pub l2s: VecMap<
59 ExchangeId,
60 UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
61 >,
62 pub liquidations:
63 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
64}
65
66impl<InstrumentKey> DynamicStreams<InstrumentKey> {
67 /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
68 ///
69 /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
70 /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
71 /// [`SubKind`], it will be further split under the hood for compile-time reasons.
72 ///
73 /// ## Examples
74 /// Please see rustrade-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
75 /// comprehensive example of how to use this market data stream initialiser.
76 #[allow(clippy::unwrap_used)] // Invariant: Channels::try_from creates entries for all exchanges in batches; lookups iterate over the same exchanges
77 pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
78 subscription_batches: SubBatchIter,
79 ) -> Result<Self, DataError>
80 where
81 SubBatchIter: IntoIterator<Item = SubIter>,
82 SubIter: IntoIterator<Item = Sub>,
83 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
84 Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
85 InstrumentKey: Debug + Clone + PartialEq + Send + Sync + 'static,
86 Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
87 Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
88 Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
89 Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
90 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
91 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
92 Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
93 Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
94 Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
95 Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
96 Subscription<BybitSpot, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
97 Subscription<BybitSpot, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
98 Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
99 Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
100 Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
101 Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
102 Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
103 Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
104 Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
105 Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
106 Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
107 Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
108 Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
109 Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
110 Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
111 {
112 // Validate & dedup Subscription batches
113 let batches = validate_batches(subscription_batches)?;
114
115 // Generate required Channels from Subscription batches
116 let channels = Channels::try_from(&batches)?;
117
118 let futures =
119 batches.into_iter().map(|mut batch| {
120 batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
121 let by_exchange_by_sub_kind =
122 batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
123
124 let batch_futures =
125 by_exchange_by_sub_kind
126 .into_iter()
127 .map(|((exchange, sub_kind), subs)| {
128 let subs = subs.into_iter().collect::<Vec<_>>();
129 let txs = Arc::clone(&channels.txs);
130 async move {
131 match (exchange, sub_kind) {
132 (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
133 init_market_stream(
134 STREAM_RECONNECTION_POLICY,
135 WebSocketSubscriber,
136 subs.into_iter()
137 .map(|sub| {
138 Subscription::new(
139 BinanceSpot::default(),
140 sub.instrument,
141 PublicTrades,
142 )
143 })
144 .collect(),
145 )
146 .await
147 .map(|stream| {
148 tokio::spawn(stream.forward_to(
149 txs.trades.get(&exchange).unwrap().clone(),
150 ))
151 })
152 }
153 (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
154 init_market_stream(
155 STREAM_RECONNECTION_POLICY,
156 WebSocketSubscriber,
157 subs.into_iter()
158 .map(|sub| {
159 Subscription::new(
160 BinanceSpot::default(),
161 sub.instrument,
162 OrderBooksL1,
163 )
164 })
165 .collect(),
166 )
167 .await
168 .map(|stream| {
169 tokio::spawn(stream.forward_to(
170 txs.l1s.get(&exchange).unwrap().clone(),
171 ))
172 })
173 }
174 (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
175 init_market_stream(
176 STREAM_RECONNECTION_POLICY,
177 WebSocketSubscriber,
178 subs.into_iter()
179 .map(|sub| {
180 Subscription::new(
181 BinanceSpot::default(),
182 sub.instrument,
183 OrderBooksL2,
184 )
185 })
186 .collect(),
187 )
188 .await
189 .map(|stream| {
190 tokio::spawn(stream.forward_to(
191 txs.l2s.get(&exchange).unwrap().clone(),
192 ))
193 })
194 }
195 (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
196 init_market_stream(
197 STREAM_RECONNECTION_POLICY,
198 WebSocketSubscriber,
199 subs.into_iter()
200 .map(|sub| {
201 Subscription::new(
202 BinanceFuturesUsd::default(),
203 sub.instrument,
204 PublicTrades,
205 )
206 })
207 .collect(),
208 )
209 .await
210 .map(|stream| {
211 tokio::spawn(stream.forward_to(
212 txs.trades.get(&exchange).unwrap().clone(),
213 ))
214 })
215 }
216 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
217 init_market_stream(
218 STREAM_RECONNECTION_POLICY,
219 WebSocketSubscriber,
220 subs.into_iter()
221 .map(|sub| {
222 Subscription::<_, Instrument, _>::new(
223 BinanceFuturesUsd::default(),
224 sub.instrument,
225 OrderBooksL1,
226 )
227 })
228 .collect(),
229 )
230 .await
231 .map(|stream| {
232 tokio::spawn(stream.forward_to(
233 txs.l1s.get(&exchange).unwrap().clone(),
234 ))
235 })
236 }
237 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
238 init_market_stream(
239 STREAM_RECONNECTION_POLICY,
240 WebSocketSubscriber,
241 subs.into_iter()
242 .map(|sub| {
243 Subscription::<_, Instrument, _>::new(
244 BinanceFuturesUsd::default(),
245 sub.instrument,
246 OrderBooksL2,
247 )
248 })
249 .collect(),
250 )
251 .await
252 .map(|stream| {
253 tokio::spawn(stream.forward_to(
254 txs.l2s.get(&exchange).unwrap().clone(),
255 ))
256 })
257 }
258 (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
259 init_market_stream(
260 STREAM_RECONNECTION_POLICY,
261 WebSocketSubscriber,
262 subs.into_iter()
263 .map(|sub| {
264 Subscription::<_, Instrument, _>::new(
265 BinanceFuturesUsd::default(),
266 sub.instrument,
267 Liquidations,
268 )
269 })
270 .collect(),
271 )
272 .await
273 .map(|stream| {
274 tokio::spawn(stream.forward_to(
275 txs.liquidations.get(&exchange).unwrap().clone(),
276 ))
277 })
278 }
279 (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
280 init_market_stream(
281 STREAM_RECONNECTION_POLICY,
282 WebSocketSubscriber,
283 subs.into_iter()
284 .map(|sub| {
285 Subscription::new(
286 Bitfinex,
287 sub.instrument,
288 PublicTrades,
289 )
290 })
291 .collect(),
292 )
293 .await
294 .map(|stream| {
295 tokio::spawn(stream.forward_to(
296 txs.trades.get(&exchange).unwrap().clone(),
297 ))
298 })
299 }
300 (ExchangeId::Bitmex, SubKind::PublicTrades) => {
301 init_market_stream(
302 STREAM_RECONNECTION_POLICY,
303 WebSocketSubscriber,
304 subs.into_iter()
305 .map(|sub| {
306 Subscription::new(
307 Bitmex,
308 sub.instrument,
309 PublicTrades,
310 )
311 })
312 .collect(),
313 )
314 .await
315 .map(|stream| {
316 tokio::spawn(stream.forward_to(
317 txs.trades.get(&exchange).unwrap().clone(),
318 ))
319 })
320 }
321 (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
322 init_market_stream(
323 STREAM_RECONNECTION_POLICY,
324 WebSocketSubscriber,
325 subs.into_iter()
326 .map(|sub| {
327 Subscription::new(
328 BybitSpot::default(),
329 sub.instrument,
330 PublicTrades,
331 )
332 })
333 .collect(),
334 )
335 .await
336 .map(|stream| {
337 tokio::spawn(stream.forward_to(
338 txs.trades.get(&exchange).unwrap().clone(),
339 ))
340 })
341 }
342 (ExchangeId::BybitSpot, SubKind::OrderBooksL1) => {
343 init_market_stream(
344 STREAM_RECONNECTION_POLICY,
345 WebSocketSubscriber,
346 subs.into_iter()
347 .map(|sub| {
348 Subscription::new(
349 BybitSpot::default(),
350 sub.instrument,
351 OrderBooksL1,
352 )
353 })
354 .collect(),
355 )
356 .await
357 .map(|stream| {
358 tokio::spawn(stream.forward_to(
359 txs.l1s.get(&exchange).unwrap().clone(),
360 ))
361 })
362 }
363 (ExchangeId::BybitSpot, SubKind::OrderBooksL2) => {
364 init_market_stream(
365 STREAM_RECONNECTION_POLICY,
366 WebSocketSubscriber,
367 subs.into_iter()
368 .map(|sub| {
369 Subscription::new(
370 BybitSpot::default(),
371 sub.instrument,
372 OrderBooksL2,
373 )
374 })
375 .collect(),
376 )
377 .await
378 .map(|stream| {
379 tokio::spawn(stream.forward_to(
380 txs.l2s.get(&exchange).unwrap().clone(),
381 ))
382 })
383 }
384 (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
385 init_market_stream(
386 STREAM_RECONNECTION_POLICY,
387 WebSocketSubscriber,
388 subs.into_iter()
389 .map(|sub| {
390 Subscription::new(
391 BybitPerpetualsUsd::default(),
392 sub.instrument,
393 PublicTrades,
394 )
395 })
396 .collect(),
397 )
398 .await
399 .map(|stream| {
400 tokio::spawn(stream.forward_to(
401 txs.trades.get(&exchange).unwrap().clone(),
402 ))
403 })
404 }
405 (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL1) => {
406 init_market_stream(
407 STREAM_RECONNECTION_POLICY,
408 WebSocketSubscriber,
409 subs.into_iter()
410 .map(|sub| {
411 Subscription::new(
412 BybitSpot::default(),
413 sub.instrument,
414 OrderBooksL1,
415 )
416 })
417 .collect(),
418 )
419 .await
420 .map(|stream| {
421 tokio::spawn(stream.forward_to(
422 txs.l1s.get(&exchange).unwrap().clone(),
423 ))
424 })
425 }
426 (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL2) => {
427 init_market_stream(
428 STREAM_RECONNECTION_POLICY,
429 WebSocketSubscriber,
430 subs.into_iter()
431 .map(|sub| {
432 Subscription::new(
433 BybitSpot::default(),
434 sub.instrument,
435 OrderBooksL2,
436 )
437 })
438 .collect(),
439 )
440 .await
441 .map(|stream| {
442 tokio::spawn(stream.forward_to(
443 txs.l2s.get(&exchange).unwrap().clone(),
444 ))
445 })
446 }
447 (ExchangeId::Coinbase, SubKind::PublicTrades) => {
448 init_market_stream(
449 STREAM_RECONNECTION_POLICY,
450 WebSocketSubscriber,
451 subs.into_iter()
452 .map(|sub| {
453 Subscription::new(
454 Coinbase,
455 sub.instrument,
456 PublicTrades,
457 )
458 })
459 .collect(),
460 )
461 .await
462 .map(|stream| {
463 tokio::spawn(stream.forward_to(
464 txs.trades.get(&exchange).unwrap().clone(),
465 ))
466 })
467 }
468 (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
469 init_market_stream(
470 STREAM_RECONNECTION_POLICY,
471 WebSocketSubscriber,
472 subs.into_iter()
473 .map(|sub| {
474 Subscription::new(
475 GateioSpot::default(),
476 sub.instrument,
477 PublicTrades,
478 )
479 })
480 .collect(),
481 )
482 .await
483 .map(|stream| {
484 tokio::spawn(stream.forward_to(
485 txs.trades.get(&exchange).unwrap().clone(),
486 ))
487 })
488 }
489 (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
490 init_market_stream(
491 STREAM_RECONNECTION_POLICY,
492 WebSocketSubscriber,
493 subs.into_iter()
494 .map(|sub| {
495 Subscription::new(
496 GateioFuturesUsd::default(),
497 sub.instrument,
498 PublicTrades,
499 )
500 })
501 .collect(),
502 )
503 .await
504 .map(|stream| {
505 tokio::spawn(stream.forward_to(
506 txs.trades.get(&exchange).unwrap().clone(),
507 ))
508 })
509 }
510 (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
511 init_market_stream(
512 STREAM_RECONNECTION_POLICY,
513 WebSocketSubscriber,
514 subs.into_iter()
515 .map(|sub| {
516 Subscription::new(
517 GateioFuturesBtc::default(),
518 sub.instrument,
519 PublicTrades,
520 )
521 })
522 .collect(),
523 )
524 .await
525 .map(|stream| {
526 tokio::spawn(stream.forward_to(
527 txs.trades.get(&exchange).unwrap().clone(),
528 ))
529 })
530 }
531 (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
532 init_market_stream(
533 STREAM_RECONNECTION_POLICY,
534 WebSocketSubscriber,
535 subs.into_iter()
536 .map(|sub| {
537 Subscription::new(
538 GateioPerpetualsUsd::default(),
539 sub.instrument,
540 PublicTrades,
541 )
542 })
543 .collect(),
544 )
545 .await
546 .map(|stream| {
547 tokio::spawn(stream.forward_to(
548 txs.trades.get(&exchange).unwrap().clone(),
549 ))
550 })
551 }
552 (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
553 init_market_stream(
554 STREAM_RECONNECTION_POLICY,
555 WebSocketSubscriber,
556 subs.into_iter()
557 .map(|sub| {
558 Subscription::new(
559 GateioPerpetualsBtc::default(),
560 sub.instrument,
561 PublicTrades,
562 )
563 })
564 .collect(),
565 )
566 .await
567 .map(|stream| {
568 tokio::spawn(stream.forward_to(
569 txs.trades.get(&exchange).unwrap().clone(),
570 ))
571 })
572 }
573 (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
574 init_market_stream(
575 STREAM_RECONNECTION_POLICY,
576 WebSocketSubscriber,
577 subs.into_iter()
578 .map(|sub| {
579 Subscription::new(
580 GateioOptions::default(),
581 sub.instrument,
582 PublicTrades,
583 )
584 })
585 .collect(),
586 )
587 .await
588 .map(|stream| {
589 tokio::spawn(stream.forward_to(
590 txs.trades.get(&exchange).unwrap().clone(),
591 ))
592 })
593 }
594 (ExchangeId::Kraken, SubKind::PublicTrades) => {
595 init_market_stream(
596 STREAM_RECONNECTION_POLICY,
597 WebSocketSubscriber,
598 subs.into_iter()
599 .map(|sub| {
600 Subscription::new(
601 Kraken,
602 sub.instrument,
603 PublicTrades,
604 )
605 })
606 .collect(),
607 )
608 .await
609 .map(|stream| {
610 tokio::spawn(stream.forward_to(
611 txs.trades.get(&exchange).unwrap().clone(),
612 ))
613 })
614 }
615 (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
616 init_market_stream(
617 STREAM_RECONNECTION_POLICY,
618 WebSocketSubscriber,
619 subs.into_iter()
620 .map(|sub| {
621 Subscription::new(
622 Kraken,
623 sub.instrument,
624 OrderBooksL1,
625 )
626 })
627 .collect(),
628 )
629 .await
630 .map(|stream| {
631 tokio::spawn(stream.forward_to(
632 txs.l1s.get(&exchange).unwrap().clone(),
633 ))
634 })
635 }
636 (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
637 STREAM_RECONNECTION_POLICY,
638 WebSocketSubscriber,
639 subs.into_iter()
640 .map(|sub| {
641 Subscription::new(Okx, sub.instrument, PublicTrades)
642 })
643 .collect(),
644 )
645 .await
646 .map(|stream| {
647 tokio::spawn(
648 stream.forward_to(
649 txs.trades.get(&exchange).unwrap().clone(),
650 ),
651 )
652 }),
653 (exchange, sub_kind) => {
654 Err(DataError::Unsupported { exchange, sub_kind })
655 }
656 }
657 }
658 });
659
660 try_join_all(batch_futures)
661 });
662
663 try_join_all(futures).await?;
664
665 Ok(Self {
666 trades: channels
667 .rxs
668 .trades
669 .into_iter()
670 .map(|(exchange, rx)| (exchange, rx.into_stream()))
671 .collect(),
672 l1s: channels
673 .rxs
674 .l1s
675 .into_iter()
676 .map(|(exchange, rx)| (exchange, rx.into_stream()))
677 .collect(),
678 l2s: channels
679 .rxs
680 .l2s
681 .into_iter()
682 .map(|(exchange, rx)| (exchange, rx.into_stream()))
683 .collect(),
684 liquidations: channels
685 .rxs
686 .liquidations
687 .into_iter()
688 .map(|(exchange, rx)| (exchange, rx.into_stream()))
689 .collect(),
690 })
691 }
692
693 /// Remove an exchange [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
694 ///
695 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
696 pub fn select_trades(
697 &mut self,
698 exchange: ExchangeId,
699 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
700 self.trades.remove(&exchange)
701 }
702
703 /// Select and merge every exchange [`PublicTrade`] `Stream` using
704 /// [`SelectAll`](futures_util::stream::select_all::select_all).
705 pub fn select_all_trades(
706 &mut self,
707 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
708 futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
709 }
710
711 /// Remove an exchange [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
712 ///
713 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
714 pub fn select_l1s(
715 &mut self,
716 exchange: ExchangeId,
717 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
718 self.l1s.remove(&exchange)
719 }
720
721 /// Select and merge every exchange [`OrderBookL1`] `Stream` using
722 /// [`SelectAll`](futures_util::stream::select_all::select_all).
723 pub fn select_all_l1s(
724 &mut self,
725 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
726 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
727 }
728
729 /// Remove an exchange [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
730 ///
731 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
732 pub fn select_l2s(
733 &mut self,
734 exchange: ExchangeId,
735 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
736 self.l2s.remove(&exchange)
737 }
738
739 /// Select and merge every exchange [`OrderBookEvent`] `Stream` using
740 /// [`SelectAll`](futures_util::stream::select_all::select_all).
741 pub fn select_all_l2s(
742 &mut self,
743 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
744 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
745 }
746
747 /// Remove an exchange [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
748 ///
749 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
750 pub fn select_liquidations(
751 &mut self,
752 exchange: ExchangeId,
753 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
754 self.liquidations.remove(&exchange)
755 }
756
757 /// Select and merge every exchange [`Liquidation`] `Stream` using
758 /// [`SelectAll`](futures_util::stream::select_all::select_all).
759 pub fn select_all_liquidations(
760 &mut self,
761 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
762 futures_util::stream::select_all::select_all(
763 std::mem::take(&mut self.liquidations).into_values(),
764 )
765 }
766
767 /// Select and merge every exchange `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
768 ///
769 /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
770 /// use cases.
771 pub fn select_all<Output>(self) -> impl Stream<Item = Output>
772 where
773 InstrumentKey: Send + 'static,
774 Output: 'static,
775 MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
776 MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
777 MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
778 MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
779 {
780 let Self {
781 trades,
782 l1s,
783 l2s,
784 liquidations,
785 } = self;
786
787 let trades = trades
788 .into_values()
789 .map(|stream| stream.map(MarketStreamResult::into).boxed());
790
791 let l1s = l1s
792 .into_values()
793 .map(|stream| stream.map(MarketStreamResult::into).boxed());
794
795 let l2s = l2s
796 .into_values()
797 .map(|stream| stream.map(MarketStreamResult::into).boxed());
798
799 let liquidations = liquidations
800 .into_values()
801 .map(|stream| stream.map(MarketStreamResult::into).boxed());
802
803 let all = trades.chain(l1s).chain(l2s).chain(liquidations);
804
805 futures_util::stream::select_all::select_all(all)
806 }
807}
808
809pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
810 batches: SubBatchIter,
811) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
812where
813 SubBatchIter: IntoIterator<Item = SubIter>,
814 SubIter: IntoIterator<Item = Sub>,
815 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
816 Instrument: InstrumentData + Ord,
817{
818 batches
819 .into_iter()
820 .map(validate_subscriptions::<SubIter, Sub, Instrument>)
821 .collect()
822}
823
824pub fn validate_subscriptions<SubIter, Sub, Instrument>(
825 batch: SubIter,
826) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
827where
828 SubIter: IntoIterator<Item = Sub>,
829 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
830 Instrument: InstrumentData + Ord,
831{
832 // Validate Subscriptions
833 let mut batch = batch
834 .into_iter()
835 .map(Sub::into)
836 .map(Validator::validate)
837 .collect::<Result<Vec<_>, SocketError>>()?;
838
839 // Remove duplicate Subscriptions
840 batch.sort();
841 batch.dedup();
842
843 Ok(batch)
844}
845
846struct Channels<InstrumentKey> {
847 txs: Arc<Txs<InstrumentKey>>,
848 rxs: Rxs<InstrumentKey>,
849}
850
851impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
852 for Channels<Instrument::Key>
853where
854 Instrument: InstrumentData,
855{
856 type Error = DataError;
857
858 fn try_from(
859 value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
860 ) -> Result<Self, Self::Error> {
861 let mut txs = Txs::default();
862 let mut rxs = Rxs::default();
863
864 for sub in value.iter().flatten() {
865 match sub.kind {
866 SubKind::PublicTrades => {
867 if let (None, None) =
868 (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
869 {
870 let (tx, rx) = mpsc_unbounded();
871 txs.trades.insert(sub.exchange, tx);
872 rxs.trades.insert(sub.exchange, rx);
873 }
874 }
875 SubKind::OrderBooksL1 => {
876 if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
877 let (tx, rx) = mpsc_unbounded();
878 txs.l1s.insert(sub.exchange, tx);
879 rxs.l1s.insert(sub.exchange, rx);
880 }
881 }
882 SubKind::OrderBooksL2 => {
883 if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
884 let (tx, rx) = mpsc_unbounded();
885 txs.l2s.insert(sub.exchange, tx);
886 rxs.l2s.insert(sub.exchange, rx);
887 }
888 }
889 SubKind::Liquidations => {
890 if let (None, None) = (
891 txs.liquidations.get(&sub.exchange),
892 rxs.liquidations.get(&sub.exchange),
893 ) {
894 let (tx, rx) = mpsc_unbounded();
895 txs.liquidations.insert(sub.exchange, tx);
896 rxs.liquidations.insert(sub.exchange, rx);
897 }
898 }
899 unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
900 }
901 }
902
903 Ok(Channels {
904 txs: Arc::new(txs),
905 rxs,
906 })
907 }
908}
909
910struct Txs<InstrumentKey> {
911 trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
912 l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
913 l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
914 liquidations:
915 FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
916}
917
918impl<InstrumentKey> Default for Txs<InstrumentKey> {
919 fn default() -> Self {
920 Self {
921 trades: Default::default(),
922 l1s: Default::default(),
923 l2s: Default::default(),
924 liquidations: Default::default(),
925 }
926 }
927}
928
929struct Rxs<InstrumentKey> {
930 trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
931 l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
932 l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
933 liquidations:
934 FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
935}
936
937impl<InstrumentKey> Default for Rxs<InstrumentKey> {
938 fn default() -> Self {
939 Self {
940 trades: Default::default(),
941 l1s: Default::default(),
942 l2s: Default::default(),
943 liquidations: Default::default(),
944 }
945 }
946}