rustrade_data/streams/builder/dynamic/mod.rs
1use crate::{
2 Identifier,
3 error::DataError,
4 exchange::{
5 binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6 bitfinex::{Bitfinex, market::BitfinexMarket},
7 bitmex::{Bitmex, market::BitmexMarket},
8 bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9 coinbase::{Coinbase, market::CoinbaseMarket},
10 gateio::{
11 future::{GateioFuturesBtc, GateioFuturesUsd},
12 market::GateioMarket,
13 option::GateioOptions,
14 perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15 spot::GateioSpot,
16 },
17 kraken::{Kraken, market::KrakenMarket},
18 okx::{Okx, market::OkxMarket},
19 },
20 instrument::InstrumentData,
21 streams::{
22 consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23 reconnect::stream::ReconnectingStream,
24 },
25 subscription::{
26 SubKind, Subscription,
27 book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
28 liquidation::{Liquidation, Liquidations},
29 trade::{PublicTrade, PublicTrades},
30 },
31};
32use fnv::FnvHashMap;
33use futures::{Stream, stream::SelectAll};
34use futures_util::{StreamExt, future::try_join_all};
35use itertools::Itertools;
36use rustrade_instrument::exchange::ExchangeId;
37use rustrade_integration::{
38 Validator,
39 channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
40 error::SocketError,
41};
42use std::{
43 fmt::{Debug, Display},
44 sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53 pub trades:
54 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55 pub l1s:
56 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57 pub l2s: VecMap<
58 ExchangeId,
59 UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60 >,
61 pub liquidations:
62 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66 /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67 ///
68 /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69 /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70 /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71 ///
72 /// ## Examples
73 /// Please see rustrade-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74 /// comprehensive example of how to use this market data stream initialiser.
75 #[allow(clippy::unwrap_used)] // Invariant: Channels::try_from creates entries for all exchanges in batches; lookups iterate over the same exchanges
76 pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
77 subscription_batches: SubBatchIter,
78 ) -> Result<Self, DataError>
79 where
80 SubBatchIter: IntoIterator<Item = SubIter>,
81 SubIter: IntoIterator<Item = Sub>,
82 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
83 Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
84 InstrumentKey: Debug + Clone + Send + 'static,
85 Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
86 Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
87 Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
88 Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
89 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
90 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
91 Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
92 Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
93 Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
94 Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
95 Subscription<BybitSpot, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
96 Subscription<BybitSpot, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
97 Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
98 Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
99 Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
100 Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
101 Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
102 Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
103 Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
104 Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
105 Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
106 Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
107 Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
108 Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
109 Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
110 {
111 // Validate & dedup Subscription batches
112 let batches = validate_batches(subscription_batches)?;
113
114 // Generate required Channels from Subscription batches
115 let channels = Channels::try_from(&batches)?;
116
117 let futures =
118 batches.into_iter().map(|mut batch| {
119 batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
120 let by_exchange_by_sub_kind =
121 batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
122
123 let batch_futures =
124 by_exchange_by_sub_kind
125 .into_iter()
126 .map(|((exchange, sub_kind), subs)| {
127 let subs = subs.into_iter().collect::<Vec<_>>();
128 let txs = Arc::clone(&channels.txs);
129 async move {
130 match (exchange, sub_kind) {
131 (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
132 init_market_stream(
133 STREAM_RECONNECTION_POLICY,
134 subs.into_iter()
135 .map(|sub| {
136 Subscription::new(
137 BinanceSpot::default(),
138 sub.instrument,
139 PublicTrades,
140 )
141 })
142 .collect(),
143 )
144 .await
145 .map(|stream| {
146 tokio::spawn(stream.forward_to(
147 txs.trades.get(&exchange).unwrap().clone(),
148 ))
149 })
150 }
151 (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
152 init_market_stream(
153 STREAM_RECONNECTION_POLICY,
154 subs.into_iter()
155 .map(|sub| {
156 Subscription::new(
157 BinanceSpot::default(),
158 sub.instrument,
159 OrderBooksL1,
160 )
161 })
162 .collect(),
163 )
164 .await
165 .map(|stream| {
166 tokio::spawn(stream.forward_to(
167 txs.l1s.get(&exchange).unwrap().clone(),
168 ))
169 })
170 }
171 (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
172 init_market_stream(
173 STREAM_RECONNECTION_POLICY,
174 subs.into_iter()
175 .map(|sub| {
176 Subscription::new(
177 BinanceSpot::default(),
178 sub.instrument,
179 OrderBooksL2,
180 )
181 })
182 .collect(),
183 )
184 .await
185 .map(|stream| {
186 tokio::spawn(stream.forward_to(
187 txs.l2s.get(&exchange).unwrap().clone(),
188 ))
189 })
190 }
191 (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
192 init_market_stream(
193 STREAM_RECONNECTION_POLICY,
194 subs.into_iter()
195 .map(|sub| {
196 Subscription::new(
197 BinanceFuturesUsd::default(),
198 sub.instrument,
199 PublicTrades,
200 )
201 })
202 .collect(),
203 )
204 .await
205 .map(|stream| {
206 tokio::spawn(stream.forward_to(
207 txs.trades.get(&exchange).unwrap().clone(),
208 ))
209 })
210 }
211 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
212 init_market_stream(
213 STREAM_RECONNECTION_POLICY,
214 subs.into_iter()
215 .map(|sub| {
216 Subscription::<_, Instrument, _>::new(
217 BinanceFuturesUsd::default(),
218 sub.instrument,
219 OrderBooksL1,
220 )
221 })
222 .collect(),
223 )
224 .await
225 .map(|stream| {
226 tokio::spawn(stream.forward_to(
227 txs.l1s.get(&exchange).unwrap().clone(),
228 ))
229 })
230 }
231 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
232 init_market_stream(
233 STREAM_RECONNECTION_POLICY,
234 subs.into_iter()
235 .map(|sub| {
236 Subscription::<_, Instrument, _>::new(
237 BinanceFuturesUsd::default(),
238 sub.instrument,
239 OrderBooksL2,
240 )
241 })
242 .collect(),
243 )
244 .await
245 .map(|stream| {
246 tokio::spawn(stream.forward_to(
247 txs.l2s.get(&exchange).unwrap().clone(),
248 ))
249 })
250 }
251 (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
252 init_market_stream(
253 STREAM_RECONNECTION_POLICY,
254 subs.into_iter()
255 .map(|sub| {
256 Subscription::<_, Instrument, _>::new(
257 BinanceFuturesUsd::default(),
258 sub.instrument,
259 Liquidations,
260 )
261 })
262 .collect(),
263 )
264 .await
265 .map(|stream| {
266 tokio::spawn(stream.forward_to(
267 txs.liquidations.get(&exchange).unwrap().clone(),
268 ))
269 })
270 }
271 (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
272 init_market_stream(
273 STREAM_RECONNECTION_POLICY,
274 subs.into_iter()
275 .map(|sub| {
276 Subscription::new(
277 Bitfinex,
278 sub.instrument,
279 PublicTrades,
280 )
281 })
282 .collect(),
283 )
284 .await
285 .map(|stream| {
286 tokio::spawn(stream.forward_to(
287 txs.trades.get(&exchange).unwrap().clone(),
288 ))
289 })
290 }
291 (ExchangeId::Bitmex, SubKind::PublicTrades) => {
292 init_market_stream(
293 STREAM_RECONNECTION_POLICY,
294 subs.into_iter()
295 .map(|sub| {
296 Subscription::new(
297 Bitmex,
298 sub.instrument,
299 PublicTrades,
300 )
301 })
302 .collect(),
303 )
304 .await
305 .map(|stream| {
306 tokio::spawn(stream.forward_to(
307 txs.trades.get(&exchange).unwrap().clone(),
308 ))
309 })
310 }
311 (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
312 init_market_stream(
313 STREAM_RECONNECTION_POLICY,
314 subs.into_iter()
315 .map(|sub| {
316 Subscription::new(
317 BybitSpot::default(),
318 sub.instrument,
319 PublicTrades,
320 )
321 })
322 .collect(),
323 )
324 .await
325 .map(|stream| {
326 tokio::spawn(stream.forward_to(
327 txs.trades.get(&exchange).unwrap().clone(),
328 ))
329 })
330 }
331 (ExchangeId::BybitSpot, SubKind::OrderBooksL1) => {
332 init_market_stream(
333 STREAM_RECONNECTION_POLICY,
334 subs.into_iter()
335 .map(|sub| {
336 Subscription::new(
337 BybitSpot::default(),
338 sub.instrument,
339 OrderBooksL1,
340 )
341 })
342 .collect(),
343 )
344 .await
345 .map(|stream| {
346 tokio::spawn(stream.forward_to(
347 txs.l1s.get(&exchange).unwrap().clone(),
348 ))
349 })
350 }
351 (ExchangeId::BybitSpot, SubKind::OrderBooksL2) => {
352 init_market_stream(
353 STREAM_RECONNECTION_POLICY,
354 subs.into_iter()
355 .map(|sub| {
356 Subscription::new(
357 BybitSpot::default(),
358 sub.instrument,
359 OrderBooksL2,
360 )
361 })
362 .collect(),
363 )
364 .await
365 .map(|stream| {
366 tokio::spawn(stream.forward_to(
367 txs.l2s.get(&exchange).unwrap().clone(),
368 ))
369 })
370 }
371 (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
372 init_market_stream(
373 STREAM_RECONNECTION_POLICY,
374 subs.into_iter()
375 .map(|sub| {
376 Subscription::new(
377 BybitPerpetualsUsd::default(),
378 sub.instrument,
379 PublicTrades,
380 )
381 })
382 .collect(),
383 )
384 .await
385 .map(|stream| {
386 tokio::spawn(stream.forward_to(
387 txs.trades.get(&exchange).unwrap().clone(),
388 ))
389 })
390 }
391 (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL1) => {
392 init_market_stream(
393 STREAM_RECONNECTION_POLICY,
394 subs.into_iter()
395 .map(|sub| {
396 Subscription::new(
397 BybitSpot::default(),
398 sub.instrument,
399 OrderBooksL1,
400 )
401 })
402 .collect(),
403 )
404 .await
405 .map(|stream| {
406 tokio::spawn(stream.forward_to(
407 txs.l1s.get(&exchange).unwrap().clone(),
408 ))
409 })
410 }
411 (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL2) => {
412 init_market_stream(
413 STREAM_RECONNECTION_POLICY,
414 subs.into_iter()
415 .map(|sub| {
416 Subscription::new(
417 BybitSpot::default(),
418 sub.instrument,
419 OrderBooksL2,
420 )
421 })
422 .collect(),
423 )
424 .await
425 .map(|stream| {
426 tokio::spawn(stream.forward_to(
427 txs.l2s.get(&exchange).unwrap().clone(),
428 ))
429 })
430 }
431 (ExchangeId::Coinbase, SubKind::PublicTrades) => {
432 init_market_stream(
433 STREAM_RECONNECTION_POLICY,
434 subs.into_iter()
435 .map(|sub| {
436 Subscription::new(
437 Coinbase,
438 sub.instrument,
439 PublicTrades,
440 )
441 })
442 .collect(),
443 )
444 .await
445 .map(|stream| {
446 tokio::spawn(stream.forward_to(
447 txs.trades.get(&exchange).unwrap().clone(),
448 ))
449 })
450 }
451 (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
452 init_market_stream(
453 STREAM_RECONNECTION_POLICY,
454 subs.into_iter()
455 .map(|sub| {
456 Subscription::new(
457 GateioSpot::default(),
458 sub.instrument,
459 PublicTrades,
460 )
461 })
462 .collect(),
463 )
464 .await
465 .map(|stream| {
466 tokio::spawn(stream.forward_to(
467 txs.trades.get(&exchange).unwrap().clone(),
468 ))
469 })
470 }
471 (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
472 init_market_stream(
473 STREAM_RECONNECTION_POLICY,
474 subs.into_iter()
475 .map(|sub| {
476 Subscription::new(
477 GateioFuturesUsd::default(),
478 sub.instrument,
479 PublicTrades,
480 )
481 })
482 .collect(),
483 )
484 .await
485 .map(|stream| {
486 tokio::spawn(stream.forward_to(
487 txs.trades.get(&exchange).unwrap().clone(),
488 ))
489 })
490 }
491 (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
492 init_market_stream(
493 STREAM_RECONNECTION_POLICY,
494 subs.into_iter()
495 .map(|sub| {
496 Subscription::new(
497 GateioFuturesBtc::default(),
498 sub.instrument,
499 PublicTrades,
500 )
501 })
502 .collect(),
503 )
504 .await
505 .map(|stream| {
506 tokio::spawn(stream.forward_to(
507 txs.trades.get(&exchange).unwrap().clone(),
508 ))
509 })
510 }
511 (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
512 init_market_stream(
513 STREAM_RECONNECTION_POLICY,
514 subs.into_iter()
515 .map(|sub| {
516 Subscription::new(
517 GateioPerpetualsUsd::default(),
518 sub.instrument,
519 PublicTrades,
520 )
521 })
522 .collect(),
523 )
524 .await
525 .map(|stream| {
526 tokio::spawn(stream.forward_to(
527 txs.trades.get(&exchange).unwrap().clone(),
528 ))
529 })
530 }
531 (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
532 init_market_stream(
533 STREAM_RECONNECTION_POLICY,
534 subs.into_iter()
535 .map(|sub| {
536 Subscription::new(
537 GateioPerpetualsBtc::default(),
538 sub.instrument,
539 PublicTrades,
540 )
541 })
542 .collect(),
543 )
544 .await
545 .map(|stream| {
546 tokio::spawn(stream.forward_to(
547 txs.trades.get(&exchange).unwrap().clone(),
548 ))
549 })
550 }
551 (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
552 init_market_stream(
553 STREAM_RECONNECTION_POLICY,
554 subs.into_iter()
555 .map(|sub| {
556 Subscription::new(
557 GateioOptions::default(),
558 sub.instrument,
559 PublicTrades,
560 )
561 })
562 .collect(),
563 )
564 .await
565 .map(|stream| {
566 tokio::spawn(stream.forward_to(
567 txs.trades.get(&exchange).unwrap().clone(),
568 ))
569 })
570 }
571 (ExchangeId::Kraken, SubKind::PublicTrades) => {
572 init_market_stream(
573 STREAM_RECONNECTION_POLICY,
574 subs.into_iter()
575 .map(|sub| {
576 Subscription::new(
577 Kraken,
578 sub.instrument,
579 PublicTrades,
580 )
581 })
582 .collect(),
583 )
584 .await
585 .map(|stream| {
586 tokio::spawn(stream.forward_to(
587 txs.trades.get(&exchange).unwrap().clone(),
588 ))
589 })
590 }
591 (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
592 init_market_stream(
593 STREAM_RECONNECTION_POLICY,
594 subs.into_iter()
595 .map(|sub| {
596 Subscription::new(
597 Kraken,
598 sub.instrument,
599 OrderBooksL1,
600 )
601 })
602 .collect(),
603 )
604 .await
605 .map(|stream| {
606 tokio::spawn(stream.forward_to(
607 txs.l1s.get(&exchange).unwrap().clone(),
608 ))
609 })
610 }
611 (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
612 STREAM_RECONNECTION_POLICY,
613 subs.into_iter()
614 .map(|sub| {
615 Subscription::new(Okx, sub.instrument, PublicTrades)
616 })
617 .collect(),
618 )
619 .await
620 .map(|stream| {
621 tokio::spawn(
622 stream.forward_to(
623 txs.trades.get(&exchange).unwrap().clone(),
624 ),
625 )
626 }),
627 (exchange, sub_kind) => {
628 Err(DataError::Unsupported { exchange, sub_kind })
629 }
630 }
631 }
632 });
633
634 try_join_all(batch_futures)
635 });
636
637 try_join_all(futures).await?;
638
639 Ok(Self {
640 trades: channels
641 .rxs
642 .trades
643 .into_iter()
644 .map(|(exchange, rx)| (exchange, rx.into_stream()))
645 .collect(),
646 l1s: channels
647 .rxs
648 .l1s
649 .into_iter()
650 .map(|(exchange, rx)| (exchange, rx.into_stream()))
651 .collect(),
652 l2s: channels
653 .rxs
654 .l2s
655 .into_iter()
656 .map(|(exchange, rx)| (exchange, rx.into_stream()))
657 .collect(),
658 liquidations: channels
659 .rxs
660 .liquidations
661 .into_iter()
662 .map(|(exchange, rx)| (exchange, rx.into_stream()))
663 .collect(),
664 })
665 }
666
667 /// Remove an exchange [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
668 ///
669 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
670 pub fn select_trades(
671 &mut self,
672 exchange: ExchangeId,
673 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
674 self.trades.remove(&exchange)
675 }
676
677 /// Select and merge every exchange [`PublicTrade`] `Stream` using
678 /// [`SelectAll`](futures_util::stream::select_all::select_all).
679 pub fn select_all_trades(
680 &mut self,
681 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
682 futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
683 }
684
685 /// Remove an exchange [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
686 ///
687 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
688 pub fn select_l1s(
689 &mut self,
690 exchange: ExchangeId,
691 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
692 self.l1s.remove(&exchange)
693 }
694
695 /// Select and merge every exchange [`OrderBookL1`] `Stream` using
696 /// [`SelectAll`](futures_util::stream::select_all::select_all).
697 pub fn select_all_l1s(
698 &mut self,
699 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
700 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
701 }
702
703 /// Remove an exchange [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
704 ///
705 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
706 pub fn select_l2s(
707 &mut self,
708 exchange: ExchangeId,
709 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
710 self.l2s.remove(&exchange)
711 }
712
713 /// Select and merge every exchange [`OrderBookEvent`] `Stream` using
714 /// [`SelectAll`](futures_util::stream::select_all::select_all).
715 pub fn select_all_l2s(
716 &mut self,
717 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
718 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
719 }
720
721 /// Remove an exchange [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
722 ///
723 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
724 pub fn select_liquidations(
725 &mut self,
726 exchange: ExchangeId,
727 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
728 self.liquidations.remove(&exchange)
729 }
730
731 /// Select and merge every exchange [`Liquidation`] `Stream` using
732 /// [`SelectAll`](futures_util::stream::select_all::select_all).
733 pub fn select_all_liquidations(
734 &mut self,
735 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
736 futures_util::stream::select_all::select_all(
737 std::mem::take(&mut self.liquidations).into_values(),
738 )
739 }
740
741 /// Select and merge every exchange `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
742 ///
743 /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
744 /// use cases.
745 pub fn select_all<Output>(self) -> impl Stream<Item = Output>
746 where
747 InstrumentKey: Send + 'static,
748 Output: 'static,
749 MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
750 MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
751 MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
752 MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
753 {
754 let Self {
755 trades,
756 l1s,
757 l2s,
758 liquidations,
759 } = self;
760
761 let trades = trades
762 .into_values()
763 .map(|stream| stream.map(MarketStreamResult::into).boxed());
764
765 let l1s = l1s
766 .into_values()
767 .map(|stream| stream.map(MarketStreamResult::into).boxed());
768
769 let l2s = l2s
770 .into_values()
771 .map(|stream| stream.map(MarketStreamResult::into).boxed());
772
773 let liquidations = liquidations
774 .into_values()
775 .map(|stream| stream.map(MarketStreamResult::into).boxed());
776
777 let all = trades.chain(l1s).chain(l2s).chain(liquidations);
778
779 futures_util::stream::select_all::select_all(all)
780 }
781}
782
783pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
784 batches: SubBatchIter,
785) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
786where
787 SubBatchIter: IntoIterator<Item = SubIter>,
788 SubIter: IntoIterator<Item = Sub>,
789 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
790 Instrument: InstrumentData + Ord,
791{
792 batches
793 .into_iter()
794 .map(validate_subscriptions::<SubIter, Sub, Instrument>)
795 .collect()
796}
797
798pub fn validate_subscriptions<SubIter, Sub, Instrument>(
799 batch: SubIter,
800) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
801where
802 SubIter: IntoIterator<Item = Sub>,
803 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
804 Instrument: InstrumentData + Ord,
805{
806 // Validate Subscriptions
807 let mut batch = batch
808 .into_iter()
809 .map(Sub::into)
810 .map(Validator::validate)
811 .collect::<Result<Vec<_>, SocketError>>()?;
812
813 // Remove duplicate Subscriptions
814 batch.sort();
815 batch.dedup();
816
817 Ok(batch)
818}
819
820struct Channels<InstrumentKey> {
821 txs: Arc<Txs<InstrumentKey>>,
822 rxs: Rxs<InstrumentKey>,
823}
824
825impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
826 for Channels<Instrument::Key>
827where
828 Instrument: InstrumentData,
829{
830 type Error = DataError;
831
832 fn try_from(
833 value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
834 ) -> Result<Self, Self::Error> {
835 let mut txs = Txs::default();
836 let mut rxs = Rxs::default();
837
838 for sub in value.iter().flatten() {
839 match sub.kind {
840 SubKind::PublicTrades => {
841 if let (None, None) =
842 (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
843 {
844 let (tx, rx) = mpsc_unbounded();
845 txs.trades.insert(sub.exchange, tx);
846 rxs.trades.insert(sub.exchange, rx);
847 }
848 }
849 SubKind::OrderBooksL1 => {
850 if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
851 let (tx, rx) = mpsc_unbounded();
852 txs.l1s.insert(sub.exchange, tx);
853 rxs.l1s.insert(sub.exchange, rx);
854 }
855 }
856 SubKind::OrderBooksL2 => {
857 if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
858 let (tx, rx) = mpsc_unbounded();
859 txs.l2s.insert(sub.exchange, tx);
860 rxs.l2s.insert(sub.exchange, rx);
861 }
862 }
863 SubKind::Liquidations => {
864 if let (None, None) = (
865 txs.liquidations.get(&sub.exchange),
866 rxs.liquidations.get(&sub.exchange),
867 ) {
868 let (tx, rx) = mpsc_unbounded();
869 txs.liquidations.insert(sub.exchange, tx);
870 rxs.liquidations.insert(sub.exchange, rx);
871 }
872 }
873 unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
874 }
875 }
876
877 Ok(Channels {
878 txs: Arc::new(txs),
879 rxs,
880 })
881 }
882}
883
884struct Txs<InstrumentKey> {
885 trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
886 l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
887 l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
888 liquidations:
889 FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
890}
891
892impl<InstrumentKey> Default for Txs<InstrumentKey> {
893 fn default() -> Self {
894 Self {
895 trades: Default::default(),
896 l1s: Default::default(),
897 l2s: Default::default(),
898 liquidations: Default::default(),
899 }
900 }
901}
902
903struct Rxs<InstrumentKey> {
904 trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
905 l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
906 l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
907 liquidations:
908 FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
909}
910
911impl<InstrumentKey> Default for Rxs<InstrumentKey> {
912 fn default() -> Self {
913 Self {
914 trades: Default::default(),
915 l1s: Default::default(),
916 l2s: Default::default(),
917 liquidations: Default::default(),
918 }
919 }
920}