barter_data/streams/builder/dynamic/
mod.rs

1use crate::{
2    Identifier,
3    error::DataError,
4    exchange::{
5        binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6        bitfinex::{Bitfinex, market::BitfinexMarket},
7        bitmex::{Bitmex, market::BitmexMarket},
8        bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9        coinbase::{Coinbase, market::CoinbaseMarket},
10        gateio::{
11            future::{GateioFuturesBtc, GateioFuturesUsd},
12            market::GateioMarket,
13            option::GateioOptions,
14            perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15            spot::GateioSpot,
16        },
17        kraken::{Kraken, market::KrakenMarket},
18        okx::{Okx, market::OkxMarket},
19    },
20    instrument::InstrumentData,
21    streams::{
22        consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23        reconnect::stream::ReconnectingStream,
24    },
25    subscription::{
26        SubKind, Subscription,
27        book::{OrderBookEvent, OrderBookL1, OrderBooksL1},
28        liquidation::{Liquidation, Liquidations},
29        trade::{PublicTrade, PublicTrades},
30    },
31};
32use barter_instrument::exchange::ExchangeId;
33use barter_integration::{
34    Validator,
35    channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
36    error::SocketError,
37};
38use fnv::FnvHashMap;
39use futures::{Stream, stream::SelectAll};
40use futures_util::{StreamExt, future::try_join_all};
41use itertools::Itertools;
42use std::{
43    fmt::{Debug, Display},
44    sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53    pub trades:
54        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55    pub l1s:
56        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57    pub l2s: VecMap<
58        ExchangeId,
59        UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60    >,
61    pub liquidations:
62        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66    /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67    ///
68    /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69    /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70    /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71    ///
72    /// ## Examples
73    /// Please see barter-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74    /// comprehensive example of how to use this market data stream initialiser.
75    pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
76        subscription_batches: SubBatchIter,
77    ) -> Result<Self, DataError>
78    where
79        SubBatchIter: IntoIterator<Item = SubIter>,
80        SubIter: IntoIterator<Item = Sub>,
81        Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
82        Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
83        InstrumentKey: Debug + Clone + Send + 'static,
84        Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
85        Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
86        Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
87        Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
88        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
89        Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
90        Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
91        Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
92        Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
93        Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
94        Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
95        Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
96        Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
97        Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
98        Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
99        Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
100        Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
101        Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
102        Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
103        Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
104    {
105        // Validate & dedup Subscription batches
106        let batches = validate_batches(subscription_batches)?;
107
108        // Generate required Channels from Subscription batches
109        let channels = Channels::try_from(&batches)?;
110
111        let futures =
112            batches.into_iter().map(|mut batch| {
113                batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
114                let by_exchange_by_sub_kind =
115                    batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
116
117                let batch_futures =
118                    by_exchange_by_sub_kind
119                        .into_iter()
120                        .map(|((exchange, sub_kind), subs)| {
121                            let subs = subs.into_iter().collect::<Vec<_>>();
122                            let txs = Arc::clone(&channels.txs);
123                            async move {
124                                match (exchange, sub_kind) {
125                                    (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
126                                        init_market_stream(
127                                            STREAM_RECONNECTION_POLICY,
128                                            subs.into_iter()
129                                                .map(|sub| {
130                                                    Subscription::new(
131                                                        BinanceSpot::default(),
132                                                        sub.instrument,
133                                                        PublicTrades,
134                                                    )
135                                                })
136                                                .collect(),
137                                        )
138                                        .await
139                                        .map(|stream| {
140                                            tokio::spawn(stream.forward_to(
141                                                txs.trades.get(&exchange).unwrap().clone(),
142                                            ))
143                                        })
144                                    }
145                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
146                                        init_market_stream(
147                                            STREAM_RECONNECTION_POLICY,
148                                            subs.into_iter()
149                                                .map(|sub| {
150                                                    Subscription::new(
151                                                        BinanceSpot::default(),
152                                                        sub.instrument,
153                                                        OrderBooksL1,
154                                                    )
155                                                })
156                                                .collect(),
157                                        )
158                                        .await
159                                        .map(|stream| {
160                                            tokio::spawn(stream.forward_to(
161                                                txs.l1s.get(&exchange).unwrap().clone(),
162                                            ))
163                                        })
164                                    }
165                                    (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
166                                        init_market_stream(
167                                            STREAM_RECONNECTION_POLICY,
168                                            subs.into_iter()
169                                                .map(|sub| {
170                                                    Subscription::new(
171                                                        BinanceFuturesUsd::default(),
172                                                        sub.instrument,
173                                                        PublicTrades,
174                                                    )
175                                                })
176                                                .collect(),
177                                        )
178                                        .await
179                                        .map(|stream| {
180                                            tokio::spawn(stream.forward_to(
181                                                txs.trades.get(&exchange).unwrap().clone(),
182                                            ))
183                                        })
184                                    }
185                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
186                                        init_market_stream(
187                                            STREAM_RECONNECTION_POLICY,
188                                            subs.into_iter()
189                                                .map(|sub| {
190                                                    Subscription::<_, Instrument, _>::new(
191                                                        BinanceFuturesUsd::default(),
192                                                        sub.instrument,
193                                                        OrderBooksL1,
194                                                    )
195                                                })
196                                                .collect(),
197                                        )
198                                        .await
199                                        .map(|stream| {
200                                            tokio::spawn(stream.forward_to(
201                                                txs.l1s.get(&exchange).unwrap().clone(),
202                                            ))
203                                        })
204                                    }
205                                    (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
206                                        init_market_stream(
207                                            STREAM_RECONNECTION_POLICY,
208                                            subs.into_iter()
209                                                .map(|sub| {
210                                                    Subscription::<_, Instrument, _>::new(
211                                                        BinanceFuturesUsd::default(),
212                                                        sub.instrument,
213                                                        Liquidations,
214                                                    )
215                                                })
216                                                .collect(),
217                                        )
218                                        .await
219                                        .map(|stream| {
220                                            tokio::spawn(stream.forward_to(
221                                                txs.liquidations.get(&exchange).unwrap().clone(),
222                                            ))
223                                        })
224                                    }
225                                    (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
226                                        init_market_stream(
227                                            STREAM_RECONNECTION_POLICY,
228                                            subs.into_iter()
229                                                .map(|sub| {
230                                                    Subscription::new(
231                                                        Bitfinex,
232                                                        sub.instrument,
233                                                        PublicTrades,
234                                                    )
235                                                })
236                                                .collect(),
237                                        )
238                                        .await
239                                        .map(|stream| {
240                                            tokio::spawn(stream.forward_to(
241                                                txs.trades.get(&exchange).unwrap().clone(),
242                                            ))
243                                        })
244                                    }
245                                    (ExchangeId::Bitmex, SubKind::PublicTrades) => {
246                                        init_market_stream(
247                                            STREAM_RECONNECTION_POLICY,
248                                            subs.into_iter()
249                                                .map(|sub| {
250                                                    Subscription::new(
251                                                        Bitmex,
252                                                        sub.instrument,
253                                                        PublicTrades,
254                                                    )
255                                                })
256                                                .collect(),
257                                        )
258                                        .await
259                                        .map(|stream| {
260                                            tokio::spawn(stream.forward_to(
261                                                txs.trades.get(&exchange).unwrap().clone(),
262                                            ))
263                                        })
264                                    }
265                                    (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
266                                        init_market_stream(
267                                            STREAM_RECONNECTION_POLICY,
268                                            subs.into_iter()
269                                                .map(|sub| {
270                                                    Subscription::new(
271                                                        BybitSpot::default(),
272                                                        sub.instrument,
273                                                        PublicTrades,
274                                                    )
275                                                })
276                                                .collect(),
277                                        )
278                                        .await
279                                        .map(|stream| {
280                                            tokio::spawn(stream.forward_to(
281                                                txs.trades.get(&exchange).unwrap().clone(),
282                                            ))
283                                        })
284                                    }
285                                    (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
286                                        init_market_stream(
287                                            STREAM_RECONNECTION_POLICY,
288                                            subs.into_iter()
289                                                .map(|sub| {
290                                                    Subscription::new(
291                                                        BybitPerpetualsUsd::default(),
292                                                        sub.instrument,
293                                                        PublicTrades,
294                                                    )
295                                                })
296                                                .collect(),
297                                        )
298                                        .await
299                                        .map(|stream| {
300                                            tokio::spawn(stream.forward_to(
301                                                txs.trades.get(&exchange).unwrap().clone(),
302                                            ))
303                                        })
304                                    }
305                                    (ExchangeId::Coinbase, SubKind::PublicTrades) => {
306                                        init_market_stream(
307                                            STREAM_RECONNECTION_POLICY,
308                                            subs.into_iter()
309                                                .map(|sub| {
310                                                    Subscription::new(
311                                                        Coinbase,
312                                                        sub.instrument,
313                                                        PublicTrades,
314                                                    )
315                                                })
316                                                .collect(),
317                                        )
318                                        .await
319                                        .map(|stream| {
320                                            tokio::spawn(stream.forward_to(
321                                                txs.trades.get(&exchange).unwrap().clone(),
322                                            ))
323                                        })
324                                    }
325                                    (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
326                                        init_market_stream(
327                                            STREAM_RECONNECTION_POLICY,
328                                            subs.into_iter()
329                                                .map(|sub| {
330                                                    Subscription::new(
331                                                        GateioSpot::default(),
332                                                        sub.instrument,
333                                                        PublicTrades,
334                                                    )
335                                                })
336                                                .collect(),
337                                        )
338                                        .await
339                                        .map(|stream| {
340                                            tokio::spawn(stream.forward_to(
341                                                txs.trades.get(&exchange).unwrap().clone(),
342                                            ))
343                                        })
344                                    }
345                                    (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
346                                        init_market_stream(
347                                            STREAM_RECONNECTION_POLICY,
348                                            subs.into_iter()
349                                                .map(|sub| {
350                                                    Subscription::new(
351                                                        GateioFuturesUsd::default(),
352                                                        sub.instrument,
353                                                        PublicTrades,
354                                                    )
355                                                })
356                                                .collect(),
357                                        )
358                                        .await
359                                        .map(|stream| {
360                                            tokio::spawn(stream.forward_to(
361                                                txs.trades.get(&exchange).unwrap().clone(),
362                                            ))
363                                        })
364                                    }
365                                    (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
366                                        init_market_stream(
367                                            STREAM_RECONNECTION_POLICY,
368                                            subs.into_iter()
369                                                .map(|sub| {
370                                                    Subscription::new(
371                                                        GateioFuturesBtc::default(),
372                                                        sub.instrument,
373                                                        PublicTrades,
374                                                    )
375                                                })
376                                                .collect(),
377                                        )
378                                        .await
379                                        .map(|stream| {
380                                            tokio::spawn(stream.forward_to(
381                                                txs.trades.get(&exchange).unwrap().clone(),
382                                            ))
383                                        })
384                                    }
385                                    (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
386                                        init_market_stream(
387                                            STREAM_RECONNECTION_POLICY,
388                                            subs.into_iter()
389                                                .map(|sub| {
390                                                    Subscription::new(
391                                                        GateioPerpetualsUsd::default(),
392                                                        sub.instrument,
393                                                        PublicTrades,
394                                                    )
395                                                })
396                                                .collect(),
397                                        )
398                                        .await
399                                        .map(|stream| {
400                                            tokio::spawn(stream.forward_to(
401                                                txs.trades.get(&exchange).unwrap().clone(),
402                                            ))
403                                        })
404                                    }
405                                    (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
406                                        init_market_stream(
407                                            STREAM_RECONNECTION_POLICY,
408                                            subs.into_iter()
409                                                .map(|sub| {
410                                                    Subscription::new(
411                                                        GateioPerpetualsBtc::default(),
412                                                        sub.instrument,
413                                                        PublicTrades,
414                                                    )
415                                                })
416                                                .collect(),
417                                        )
418                                        .await
419                                        .map(|stream| {
420                                            tokio::spawn(stream.forward_to(
421                                                txs.trades.get(&exchange).unwrap().clone(),
422                                            ))
423                                        })
424                                    }
425                                    (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
426                                        init_market_stream(
427                                            STREAM_RECONNECTION_POLICY,
428                                            subs.into_iter()
429                                                .map(|sub| {
430                                                    Subscription::new(
431                                                        GateioOptions::default(),
432                                                        sub.instrument,
433                                                        PublicTrades,
434                                                    )
435                                                })
436                                                .collect(),
437                                        )
438                                        .await
439                                        .map(|stream| {
440                                            tokio::spawn(stream.forward_to(
441                                                txs.trades.get(&exchange).unwrap().clone(),
442                                            ))
443                                        })
444                                    }
445                                    (ExchangeId::Kraken, SubKind::PublicTrades) => {
446                                        init_market_stream(
447                                            STREAM_RECONNECTION_POLICY,
448                                            subs.into_iter()
449                                                .map(|sub| {
450                                                    Subscription::new(
451                                                        Kraken,
452                                                        sub.instrument,
453                                                        PublicTrades,
454                                                    )
455                                                })
456                                                .collect(),
457                                        )
458                                        .await
459                                        .map(|stream| {
460                                            tokio::spawn(stream.forward_to(
461                                                txs.trades.get(&exchange).unwrap().clone(),
462                                            ))
463                                        })
464                                    }
465                                    (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
466                                        init_market_stream(
467                                            STREAM_RECONNECTION_POLICY,
468                                            subs.into_iter()
469                                                .map(|sub| {
470                                                    Subscription::new(
471                                                        Kraken,
472                                                        sub.instrument,
473                                                        OrderBooksL1,
474                                                    )
475                                                })
476                                                .collect(),
477                                        )
478                                        .await
479                                        .map(|stream| {
480                                            tokio::spawn(stream.forward_to(
481                                                txs.l1s.get(&exchange).unwrap().clone(),
482                                            ))
483                                        })
484                                    }
485                                    (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
486                                        STREAM_RECONNECTION_POLICY,
487                                        subs.into_iter()
488                                            .map(|sub| {
489                                                Subscription::new(Okx, sub.instrument, PublicTrades)
490                                            })
491                                            .collect(),
492                                    )
493                                    .await
494                                    .map(|stream| {
495                                        tokio::spawn(
496                                            stream.forward_to(
497                                                txs.trades.get(&exchange).unwrap().clone(),
498                                            ),
499                                        )
500                                    }),
501                                    (exchange, sub_kind) => {
502                                        Err(DataError::Unsupported { exchange, sub_kind })
503                                    }
504                                }
505                            }
506                        });
507
508                try_join_all(batch_futures)
509            });
510
511        try_join_all(futures).await?;
512
513        Ok(Self {
514            trades: channels
515                .rxs
516                .trades
517                .into_iter()
518                .map(|(exchange, rx)| (exchange, rx.into_stream()))
519                .collect(),
520            l1s: channels
521                .rxs
522                .l1s
523                .into_iter()
524                .map(|(exchange, rx)| (exchange, rx.into_stream()))
525                .collect(),
526            l2s: channels
527                .rxs
528                .l2s
529                .into_iter()
530                .map(|(exchange, rx)| (exchange, rx.into_stream()))
531                .collect(),
532            liquidations: channels
533                .rxs
534                .liquidations
535                .into_iter()
536                .map(|(exchange, rx)| (exchange, rx.into_stream()))
537                .collect(),
538        })
539    }
540
541    /// Remove an execution [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
542    ///
543    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
544    pub fn select_trades(
545        &mut self,
546        exchange: ExchangeId,
547    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
548        self.trades.remove(&exchange)
549    }
550
551    /// Select and merge every execution [`PublicTrade`] `Stream` using
552    /// [`SelectAll`](futures_util::stream::select_all::select_all).
553    pub fn select_all_trades(
554        &mut self,
555    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
556        futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
557    }
558
559    /// Remove an execution [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
560    ///
561    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
562    pub fn select_l1s(
563        &mut self,
564        exchange: ExchangeId,
565    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
566        self.l1s.remove(&exchange)
567    }
568
569    /// Select and merge every execution [`OrderBookL1`] `Stream` using
570    /// [`SelectAll`](futures_util::stream::select_all::select_all).
571    pub fn select_all_l1s(
572        &mut self,
573    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
574        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
575    }
576
577    /// Remove an execution [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
578    ///
579    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
580    pub fn select_l2s(
581        &mut self,
582        exchange: ExchangeId,
583    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
584        self.l2s.remove(&exchange)
585    }
586
587    /// Select and merge every execution [`OrderBookEvent`] `Stream` using
588    /// [`SelectAll`](futures_util::stream::select_all::select_all).
589    pub fn select_all_l2s(
590        &mut self,
591    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
592        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
593    }
594
595    /// Remove an execution [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
596    ///
597    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
598    pub fn select_liquidations(
599        &mut self,
600        exchange: ExchangeId,
601    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
602        self.liquidations.remove(&exchange)
603    }
604
605    /// Select and merge every execution [`Liquidation`] `Stream` using
606    /// [`SelectAll`](futures_util::stream::select_all::select_all).
607    pub fn select_all_liquidations(
608        &mut self,
609    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
610        futures_util::stream::select_all::select_all(
611            std::mem::take(&mut self.liquidations).into_values(),
612        )
613    }
614
615    /// Select and merge every execution `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
616    ///
617    /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
618    /// use cases.
619    pub fn select_all<Output>(self) -> impl Stream<Item = Output>
620    where
621        InstrumentKey: Send + 'static,
622        Output: 'static,
623        MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
624        MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
625        MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
626        MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
627    {
628        let Self {
629            trades,
630            l1s,
631            l2s,
632            liquidations,
633        } = self;
634
635        let trades = trades
636            .into_values()
637            .map(|stream| stream.map(MarketStreamResult::into).boxed());
638
639        let l1s = l1s
640            .into_values()
641            .map(|stream| stream.map(MarketStreamResult::into).boxed());
642
643        let l2s = l2s
644            .into_values()
645            .map(|stream| stream.map(MarketStreamResult::into).boxed());
646
647        let liquidations = liquidations
648            .into_values()
649            .map(|stream| stream.map(MarketStreamResult::into).boxed());
650
651        let all = trades.chain(l1s).chain(l2s).chain(liquidations);
652
653        futures_util::stream::select_all::select_all(all)
654    }
655}
656
657pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
658    batches: SubBatchIter,
659) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
660where
661    SubBatchIter: IntoIterator<Item = SubIter>,
662    SubIter: IntoIterator<Item = Sub>,
663    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
664    Instrument: InstrumentData + Ord,
665{
666    batches
667        .into_iter()
668        .map(validate_subscriptions::<SubIter, Sub, Instrument>)
669        .collect()
670}
671
672pub fn validate_subscriptions<SubIter, Sub, Instrument>(
673    batch: SubIter,
674) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
675where
676    SubIter: IntoIterator<Item = Sub>,
677    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
678    Instrument: InstrumentData + Ord,
679{
680    // Validate Subscriptions
681    let mut batch = batch
682        .into_iter()
683        .map(Sub::into)
684        .map(Validator::validate)
685        .collect::<Result<Vec<_>, SocketError>>()?;
686
687    // Remove duplicate Subscriptions
688    batch.sort();
689    batch.dedup();
690
691    Ok(batch)
692}
693
694struct Channels<InstrumentKey> {
695    txs: Arc<Txs<InstrumentKey>>,
696    rxs: Rxs<InstrumentKey>,
697}
698
699impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
700    for Channels<Instrument::Key>
701where
702    Instrument: InstrumentData,
703{
704    type Error = DataError;
705
706    fn try_from(
707        value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
708    ) -> Result<Self, Self::Error> {
709        let mut txs = Txs::default();
710        let mut rxs = Rxs::default();
711
712        for sub in value.iter().flatten() {
713            match sub.kind {
714                SubKind::PublicTrades => {
715                    if let (None, None) =
716                        (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
717                    {
718                        let (tx, rx) = mpsc_unbounded();
719                        txs.trades.insert(sub.exchange, tx);
720                        rxs.trades.insert(sub.exchange, rx);
721                    }
722                }
723                SubKind::OrderBooksL1 => {
724                    if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
725                        let (tx, rx) = mpsc_unbounded();
726                        txs.l1s.insert(sub.exchange, tx);
727                        rxs.l1s.insert(sub.exchange, rx);
728                    }
729                }
730                SubKind::OrderBooksL2 => {
731                    if let (None, None) =
732                        (txs.l2s.get(&sub.exchange), rxs.trades.get(&sub.exchange))
733                    {
734                        let (tx, rx) = mpsc_unbounded();
735                        txs.l2s.insert(sub.exchange, tx);
736                        rxs.l2s.insert(sub.exchange, rx);
737                    }
738                }
739                SubKind::Liquidations => {
740                    if let (None, None) = (
741                        txs.liquidations.get(&sub.exchange),
742                        rxs.liquidations.get(&sub.exchange),
743                    ) {
744                        let (tx, rx) = mpsc_unbounded();
745                        txs.liquidations.insert(sub.exchange, tx);
746                        rxs.liquidations.insert(sub.exchange, rx);
747                    }
748                }
749                unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
750            }
751        }
752
753        Ok(Channels {
754            txs: Arc::new(txs),
755            rxs,
756        })
757    }
758}
759
760struct Txs<InstrumentKey> {
761    trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
762    l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
763    l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
764    liquidations:
765        FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
766}
767
768impl<InstrumentKey> Default for Txs<InstrumentKey> {
769    fn default() -> Self {
770        Self {
771            trades: Default::default(),
772            l1s: Default::default(),
773            l2s: Default::default(),
774            liquidations: Default::default(),
775        }
776    }
777}
778
779struct Rxs<InstrumentKey> {
780    trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
781    l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
782    l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
783    liquidations:
784        FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
785}
786
787impl<InstrumentKey> Default for Rxs<InstrumentKey> {
788    fn default() -> Self {
789        Self {
790            trades: Default::default(),
791            l1s: Default::default(),
792            l2s: Default::default(),
793            liquidations: Default::default(),
794        }
795    }
796}