barter_data/streams/builder/dynamic/
mod.rs

1use crate::{
2    Identifier,
3    error::DataError,
4    exchange::{
5        binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6        bitfinex::{Bitfinex, market::BitfinexMarket},
7        bitmex::{Bitmex, market::BitmexMarket},
8        bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9        coinbase::{Coinbase, market::CoinbaseMarket},
10        gateio::{
11            future::{GateioFuturesBtc, GateioFuturesUsd},
12            market::GateioMarket,
13            option::GateioOptions,
14            perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15            spot::GateioSpot,
16        },
17        kraken::{Kraken, market::KrakenMarket},
18        okx::{Okx, market::OkxMarket},
19    },
20    instrument::InstrumentData,
21    streams::{
22        consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23        reconnect::stream::ReconnectingStream,
24    },
25    subscription::{
26        SubKind, Subscription,
27        book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
28        liquidation::{Liquidation, Liquidations},
29        trade::{PublicTrade, PublicTrades},
30    },
31};
32use barter_instrument::exchange::ExchangeId;
33use barter_integration::{
34    Validator,
35    channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
36    error::SocketError,
37};
38use fnv::FnvHashMap;
39use futures::{Stream, stream::SelectAll};
40use futures_util::{StreamExt, future::try_join_all};
41use itertools::Itertools;
42use std::{
43    fmt::{Debug, Display},
44    sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53    pub trades:
54        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55    pub l1s:
56        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57    pub l2s: VecMap<
58        ExchangeId,
59        UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60    >,
61    pub liquidations:
62        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66    /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67    ///
68    /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69    /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70    /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71    ///
72    /// ## Examples
73    /// Please see barter-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74    /// comprehensive example of how to use this market data stream initialiser.
75    pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
76        subscription_batches: SubBatchIter,
77    ) -> Result<Self, DataError>
78    where
79        SubBatchIter: IntoIterator<Item = SubIter>,
80        SubIter: IntoIterator<Item = Sub>,
81        Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
82        Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
83        InstrumentKey: Debug + Clone + Send + 'static,
84        Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
85        Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
86        Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
87        Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
88        Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
89        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
90        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
91        Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
92        Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
93        Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
94        Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
95        Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
96        Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
97        Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
98        Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
99        Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
100        Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
101        Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
102        Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
103        Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
104        Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
105        Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
106    {
107        // Validate & dedup Subscription batches
108        let batches = validate_batches(subscription_batches)?;
109
110        // Generate required Channels from Subscription batches
111        let channels = Channels::try_from(&batches)?;
112
113        let futures =
114            batches.into_iter().map(|mut batch| {
115                batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
116                let by_exchange_by_sub_kind =
117                    batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
118
119                let batch_futures =
120                    by_exchange_by_sub_kind
121                        .into_iter()
122                        .map(|((exchange, sub_kind), subs)| {
123                            let subs = subs.into_iter().collect::<Vec<_>>();
124                            let txs = Arc::clone(&channels.txs);
125                            async move {
126                                match (exchange, sub_kind) {
127                                    (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
128                                        init_market_stream(
129                                            STREAM_RECONNECTION_POLICY,
130                                            subs.into_iter()
131                                                .map(|sub| {
132                                                    Subscription::new(
133                                                        BinanceSpot::default(),
134                                                        sub.instrument,
135                                                        PublicTrades,
136                                                    )
137                                                })
138                                                .collect(),
139                                        )
140                                        .await
141                                        .map(|stream| {
142                                            tokio::spawn(stream.forward_to(
143                                                txs.trades.get(&exchange).unwrap().clone(),
144                                            ))
145                                        })
146                                    }
147                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
148                                        init_market_stream(
149                                            STREAM_RECONNECTION_POLICY,
150                                            subs.into_iter()
151                                                .map(|sub| {
152                                                    Subscription::new(
153                                                        BinanceSpot::default(),
154                                                        sub.instrument,
155                                                        OrderBooksL1,
156                                                    )
157                                                })
158                                                .collect(),
159                                        )
160                                        .await
161                                        .map(|stream| {
162                                            tokio::spawn(stream.forward_to(
163                                                txs.l1s.get(&exchange).unwrap().clone(),
164                                            ))
165                                        })
166                                    }
167                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
168                                        init_market_stream(
169                                            STREAM_RECONNECTION_POLICY,
170                                            subs.into_iter()
171                                                .map(|sub| {
172                                                    Subscription::new(
173                                                        BinanceSpot::default(),
174                                                        sub.instrument,
175                                                        OrderBooksL2,
176                                                    )
177                                                })
178                                                .collect(),
179                                        )
180                                        .await
181                                        .map(|stream| {
182                                            tokio::spawn(stream.forward_to(
183                                                txs.l2s.get(&exchange).unwrap().clone(),
184                                            ))
185                                        })
186                                    }
187                                    (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
188                                        init_market_stream(
189                                            STREAM_RECONNECTION_POLICY,
190                                            subs.into_iter()
191                                                .map(|sub| {
192                                                    Subscription::new(
193                                                        BinanceFuturesUsd::default(),
194                                                        sub.instrument,
195                                                        PublicTrades,
196                                                    )
197                                                })
198                                                .collect(),
199                                        )
200                                        .await
201                                        .map(|stream| {
202                                            tokio::spawn(stream.forward_to(
203                                                txs.trades.get(&exchange).unwrap().clone(),
204                                            ))
205                                        })
206                                    }
207                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
208                                        init_market_stream(
209                                            STREAM_RECONNECTION_POLICY,
210                                            subs.into_iter()
211                                                .map(|sub| {
212                                                    Subscription::<_, Instrument, _>::new(
213                                                        BinanceFuturesUsd::default(),
214                                                        sub.instrument,
215                                                        OrderBooksL1,
216                                                    )
217                                                })
218                                                .collect(),
219                                        )
220                                        .await
221                                        .map(|stream| {
222                                            tokio::spawn(stream.forward_to(
223                                                txs.l1s.get(&exchange).unwrap().clone(),
224                                            ))
225                                        })
226                                    }
227                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
228                                        init_market_stream(
229                                            STREAM_RECONNECTION_POLICY,
230                                            subs.into_iter()
231                                                .map(|sub| {
232                                                    Subscription::<_, Instrument, _>::new(
233                                                        BinanceFuturesUsd::default(),
234                                                        sub.instrument,
235                                                        OrderBooksL2,
236                                                    )
237                                                })
238                                                .collect(),
239                                        )
240                                        .await
241                                        .map(|stream| {
242                                            tokio::spawn(stream.forward_to(
243                                                txs.l2s.get(&exchange).unwrap().clone(),
244                                            ))
245                                        })
246                                    }
247                                    (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
248                                        init_market_stream(
249                                            STREAM_RECONNECTION_POLICY,
250                                            subs.into_iter()
251                                                .map(|sub| {
252                                                    Subscription::<_, Instrument, _>::new(
253                                                        BinanceFuturesUsd::default(),
254                                                        sub.instrument,
255                                                        Liquidations,
256                                                    )
257                                                })
258                                                .collect(),
259                                        )
260                                        .await
261                                        .map(|stream| {
262                                            tokio::spawn(stream.forward_to(
263                                                txs.liquidations.get(&exchange).unwrap().clone(),
264                                            ))
265                                        })
266                                    }
267                                    (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
268                                        init_market_stream(
269                                            STREAM_RECONNECTION_POLICY,
270                                            subs.into_iter()
271                                                .map(|sub| {
272                                                    Subscription::new(
273                                                        Bitfinex,
274                                                        sub.instrument,
275                                                        PublicTrades,
276                                                    )
277                                                })
278                                                .collect(),
279                                        )
280                                        .await
281                                        .map(|stream| {
282                                            tokio::spawn(stream.forward_to(
283                                                txs.trades.get(&exchange).unwrap().clone(),
284                                            ))
285                                        })
286                                    }
287                                    (ExchangeId::Bitmex, SubKind::PublicTrades) => {
288                                        init_market_stream(
289                                            STREAM_RECONNECTION_POLICY,
290                                            subs.into_iter()
291                                                .map(|sub| {
292                                                    Subscription::new(
293                                                        Bitmex,
294                                                        sub.instrument,
295                                                        PublicTrades,
296                                                    )
297                                                })
298                                                .collect(),
299                                        )
300                                        .await
301                                        .map(|stream| {
302                                            tokio::spawn(stream.forward_to(
303                                                txs.trades.get(&exchange).unwrap().clone(),
304                                            ))
305                                        })
306                                    }
307                                    (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
308                                        init_market_stream(
309                                            STREAM_RECONNECTION_POLICY,
310                                            subs.into_iter()
311                                                .map(|sub| {
312                                                    Subscription::new(
313                                                        BybitSpot::default(),
314                                                        sub.instrument,
315                                                        PublicTrades,
316                                                    )
317                                                })
318                                                .collect(),
319                                        )
320                                        .await
321                                        .map(|stream| {
322                                            tokio::spawn(stream.forward_to(
323                                                txs.trades.get(&exchange).unwrap().clone(),
324                                            ))
325                                        })
326                                    }
327                                    (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
328                                        init_market_stream(
329                                            STREAM_RECONNECTION_POLICY,
330                                            subs.into_iter()
331                                                .map(|sub| {
332                                                    Subscription::new(
333                                                        BybitPerpetualsUsd::default(),
334                                                        sub.instrument,
335                                                        PublicTrades,
336                                                    )
337                                                })
338                                                .collect(),
339                                        )
340                                        .await
341                                        .map(|stream| {
342                                            tokio::spawn(stream.forward_to(
343                                                txs.trades.get(&exchange).unwrap().clone(),
344                                            ))
345                                        })
346                                    }
347                                    (ExchangeId::Coinbase, SubKind::PublicTrades) => {
348                                        init_market_stream(
349                                            STREAM_RECONNECTION_POLICY,
350                                            subs.into_iter()
351                                                .map(|sub| {
352                                                    Subscription::new(
353                                                        Coinbase,
354                                                        sub.instrument,
355                                                        PublicTrades,
356                                                    )
357                                                })
358                                                .collect(),
359                                        )
360                                        .await
361                                        .map(|stream| {
362                                            tokio::spawn(stream.forward_to(
363                                                txs.trades.get(&exchange).unwrap().clone(),
364                                            ))
365                                        })
366                                    }
367                                    (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
368                                        init_market_stream(
369                                            STREAM_RECONNECTION_POLICY,
370                                            subs.into_iter()
371                                                .map(|sub| {
372                                                    Subscription::new(
373                                                        GateioSpot::default(),
374                                                        sub.instrument,
375                                                        PublicTrades,
376                                                    )
377                                                })
378                                                .collect(),
379                                        )
380                                        .await
381                                        .map(|stream| {
382                                            tokio::spawn(stream.forward_to(
383                                                txs.trades.get(&exchange).unwrap().clone(),
384                                            ))
385                                        })
386                                    }
387                                    (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
388                                        init_market_stream(
389                                            STREAM_RECONNECTION_POLICY,
390                                            subs.into_iter()
391                                                .map(|sub| {
392                                                    Subscription::new(
393                                                        GateioFuturesUsd::default(),
394                                                        sub.instrument,
395                                                        PublicTrades,
396                                                    )
397                                                })
398                                                .collect(),
399                                        )
400                                        .await
401                                        .map(|stream| {
402                                            tokio::spawn(stream.forward_to(
403                                                txs.trades.get(&exchange).unwrap().clone(),
404                                            ))
405                                        })
406                                    }
407                                    (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
408                                        init_market_stream(
409                                            STREAM_RECONNECTION_POLICY,
410                                            subs.into_iter()
411                                                .map(|sub| {
412                                                    Subscription::new(
413                                                        GateioFuturesBtc::default(),
414                                                        sub.instrument,
415                                                        PublicTrades,
416                                                    )
417                                                })
418                                                .collect(),
419                                        )
420                                        .await
421                                        .map(|stream| {
422                                            tokio::spawn(stream.forward_to(
423                                                txs.trades.get(&exchange).unwrap().clone(),
424                                            ))
425                                        })
426                                    }
427                                    (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
428                                        init_market_stream(
429                                            STREAM_RECONNECTION_POLICY,
430                                            subs.into_iter()
431                                                .map(|sub| {
432                                                    Subscription::new(
433                                                        GateioPerpetualsUsd::default(),
434                                                        sub.instrument,
435                                                        PublicTrades,
436                                                    )
437                                                })
438                                                .collect(),
439                                        )
440                                        .await
441                                        .map(|stream| {
442                                            tokio::spawn(stream.forward_to(
443                                                txs.trades.get(&exchange).unwrap().clone(),
444                                            ))
445                                        })
446                                    }
447                                    (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
448                                        init_market_stream(
449                                            STREAM_RECONNECTION_POLICY,
450                                            subs.into_iter()
451                                                .map(|sub| {
452                                                    Subscription::new(
453                                                        GateioPerpetualsBtc::default(),
454                                                        sub.instrument,
455                                                        PublicTrades,
456                                                    )
457                                                })
458                                                .collect(),
459                                        )
460                                        .await
461                                        .map(|stream| {
462                                            tokio::spawn(stream.forward_to(
463                                                txs.trades.get(&exchange).unwrap().clone(),
464                                            ))
465                                        })
466                                    }
467                                    (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
468                                        init_market_stream(
469                                            STREAM_RECONNECTION_POLICY,
470                                            subs.into_iter()
471                                                .map(|sub| {
472                                                    Subscription::new(
473                                                        GateioOptions::default(),
474                                                        sub.instrument,
475                                                        PublicTrades,
476                                                    )
477                                                })
478                                                .collect(),
479                                        )
480                                        .await
481                                        .map(|stream| {
482                                            tokio::spawn(stream.forward_to(
483                                                txs.trades.get(&exchange).unwrap().clone(),
484                                            ))
485                                        })
486                                    }
487                                    (ExchangeId::Kraken, SubKind::PublicTrades) => {
488                                        init_market_stream(
489                                            STREAM_RECONNECTION_POLICY,
490                                            subs.into_iter()
491                                                .map(|sub| {
492                                                    Subscription::new(
493                                                        Kraken,
494                                                        sub.instrument,
495                                                        PublicTrades,
496                                                    )
497                                                })
498                                                .collect(),
499                                        )
500                                        .await
501                                        .map(|stream| {
502                                            tokio::spawn(stream.forward_to(
503                                                txs.trades.get(&exchange).unwrap().clone(),
504                                            ))
505                                        })
506                                    }
507                                    (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
508                                        init_market_stream(
509                                            STREAM_RECONNECTION_POLICY,
510                                            subs.into_iter()
511                                                .map(|sub| {
512                                                    Subscription::new(
513                                                        Kraken,
514                                                        sub.instrument,
515                                                        OrderBooksL1,
516                                                    )
517                                                })
518                                                .collect(),
519                                        )
520                                        .await
521                                        .map(|stream| {
522                                            tokio::spawn(stream.forward_to(
523                                                txs.l1s.get(&exchange).unwrap().clone(),
524                                            ))
525                                        })
526                                    }
527                                    (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
528                                        STREAM_RECONNECTION_POLICY,
529                                        subs.into_iter()
530                                            .map(|sub| {
531                                                Subscription::new(Okx, sub.instrument, PublicTrades)
532                                            })
533                                            .collect(),
534                                    )
535                                    .await
536                                    .map(|stream| {
537                                        tokio::spawn(
538                                            stream.forward_to(
539                                                txs.trades.get(&exchange).unwrap().clone(),
540                                            ),
541                                        )
542                                    }),
543                                    (exchange, sub_kind) => {
544                                        Err(DataError::Unsupported { exchange, sub_kind })
545                                    }
546                                }
547                            }
548                        });
549
550                try_join_all(batch_futures)
551            });
552
553        try_join_all(futures).await?;
554
555        Ok(Self {
556            trades: channels
557                .rxs
558                .trades
559                .into_iter()
560                .map(|(exchange, rx)| (exchange, rx.into_stream()))
561                .collect(),
562            l1s: channels
563                .rxs
564                .l1s
565                .into_iter()
566                .map(|(exchange, rx)| (exchange, rx.into_stream()))
567                .collect(),
568            l2s: channels
569                .rxs
570                .l2s
571                .into_iter()
572                .map(|(exchange, rx)| (exchange, rx.into_stream()))
573                .collect(),
574            liquidations: channels
575                .rxs
576                .liquidations
577                .into_iter()
578                .map(|(exchange, rx)| (exchange, rx.into_stream()))
579                .collect(),
580        })
581    }
582
583    /// Remove an execution [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
584    ///
585    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
586    pub fn select_trades(
587        &mut self,
588        exchange: ExchangeId,
589    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
590        self.trades.remove(&exchange)
591    }
592
593    /// Select and merge every execution [`PublicTrade`] `Stream` using
594    /// [`SelectAll`](futures_util::stream::select_all::select_all).
595    pub fn select_all_trades(
596        &mut self,
597    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
598        futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
599    }
600
601    /// Remove an execution [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
602    ///
603    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
604    pub fn select_l1s(
605        &mut self,
606        exchange: ExchangeId,
607    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
608        self.l1s.remove(&exchange)
609    }
610
611    /// Select and merge every execution [`OrderBookL1`] `Stream` using
612    /// [`SelectAll`](futures_util::stream::select_all::select_all).
613    pub fn select_all_l1s(
614        &mut self,
615    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
616        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
617    }
618
619    /// Remove an execution [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
620    ///
621    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
622    pub fn select_l2s(
623        &mut self,
624        exchange: ExchangeId,
625    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
626        self.l2s.remove(&exchange)
627    }
628
629    /// Select and merge every execution [`OrderBookEvent`] `Stream` using
630    /// [`SelectAll`](futures_util::stream::select_all::select_all).
631    pub fn select_all_l2s(
632        &mut self,
633    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
634        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
635    }
636
637    /// Remove an execution [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
638    ///
639    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
640    pub fn select_liquidations(
641        &mut self,
642        exchange: ExchangeId,
643    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
644        self.liquidations.remove(&exchange)
645    }
646
647    /// Select and merge every execution [`Liquidation`] `Stream` using
648    /// [`SelectAll`](futures_util::stream::select_all::select_all).
649    pub fn select_all_liquidations(
650        &mut self,
651    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
652        futures_util::stream::select_all::select_all(
653            std::mem::take(&mut self.liquidations).into_values(),
654        )
655    }
656
657    /// Select and merge every execution `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
658    ///
659    /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
660    /// use cases.
661    pub fn select_all<Output>(self) -> impl Stream<Item = Output>
662    where
663        InstrumentKey: Send + 'static,
664        Output: 'static,
665        MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
666        MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
667        MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
668        MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
669    {
670        let Self {
671            trades,
672            l1s,
673            l2s,
674            liquidations,
675        } = self;
676
677        let trades = trades
678            .into_values()
679            .map(|stream| stream.map(MarketStreamResult::into).boxed());
680
681        let l1s = l1s
682            .into_values()
683            .map(|stream| stream.map(MarketStreamResult::into).boxed());
684
685        let l2s = l2s
686            .into_values()
687            .map(|stream| stream.map(MarketStreamResult::into).boxed());
688
689        let liquidations = liquidations
690            .into_values()
691            .map(|stream| stream.map(MarketStreamResult::into).boxed());
692
693        let all = trades.chain(l1s).chain(l2s).chain(liquidations);
694
695        futures_util::stream::select_all::select_all(all)
696    }
697}
698
699pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
700    batches: SubBatchIter,
701) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
702where
703    SubBatchIter: IntoIterator<Item = SubIter>,
704    SubIter: IntoIterator<Item = Sub>,
705    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
706    Instrument: InstrumentData + Ord,
707{
708    batches
709        .into_iter()
710        .map(validate_subscriptions::<SubIter, Sub, Instrument>)
711        .collect()
712}
713
714pub fn validate_subscriptions<SubIter, Sub, Instrument>(
715    batch: SubIter,
716) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
717where
718    SubIter: IntoIterator<Item = Sub>,
719    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
720    Instrument: InstrumentData + Ord,
721{
722    // Validate Subscriptions
723    let mut batch = batch
724        .into_iter()
725        .map(Sub::into)
726        .map(Validator::validate)
727        .collect::<Result<Vec<_>, SocketError>>()?;
728
729    // Remove duplicate Subscriptions
730    batch.sort();
731    batch.dedup();
732
733    Ok(batch)
734}
735
736struct Channels<InstrumentKey> {
737    txs: Arc<Txs<InstrumentKey>>,
738    rxs: Rxs<InstrumentKey>,
739}
740
741impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
742    for Channels<Instrument::Key>
743where
744    Instrument: InstrumentData,
745{
746    type Error = DataError;
747
748    fn try_from(
749        value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
750    ) -> Result<Self, Self::Error> {
751        let mut txs = Txs::default();
752        let mut rxs = Rxs::default();
753
754        for sub in value.iter().flatten() {
755            match sub.kind {
756                SubKind::PublicTrades => {
757                    if let (None, None) =
758                        (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
759                    {
760                        let (tx, rx) = mpsc_unbounded();
761                        txs.trades.insert(sub.exchange, tx);
762                        rxs.trades.insert(sub.exchange, rx);
763                    }
764                }
765                SubKind::OrderBooksL1 => {
766                    if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
767                        let (tx, rx) = mpsc_unbounded();
768                        txs.l1s.insert(sub.exchange, tx);
769                        rxs.l1s.insert(sub.exchange, rx);
770                    }
771                }
772                SubKind::OrderBooksL2 => {
773                    if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
774                        let (tx, rx) = mpsc_unbounded();
775                        txs.l2s.insert(sub.exchange, tx);
776                        rxs.l2s.insert(sub.exchange, rx);
777                    }
778                }
779                SubKind::Liquidations => {
780                    if let (None, None) = (
781                        txs.liquidations.get(&sub.exchange),
782                        rxs.liquidations.get(&sub.exchange),
783                    ) {
784                        let (tx, rx) = mpsc_unbounded();
785                        txs.liquidations.insert(sub.exchange, tx);
786                        rxs.liquidations.insert(sub.exchange, rx);
787                    }
788                }
789                unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
790            }
791        }
792
793        Ok(Channels {
794            txs: Arc::new(txs),
795            rxs,
796        })
797    }
798}
799
800struct Txs<InstrumentKey> {
801    trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
802    l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
803    l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
804    liquidations:
805        FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
806}
807
808impl<InstrumentKey> Default for Txs<InstrumentKey> {
809    fn default() -> Self {
810        Self {
811            trades: Default::default(),
812            l1s: Default::default(),
813            l2s: Default::default(),
814            liquidations: Default::default(),
815        }
816    }
817}
818
819struct Rxs<InstrumentKey> {
820    trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
821    l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
822    l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
823    liquidations:
824        FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
825}
826
827impl<InstrumentKey> Default for Rxs<InstrumentKey> {
828    fn default() -> Self {
829        Self {
830            trades: Default::default(),
831            l1s: Default::default(),
832            l2s: Default::default(),
833            liquidations: Default::default(),
834        }
835    }
836}