barter_data/streams/builder/dynamic/mod.rs
1use crate::{
2 Identifier,
3 error::DataError,
4 exchange::{
5 binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6 bitfinex::{Bitfinex, market::BitfinexMarket},
7 bitmex::{Bitmex, market::BitmexMarket},
8 bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9 coinbase::{Coinbase, market::CoinbaseMarket},
10 gateio::{
11 future::{GateioFuturesBtc, GateioFuturesUsd},
12 market::GateioMarket,
13 option::GateioOptions,
14 perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15 spot::GateioSpot,
16 },
17 kraken::{Kraken, market::KrakenMarket},
18 okx::{Okx, market::OkxMarket},
19 },
20 instrument::InstrumentData,
21 streams::{
22 consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23 reconnect::stream::ReconnectingStream,
24 },
25 subscription::{
26 SubKind, Subscription,
27 book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
28 liquidation::{Liquidation, Liquidations},
29 trade::{PublicTrade, PublicTrades},
30 },
31};
32use barter_instrument::exchange::ExchangeId;
33use barter_integration::{
34 Validator,
35 channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
36 error::SocketError,
37};
38use fnv::FnvHashMap;
39use futures::{Stream, stream::SelectAll};
40use futures_util::{StreamExt, future::try_join_all};
41use itertools::Itertools;
42use std::{
43 fmt::{Debug, Display},
44 sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53 pub trades:
54 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55 pub l1s:
56 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57 pub l2s: VecMap<
58 ExchangeId,
59 UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60 >,
61 pub liquidations:
62 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66 /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67 ///
68 /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69 /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70 /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71 ///
72 /// ## Examples
73 /// Please see barter-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74 /// comprehensive example of how to use this market data stream initialiser.
75 pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
76 subscription_batches: SubBatchIter,
77 ) -> Result<Self, DataError>
78 where
79 SubBatchIter: IntoIterator<Item = SubIter>,
80 SubIter: IntoIterator<Item = Sub>,
81 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
82 Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
83 InstrumentKey: Debug + Clone + Send + 'static,
84 Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
85 Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
86 Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
87 Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
88 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
89 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
90 Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
91 Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
92 Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
93 Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
94 Subscription<BybitSpot, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
95 Subscription<BybitSpot, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
96 Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
97 Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
98 Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
99 Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
100 Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
101 Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
102 Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
103 Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
104 Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
105 Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
106 Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
107 Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
108 Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
109 {
110 // Validate & dedup Subscription batches
111 let batches = validate_batches(subscription_batches)?;
112
113 // Generate required Channels from Subscription batches
114 let channels = Channels::try_from(&batches)?;
115
116 let futures =
117 batches.into_iter().map(|mut batch| {
118 batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
119 let by_exchange_by_sub_kind =
120 batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
121
122 let batch_futures =
123 by_exchange_by_sub_kind
124 .into_iter()
125 .map(|((exchange, sub_kind), subs)| {
126 let subs = subs.into_iter().collect::<Vec<_>>();
127 let txs = Arc::clone(&channels.txs);
128 async move {
129 match (exchange, sub_kind) {
130 (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
131 init_market_stream(
132 STREAM_RECONNECTION_POLICY,
133 subs.into_iter()
134 .map(|sub| {
135 Subscription::new(
136 BinanceSpot::default(),
137 sub.instrument,
138 PublicTrades,
139 )
140 })
141 .collect(),
142 )
143 .await
144 .map(|stream| {
145 tokio::spawn(stream.forward_to(
146 txs.trades.get(&exchange).unwrap().clone(),
147 ))
148 })
149 }
150 (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
151 init_market_stream(
152 STREAM_RECONNECTION_POLICY,
153 subs.into_iter()
154 .map(|sub| {
155 Subscription::new(
156 BinanceSpot::default(),
157 sub.instrument,
158 OrderBooksL1,
159 )
160 })
161 .collect(),
162 )
163 .await
164 .map(|stream| {
165 tokio::spawn(stream.forward_to(
166 txs.l1s.get(&exchange).unwrap().clone(),
167 ))
168 })
169 }
170 (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
171 init_market_stream(
172 STREAM_RECONNECTION_POLICY,
173 subs.into_iter()
174 .map(|sub| {
175 Subscription::new(
176 BinanceSpot::default(),
177 sub.instrument,
178 OrderBooksL2,
179 )
180 })
181 .collect(),
182 )
183 .await
184 .map(|stream| {
185 tokio::spawn(stream.forward_to(
186 txs.l2s.get(&exchange).unwrap().clone(),
187 ))
188 })
189 }
190 (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
191 init_market_stream(
192 STREAM_RECONNECTION_POLICY,
193 subs.into_iter()
194 .map(|sub| {
195 Subscription::new(
196 BinanceFuturesUsd::default(),
197 sub.instrument,
198 PublicTrades,
199 )
200 })
201 .collect(),
202 )
203 .await
204 .map(|stream| {
205 tokio::spawn(stream.forward_to(
206 txs.trades.get(&exchange).unwrap().clone(),
207 ))
208 })
209 }
210 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
211 init_market_stream(
212 STREAM_RECONNECTION_POLICY,
213 subs.into_iter()
214 .map(|sub| {
215 Subscription::<_, Instrument, _>::new(
216 BinanceFuturesUsd::default(),
217 sub.instrument,
218 OrderBooksL1,
219 )
220 })
221 .collect(),
222 )
223 .await
224 .map(|stream| {
225 tokio::spawn(stream.forward_to(
226 txs.l1s.get(&exchange).unwrap().clone(),
227 ))
228 })
229 }
230 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
231 init_market_stream(
232 STREAM_RECONNECTION_POLICY,
233 subs.into_iter()
234 .map(|sub| {
235 Subscription::<_, Instrument, _>::new(
236 BinanceFuturesUsd::default(),
237 sub.instrument,
238 OrderBooksL2,
239 )
240 })
241 .collect(),
242 )
243 .await
244 .map(|stream| {
245 tokio::spawn(stream.forward_to(
246 txs.l2s.get(&exchange).unwrap().clone(),
247 ))
248 })
249 }
250 (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
251 init_market_stream(
252 STREAM_RECONNECTION_POLICY,
253 subs.into_iter()
254 .map(|sub| {
255 Subscription::<_, Instrument, _>::new(
256 BinanceFuturesUsd::default(),
257 sub.instrument,
258 Liquidations,
259 )
260 })
261 .collect(),
262 )
263 .await
264 .map(|stream| {
265 tokio::spawn(stream.forward_to(
266 txs.liquidations.get(&exchange).unwrap().clone(),
267 ))
268 })
269 }
270 (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
271 init_market_stream(
272 STREAM_RECONNECTION_POLICY,
273 subs.into_iter()
274 .map(|sub| {
275 Subscription::new(
276 Bitfinex,
277 sub.instrument,
278 PublicTrades,
279 )
280 })
281 .collect(),
282 )
283 .await
284 .map(|stream| {
285 tokio::spawn(stream.forward_to(
286 txs.trades.get(&exchange).unwrap().clone(),
287 ))
288 })
289 }
290 (ExchangeId::Bitmex, SubKind::PublicTrades) => {
291 init_market_stream(
292 STREAM_RECONNECTION_POLICY,
293 subs.into_iter()
294 .map(|sub| {
295 Subscription::new(
296 Bitmex,
297 sub.instrument,
298 PublicTrades,
299 )
300 })
301 .collect(),
302 )
303 .await
304 .map(|stream| {
305 tokio::spawn(stream.forward_to(
306 txs.trades.get(&exchange).unwrap().clone(),
307 ))
308 })
309 }
310 (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
311 init_market_stream(
312 STREAM_RECONNECTION_POLICY,
313 subs.into_iter()
314 .map(|sub| {
315 Subscription::new(
316 BybitSpot::default(),
317 sub.instrument,
318 PublicTrades,
319 )
320 })
321 .collect(),
322 )
323 .await
324 .map(|stream| {
325 tokio::spawn(stream.forward_to(
326 txs.trades.get(&exchange).unwrap().clone(),
327 ))
328 })
329 }
330 (ExchangeId::BybitSpot, SubKind::OrderBooksL1) => {
331 init_market_stream(
332 STREAM_RECONNECTION_POLICY,
333 subs.into_iter()
334 .map(|sub| {
335 Subscription::new(
336 BybitSpot::default(),
337 sub.instrument,
338 OrderBooksL1,
339 )
340 })
341 .collect(),
342 )
343 .await
344 .map(|stream| {
345 tokio::spawn(stream.forward_to(
346 txs.l1s.get(&exchange).unwrap().clone(),
347 ))
348 })
349 }
350 (ExchangeId::BybitSpot, SubKind::OrderBooksL2) => {
351 init_market_stream(
352 STREAM_RECONNECTION_POLICY,
353 subs.into_iter()
354 .map(|sub| {
355 Subscription::new(
356 BybitSpot::default(),
357 sub.instrument,
358 OrderBooksL2,
359 )
360 })
361 .collect(),
362 )
363 .await
364 .map(|stream| {
365 tokio::spawn(stream.forward_to(
366 txs.l2s.get(&exchange).unwrap().clone(),
367 ))
368 })
369 }
370 (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
371 init_market_stream(
372 STREAM_RECONNECTION_POLICY,
373 subs.into_iter()
374 .map(|sub| {
375 Subscription::new(
376 BybitPerpetualsUsd::default(),
377 sub.instrument,
378 PublicTrades,
379 )
380 })
381 .collect(),
382 )
383 .await
384 .map(|stream| {
385 tokio::spawn(stream.forward_to(
386 txs.trades.get(&exchange).unwrap().clone(),
387 ))
388 })
389 }
390 (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL1) => {
391 init_market_stream(
392 STREAM_RECONNECTION_POLICY,
393 subs.into_iter()
394 .map(|sub| {
395 Subscription::new(
396 BybitSpot::default(),
397 sub.instrument,
398 OrderBooksL1,
399 )
400 })
401 .collect(),
402 )
403 .await
404 .map(|stream| {
405 tokio::spawn(stream.forward_to(
406 txs.l1s.get(&exchange).unwrap().clone(),
407 ))
408 })
409 }
410 (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL2) => {
411 init_market_stream(
412 STREAM_RECONNECTION_POLICY,
413 subs.into_iter()
414 .map(|sub| {
415 Subscription::new(
416 BybitSpot::default(),
417 sub.instrument,
418 OrderBooksL2,
419 )
420 })
421 .collect(),
422 )
423 .await
424 .map(|stream| {
425 tokio::spawn(stream.forward_to(
426 txs.l2s.get(&exchange).unwrap().clone(),
427 ))
428 })
429 }
430 (ExchangeId::Coinbase, SubKind::PublicTrades) => {
431 init_market_stream(
432 STREAM_RECONNECTION_POLICY,
433 subs.into_iter()
434 .map(|sub| {
435 Subscription::new(
436 Coinbase,
437 sub.instrument,
438 PublicTrades,
439 )
440 })
441 .collect(),
442 )
443 .await
444 .map(|stream| {
445 tokio::spawn(stream.forward_to(
446 txs.trades.get(&exchange).unwrap().clone(),
447 ))
448 })
449 }
450 (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
451 init_market_stream(
452 STREAM_RECONNECTION_POLICY,
453 subs.into_iter()
454 .map(|sub| {
455 Subscription::new(
456 GateioSpot::default(),
457 sub.instrument,
458 PublicTrades,
459 )
460 })
461 .collect(),
462 )
463 .await
464 .map(|stream| {
465 tokio::spawn(stream.forward_to(
466 txs.trades.get(&exchange).unwrap().clone(),
467 ))
468 })
469 }
470 (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
471 init_market_stream(
472 STREAM_RECONNECTION_POLICY,
473 subs.into_iter()
474 .map(|sub| {
475 Subscription::new(
476 GateioFuturesUsd::default(),
477 sub.instrument,
478 PublicTrades,
479 )
480 })
481 .collect(),
482 )
483 .await
484 .map(|stream| {
485 tokio::spawn(stream.forward_to(
486 txs.trades.get(&exchange).unwrap().clone(),
487 ))
488 })
489 }
490 (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
491 init_market_stream(
492 STREAM_RECONNECTION_POLICY,
493 subs.into_iter()
494 .map(|sub| {
495 Subscription::new(
496 GateioFuturesBtc::default(),
497 sub.instrument,
498 PublicTrades,
499 )
500 })
501 .collect(),
502 )
503 .await
504 .map(|stream| {
505 tokio::spawn(stream.forward_to(
506 txs.trades.get(&exchange).unwrap().clone(),
507 ))
508 })
509 }
510 (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
511 init_market_stream(
512 STREAM_RECONNECTION_POLICY,
513 subs.into_iter()
514 .map(|sub| {
515 Subscription::new(
516 GateioPerpetualsUsd::default(),
517 sub.instrument,
518 PublicTrades,
519 )
520 })
521 .collect(),
522 )
523 .await
524 .map(|stream| {
525 tokio::spawn(stream.forward_to(
526 txs.trades.get(&exchange).unwrap().clone(),
527 ))
528 })
529 }
530 (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
531 init_market_stream(
532 STREAM_RECONNECTION_POLICY,
533 subs.into_iter()
534 .map(|sub| {
535 Subscription::new(
536 GateioPerpetualsBtc::default(),
537 sub.instrument,
538 PublicTrades,
539 )
540 })
541 .collect(),
542 )
543 .await
544 .map(|stream| {
545 tokio::spawn(stream.forward_to(
546 txs.trades.get(&exchange).unwrap().clone(),
547 ))
548 })
549 }
550 (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
551 init_market_stream(
552 STREAM_RECONNECTION_POLICY,
553 subs.into_iter()
554 .map(|sub| {
555 Subscription::new(
556 GateioOptions::default(),
557 sub.instrument,
558 PublicTrades,
559 )
560 })
561 .collect(),
562 )
563 .await
564 .map(|stream| {
565 tokio::spawn(stream.forward_to(
566 txs.trades.get(&exchange).unwrap().clone(),
567 ))
568 })
569 }
570 (ExchangeId::Kraken, SubKind::PublicTrades) => {
571 init_market_stream(
572 STREAM_RECONNECTION_POLICY,
573 subs.into_iter()
574 .map(|sub| {
575 Subscription::new(
576 Kraken,
577 sub.instrument,
578 PublicTrades,
579 )
580 })
581 .collect(),
582 )
583 .await
584 .map(|stream| {
585 tokio::spawn(stream.forward_to(
586 txs.trades.get(&exchange).unwrap().clone(),
587 ))
588 })
589 }
590 (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
591 init_market_stream(
592 STREAM_RECONNECTION_POLICY,
593 subs.into_iter()
594 .map(|sub| {
595 Subscription::new(
596 Kraken,
597 sub.instrument,
598 OrderBooksL1,
599 )
600 })
601 .collect(),
602 )
603 .await
604 .map(|stream| {
605 tokio::spawn(stream.forward_to(
606 txs.l1s.get(&exchange).unwrap().clone(),
607 ))
608 })
609 }
610 (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
611 STREAM_RECONNECTION_POLICY,
612 subs.into_iter()
613 .map(|sub| {
614 Subscription::new(Okx, sub.instrument, PublicTrades)
615 })
616 .collect(),
617 )
618 .await
619 .map(|stream| {
620 tokio::spawn(
621 stream.forward_to(
622 txs.trades.get(&exchange).unwrap().clone(),
623 ),
624 )
625 }),
626 (exchange, sub_kind) => {
627 Err(DataError::Unsupported { exchange, sub_kind })
628 }
629 }
630 }
631 });
632
633 try_join_all(batch_futures)
634 });
635
636 try_join_all(futures).await?;
637
638 Ok(Self {
639 trades: channels
640 .rxs
641 .trades
642 .into_iter()
643 .map(|(exchange, rx)| (exchange, rx.into_stream()))
644 .collect(),
645 l1s: channels
646 .rxs
647 .l1s
648 .into_iter()
649 .map(|(exchange, rx)| (exchange, rx.into_stream()))
650 .collect(),
651 l2s: channels
652 .rxs
653 .l2s
654 .into_iter()
655 .map(|(exchange, rx)| (exchange, rx.into_stream()))
656 .collect(),
657 liquidations: channels
658 .rxs
659 .liquidations
660 .into_iter()
661 .map(|(exchange, rx)| (exchange, rx.into_stream()))
662 .collect(),
663 })
664 }
665
666 /// Remove an exchange [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
667 ///
668 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
669 pub fn select_trades(
670 &mut self,
671 exchange: ExchangeId,
672 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
673 self.trades.remove(&exchange)
674 }
675
676 /// Select and merge every exchange [`PublicTrade`] `Stream` using
677 /// [`SelectAll`](futures_util::stream::select_all::select_all).
678 pub fn select_all_trades(
679 &mut self,
680 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
681 futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
682 }
683
684 /// Remove an exchange [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
685 ///
686 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
687 pub fn select_l1s(
688 &mut self,
689 exchange: ExchangeId,
690 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
691 self.l1s.remove(&exchange)
692 }
693
694 /// Select and merge every exchange [`OrderBookL1`] `Stream` using
695 /// [`SelectAll`](futures_util::stream::select_all::select_all).
696 pub fn select_all_l1s(
697 &mut self,
698 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
699 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
700 }
701
702 /// Remove an exchange [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
703 ///
704 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
705 pub fn select_l2s(
706 &mut self,
707 exchange: ExchangeId,
708 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
709 self.l2s.remove(&exchange)
710 }
711
712 /// Select and merge every exchange [`OrderBookEvent`] `Stream` using
713 /// [`SelectAll`](futures_util::stream::select_all::select_all).
714 pub fn select_all_l2s(
715 &mut self,
716 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
717 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
718 }
719
720 /// Remove an exchange [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
721 ///
722 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
723 pub fn select_liquidations(
724 &mut self,
725 exchange: ExchangeId,
726 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
727 self.liquidations.remove(&exchange)
728 }
729
730 /// Select and merge every exchange [`Liquidation`] `Stream` using
731 /// [`SelectAll`](futures_util::stream::select_all::select_all).
732 pub fn select_all_liquidations(
733 &mut self,
734 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
735 futures_util::stream::select_all::select_all(
736 std::mem::take(&mut self.liquidations).into_values(),
737 )
738 }
739
740 /// Select and merge every exchange `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
741 ///
742 /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
743 /// use cases.
744 pub fn select_all<Output>(self) -> impl Stream<Item = Output>
745 where
746 InstrumentKey: Send + 'static,
747 Output: 'static,
748 MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
749 MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
750 MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
751 MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
752 {
753 let Self {
754 trades,
755 l1s,
756 l2s,
757 liquidations,
758 } = self;
759
760 let trades = trades
761 .into_values()
762 .map(|stream| stream.map(MarketStreamResult::into).boxed());
763
764 let l1s = l1s
765 .into_values()
766 .map(|stream| stream.map(MarketStreamResult::into).boxed());
767
768 let l2s = l2s
769 .into_values()
770 .map(|stream| stream.map(MarketStreamResult::into).boxed());
771
772 let liquidations = liquidations
773 .into_values()
774 .map(|stream| stream.map(MarketStreamResult::into).boxed());
775
776 let all = trades.chain(l1s).chain(l2s).chain(liquidations);
777
778 futures_util::stream::select_all::select_all(all)
779 }
780}
781
782pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
783 batches: SubBatchIter,
784) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
785where
786 SubBatchIter: IntoIterator<Item = SubIter>,
787 SubIter: IntoIterator<Item = Sub>,
788 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
789 Instrument: InstrumentData + Ord,
790{
791 batches
792 .into_iter()
793 .map(validate_subscriptions::<SubIter, Sub, Instrument>)
794 .collect()
795}
796
797pub fn validate_subscriptions<SubIter, Sub, Instrument>(
798 batch: SubIter,
799) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
800where
801 SubIter: IntoIterator<Item = Sub>,
802 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
803 Instrument: InstrumentData + Ord,
804{
805 // Validate Subscriptions
806 let mut batch = batch
807 .into_iter()
808 .map(Sub::into)
809 .map(Validator::validate)
810 .collect::<Result<Vec<_>, SocketError>>()?;
811
812 // Remove duplicate Subscriptions
813 batch.sort();
814 batch.dedup();
815
816 Ok(batch)
817}
818
819struct Channels<InstrumentKey> {
820 txs: Arc<Txs<InstrumentKey>>,
821 rxs: Rxs<InstrumentKey>,
822}
823
824impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
825 for Channels<Instrument::Key>
826where
827 Instrument: InstrumentData,
828{
829 type Error = DataError;
830
831 fn try_from(
832 value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
833 ) -> Result<Self, Self::Error> {
834 let mut txs = Txs::default();
835 let mut rxs = Rxs::default();
836
837 for sub in value.iter().flatten() {
838 match sub.kind {
839 SubKind::PublicTrades => {
840 if let (None, None) =
841 (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
842 {
843 let (tx, rx) = mpsc_unbounded();
844 txs.trades.insert(sub.exchange, tx);
845 rxs.trades.insert(sub.exchange, rx);
846 }
847 }
848 SubKind::OrderBooksL1 => {
849 if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
850 let (tx, rx) = mpsc_unbounded();
851 txs.l1s.insert(sub.exchange, tx);
852 rxs.l1s.insert(sub.exchange, rx);
853 }
854 }
855 SubKind::OrderBooksL2 => {
856 if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
857 let (tx, rx) = mpsc_unbounded();
858 txs.l2s.insert(sub.exchange, tx);
859 rxs.l2s.insert(sub.exchange, rx);
860 }
861 }
862 SubKind::Liquidations => {
863 if let (None, None) = (
864 txs.liquidations.get(&sub.exchange),
865 rxs.liquidations.get(&sub.exchange),
866 ) {
867 let (tx, rx) = mpsc_unbounded();
868 txs.liquidations.insert(sub.exchange, tx);
869 rxs.liquidations.insert(sub.exchange, rx);
870 }
871 }
872 unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
873 }
874 }
875
876 Ok(Channels {
877 txs: Arc::new(txs),
878 rxs,
879 })
880 }
881}
882
883struct Txs<InstrumentKey> {
884 trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
885 l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
886 l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
887 liquidations:
888 FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
889}
890
891impl<InstrumentKey> Default for Txs<InstrumentKey> {
892 fn default() -> Self {
893 Self {
894 trades: Default::default(),
895 l1s: Default::default(),
896 l2s: Default::default(),
897 liquidations: Default::default(),
898 }
899 }
900}
901
902struct Rxs<InstrumentKey> {
903 trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
904 l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
905 l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
906 liquidations:
907 FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
908}
909
910impl<InstrumentKey> Default for Rxs<InstrumentKey> {
911 fn default() -> Self {
912 Self {
913 trades: Default::default(),
914 l1s: Default::default(),
915 l2s: Default::default(),
916 liquidations: Default::default(),
917 }
918 }
919}