barter_data/streams/builder/dynamic/
mod.rs

1use crate::{
2    Identifier,
3    error::DataError,
4    exchange::{
5        binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6        bitfinex::{Bitfinex, market::BitfinexMarket},
7        bitmex::{Bitmex, market::BitmexMarket},
8        bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9        coinbase::{Coinbase, market::CoinbaseMarket},
10        gateio::{
11            future::{GateioFuturesBtc, GateioFuturesUsd},
12            market::GateioMarket,
13            option::GateioOptions,
14            perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15            spot::GateioSpot,
16        },
17        kraken::{Kraken, market::KrakenMarket},
18        okx::{Okx, market::OkxMarket},
19    },
20    instrument::InstrumentData,
21    streams::{
22        consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23        reconnect::stream::ReconnectingStream,
24    },
25    subscription::{
26        SubKind, Subscription,
27        book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
28        liquidation::{Liquidation, Liquidations},
29        trade::{PublicTrade, PublicTrades},
30    },
31};
32use barter_instrument::exchange::ExchangeId;
33use barter_integration::{
34    Validator,
35    channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
36    error::SocketError,
37};
38use fnv::FnvHashMap;
39use futures::{Stream, stream::SelectAll};
40use futures_util::{StreamExt, future::try_join_all};
41use itertools::Itertools;
42use std::{
43    fmt::{Debug, Display},
44    sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53    pub trades:
54        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55    pub l1s:
56        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57    pub l2s: VecMap<
58        ExchangeId,
59        UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60    >,
61    pub liquidations:
62        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66    /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67    ///
68    /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69    /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70    /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71    ///
72    /// ## Examples
73    /// Please see barter-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74    /// comprehensive example of how to use this market data stream initialiser.
75    pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
76        subscription_batches: SubBatchIter,
77    ) -> Result<Self, DataError>
78    where
79        SubBatchIter: IntoIterator<Item = SubIter>,
80        SubIter: IntoIterator<Item = Sub>,
81        Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
82        Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
83        InstrumentKey: Debug + Clone + Send + 'static,
84        Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
85        Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
86        Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
87        Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
88        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
89        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
90        Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
91        Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
92        Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
93        Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
94        Subscription<BybitSpot, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
95        Subscription<BybitSpot, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
96        Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
97        Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
98        Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
99        Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
100        Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
101        Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
102        Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
103        Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
104        Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
105        Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
106        Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
107        Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
108        Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
109    {
110        // Validate & dedup Subscription batches
111        let batches = validate_batches(subscription_batches)?;
112
113        // Generate required Channels from Subscription batches
114        let channels = Channels::try_from(&batches)?;
115
116        let futures =
117            batches.into_iter().map(|mut batch| {
118                batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
119                let by_exchange_by_sub_kind =
120                    batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
121
122                let batch_futures =
123                    by_exchange_by_sub_kind
124                        .into_iter()
125                        .map(|((exchange, sub_kind), subs)| {
126                            let subs = subs.into_iter().collect::<Vec<_>>();
127                            let txs = Arc::clone(&channels.txs);
128                            async move {
129                                match (exchange, sub_kind) {
130                                    (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
131                                        init_market_stream(
132                                            STREAM_RECONNECTION_POLICY,
133                                            subs.into_iter()
134                                                .map(|sub| {
135                                                    Subscription::new(
136                                                        BinanceSpot::default(),
137                                                        sub.instrument,
138                                                        PublicTrades,
139                                                    )
140                                                })
141                                                .collect(),
142                                        )
143                                        .await
144                                        .map(|stream| {
145                                            tokio::spawn(stream.forward_to(
146                                                txs.trades.get(&exchange).unwrap().clone(),
147                                            ))
148                                        })
149                                    }
150                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
151                                        init_market_stream(
152                                            STREAM_RECONNECTION_POLICY,
153                                            subs.into_iter()
154                                                .map(|sub| {
155                                                    Subscription::new(
156                                                        BinanceSpot::default(),
157                                                        sub.instrument,
158                                                        OrderBooksL1,
159                                                    )
160                                                })
161                                                .collect(),
162                                        )
163                                        .await
164                                        .map(|stream| {
165                                            tokio::spawn(stream.forward_to(
166                                                txs.l1s.get(&exchange).unwrap().clone(),
167                                            ))
168                                        })
169                                    }
170                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
171                                        init_market_stream(
172                                            STREAM_RECONNECTION_POLICY,
173                                            subs.into_iter()
174                                                .map(|sub| {
175                                                    Subscription::new(
176                                                        BinanceSpot::default(),
177                                                        sub.instrument,
178                                                        OrderBooksL2,
179                                                    )
180                                                })
181                                                .collect(),
182                                        )
183                                        .await
184                                        .map(|stream| {
185                                            tokio::spawn(stream.forward_to(
186                                                txs.l2s.get(&exchange).unwrap().clone(),
187                                            ))
188                                        })
189                                    }
190                                    (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
191                                        init_market_stream(
192                                            STREAM_RECONNECTION_POLICY,
193                                            subs.into_iter()
194                                                .map(|sub| {
195                                                    Subscription::new(
196                                                        BinanceFuturesUsd::default(),
197                                                        sub.instrument,
198                                                        PublicTrades,
199                                                    )
200                                                })
201                                                .collect(),
202                                        )
203                                        .await
204                                        .map(|stream| {
205                                            tokio::spawn(stream.forward_to(
206                                                txs.trades.get(&exchange).unwrap().clone(),
207                                            ))
208                                        })
209                                    }
210                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
211                                        init_market_stream(
212                                            STREAM_RECONNECTION_POLICY,
213                                            subs.into_iter()
214                                                .map(|sub| {
215                                                    Subscription::<_, Instrument, _>::new(
216                                                        BinanceFuturesUsd::default(),
217                                                        sub.instrument,
218                                                        OrderBooksL1,
219                                                    )
220                                                })
221                                                .collect(),
222                                        )
223                                        .await
224                                        .map(|stream| {
225                                            tokio::spawn(stream.forward_to(
226                                                txs.l1s.get(&exchange).unwrap().clone(),
227                                            ))
228                                        })
229                                    }
230                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
231                                        init_market_stream(
232                                            STREAM_RECONNECTION_POLICY,
233                                            subs.into_iter()
234                                                .map(|sub| {
235                                                    Subscription::<_, Instrument, _>::new(
236                                                        BinanceFuturesUsd::default(),
237                                                        sub.instrument,
238                                                        OrderBooksL2,
239                                                    )
240                                                })
241                                                .collect(),
242                                        )
243                                        .await
244                                        .map(|stream| {
245                                            tokio::spawn(stream.forward_to(
246                                                txs.l2s.get(&exchange).unwrap().clone(),
247                                            ))
248                                        })
249                                    }
250                                    (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
251                                        init_market_stream(
252                                            STREAM_RECONNECTION_POLICY,
253                                            subs.into_iter()
254                                                .map(|sub| {
255                                                    Subscription::<_, Instrument, _>::new(
256                                                        BinanceFuturesUsd::default(),
257                                                        sub.instrument,
258                                                        Liquidations,
259                                                    )
260                                                })
261                                                .collect(),
262                                        )
263                                        .await
264                                        .map(|stream| {
265                                            tokio::spawn(stream.forward_to(
266                                                txs.liquidations.get(&exchange).unwrap().clone(),
267                                            ))
268                                        })
269                                    }
270                                    (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
271                                        init_market_stream(
272                                            STREAM_RECONNECTION_POLICY,
273                                            subs.into_iter()
274                                                .map(|sub| {
275                                                    Subscription::new(
276                                                        Bitfinex,
277                                                        sub.instrument,
278                                                        PublicTrades,
279                                                    )
280                                                })
281                                                .collect(),
282                                        )
283                                        .await
284                                        .map(|stream| {
285                                            tokio::spawn(stream.forward_to(
286                                                txs.trades.get(&exchange).unwrap().clone(),
287                                            ))
288                                        })
289                                    }
290                                    (ExchangeId::Bitmex, SubKind::PublicTrades) => {
291                                        init_market_stream(
292                                            STREAM_RECONNECTION_POLICY,
293                                            subs.into_iter()
294                                                .map(|sub| {
295                                                    Subscription::new(
296                                                        Bitmex,
297                                                        sub.instrument,
298                                                        PublicTrades,
299                                                    )
300                                                })
301                                                .collect(),
302                                        )
303                                        .await
304                                        .map(|stream| {
305                                            tokio::spawn(stream.forward_to(
306                                                txs.trades.get(&exchange).unwrap().clone(),
307                                            ))
308                                        })
309                                    }
310                                    (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
311                                        init_market_stream(
312                                            STREAM_RECONNECTION_POLICY,
313                                            subs.into_iter()
314                                                .map(|sub| {
315                                                    Subscription::new(
316                                                        BybitSpot::default(),
317                                                        sub.instrument,
318                                                        PublicTrades,
319                                                    )
320                                                })
321                                                .collect(),
322                                        )
323                                        .await
324                                        .map(|stream| {
325                                            tokio::spawn(stream.forward_to(
326                                                txs.trades.get(&exchange).unwrap().clone(),
327                                            ))
328                                        })
329                                    }
330                                    (ExchangeId::BybitSpot, SubKind::OrderBooksL1) => {
331                                        init_market_stream(
332                                            STREAM_RECONNECTION_POLICY,
333                                            subs.into_iter()
334                                                .map(|sub| {
335                                                    Subscription::new(
336                                                        BybitSpot::default(),
337                                                        sub.instrument,
338                                                        OrderBooksL1,
339                                                    )
340                                                })
341                                                .collect(),
342                                        )
343                                        .await
344                                        .map(|stream| {
345                                            tokio::spawn(stream.forward_to(
346                                                txs.l1s.get(&exchange).unwrap().clone(),
347                                            ))
348                                        })
349                                    }
350                                    (ExchangeId::BybitSpot, SubKind::OrderBooksL2) => {
351                                        init_market_stream(
352                                            STREAM_RECONNECTION_POLICY,
353                                            subs.into_iter()
354                                                .map(|sub| {
355                                                    Subscription::new(
356                                                        BybitSpot::default(),
357                                                        sub.instrument,
358                                                        OrderBooksL2,
359                                                    )
360                                                })
361                                                .collect(),
362                                        )
363                                        .await
364                                        .map(|stream| {
365                                            tokio::spawn(stream.forward_to(
366                                                txs.l2s.get(&exchange).unwrap().clone(),
367                                            ))
368                                        })
369                                    }
370                                    (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
371                                        init_market_stream(
372                                            STREAM_RECONNECTION_POLICY,
373                                            subs.into_iter()
374                                                .map(|sub| {
375                                                    Subscription::new(
376                                                        BybitPerpetualsUsd::default(),
377                                                        sub.instrument,
378                                                        PublicTrades,
379                                                    )
380                                                })
381                                                .collect(),
382                                        )
383                                        .await
384                                        .map(|stream| {
385                                            tokio::spawn(stream.forward_to(
386                                                txs.trades.get(&exchange).unwrap().clone(),
387                                            ))
388                                        })
389                                    }
390                                    (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL1) => {
391                                        init_market_stream(
392                                            STREAM_RECONNECTION_POLICY,
393                                            subs.into_iter()
394                                                .map(|sub| {
395                                                    Subscription::new(
396                                                        BybitSpot::default(),
397                                                        sub.instrument,
398                                                        OrderBooksL1,
399                                                    )
400                                                })
401                                                .collect(),
402                                        )
403                                        .await
404                                        .map(|stream| {
405                                            tokio::spawn(stream.forward_to(
406                                                txs.l1s.get(&exchange).unwrap().clone(),
407                                            ))
408                                        })
409                                    }
410                                    (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL2) => {
411                                        init_market_stream(
412                                            STREAM_RECONNECTION_POLICY,
413                                            subs.into_iter()
414                                                .map(|sub| {
415                                                    Subscription::new(
416                                                        BybitSpot::default(),
417                                                        sub.instrument,
418                                                        OrderBooksL2,
419                                                    )
420                                                })
421                                                .collect(),
422                                        )
423                                        .await
424                                        .map(|stream| {
425                                            tokio::spawn(stream.forward_to(
426                                                txs.l2s.get(&exchange).unwrap().clone(),
427                                            ))
428                                        })
429                                    }
430                                    (ExchangeId::Coinbase, SubKind::PublicTrades) => {
431                                        init_market_stream(
432                                            STREAM_RECONNECTION_POLICY,
433                                            subs.into_iter()
434                                                .map(|sub| {
435                                                    Subscription::new(
436                                                        Coinbase,
437                                                        sub.instrument,
438                                                        PublicTrades,
439                                                    )
440                                                })
441                                                .collect(),
442                                        )
443                                        .await
444                                        .map(|stream| {
445                                            tokio::spawn(stream.forward_to(
446                                                txs.trades.get(&exchange).unwrap().clone(),
447                                            ))
448                                        })
449                                    }
450                                    (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
451                                        init_market_stream(
452                                            STREAM_RECONNECTION_POLICY,
453                                            subs.into_iter()
454                                                .map(|sub| {
455                                                    Subscription::new(
456                                                        GateioSpot::default(),
457                                                        sub.instrument,
458                                                        PublicTrades,
459                                                    )
460                                                })
461                                                .collect(),
462                                        )
463                                        .await
464                                        .map(|stream| {
465                                            tokio::spawn(stream.forward_to(
466                                                txs.trades.get(&exchange).unwrap().clone(),
467                                            ))
468                                        })
469                                    }
470                                    (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
471                                        init_market_stream(
472                                            STREAM_RECONNECTION_POLICY,
473                                            subs.into_iter()
474                                                .map(|sub| {
475                                                    Subscription::new(
476                                                        GateioFuturesUsd::default(),
477                                                        sub.instrument,
478                                                        PublicTrades,
479                                                    )
480                                                })
481                                                .collect(),
482                                        )
483                                        .await
484                                        .map(|stream| {
485                                            tokio::spawn(stream.forward_to(
486                                                txs.trades.get(&exchange).unwrap().clone(),
487                                            ))
488                                        })
489                                    }
490                                    (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
491                                        init_market_stream(
492                                            STREAM_RECONNECTION_POLICY,
493                                            subs.into_iter()
494                                                .map(|sub| {
495                                                    Subscription::new(
496                                                        GateioFuturesBtc::default(),
497                                                        sub.instrument,
498                                                        PublicTrades,
499                                                    )
500                                                })
501                                                .collect(),
502                                        )
503                                        .await
504                                        .map(|stream| {
505                                            tokio::spawn(stream.forward_to(
506                                                txs.trades.get(&exchange).unwrap().clone(),
507                                            ))
508                                        })
509                                    }
510                                    (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
511                                        init_market_stream(
512                                            STREAM_RECONNECTION_POLICY,
513                                            subs.into_iter()
514                                                .map(|sub| {
515                                                    Subscription::new(
516                                                        GateioPerpetualsUsd::default(),
517                                                        sub.instrument,
518                                                        PublicTrades,
519                                                    )
520                                                })
521                                                .collect(),
522                                        )
523                                        .await
524                                        .map(|stream| {
525                                            tokio::spawn(stream.forward_to(
526                                                txs.trades.get(&exchange).unwrap().clone(),
527                                            ))
528                                        })
529                                    }
530                                    (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
531                                        init_market_stream(
532                                            STREAM_RECONNECTION_POLICY,
533                                            subs.into_iter()
534                                                .map(|sub| {
535                                                    Subscription::new(
536                                                        GateioPerpetualsBtc::default(),
537                                                        sub.instrument,
538                                                        PublicTrades,
539                                                    )
540                                                })
541                                                .collect(),
542                                        )
543                                        .await
544                                        .map(|stream| {
545                                            tokio::spawn(stream.forward_to(
546                                                txs.trades.get(&exchange).unwrap().clone(),
547                                            ))
548                                        })
549                                    }
550                                    (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
551                                        init_market_stream(
552                                            STREAM_RECONNECTION_POLICY,
553                                            subs.into_iter()
554                                                .map(|sub| {
555                                                    Subscription::new(
556                                                        GateioOptions::default(),
557                                                        sub.instrument,
558                                                        PublicTrades,
559                                                    )
560                                                })
561                                                .collect(),
562                                        )
563                                        .await
564                                        .map(|stream| {
565                                            tokio::spawn(stream.forward_to(
566                                                txs.trades.get(&exchange).unwrap().clone(),
567                                            ))
568                                        })
569                                    }
570                                    (ExchangeId::Kraken, SubKind::PublicTrades) => {
571                                        init_market_stream(
572                                            STREAM_RECONNECTION_POLICY,
573                                            subs.into_iter()
574                                                .map(|sub| {
575                                                    Subscription::new(
576                                                        Kraken,
577                                                        sub.instrument,
578                                                        PublicTrades,
579                                                    )
580                                                })
581                                                .collect(),
582                                        )
583                                        .await
584                                        .map(|stream| {
585                                            tokio::spawn(stream.forward_to(
586                                                txs.trades.get(&exchange).unwrap().clone(),
587                                            ))
588                                        })
589                                    }
590                                    (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
591                                        init_market_stream(
592                                            STREAM_RECONNECTION_POLICY,
593                                            subs.into_iter()
594                                                .map(|sub| {
595                                                    Subscription::new(
596                                                        Kraken,
597                                                        sub.instrument,
598                                                        OrderBooksL1,
599                                                    )
600                                                })
601                                                .collect(),
602                                        )
603                                        .await
604                                        .map(|stream| {
605                                            tokio::spawn(stream.forward_to(
606                                                txs.l1s.get(&exchange).unwrap().clone(),
607                                            ))
608                                        })
609                                    }
610                                    (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
611                                        STREAM_RECONNECTION_POLICY,
612                                        subs.into_iter()
613                                            .map(|sub| {
614                                                Subscription::new(Okx, sub.instrument, PublicTrades)
615                                            })
616                                            .collect(),
617                                    )
618                                    .await
619                                    .map(|stream| {
620                                        tokio::spawn(
621                                            stream.forward_to(
622                                                txs.trades.get(&exchange).unwrap().clone(),
623                                            ),
624                                        )
625                                    }),
626                                    (exchange, sub_kind) => {
627                                        Err(DataError::Unsupported { exchange, sub_kind })
628                                    }
629                                }
630                            }
631                        });
632
633                try_join_all(batch_futures)
634            });
635
636        try_join_all(futures).await?;
637
638        Ok(Self {
639            trades: channels
640                .rxs
641                .trades
642                .into_iter()
643                .map(|(exchange, rx)| (exchange, rx.into_stream()))
644                .collect(),
645            l1s: channels
646                .rxs
647                .l1s
648                .into_iter()
649                .map(|(exchange, rx)| (exchange, rx.into_stream()))
650                .collect(),
651            l2s: channels
652                .rxs
653                .l2s
654                .into_iter()
655                .map(|(exchange, rx)| (exchange, rx.into_stream()))
656                .collect(),
657            liquidations: channels
658                .rxs
659                .liquidations
660                .into_iter()
661                .map(|(exchange, rx)| (exchange, rx.into_stream()))
662                .collect(),
663        })
664    }
665
666    /// Remove an exchange [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
667    ///
668    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
669    pub fn select_trades(
670        &mut self,
671        exchange: ExchangeId,
672    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
673        self.trades.remove(&exchange)
674    }
675
676    /// Select and merge every exchange [`PublicTrade`] `Stream` using
677    /// [`SelectAll`](futures_util::stream::select_all::select_all).
678    pub fn select_all_trades(
679        &mut self,
680    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
681        futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
682    }
683
684    /// Remove an exchange [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
685    ///
686    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
687    pub fn select_l1s(
688        &mut self,
689        exchange: ExchangeId,
690    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
691        self.l1s.remove(&exchange)
692    }
693
694    /// Select and merge every exchange [`OrderBookL1`] `Stream` using
695    /// [`SelectAll`](futures_util::stream::select_all::select_all).
696    pub fn select_all_l1s(
697        &mut self,
698    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
699        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
700    }
701
702    /// Remove an exchange [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
703    ///
704    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
705    pub fn select_l2s(
706        &mut self,
707        exchange: ExchangeId,
708    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
709        self.l2s.remove(&exchange)
710    }
711
712    /// Select and merge every exchange [`OrderBookEvent`] `Stream` using
713    /// [`SelectAll`](futures_util::stream::select_all::select_all).
714    pub fn select_all_l2s(
715        &mut self,
716    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
717        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
718    }
719
720    /// Remove an exchange [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
721    ///
722    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
723    pub fn select_liquidations(
724        &mut self,
725        exchange: ExchangeId,
726    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
727        self.liquidations.remove(&exchange)
728    }
729
730    /// Select and merge every exchange [`Liquidation`] `Stream` using
731    /// [`SelectAll`](futures_util::stream::select_all::select_all).
732    pub fn select_all_liquidations(
733        &mut self,
734    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
735        futures_util::stream::select_all::select_all(
736            std::mem::take(&mut self.liquidations).into_values(),
737        )
738    }
739
740    /// Select and merge every exchange `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
741    ///
742    /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
743    /// use cases.
744    pub fn select_all<Output>(self) -> impl Stream<Item = Output>
745    where
746        InstrumentKey: Send + 'static,
747        Output: 'static,
748        MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
749        MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
750        MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
751        MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
752    {
753        let Self {
754            trades,
755            l1s,
756            l2s,
757            liquidations,
758        } = self;
759
760        let trades = trades
761            .into_values()
762            .map(|stream| stream.map(MarketStreamResult::into).boxed());
763
764        let l1s = l1s
765            .into_values()
766            .map(|stream| stream.map(MarketStreamResult::into).boxed());
767
768        let l2s = l2s
769            .into_values()
770            .map(|stream| stream.map(MarketStreamResult::into).boxed());
771
772        let liquidations = liquidations
773            .into_values()
774            .map(|stream| stream.map(MarketStreamResult::into).boxed());
775
776        let all = trades.chain(l1s).chain(l2s).chain(liquidations);
777
778        futures_util::stream::select_all::select_all(all)
779    }
780}
781
782pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
783    batches: SubBatchIter,
784) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
785where
786    SubBatchIter: IntoIterator<Item = SubIter>,
787    SubIter: IntoIterator<Item = Sub>,
788    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
789    Instrument: InstrumentData + Ord,
790{
791    batches
792        .into_iter()
793        .map(validate_subscriptions::<SubIter, Sub, Instrument>)
794        .collect()
795}
796
797pub fn validate_subscriptions<SubIter, Sub, Instrument>(
798    batch: SubIter,
799) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
800where
801    SubIter: IntoIterator<Item = Sub>,
802    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
803    Instrument: InstrumentData + Ord,
804{
805    // Validate Subscriptions
806    let mut batch = batch
807        .into_iter()
808        .map(Sub::into)
809        .map(Validator::validate)
810        .collect::<Result<Vec<_>, SocketError>>()?;
811
812    // Remove duplicate Subscriptions
813    batch.sort();
814    batch.dedup();
815
816    Ok(batch)
817}
818
819struct Channels<InstrumentKey> {
820    txs: Arc<Txs<InstrumentKey>>,
821    rxs: Rxs<InstrumentKey>,
822}
823
824impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
825    for Channels<Instrument::Key>
826where
827    Instrument: InstrumentData,
828{
829    type Error = DataError;
830
831    fn try_from(
832        value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
833    ) -> Result<Self, Self::Error> {
834        let mut txs = Txs::default();
835        let mut rxs = Rxs::default();
836
837        for sub in value.iter().flatten() {
838            match sub.kind {
839                SubKind::PublicTrades => {
840                    if let (None, None) =
841                        (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
842                    {
843                        let (tx, rx) = mpsc_unbounded();
844                        txs.trades.insert(sub.exchange, tx);
845                        rxs.trades.insert(sub.exchange, rx);
846                    }
847                }
848                SubKind::OrderBooksL1 => {
849                    if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
850                        let (tx, rx) = mpsc_unbounded();
851                        txs.l1s.insert(sub.exchange, tx);
852                        rxs.l1s.insert(sub.exchange, rx);
853                    }
854                }
855                SubKind::OrderBooksL2 => {
856                    if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
857                        let (tx, rx) = mpsc_unbounded();
858                        txs.l2s.insert(sub.exchange, tx);
859                        rxs.l2s.insert(sub.exchange, rx);
860                    }
861                }
862                SubKind::Liquidations => {
863                    if let (None, None) = (
864                        txs.liquidations.get(&sub.exchange),
865                        rxs.liquidations.get(&sub.exchange),
866                    ) {
867                        let (tx, rx) = mpsc_unbounded();
868                        txs.liquidations.insert(sub.exchange, tx);
869                        rxs.liquidations.insert(sub.exchange, rx);
870                    }
871                }
872                unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
873            }
874        }
875
876        Ok(Channels {
877            txs: Arc::new(txs),
878            rxs,
879        })
880    }
881}
882
883struct Txs<InstrumentKey> {
884    trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
885    l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
886    l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
887    liquidations:
888        FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
889}
890
891impl<InstrumentKey> Default for Txs<InstrumentKey> {
892    fn default() -> Self {
893        Self {
894            trades: Default::default(),
895            l1s: Default::default(),
896            l2s: Default::default(),
897            liquidations: Default::default(),
898        }
899    }
900}
901
902struct Rxs<InstrumentKey> {
903    trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
904    l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
905    l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
906    liquidations:
907        FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
908}
909
910impl<InstrumentKey> Default for Rxs<InstrumentKey> {
911    fn default() -> Self {
912        Self {
913            trades: Default::default(),
914            l1s: Default::default(),
915            l2s: Default::default(),
916            liquidations: Default::default(),
917        }
918    }
919}