Skip to main content

rustrade_data/streams/builder/dynamic/
mod.rs

1use crate::{
2    Identifier,
3    error::DataError,
4    exchange::{
5        binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6        bitfinex::{Bitfinex, market::BitfinexMarket},
7        bitmex::{Bitmex, market::BitmexMarket},
8        bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9        coinbase::{Coinbase, market::CoinbaseMarket},
10        gateio::{
11            future::{GateioFuturesBtc, GateioFuturesUsd},
12            market::GateioMarket,
13            option::GateioOptions,
14            perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15            spot::GateioSpot,
16        },
17        kraken::{Kraken, market::KrakenMarket},
18        okx::{Okx, market::OkxMarket},
19    },
20    instrument::InstrumentData,
21    streams::{
22        consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23        reconnect::stream::ReconnectingStream,
24    },
25    subscriber::WebSocketSubscriber,
26    subscription::{
27        SubKind, Subscription,
28        book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
29        liquidation::{Liquidation, Liquidations},
30        trade::{PublicTrade, PublicTrades},
31    },
32};
33use fnv::FnvHashMap;
34use futures::{Stream, stream::SelectAll};
35use futures_util::{StreamExt, future::try_join_all};
36use itertools::Itertools;
37use rustrade_instrument::exchange::ExchangeId;
38use rustrade_integration::{
39    Validator,
40    channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
41    error::SocketError,
42};
43use std::{
44    fmt::{Debug, Display},
45    sync::Arc,
46};
47use tokio_stream::wrappers::UnboundedReceiverStream;
48use vecmap::VecMap;
49
50pub mod indexed;
51
52#[derive(Debug)]
53pub struct DynamicStreams<InstrumentKey> {
54    pub trades:
55        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
56    pub l1s:
57        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
58    pub l2s: VecMap<
59        ExchangeId,
60        UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
61    >,
62    pub liquidations:
63        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
64}
65
66impl<InstrumentKey> DynamicStreams<InstrumentKey> {
67    /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
68    ///
69    /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
70    /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
71    /// [`SubKind`], it will be further split under the hood for compile-time reasons.
72    ///
73    /// ## Examples
74    /// Please see rustrade-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
75    /// comprehensive example of how to use this market data stream initialiser.
76    #[allow(clippy::unwrap_used)] // Invariant: Channels::try_from creates entries for all exchanges in batches; lookups iterate over the same exchanges
77    pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
78        subscription_batches: SubBatchIter,
79    ) -> Result<Self, DataError>
80    where
81        SubBatchIter: IntoIterator<Item = SubIter>,
82        SubIter: IntoIterator<Item = Sub>,
83        Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
84        Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
85        InstrumentKey: Debug + Clone + PartialEq + Send + Sync + 'static,
86        Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
87        Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
88        Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
89        Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
90        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
91        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
92        Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
93        Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
94        Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
95        Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
96        Subscription<BybitSpot, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
97        Subscription<BybitSpot, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
98        Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
99        Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
100        Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
101        Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
102        Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
103        Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
104        Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
105        Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
106        Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
107        Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
108        Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
109        Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
110        Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
111    {
112        // Validate & dedup Subscription batches
113        let batches = validate_batches(subscription_batches)?;
114
115        // Generate required Channels from Subscription batches
116        let channels = Channels::try_from(&batches)?;
117
118        let futures =
119            batches.into_iter().map(|mut batch| {
120                batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
121                let by_exchange_by_sub_kind =
122                    batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
123
124                let batch_futures =
125                    by_exchange_by_sub_kind
126                        .into_iter()
127                        .map(|((exchange, sub_kind), subs)| {
128                            let subs = subs.into_iter().collect::<Vec<_>>();
129                            let txs = Arc::clone(&channels.txs);
130                            async move {
131                                match (exchange, sub_kind) {
132                                    (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
133                                        init_market_stream(
134                                            STREAM_RECONNECTION_POLICY,
135                                            WebSocketSubscriber,
136                                            subs.into_iter()
137                                                .map(|sub| {
138                                                    Subscription::new(
139                                                        BinanceSpot::default(),
140                                                        sub.instrument,
141                                                        PublicTrades,
142                                                    )
143                                                })
144                                                .collect(),
145                                        )
146                                        .await
147                                        .map(|stream| {
148                                            tokio::spawn(stream.forward_to(
149                                                txs.trades.get(&exchange).unwrap().clone(),
150                                            ))
151                                        })
152                                    }
153                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
154                                        init_market_stream(
155                                            STREAM_RECONNECTION_POLICY,
156                                            WebSocketSubscriber,
157                                            subs.into_iter()
158                                                .map(|sub| {
159                                                    Subscription::new(
160                                                        BinanceSpot::default(),
161                                                        sub.instrument,
162                                                        OrderBooksL1,
163                                                    )
164                                                })
165                                                .collect(),
166                                        )
167                                        .await
168                                        .map(|stream| {
169                                            tokio::spawn(stream.forward_to(
170                                                txs.l1s.get(&exchange).unwrap().clone(),
171                                            ))
172                                        })
173                                    }
174                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
175                                        init_market_stream(
176                                            STREAM_RECONNECTION_POLICY,
177                                            WebSocketSubscriber,
178                                            subs.into_iter()
179                                                .map(|sub| {
180                                                    Subscription::new(
181                                                        BinanceSpot::default(),
182                                                        sub.instrument,
183                                                        OrderBooksL2,
184                                                    )
185                                                })
186                                                .collect(),
187                                        )
188                                        .await
189                                        .map(|stream| {
190                                            tokio::spawn(stream.forward_to(
191                                                txs.l2s.get(&exchange).unwrap().clone(),
192                                            ))
193                                        })
194                                    }
195                                    (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
196                                        init_market_stream(
197                                            STREAM_RECONNECTION_POLICY,
198                                            WebSocketSubscriber,
199                                            subs.into_iter()
200                                                .map(|sub| {
201                                                    Subscription::new(
202                                                        BinanceFuturesUsd::default(),
203                                                        sub.instrument,
204                                                        PublicTrades,
205                                                    )
206                                                })
207                                                .collect(),
208                                        )
209                                        .await
210                                        .map(|stream| {
211                                            tokio::spawn(stream.forward_to(
212                                                txs.trades.get(&exchange).unwrap().clone(),
213                                            ))
214                                        })
215                                    }
216                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
217                                        init_market_stream(
218                                            STREAM_RECONNECTION_POLICY,
219                                            WebSocketSubscriber,
220                                            subs.into_iter()
221                                                .map(|sub| {
222                                                    Subscription::<_, Instrument, _>::new(
223                                                        BinanceFuturesUsd::default(),
224                                                        sub.instrument,
225                                                        OrderBooksL1,
226                                                    )
227                                                })
228                                                .collect(),
229                                        )
230                                        .await
231                                        .map(|stream| {
232                                            tokio::spawn(stream.forward_to(
233                                                txs.l1s.get(&exchange).unwrap().clone(),
234                                            ))
235                                        })
236                                    }
237                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
238                                        init_market_stream(
239                                            STREAM_RECONNECTION_POLICY,
240                                            WebSocketSubscriber,
241                                            subs.into_iter()
242                                                .map(|sub| {
243                                                    Subscription::<_, Instrument, _>::new(
244                                                        BinanceFuturesUsd::default(),
245                                                        sub.instrument,
246                                                        OrderBooksL2,
247                                                    )
248                                                })
249                                                .collect(),
250                                        )
251                                        .await
252                                        .map(|stream| {
253                                            tokio::spawn(stream.forward_to(
254                                                txs.l2s.get(&exchange).unwrap().clone(),
255                                            ))
256                                        })
257                                    }
258                                    (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
259                                        init_market_stream(
260                                            STREAM_RECONNECTION_POLICY,
261                                            WebSocketSubscriber,
262                                            subs.into_iter()
263                                                .map(|sub| {
264                                                    Subscription::<_, Instrument, _>::new(
265                                                        BinanceFuturesUsd::default(),
266                                                        sub.instrument,
267                                                        Liquidations,
268                                                    )
269                                                })
270                                                .collect(),
271                                        )
272                                        .await
273                                        .map(|stream| {
274                                            tokio::spawn(stream.forward_to(
275                                                txs.liquidations.get(&exchange).unwrap().clone(),
276                                            ))
277                                        })
278                                    }
279                                    (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
280                                        init_market_stream(
281                                            STREAM_RECONNECTION_POLICY,
282                                            WebSocketSubscriber,
283                                            subs.into_iter()
284                                                .map(|sub| {
285                                                    Subscription::new(
286                                                        Bitfinex,
287                                                        sub.instrument,
288                                                        PublicTrades,
289                                                    )
290                                                })
291                                                .collect(),
292                                        )
293                                        .await
294                                        .map(|stream| {
295                                            tokio::spawn(stream.forward_to(
296                                                txs.trades.get(&exchange).unwrap().clone(),
297                                            ))
298                                        })
299                                    }
300                                    (ExchangeId::Bitmex, SubKind::PublicTrades) => {
301                                        init_market_stream(
302                                            STREAM_RECONNECTION_POLICY,
303                                            WebSocketSubscriber,
304                                            subs.into_iter()
305                                                .map(|sub| {
306                                                    Subscription::new(
307                                                        Bitmex,
308                                                        sub.instrument,
309                                                        PublicTrades,
310                                                    )
311                                                })
312                                                .collect(),
313                                        )
314                                        .await
315                                        .map(|stream| {
316                                            tokio::spawn(stream.forward_to(
317                                                txs.trades.get(&exchange).unwrap().clone(),
318                                            ))
319                                        })
320                                    }
321                                    (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
322                                        init_market_stream(
323                                            STREAM_RECONNECTION_POLICY,
324                                            WebSocketSubscriber,
325                                            subs.into_iter()
326                                                .map(|sub| {
327                                                    Subscription::new(
328                                                        BybitSpot::default(),
329                                                        sub.instrument,
330                                                        PublicTrades,
331                                                    )
332                                                })
333                                                .collect(),
334                                        )
335                                        .await
336                                        .map(|stream| {
337                                            tokio::spawn(stream.forward_to(
338                                                txs.trades.get(&exchange).unwrap().clone(),
339                                            ))
340                                        })
341                                    }
342                                    (ExchangeId::BybitSpot, SubKind::OrderBooksL1) => {
343                                        init_market_stream(
344                                            STREAM_RECONNECTION_POLICY,
345                                            WebSocketSubscriber,
346                                            subs.into_iter()
347                                                .map(|sub| {
348                                                    Subscription::new(
349                                                        BybitSpot::default(),
350                                                        sub.instrument,
351                                                        OrderBooksL1,
352                                                    )
353                                                })
354                                                .collect(),
355                                        )
356                                        .await
357                                        .map(|stream| {
358                                            tokio::spawn(stream.forward_to(
359                                                txs.l1s.get(&exchange).unwrap().clone(),
360                                            ))
361                                        })
362                                    }
363                                    (ExchangeId::BybitSpot, SubKind::OrderBooksL2) => {
364                                        init_market_stream(
365                                            STREAM_RECONNECTION_POLICY,
366                                            WebSocketSubscriber,
367                                            subs.into_iter()
368                                                .map(|sub| {
369                                                    Subscription::new(
370                                                        BybitSpot::default(),
371                                                        sub.instrument,
372                                                        OrderBooksL2,
373                                                    )
374                                                })
375                                                .collect(),
376                                        )
377                                        .await
378                                        .map(|stream| {
379                                            tokio::spawn(stream.forward_to(
380                                                txs.l2s.get(&exchange).unwrap().clone(),
381                                            ))
382                                        })
383                                    }
384                                    (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
385                                        init_market_stream(
386                                            STREAM_RECONNECTION_POLICY,
387                                            WebSocketSubscriber,
388                                            subs.into_iter()
389                                                .map(|sub| {
390                                                    Subscription::new(
391                                                        BybitPerpetualsUsd::default(),
392                                                        sub.instrument,
393                                                        PublicTrades,
394                                                    )
395                                                })
396                                                .collect(),
397                                        )
398                                        .await
399                                        .map(|stream| {
400                                            tokio::spawn(stream.forward_to(
401                                                txs.trades.get(&exchange).unwrap().clone(),
402                                            ))
403                                        })
404                                    }
405                                    (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL1) => {
406                                        init_market_stream(
407                                            STREAM_RECONNECTION_POLICY,
408                                            WebSocketSubscriber,
409                                            subs.into_iter()
410                                                .map(|sub| {
411                                                    Subscription::new(
412                                                        BybitSpot::default(),
413                                                        sub.instrument,
414                                                        OrderBooksL1,
415                                                    )
416                                                })
417                                                .collect(),
418                                        )
419                                        .await
420                                        .map(|stream| {
421                                            tokio::spawn(stream.forward_to(
422                                                txs.l1s.get(&exchange).unwrap().clone(),
423                                            ))
424                                        })
425                                    }
426                                    (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL2) => {
427                                        init_market_stream(
428                                            STREAM_RECONNECTION_POLICY,
429                                            WebSocketSubscriber,
430                                            subs.into_iter()
431                                                .map(|sub| {
432                                                    Subscription::new(
433                                                        BybitSpot::default(),
434                                                        sub.instrument,
435                                                        OrderBooksL2,
436                                                    )
437                                                })
438                                                .collect(),
439                                        )
440                                        .await
441                                        .map(|stream| {
442                                            tokio::spawn(stream.forward_to(
443                                                txs.l2s.get(&exchange).unwrap().clone(),
444                                            ))
445                                        })
446                                    }
447                                    (ExchangeId::Coinbase, SubKind::PublicTrades) => {
448                                        init_market_stream(
449                                            STREAM_RECONNECTION_POLICY,
450                                            WebSocketSubscriber,
451                                            subs.into_iter()
452                                                .map(|sub| {
453                                                    Subscription::new(
454                                                        Coinbase,
455                                                        sub.instrument,
456                                                        PublicTrades,
457                                                    )
458                                                })
459                                                .collect(),
460                                        )
461                                        .await
462                                        .map(|stream| {
463                                            tokio::spawn(stream.forward_to(
464                                                txs.trades.get(&exchange).unwrap().clone(),
465                                            ))
466                                        })
467                                    }
468                                    (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
469                                        init_market_stream(
470                                            STREAM_RECONNECTION_POLICY,
471                                            WebSocketSubscriber,
472                                            subs.into_iter()
473                                                .map(|sub| {
474                                                    Subscription::new(
475                                                        GateioSpot::default(),
476                                                        sub.instrument,
477                                                        PublicTrades,
478                                                    )
479                                                })
480                                                .collect(),
481                                        )
482                                        .await
483                                        .map(|stream| {
484                                            tokio::spawn(stream.forward_to(
485                                                txs.trades.get(&exchange).unwrap().clone(),
486                                            ))
487                                        })
488                                    }
489                                    (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
490                                        init_market_stream(
491                                            STREAM_RECONNECTION_POLICY,
492                                            WebSocketSubscriber,
493                                            subs.into_iter()
494                                                .map(|sub| {
495                                                    Subscription::new(
496                                                        GateioFuturesUsd::default(),
497                                                        sub.instrument,
498                                                        PublicTrades,
499                                                    )
500                                                })
501                                                .collect(),
502                                        )
503                                        .await
504                                        .map(|stream| {
505                                            tokio::spawn(stream.forward_to(
506                                                txs.trades.get(&exchange).unwrap().clone(),
507                                            ))
508                                        })
509                                    }
510                                    (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
511                                        init_market_stream(
512                                            STREAM_RECONNECTION_POLICY,
513                                            WebSocketSubscriber,
514                                            subs.into_iter()
515                                                .map(|sub| {
516                                                    Subscription::new(
517                                                        GateioFuturesBtc::default(),
518                                                        sub.instrument,
519                                                        PublicTrades,
520                                                    )
521                                                })
522                                                .collect(),
523                                        )
524                                        .await
525                                        .map(|stream| {
526                                            tokio::spawn(stream.forward_to(
527                                                txs.trades.get(&exchange).unwrap().clone(),
528                                            ))
529                                        })
530                                    }
531                                    (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
532                                        init_market_stream(
533                                            STREAM_RECONNECTION_POLICY,
534                                            WebSocketSubscriber,
535                                            subs.into_iter()
536                                                .map(|sub| {
537                                                    Subscription::new(
538                                                        GateioPerpetualsUsd::default(),
539                                                        sub.instrument,
540                                                        PublicTrades,
541                                                    )
542                                                })
543                                                .collect(),
544                                        )
545                                        .await
546                                        .map(|stream| {
547                                            tokio::spawn(stream.forward_to(
548                                                txs.trades.get(&exchange).unwrap().clone(),
549                                            ))
550                                        })
551                                    }
552                                    (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
553                                        init_market_stream(
554                                            STREAM_RECONNECTION_POLICY,
555                                            WebSocketSubscriber,
556                                            subs.into_iter()
557                                                .map(|sub| {
558                                                    Subscription::new(
559                                                        GateioPerpetualsBtc::default(),
560                                                        sub.instrument,
561                                                        PublicTrades,
562                                                    )
563                                                })
564                                                .collect(),
565                                        )
566                                        .await
567                                        .map(|stream| {
568                                            tokio::spawn(stream.forward_to(
569                                                txs.trades.get(&exchange).unwrap().clone(),
570                                            ))
571                                        })
572                                    }
573                                    (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
574                                        init_market_stream(
575                                            STREAM_RECONNECTION_POLICY,
576                                            WebSocketSubscriber,
577                                            subs.into_iter()
578                                                .map(|sub| {
579                                                    Subscription::new(
580                                                        GateioOptions::default(),
581                                                        sub.instrument,
582                                                        PublicTrades,
583                                                    )
584                                                })
585                                                .collect(),
586                                        )
587                                        .await
588                                        .map(|stream| {
589                                            tokio::spawn(stream.forward_to(
590                                                txs.trades.get(&exchange).unwrap().clone(),
591                                            ))
592                                        })
593                                    }
594                                    (ExchangeId::Kraken, SubKind::PublicTrades) => {
595                                        init_market_stream(
596                                            STREAM_RECONNECTION_POLICY,
597                                            WebSocketSubscriber,
598                                            subs.into_iter()
599                                                .map(|sub| {
600                                                    Subscription::new(
601                                                        Kraken,
602                                                        sub.instrument,
603                                                        PublicTrades,
604                                                    )
605                                                })
606                                                .collect(),
607                                        )
608                                        .await
609                                        .map(|stream| {
610                                            tokio::spawn(stream.forward_to(
611                                                txs.trades.get(&exchange).unwrap().clone(),
612                                            ))
613                                        })
614                                    }
615                                    (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
616                                        init_market_stream(
617                                            STREAM_RECONNECTION_POLICY,
618                                            WebSocketSubscriber,
619                                            subs.into_iter()
620                                                .map(|sub| {
621                                                    Subscription::new(
622                                                        Kraken,
623                                                        sub.instrument,
624                                                        OrderBooksL1,
625                                                    )
626                                                })
627                                                .collect(),
628                                        )
629                                        .await
630                                        .map(|stream| {
631                                            tokio::spawn(stream.forward_to(
632                                                txs.l1s.get(&exchange).unwrap().clone(),
633                                            ))
634                                        })
635                                    }
636                                    (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
637                                        STREAM_RECONNECTION_POLICY,
638                                        WebSocketSubscriber,
639                                        subs.into_iter()
640                                            .map(|sub| {
641                                                Subscription::new(Okx, sub.instrument, PublicTrades)
642                                            })
643                                            .collect(),
644                                    )
645                                    .await
646                                    .map(|stream| {
647                                        tokio::spawn(
648                                            stream.forward_to(
649                                                txs.trades.get(&exchange).unwrap().clone(),
650                                            ),
651                                        )
652                                    }),
653                                    (exchange, sub_kind) => {
654                                        Err(DataError::Unsupported { exchange, sub_kind })
655                                    }
656                                }
657                            }
658                        });
659
660                try_join_all(batch_futures)
661            });
662
663        try_join_all(futures).await?;
664
665        Ok(Self {
666            trades: channels
667                .rxs
668                .trades
669                .into_iter()
670                .map(|(exchange, rx)| (exchange, rx.into_stream()))
671                .collect(),
672            l1s: channels
673                .rxs
674                .l1s
675                .into_iter()
676                .map(|(exchange, rx)| (exchange, rx.into_stream()))
677                .collect(),
678            l2s: channels
679                .rxs
680                .l2s
681                .into_iter()
682                .map(|(exchange, rx)| (exchange, rx.into_stream()))
683                .collect(),
684            liquidations: channels
685                .rxs
686                .liquidations
687                .into_iter()
688                .map(|(exchange, rx)| (exchange, rx.into_stream()))
689                .collect(),
690        })
691    }
692
693    /// Remove an exchange [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
694    ///
695    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
696    pub fn select_trades(
697        &mut self,
698        exchange: ExchangeId,
699    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
700        self.trades.remove(&exchange)
701    }
702
703    /// Select and merge every exchange [`PublicTrade`] `Stream` using
704    /// [`SelectAll`](futures_util::stream::select_all::select_all).
705    pub fn select_all_trades(
706        &mut self,
707    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
708        futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
709    }
710
711    /// Remove an exchange [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
712    ///
713    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
714    pub fn select_l1s(
715        &mut self,
716        exchange: ExchangeId,
717    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
718        self.l1s.remove(&exchange)
719    }
720
721    /// Select and merge every exchange [`OrderBookL1`] `Stream` using
722    /// [`SelectAll`](futures_util::stream::select_all::select_all).
723    pub fn select_all_l1s(
724        &mut self,
725    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
726        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
727    }
728
729    /// Remove an exchange [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
730    ///
731    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
732    pub fn select_l2s(
733        &mut self,
734        exchange: ExchangeId,
735    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
736        self.l2s.remove(&exchange)
737    }
738
739    /// Select and merge every exchange [`OrderBookEvent`] `Stream` using
740    /// [`SelectAll`](futures_util::stream::select_all::select_all).
741    pub fn select_all_l2s(
742        &mut self,
743    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
744        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
745    }
746
747    /// Remove an exchange [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
748    ///
749    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
750    pub fn select_liquidations(
751        &mut self,
752        exchange: ExchangeId,
753    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
754        self.liquidations.remove(&exchange)
755    }
756
757    /// Select and merge every exchange [`Liquidation`] `Stream` using
758    /// [`SelectAll`](futures_util::stream::select_all::select_all).
759    pub fn select_all_liquidations(
760        &mut self,
761    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
762        futures_util::stream::select_all::select_all(
763            std::mem::take(&mut self.liquidations).into_values(),
764        )
765    }
766
767    /// Select and merge every exchange `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
768    ///
769    /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
770    /// use cases.
771    pub fn select_all<Output>(self) -> impl Stream<Item = Output>
772    where
773        InstrumentKey: Send + 'static,
774        Output: 'static,
775        MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
776        MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
777        MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
778        MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
779    {
780        let Self {
781            trades,
782            l1s,
783            l2s,
784            liquidations,
785        } = self;
786
787        let trades = trades
788            .into_values()
789            .map(|stream| stream.map(MarketStreamResult::into).boxed());
790
791        let l1s = l1s
792            .into_values()
793            .map(|stream| stream.map(MarketStreamResult::into).boxed());
794
795        let l2s = l2s
796            .into_values()
797            .map(|stream| stream.map(MarketStreamResult::into).boxed());
798
799        let liquidations = liquidations
800            .into_values()
801            .map(|stream| stream.map(MarketStreamResult::into).boxed());
802
803        let all = trades.chain(l1s).chain(l2s).chain(liquidations);
804
805        futures_util::stream::select_all::select_all(all)
806    }
807}
808
809pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
810    batches: SubBatchIter,
811) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
812where
813    SubBatchIter: IntoIterator<Item = SubIter>,
814    SubIter: IntoIterator<Item = Sub>,
815    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
816    Instrument: InstrumentData + Ord,
817{
818    batches
819        .into_iter()
820        .map(validate_subscriptions::<SubIter, Sub, Instrument>)
821        .collect()
822}
823
824pub fn validate_subscriptions<SubIter, Sub, Instrument>(
825    batch: SubIter,
826) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
827where
828    SubIter: IntoIterator<Item = Sub>,
829    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
830    Instrument: InstrumentData + Ord,
831{
832    // Validate Subscriptions
833    let mut batch = batch
834        .into_iter()
835        .map(Sub::into)
836        .map(Validator::validate)
837        .collect::<Result<Vec<_>, SocketError>>()?;
838
839    // Remove duplicate Subscriptions
840    batch.sort();
841    batch.dedup();
842
843    Ok(batch)
844}
845
846struct Channels<InstrumentKey> {
847    txs: Arc<Txs<InstrumentKey>>,
848    rxs: Rxs<InstrumentKey>,
849}
850
851impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
852    for Channels<Instrument::Key>
853where
854    Instrument: InstrumentData,
855{
856    type Error = DataError;
857
858    fn try_from(
859        value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
860    ) -> Result<Self, Self::Error> {
861        let mut txs = Txs::default();
862        let mut rxs = Rxs::default();
863
864        for sub in value.iter().flatten() {
865            match sub.kind {
866                SubKind::PublicTrades => {
867                    if let (None, None) =
868                        (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
869                    {
870                        let (tx, rx) = mpsc_unbounded();
871                        txs.trades.insert(sub.exchange, tx);
872                        rxs.trades.insert(sub.exchange, rx);
873                    }
874                }
875                SubKind::OrderBooksL1 => {
876                    if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
877                        let (tx, rx) = mpsc_unbounded();
878                        txs.l1s.insert(sub.exchange, tx);
879                        rxs.l1s.insert(sub.exchange, rx);
880                    }
881                }
882                SubKind::OrderBooksL2 => {
883                    if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
884                        let (tx, rx) = mpsc_unbounded();
885                        txs.l2s.insert(sub.exchange, tx);
886                        rxs.l2s.insert(sub.exchange, rx);
887                    }
888                }
889                SubKind::Liquidations => {
890                    if let (None, None) = (
891                        txs.liquidations.get(&sub.exchange),
892                        rxs.liquidations.get(&sub.exchange),
893                    ) {
894                        let (tx, rx) = mpsc_unbounded();
895                        txs.liquidations.insert(sub.exchange, tx);
896                        rxs.liquidations.insert(sub.exchange, rx);
897                    }
898                }
899                unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
900            }
901        }
902
903        Ok(Channels {
904            txs: Arc::new(txs),
905            rxs,
906        })
907    }
908}
909
910struct Txs<InstrumentKey> {
911    trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
912    l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
913    l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
914    liquidations:
915        FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
916}
917
918impl<InstrumentKey> Default for Txs<InstrumentKey> {
919    fn default() -> Self {
920        Self {
921            trades: Default::default(),
922            l1s: Default::default(),
923            l2s: Default::default(),
924            liquidations: Default::default(),
925        }
926    }
927}
928
929struct Rxs<InstrumentKey> {
930    trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
931    l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
932    l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
933    liquidations:
934        FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
935}
936
937impl<InstrumentKey> Default for Rxs<InstrumentKey> {
938    fn default() -> Self {
939        Self {
940            trades: Default::default(),
941            l1s: Default::default(),
942            l2s: Default::default(),
943            liquidations: Default::default(),
944        }
945    }
946}