barter_data/streams/builder/dynamic/mod.rs
1use crate::{
2 Identifier,
3 error::DataError,
4 exchange::{
5 binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6 bitfinex::{Bitfinex, market::BitfinexMarket},
7 bitmex::{Bitmex, market::BitmexMarket},
8 bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9 coinbase::{Coinbase, market::CoinbaseMarket},
10 gateio::{
11 future::{GateioFuturesBtc, GateioFuturesUsd},
12 market::GateioMarket,
13 option::GateioOptions,
14 perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15 spot::GateioSpot,
16 },
17 kraken::{Kraken, market::KrakenMarket},
18 okx::{Okx, market::OkxMarket},
19 },
20 instrument::InstrumentData,
21 streams::{
22 consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23 reconnect::stream::ReconnectingStream,
24 },
25 subscription::{
26 SubKind, Subscription,
27 book::{OrderBookEvent, OrderBookL1, OrderBooksL1},
28 liquidation::{Liquidation, Liquidations},
29 trade::{PublicTrade, PublicTrades},
30 },
31};
32use barter_instrument::exchange::ExchangeId;
33use barter_integration::{
34 Validator,
35 channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
36 error::SocketError,
37};
38use fnv::FnvHashMap;
39use futures::{Stream, stream::SelectAll};
40use futures_util::{StreamExt, future::try_join_all};
41use itertools::Itertools;
42use std::{
43 fmt::{Debug, Display},
44 sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53 pub trades:
54 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55 pub l1s:
56 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57 pub l2s: VecMap<
58 ExchangeId,
59 UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60 >,
61 pub liquidations:
62 VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66 /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67 ///
68 /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69 /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70 /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71 ///
72 /// ## Examples
73 /// Please see barter-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74 /// comprehensive example of how to use this market data stream initialiser.
75 pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
76 subscription_batches: SubBatchIter,
77 ) -> Result<Self, DataError>
78 where
79 SubBatchIter: IntoIterator<Item = SubIter>,
80 SubIter: IntoIterator<Item = Sub>,
81 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
82 Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
83 InstrumentKey: Debug + Clone + Send + 'static,
84 Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
85 Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
86 Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
87 Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
88 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
89 Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
90 Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
91 Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
92 Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
93 Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
94 Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
95 Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
96 Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
97 Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
98 Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
99 Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
100 Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
101 Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
102 Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
103 Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
104 {
105 // Validate & dedup Subscription batches
106 let batches = validate_batches(subscription_batches)?;
107
108 // Generate required Channels from Subscription batches
109 let channels = Channels::try_from(&batches)?;
110
111 let futures =
112 batches.into_iter().map(|mut batch| {
113 batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
114 let by_exchange_by_sub_kind =
115 batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
116
117 let batch_futures =
118 by_exchange_by_sub_kind
119 .into_iter()
120 .map(|((exchange, sub_kind), subs)| {
121 let subs = subs.into_iter().collect::<Vec<_>>();
122 let txs = Arc::clone(&channels.txs);
123 async move {
124 match (exchange, sub_kind) {
125 (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
126 init_market_stream(
127 STREAM_RECONNECTION_POLICY,
128 subs.into_iter()
129 .map(|sub| {
130 Subscription::new(
131 BinanceSpot::default(),
132 sub.instrument,
133 PublicTrades,
134 )
135 })
136 .collect(),
137 )
138 .await
139 .map(|stream| {
140 tokio::spawn(stream.forward_to(
141 txs.trades.get(&exchange).unwrap().clone(),
142 ))
143 })
144 }
145 (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
146 init_market_stream(
147 STREAM_RECONNECTION_POLICY,
148 subs.into_iter()
149 .map(|sub| {
150 Subscription::new(
151 BinanceSpot::default(),
152 sub.instrument,
153 OrderBooksL1,
154 )
155 })
156 .collect(),
157 )
158 .await
159 .map(|stream| {
160 tokio::spawn(stream.forward_to(
161 txs.l1s.get(&exchange).unwrap().clone(),
162 ))
163 })
164 }
165 (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
166 init_market_stream(
167 STREAM_RECONNECTION_POLICY,
168 subs.into_iter()
169 .map(|sub| {
170 Subscription::new(
171 BinanceFuturesUsd::default(),
172 sub.instrument,
173 PublicTrades,
174 )
175 })
176 .collect(),
177 )
178 .await
179 .map(|stream| {
180 tokio::spawn(stream.forward_to(
181 txs.trades.get(&exchange).unwrap().clone(),
182 ))
183 })
184 }
185 (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
186 init_market_stream(
187 STREAM_RECONNECTION_POLICY,
188 subs.into_iter()
189 .map(|sub| {
190 Subscription::<_, Instrument, _>::new(
191 BinanceFuturesUsd::default(),
192 sub.instrument,
193 OrderBooksL1,
194 )
195 })
196 .collect(),
197 )
198 .await
199 .map(|stream| {
200 tokio::spawn(stream.forward_to(
201 txs.l1s.get(&exchange).unwrap().clone(),
202 ))
203 })
204 }
205 (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
206 init_market_stream(
207 STREAM_RECONNECTION_POLICY,
208 subs.into_iter()
209 .map(|sub| {
210 Subscription::<_, Instrument, _>::new(
211 BinanceFuturesUsd::default(),
212 sub.instrument,
213 Liquidations,
214 )
215 })
216 .collect(),
217 )
218 .await
219 .map(|stream| {
220 tokio::spawn(stream.forward_to(
221 txs.liquidations.get(&exchange).unwrap().clone(),
222 ))
223 })
224 }
225 (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
226 init_market_stream(
227 STREAM_RECONNECTION_POLICY,
228 subs.into_iter()
229 .map(|sub| {
230 Subscription::new(
231 Bitfinex,
232 sub.instrument,
233 PublicTrades,
234 )
235 })
236 .collect(),
237 )
238 .await
239 .map(|stream| {
240 tokio::spawn(stream.forward_to(
241 txs.trades.get(&exchange).unwrap().clone(),
242 ))
243 })
244 }
245 (ExchangeId::Bitmex, SubKind::PublicTrades) => {
246 init_market_stream(
247 STREAM_RECONNECTION_POLICY,
248 subs.into_iter()
249 .map(|sub| {
250 Subscription::new(
251 Bitmex,
252 sub.instrument,
253 PublicTrades,
254 )
255 })
256 .collect(),
257 )
258 .await
259 .map(|stream| {
260 tokio::spawn(stream.forward_to(
261 txs.trades.get(&exchange).unwrap().clone(),
262 ))
263 })
264 }
265 (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
266 init_market_stream(
267 STREAM_RECONNECTION_POLICY,
268 subs.into_iter()
269 .map(|sub| {
270 Subscription::new(
271 BybitSpot::default(),
272 sub.instrument,
273 PublicTrades,
274 )
275 })
276 .collect(),
277 )
278 .await
279 .map(|stream| {
280 tokio::spawn(stream.forward_to(
281 txs.trades.get(&exchange).unwrap().clone(),
282 ))
283 })
284 }
285 (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
286 init_market_stream(
287 STREAM_RECONNECTION_POLICY,
288 subs.into_iter()
289 .map(|sub| {
290 Subscription::new(
291 BybitPerpetualsUsd::default(),
292 sub.instrument,
293 PublicTrades,
294 )
295 })
296 .collect(),
297 )
298 .await
299 .map(|stream| {
300 tokio::spawn(stream.forward_to(
301 txs.trades.get(&exchange).unwrap().clone(),
302 ))
303 })
304 }
305 (ExchangeId::Coinbase, SubKind::PublicTrades) => {
306 init_market_stream(
307 STREAM_RECONNECTION_POLICY,
308 subs.into_iter()
309 .map(|sub| {
310 Subscription::new(
311 Coinbase,
312 sub.instrument,
313 PublicTrades,
314 )
315 })
316 .collect(),
317 )
318 .await
319 .map(|stream| {
320 tokio::spawn(stream.forward_to(
321 txs.trades.get(&exchange).unwrap().clone(),
322 ))
323 })
324 }
325 (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
326 init_market_stream(
327 STREAM_RECONNECTION_POLICY,
328 subs.into_iter()
329 .map(|sub| {
330 Subscription::new(
331 GateioSpot::default(),
332 sub.instrument,
333 PublicTrades,
334 )
335 })
336 .collect(),
337 )
338 .await
339 .map(|stream| {
340 tokio::spawn(stream.forward_to(
341 txs.trades.get(&exchange).unwrap().clone(),
342 ))
343 })
344 }
345 (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
346 init_market_stream(
347 STREAM_RECONNECTION_POLICY,
348 subs.into_iter()
349 .map(|sub| {
350 Subscription::new(
351 GateioFuturesUsd::default(),
352 sub.instrument,
353 PublicTrades,
354 )
355 })
356 .collect(),
357 )
358 .await
359 .map(|stream| {
360 tokio::spawn(stream.forward_to(
361 txs.trades.get(&exchange).unwrap().clone(),
362 ))
363 })
364 }
365 (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
366 init_market_stream(
367 STREAM_RECONNECTION_POLICY,
368 subs.into_iter()
369 .map(|sub| {
370 Subscription::new(
371 GateioFuturesBtc::default(),
372 sub.instrument,
373 PublicTrades,
374 )
375 })
376 .collect(),
377 )
378 .await
379 .map(|stream| {
380 tokio::spawn(stream.forward_to(
381 txs.trades.get(&exchange).unwrap().clone(),
382 ))
383 })
384 }
385 (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
386 init_market_stream(
387 STREAM_RECONNECTION_POLICY,
388 subs.into_iter()
389 .map(|sub| {
390 Subscription::new(
391 GateioPerpetualsUsd::default(),
392 sub.instrument,
393 PublicTrades,
394 )
395 })
396 .collect(),
397 )
398 .await
399 .map(|stream| {
400 tokio::spawn(stream.forward_to(
401 txs.trades.get(&exchange).unwrap().clone(),
402 ))
403 })
404 }
405 (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
406 init_market_stream(
407 STREAM_RECONNECTION_POLICY,
408 subs.into_iter()
409 .map(|sub| {
410 Subscription::new(
411 GateioPerpetualsBtc::default(),
412 sub.instrument,
413 PublicTrades,
414 )
415 })
416 .collect(),
417 )
418 .await
419 .map(|stream| {
420 tokio::spawn(stream.forward_to(
421 txs.trades.get(&exchange).unwrap().clone(),
422 ))
423 })
424 }
425 (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
426 init_market_stream(
427 STREAM_RECONNECTION_POLICY,
428 subs.into_iter()
429 .map(|sub| {
430 Subscription::new(
431 GateioOptions::default(),
432 sub.instrument,
433 PublicTrades,
434 )
435 })
436 .collect(),
437 )
438 .await
439 .map(|stream| {
440 tokio::spawn(stream.forward_to(
441 txs.trades.get(&exchange).unwrap().clone(),
442 ))
443 })
444 }
445 (ExchangeId::Kraken, SubKind::PublicTrades) => {
446 init_market_stream(
447 STREAM_RECONNECTION_POLICY,
448 subs.into_iter()
449 .map(|sub| {
450 Subscription::new(
451 Kraken,
452 sub.instrument,
453 PublicTrades,
454 )
455 })
456 .collect(),
457 )
458 .await
459 .map(|stream| {
460 tokio::spawn(stream.forward_to(
461 txs.trades.get(&exchange).unwrap().clone(),
462 ))
463 })
464 }
465 (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
466 init_market_stream(
467 STREAM_RECONNECTION_POLICY,
468 subs.into_iter()
469 .map(|sub| {
470 Subscription::new(
471 Kraken,
472 sub.instrument,
473 OrderBooksL1,
474 )
475 })
476 .collect(),
477 )
478 .await
479 .map(|stream| {
480 tokio::spawn(stream.forward_to(
481 txs.l1s.get(&exchange).unwrap().clone(),
482 ))
483 })
484 }
485 (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
486 STREAM_RECONNECTION_POLICY,
487 subs.into_iter()
488 .map(|sub| {
489 Subscription::new(Okx, sub.instrument, PublicTrades)
490 })
491 .collect(),
492 )
493 .await
494 .map(|stream| {
495 tokio::spawn(
496 stream.forward_to(
497 txs.trades.get(&exchange).unwrap().clone(),
498 ),
499 )
500 }),
501 (exchange, sub_kind) => {
502 Err(DataError::Unsupported { exchange, sub_kind })
503 }
504 }
505 }
506 });
507
508 try_join_all(batch_futures)
509 });
510
511 try_join_all(futures).await?;
512
513 Ok(Self {
514 trades: channels
515 .rxs
516 .trades
517 .into_iter()
518 .map(|(exchange, rx)| (exchange, rx.into_stream()))
519 .collect(),
520 l1s: channels
521 .rxs
522 .l1s
523 .into_iter()
524 .map(|(exchange, rx)| (exchange, rx.into_stream()))
525 .collect(),
526 l2s: channels
527 .rxs
528 .l2s
529 .into_iter()
530 .map(|(exchange, rx)| (exchange, rx.into_stream()))
531 .collect(),
532 liquidations: channels
533 .rxs
534 .liquidations
535 .into_iter()
536 .map(|(exchange, rx)| (exchange, rx.into_stream()))
537 .collect(),
538 })
539 }
540
541 /// Remove an execution [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
542 ///
543 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
544 pub fn select_trades(
545 &mut self,
546 exchange: ExchangeId,
547 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
548 self.trades.remove(&exchange)
549 }
550
551 /// Select and merge every execution [`PublicTrade`] `Stream` using
552 /// [`SelectAll`](futures_util::stream::select_all::select_all).
553 pub fn select_all_trades(
554 &mut self,
555 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
556 futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
557 }
558
559 /// Remove an execution [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
560 ///
561 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
562 pub fn select_l1s(
563 &mut self,
564 exchange: ExchangeId,
565 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
566 self.l1s.remove(&exchange)
567 }
568
569 /// Select and merge every execution [`OrderBookL1`] `Stream` using
570 /// [`SelectAll`](futures_util::stream::select_all::select_all).
571 pub fn select_all_l1s(
572 &mut self,
573 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
574 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
575 }
576
577 /// Remove an execution [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
578 ///
579 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
580 pub fn select_l2s(
581 &mut self,
582 exchange: ExchangeId,
583 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
584 self.l2s.remove(&exchange)
585 }
586
587 /// Select and merge every execution [`OrderBookEvent`] `Stream` using
588 /// [`SelectAll`](futures_util::stream::select_all::select_all).
589 pub fn select_all_l2s(
590 &mut self,
591 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
592 futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
593 }
594
595 /// Remove an execution [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
596 ///
597 /// Note that calling this method will permanently remove this `Stream` from [`Self`].
598 pub fn select_liquidations(
599 &mut self,
600 exchange: ExchangeId,
601 ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
602 self.liquidations.remove(&exchange)
603 }
604
605 /// Select and merge every execution [`Liquidation`] `Stream` using
606 /// [`SelectAll`](futures_util::stream::select_all::select_all).
607 pub fn select_all_liquidations(
608 &mut self,
609 ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
610 futures_util::stream::select_all::select_all(
611 std::mem::take(&mut self.liquidations).into_values(),
612 )
613 }
614
615 /// Select and merge every execution `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
616 ///
617 /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
618 /// use cases.
619 pub fn select_all<Output>(self) -> impl Stream<Item = Output>
620 where
621 InstrumentKey: Send + 'static,
622 Output: 'static,
623 MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
624 MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
625 MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
626 MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
627 {
628 let Self {
629 trades,
630 l1s,
631 l2s,
632 liquidations,
633 } = self;
634
635 let trades = trades
636 .into_values()
637 .map(|stream| stream.map(MarketStreamResult::into).boxed());
638
639 let l1s = l1s
640 .into_values()
641 .map(|stream| stream.map(MarketStreamResult::into).boxed());
642
643 let l2s = l2s
644 .into_values()
645 .map(|stream| stream.map(MarketStreamResult::into).boxed());
646
647 let liquidations = liquidations
648 .into_values()
649 .map(|stream| stream.map(MarketStreamResult::into).boxed());
650
651 let all = trades.chain(l1s).chain(l2s).chain(liquidations);
652
653 futures_util::stream::select_all::select_all(all)
654 }
655}
656
657pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
658 batches: SubBatchIter,
659) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
660where
661 SubBatchIter: IntoIterator<Item = SubIter>,
662 SubIter: IntoIterator<Item = Sub>,
663 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
664 Instrument: InstrumentData + Ord,
665{
666 batches
667 .into_iter()
668 .map(validate_subscriptions::<SubIter, Sub, Instrument>)
669 .collect()
670}
671
672pub fn validate_subscriptions<SubIter, Sub, Instrument>(
673 batch: SubIter,
674) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
675where
676 SubIter: IntoIterator<Item = Sub>,
677 Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
678 Instrument: InstrumentData + Ord,
679{
680 // Validate Subscriptions
681 let mut batch = batch
682 .into_iter()
683 .map(Sub::into)
684 .map(Validator::validate)
685 .collect::<Result<Vec<_>, SocketError>>()?;
686
687 // Remove duplicate Subscriptions
688 batch.sort();
689 batch.dedup();
690
691 Ok(batch)
692}
693
694struct Channels<InstrumentKey> {
695 txs: Arc<Txs<InstrumentKey>>,
696 rxs: Rxs<InstrumentKey>,
697}
698
699impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
700 for Channels<Instrument::Key>
701where
702 Instrument: InstrumentData,
703{
704 type Error = DataError;
705
706 fn try_from(
707 value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
708 ) -> Result<Self, Self::Error> {
709 let mut txs = Txs::default();
710 let mut rxs = Rxs::default();
711
712 for sub in value.iter().flatten() {
713 match sub.kind {
714 SubKind::PublicTrades => {
715 if let (None, None) =
716 (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
717 {
718 let (tx, rx) = mpsc_unbounded();
719 txs.trades.insert(sub.exchange, tx);
720 rxs.trades.insert(sub.exchange, rx);
721 }
722 }
723 SubKind::OrderBooksL1 => {
724 if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
725 let (tx, rx) = mpsc_unbounded();
726 txs.l1s.insert(sub.exchange, tx);
727 rxs.l1s.insert(sub.exchange, rx);
728 }
729 }
730 SubKind::OrderBooksL2 => {
731 if let (None, None) =
732 (txs.l2s.get(&sub.exchange), rxs.trades.get(&sub.exchange))
733 {
734 let (tx, rx) = mpsc_unbounded();
735 txs.l2s.insert(sub.exchange, tx);
736 rxs.l2s.insert(sub.exchange, rx);
737 }
738 }
739 SubKind::Liquidations => {
740 if let (None, None) = (
741 txs.liquidations.get(&sub.exchange),
742 rxs.liquidations.get(&sub.exchange),
743 ) {
744 let (tx, rx) = mpsc_unbounded();
745 txs.liquidations.insert(sub.exchange, tx);
746 rxs.liquidations.insert(sub.exchange, rx);
747 }
748 }
749 unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
750 }
751 }
752
753 Ok(Channels {
754 txs: Arc::new(txs),
755 rxs,
756 })
757 }
758}
759
760struct Txs<InstrumentKey> {
761 trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
762 l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
763 l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
764 liquidations:
765 FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
766}
767
768impl<InstrumentKey> Default for Txs<InstrumentKey> {
769 fn default() -> Self {
770 Self {
771 trades: Default::default(),
772 l1s: Default::default(),
773 l2s: Default::default(),
774 liquidations: Default::default(),
775 }
776 }
777}
778
779struct Rxs<InstrumentKey> {
780 trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
781 l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
782 l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
783 liquidations:
784 FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
785}
786
787impl<InstrumentKey> Default for Rxs<InstrumentKey> {
788 fn default() -> Self {
789 Self {
790 trades: Default::default(),
791 l1s: Default::default(),
792 l2s: Default::default(),
793 liquidations: Default::default(),
794 }
795 }
796}