barter_data/streams/builder/dynamic/mod.rs
1use crate::{
2 Identifier,
3 error::DataError,
4 exchange::{
5 binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6 bitfinex::{Bitfinex, market::BitfinexMarket},
7 bitmex::{Bitmex, market::BitmexMarket},
8 bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9 coinbase::{Coinbase, market::CoinbaseMarket},
10 gateio::{
11 future::{GateioFuturesBtc, GateioFuturesUsd},
12 market::GateioMarket,
13 option::GateioOptions,
14 perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15 spot::GateioSpot,
16 },
17 kraken::{Kraken, market::KrakenMarket},
18 okx::{Okx, market::OkxMarket},
19 },
20 instrument::InstrumentData,
21 streams::{
22 consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23 reconnect::stream::ReconnectingStream,
24 },
25 subscription::{
26 SubKind, Subscription,
27 book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
28 liquidation::{Liquidation, Liquidations},
29 trade::{PublicTrade, PublicTrades},
30 },
31};
32use barter_instrument::exchange::ExchangeId;
33use barter_integration::{
34 Validator,
35 channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
36 error::SocketError,
37};
38use fnv::FnvHashMap;
39use futures::{Stream, stream::SelectAll};
40use futures_util::{StreamExt, future::try_join_all};
41use itertools::Itertools;
42use std::{
43 fmt::{Debug, Display},
44 sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53 pub trades:
54 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55 pub l1s:
56 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57 pub l2s: VecMap<
58 ExchangeId,
59 UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60 >,
61 pub liquidations:
62 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66 /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67 ///
68 /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69 /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70 /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71 ///
72 /// ## Examples
73 /// Please see barter-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74 /// comprehensive example of how to use this market data stream initialiser.
75 pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
76 subscription_batches: SubBatchIter,
77 ) -> Result<Self, DataError>
78 where
79 SubBatchIter: IntoIterator<Item = SubIter>,
80 SubIter: IntoIterator<Item = Sub>,
81 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
82 Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
83 InstrumentKey: Debug + Clone + Send + 'static,
84 Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
85 Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
86 Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
87 Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
88 Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
89 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
90 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
91 Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
92 Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
93 Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
94 Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
95 Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
96 Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
97 Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
98 Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
99 Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
100 Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
101 Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
102 Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
103 Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
104 Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
105 Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
106 {
107 // Validate & dedup Subscription batches
108 let batches = validate_batches(subscription_batches)?;
109
110 // Generate required Channels from Subscription batches
111 let channels = Channels::try_from(&batches)?;
112
113 let futures =
114 batches.into_iter().map(|mut batch| {
115 batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
116 let by_exchange_by_sub_kind =
117 batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
118
119 let batch_futures =
120 by_exchange_by_sub_kind
121 .into_iter()
122 .map(|((exchange, sub_kind), subs)| {
123 let subs = subs.into_iter().collect::<Vec<_>>();
124 let txs = Arc::clone(&channels.txs);
125 async move {
126 match (exchange, sub_kind) {
127 (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
128 init_market_stream(
129 STREAM_RECONNECTION_POLICY,
130 subs.into_iter()
131 .map(|sub| {
132 Subscription::new(
133 BinanceSpot::default(),
134 sub.instrument,
135 PublicTrades,
136 )
137 })
138 .collect(),
139 )
140 .await
141 .map(|stream| {
142 tokio::spawn(stream.forward_to(
143 txs.trades.get(&exchange).unwrap().clone(),
144 ))
145 })
146 }
147 (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
148 init_market_stream(
149 STREAM_RECONNECTION_POLICY,
150 subs.into_iter()
151 .map(|sub| {
152 Subscription::new(
153 BinanceSpot::default(),
154 sub.instrument,
155 OrderBooksL1,
156 )
157 })
158 .collect(),
159 )
160 .await
161 .map(|stream| {
162 tokio::spawn(stream.forward_to(
163 txs.l1s.get(&exchange).unwrap().clone(),
164 ))
165 })
166 }
167 (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
168 init_market_stream(
169 STREAM_RECONNECTION_POLICY,
170 subs.into_iter()
171 .map(|sub| {
172 Subscription::new(
173 BinanceSpot::default(),
174 sub.instrument,
175 OrderBooksL2,
176 )
177 })
178 .collect(),
179 )
180 .await
181 .map(|stream| {
182 tokio::spawn(stream.forward_to(
183 txs.l2s.get(&exchange).unwrap().clone(),
184 ))
185 })
186 }
187 (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
188 init_market_stream(
189 STREAM_RECONNECTION_POLICY,
190 subs.into_iter()
191 .map(|sub| {
192 Subscription::new(
193 BinanceFuturesUsd::default(),
194 sub.instrument,
195 PublicTrades,
196 )
197 })
198 .collect(),
199 )
200 .await
201 .map(|stream| {
202 tokio::spawn(stream.forward_to(
203 txs.trades.get(&exchange).unwrap().clone(),
204 ))
205 })
206 }
207 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
208 init_market_stream(
209 STREAM_RECONNECTION_POLICY,
210 subs.into_iter()
211 .map(|sub| {
212 Subscription::<_, Instrument, _>::new(
213 BinanceFuturesUsd::default(),
214 sub.instrument,
215 OrderBooksL1,
216 )
217 })
218 .collect(),
219 )
220 .await
221 .map(|stream| {
222 tokio::spawn(stream.forward_to(
223 txs.l1s.get(&exchange).unwrap().clone(),
224 ))
225 })
226 }
227 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
228 init_market_stream(
229 STREAM_RECONNECTION_POLICY,
230 subs.into_iter()
231 .map(|sub| {
232 Subscription::<_, Instrument, _>::new(
233 BinanceFuturesUsd::default(),
234 sub.instrument,
235 OrderBooksL2,
236 )
237 })
238 .collect(),
239 )
240 .await
241 .map(|stream| {
242 tokio::spawn(stream.forward_to(
243 txs.l2s.get(&exchange).unwrap().clone(),
244 ))
245 })
246 }
247 (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
248 init_market_stream(
249 STREAM_RECONNECTION_POLICY,
250 subs.into_iter()
251 .map(|sub| {
252 Subscription::<_, Instrument, _>::new(
253 BinanceFuturesUsd::default(),
254 sub.instrument,
255 Liquidations,
256 )
257 })
258 .collect(),
259 )
260 .await
261 .map(|stream| {
262 tokio::spawn(stream.forward_to(
263 txs.liquidations.get(&exchange).unwrap().clone(),
264 ))
265 })
266 }
267 (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
268 init_market_stream(
269 STREAM_RECONNECTION_POLICY,
270 subs.into_iter()
271 .map(|sub| {
272 Subscription::new(
273 Bitfinex,
274 sub.instrument,
275 PublicTrades,
276 )
277 })
278 .collect(),
279 )
280 .await
281 .map(|stream| {
282 tokio::spawn(stream.forward_to(
283 txs.trades.get(&exchange).unwrap().clone(),
284 ))
285 })
286 }
287 (ExchangeId::Bitmex, SubKind::PublicTrades) => {
288 init_market_stream(
289 STREAM_RECONNECTION_POLICY,
290 subs.into_iter()
291 .map(|sub| {
292 Subscription::new(
293 Bitmex,
294 sub.instrument,
295 PublicTrades,
296 )
297 })
298 .collect(),
299 )
300 .await
301 .map(|stream| {
302 tokio::spawn(stream.forward_to(
303 txs.trades.get(&exchange).unwrap().clone(),
304 ))
305 })
306 }
307 (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
308 init_market_stream(
309 STREAM_RECONNECTION_POLICY,
310 subs.into_iter()
311 .map(|sub| {
312 Subscription::new(
313 BybitSpot::default(),
314 sub.instrument,
315 PublicTrades,
316 )
317 })
318 .collect(),
319 )
320 .await
321 .map(|stream| {
322 tokio::spawn(stream.forward_to(
323 txs.trades.get(&exchange).unwrap().clone(),
324 ))
325 })
326 }
327 (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
328 init_market_stream(
329 STREAM_RECONNECTION_POLICY,
330 subs.into_iter()
331 .map(|sub| {
332 Subscription::new(
333 BybitPerpetualsUsd::default(),
334 sub.instrument,
335 PublicTrades,
336 )
337 })
338 .collect(),
339 )
340 .await
341 .map(|stream| {
342 tokio::spawn(stream.forward_to(
343 txs.trades.get(&exchange).unwrap().clone(),
344 ))
345 })
346 }
347 (ExchangeId::Coinbase, SubKind::PublicTrades) => {
348 init_market_stream(
349 STREAM_RECONNECTION_POLICY,
350 subs.into_iter()
351 .map(|sub| {
352 Subscription::new(
353 Coinbase,
354 sub.instrument,
355 PublicTrades,
356 )
357 })
358 .collect(),
359 )
360 .await
361 .map(|stream| {
362 tokio::spawn(stream.forward_to(
363 txs.trades.get(&exchange).unwrap().clone(),
364 ))
365 })
366 }
367 (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
368 init_market_stream(
369 STREAM_RECONNECTION_POLICY,
370 subs.into_iter()
371 .map(|sub| {
372 Subscription::new(
373 GateioSpot::default(),
374 sub.instrument,
375 PublicTrades,
376 )
377 })
378 .collect(),
379 )
380 .await
381 .map(|stream| {
382 tokio::spawn(stream.forward_to(
383 txs.trades.get(&exchange).unwrap().clone(),
384 ))
385 })
386 }
387 (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
388 init_market_stream(
389 STREAM_RECONNECTION_POLICY,
390 subs.into_iter()
391 .map(|sub| {
392 Subscription::new(
393 GateioFuturesUsd::default(),
394 sub.instrument,
395 PublicTrades,
396 )
397 })
398 .collect(),
399 )
400 .await
401 .map(|stream| {
402 tokio::spawn(stream.forward_to(
403 txs.trades.get(&exchange).unwrap().clone(),
404 ))
405 })
406 }
407 (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
408 init_market_stream(
409 STREAM_RECONNECTION_POLICY,
410 subs.into_iter()
411 .map(|sub| {
412 Subscription::new(
413 GateioFuturesBtc::default(),
414 sub.instrument,
415 PublicTrades,
416 )
417 })
418 .collect(),
419 )
420 .await
421 .map(|stream| {
422 tokio::spawn(stream.forward_to(
423 txs.trades.get(&exchange).unwrap().clone(),
424 ))
425 })
426 }
427 (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
428 init_market_stream(
429 STREAM_RECONNECTION_POLICY,
430 subs.into_iter()
431 .map(|sub| {
432 Subscription::new(
433 GateioPerpetualsUsd::default(),
434 sub.instrument,
435 PublicTrades,
436 )
437 })
438 .collect(),
439 )
440 .await
441 .map(|stream| {
442 tokio::spawn(stream.forward_to(
443 txs.trades.get(&exchange).unwrap().clone(),
444 ))
445 })
446 }
447 (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
448 init_market_stream(
449 STREAM_RECONNECTION_POLICY,
450 subs.into_iter()
451 .map(|sub| {
452 Subscription::new(
453 GateioPerpetualsBtc::default(),
454 sub.instrument,
455 PublicTrades,
456 )
457 })
458 .collect(),
459 )
460 .await
461 .map(|stream| {
462 tokio::spawn(stream.forward_to(
463 txs.trades.get(&exchange).unwrap().clone(),
464 ))
465 })
466 }
467 (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
468 init_market_stream(
469 STREAM_RECONNECTION_POLICY,
470 subs.into_iter()
471 .map(|sub| {
472 Subscription::new(
473 GateioOptions::default(),
474 sub.instrument,
475 PublicTrades,
476 )
477 })
478 .collect(),
479 )
480 .await
481 .map(|stream| {
482 tokio::spawn(stream.forward_to(
483 txs.trades.get(&exchange).unwrap().clone(),
484 ))
485 })
486 }
487 (ExchangeId::Kraken, SubKind::PublicTrades) => {
488 init_market_stream(
489 STREAM_RECONNECTION_POLICY,
490 subs.into_iter()
491 .map(|sub| {
492 Subscription::new(
493 Kraken,
494 sub.instrument,
495 PublicTrades,
496 )
497 })
498 .collect(),
499 )
500 .await
501 .map(|stream| {
502 tokio::spawn(stream.forward_to(
503 txs.trades.get(&exchange).unwrap().clone(),
504 ))
505 })
506 }
507 (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
508 init_market_stream(
509 STREAM_RECONNECTION_POLICY,
510 subs.into_iter()
511 .map(|sub| {
512 Subscription::new(
513 Kraken,
514 sub.instrument,
515 OrderBooksL1,
516 )
517 })
518 .collect(),
519 )
520 .await
521 .map(|stream| {
522 tokio::spawn(stream.forward_to(
523 txs.l1s.get(&exchange).unwrap().clone(),
524 ))
525 })
526 }
527 (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
528 STREAM_RECONNECTION_POLICY,
529 subs.into_iter()
530 .map(|sub| {
531 Subscription::new(Okx, sub.instrument, PublicTrades)
532 })
533 .collect(),
534 )
535 .await
536 .map(|stream| {
537 tokio::spawn(
538 stream.forward_to(
539 txs.trades.get(&exchange).unwrap().clone(),
540 ),
541 )
542 }),
543 (exchange, sub_kind) => {
544 Err(DataError::Unsupported { exchange, sub_kind })
545 }
546 }
547 }
548 });
549
550 try_join_all(batch_futures)
551 });
552
553 try_join_all(futures).await?;
554
555 Ok(Self {
556 trades: channels
557 .rxs
558 .trades
559 .into_iter()
560 .map(|(exchange, rx)| (exchange, rx.into_stream()))
561 .collect(),
562 l1s: channels
563 .rxs
564 .l1s
565 .into_iter()
566 .map(|(exchange, rx)| (exchange, rx.into_stream()))
567 .collect(),
568 l2s: channels
569 .rxs
570 .l2s
571 .into_iter()
572 .map(|(exchange, rx)| (exchange, rx.into_stream()))
573 .collect(),
574 liquidations: channels
575 .rxs
576 .liquidations
577 .into_iter()
578 .map(|(exchange, rx)| (exchange, rx.into_stream()))
579 .collect(),
580 })
581 }
582
583 /// Remove an execution [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
584 ///
585 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
586 pub fn select_trades(
587 &mut self,
588 exchange: ExchangeId,
589 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
590 self.trades.remove(&exchange)
591 }
592
593 /// Select and merge every execution [`PublicTrade`] `Stream` using
594 /// [`SelectAll`](futures_util::stream::select_all::select_all).
595 pub fn select_all_trades(
596 &mut self,
597 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
598 futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
599 }
600
601 /// Remove an execution [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
602 ///
603 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
604 pub fn select_l1s(
605 &mut self,
606 exchange: ExchangeId,
607 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
608 self.l1s.remove(&exchange)
609 }
610
611 /// Select and merge every execution [`OrderBookL1`] `Stream` using
612 /// [`SelectAll`](futures_util::stream::select_all::select_all).
613 pub fn select_all_l1s(
614 &mut self,
615 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
616 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
617 }
618
619 /// Remove an execution [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
620 ///
621 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
622 pub fn select_l2s(
623 &mut self,
624 exchange: ExchangeId,
625 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
626 self.l2s.remove(&exchange)
627 }
628
629 /// Select and merge every execution [`OrderBookEvent`] `Stream` using
630 /// [`SelectAll`](futures_util::stream::select_all::select_all).
631 pub fn select_all_l2s(
632 &mut self,
633 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
634 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
635 }
636
637 /// Remove an execution [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
638 ///
639 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
640 pub fn select_liquidations(
641 &mut self,
642 exchange: ExchangeId,
643 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
644 self.liquidations.remove(&exchange)
645 }
646
647 /// Select and merge every execution [`Liquidation`] `Stream` using
648 /// [`SelectAll`](futures_util::stream::select_all::select_all).
649 pub fn select_all_liquidations(
650 &mut self,
651 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
652 futures_util::stream::select_all::select_all(
653 std::mem::take(&mut self.liquidations).into_values(),
654 )
655 }
656
657 /// Select and merge every execution `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
658 ///
659 /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
660 /// use cases.
661 pub fn select_all<Output>(self) -> impl Stream<Item = Output>
662 where
663 InstrumentKey: Send + 'static,
664 Output: 'static,
665 MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
666 MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
667 MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
668 MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
669 {
670 let Self {
671 trades,
672 l1s,
673 l2s,
674 liquidations,
675 } = self;
676
677 let trades = trades
678 .into_values()
679 .map(|stream| stream.map(MarketStreamResult::into).boxed());
680
681 let l1s = l1s
682 .into_values()
683 .map(|stream| stream.map(MarketStreamResult::into).boxed());
684
685 let l2s = l2s
686 .into_values()
687 .map(|stream| stream.map(MarketStreamResult::into).boxed());
688
689 let liquidations = liquidations
690 .into_values()
691 .map(|stream| stream.map(MarketStreamResult::into).boxed());
692
693 let all = trades.chain(l1s).chain(l2s).chain(liquidations);
694
695 futures_util::stream::select_all::select_all(all)
696 }
697}
698
699pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
700 batches: SubBatchIter,
701) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
702where
703 SubBatchIter: IntoIterator<Item = SubIter>,
704 SubIter: IntoIterator<Item = Sub>,
705 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
706 Instrument: InstrumentData + Ord,
707{
708 batches
709 .into_iter()
710 .map(validate_subscriptions::<SubIter, Sub, Instrument>)
711 .collect()
712}
713
714pub fn validate_subscriptions<SubIter, Sub, Instrument>(
715 batch: SubIter,
716) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
717where
718 SubIter: IntoIterator<Item = Sub>,
719 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
720 Instrument: InstrumentData + Ord,
721{
722 // Validate Subscriptions
723 let mut batch = batch
724 .into_iter()
725 .map(Sub::into)
726 .map(Validator::validate)
727 .collect::<Result<Vec<_>, SocketError>>()?;
728
729 // Remove duplicate Subscriptions
730 batch.sort();
731 batch.dedup();
732
733 Ok(batch)
734}
735
736struct Channels<InstrumentKey> {
737 txs: Arc<Txs<InstrumentKey>>,
738 rxs: Rxs<InstrumentKey>,
739}
740
741impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
742 for Channels<Instrument::Key>
743where
744 Instrument: InstrumentData,
745{
746 type Error = DataError;
747
748 fn try_from(
749 value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
750 ) -> Result<Self, Self::Error> {
751 let mut txs = Txs::default();
752 let mut rxs = Rxs::default();
753
754 for sub in value.iter().flatten() {
755 match sub.kind {
756 SubKind::PublicTrades => {
757 if let (None, None) =
758 (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
759 {
760 let (tx, rx) = mpsc_unbounded();
761 txs.trades.insert(sub.exchange, tx);
762 rxs.trades.insert(sub.exchange, rx);
763 }
764 }
765 SubKind::OrderBooksL1 => {
766 if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
767 let (tx, rx) = mpsc_unbounded();
768 txs.l1s.insert(sub.exchange, tx);
769 rxs.l1s.insert(sub.exchange, rx);
770 }
771 }
772 SubKind::OrderBooksL2 => {
773 if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
774 let (tx, rx) = mpsc_unbounded();
775 txs.l2s.insert(sub.exchange, tx);
776 rxs.l2s.insert(sub.exchange, rx);
777 }
778 }
779 SubKind::Liquidations => {
780 if let (None, None) = (
781 txs.liquidations.get(&sub.exchange),
782 rxs.liquidations.get(&sub.exchange),
783 ) {
784 let (tx, rx) = mpsc_unbounded();
785 txs.liquidations.insert(sub.exchange, tx);
786 rxs.liquidations.insert(sub.exchange, rx);
787 }
788 }
789 unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
790 }
791 }
792
793 Ok(Channels {
794 txs: Arc::new(txs),
795 rxs,
796 })
797 }
798}
799
800struct Txs<InstrumentKey> {
801 trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
802 l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
803 l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
804 liquidations:
805 FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
806}
807
808impl<InstrumentKey> Default for Txs<InstrumentKey> {
809 fn default() -> Self {
810 Self {
811 trades: Default::default(),
812 l1s: Default::default(),
813 l2s: Default::default(),
814 liquidations: Default::default(),
815 }
816 }
817}
818
819struct Rxs<InstrumentKey> {
820 trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
821 l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
822 l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
823 liquidations:
824 FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
825}
826
827impl<InstrumentKey> Default for Rxs<InstrumentKey> {
828 fn default() -> Self {
829 Self {
830 trades: Default::default(),
831 l1s: Default::default(),
832 l2s: Default::default(),
833 liquidations: Default::default(),
834 }
835 }
836}