Skip to main content

rustrade_data/streams/builder/dynamic/
mod.rs

1use crate::{
2    Identifier,
3    error::DataError,
4    exchange::{
5        binance::{futures::BinanceFuturesUsd, market::BinanceMarket, spot::BinanceSpot},
6        bitfinex::{Bitfinex, market::BitfinexMarket},
7        bitmex::{Bitmex, market::BitmexMarket},
8        bybit::{futures::BybitPerpetualsUsd, market::BybitMarket, spot::BybitSpot},
9        coinbase::{Coinbase, market::CoinbaseMarket},
10        gateio::{
11            future::{GateioFuturesBtc, GateioFuturesUsd},
12            market::GateioMarket,
13            option::GateioOptions,
14            perpetual::{GateioPerpetualsBtc, GateioPerpetualsUsd},
15            spot::GateioSpot,
16        },
17        kraken::{Kraken, market::KrakenMarket},
18        okx::{Okx, market::OkxMarket},
19    },
20    instrument::InstrumentData,
21    streams::{
22        consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
23        reconnect::stream::ReconnectingStream,
24    },
25    subscription::{
26        SubKind, Subscription,
27        book::{OrderBookEvent, OrderBookL1, OrderBooksL1, OrderBooksL2},
28        liquidation::{Liquidation, Liquidations},
29        trade::{PublicTrade, PublicTrades},
30    },
31};
32use fnv::FnvHashMap;
33use futures::{Stream, stream::SelectAll};
34use futures_util::{StreamExt, future::try_join_all};
35use itertools::Itertools;
36use rustrade_instrument::exchange::ExchangeId;
37use rustrade_integration::{
38    Validator,
39    channel::{UnboundedRx, UnboundedTx, mpsc_unbounded},
40    error::SocketError,
41};
42use std::{
43    fmt::{Debug, Display},
44    sync::Arc,
45};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47use vecmap::VecMap;
48
49pub mod indexed;
50
51#[derive(Debug)]
52pub struct DynamicStreams<InstrumentKey> {
53    pub trades:
54        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>>,
55    pub l1s:
56        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
57    pub l2s: VecMap<
58        ExchangeId,
59        UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>,
60    >,
61    pub liquidations:
62        VecMap<ExchangeId, UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>>,
63}
64
65impl<InstrumentKey> DynamicStreams<InstrumentKey> {
66    /// Initialise a set of `Streams` by providing one or more [`Subscription`] batches.
67    ///
68    /// Each batch (ie/ `impl Iterator<Item = Subscription>`) will initialise at-least-one
69    /// WebSocket `Stream` under the hood. If the batch contains more-than-one [`ExchangeId`] and/or
70    /// [`SubKind`], it will be further split under the hood for compile-time reasons.
71    ///
72    /// ## Examples
73    /// Please see rustrade-data-rs/examples/dynamic_multi_stream_multi_exchange.rs for a
74    /// comprehensive example of how to use this market data stream initialiser.
75    #[allow(clippy::unwrap_used)] // Invariant: Channels::try_from creates entries for all exchanges in batches; lookups iterate over the same exchanges
76    pub async fn init<SubBatchIter, SubIter, Sub, Instrument>(
77        subscription_batches: SubBatchIter,
78    ) -> Result<Self, DataError>
79    where
80        SubBatchIter: IntoIterator<Item = SubIter>,
81        SubIter: IntoIterator<Item = Sub>,
82        Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
83        Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
84        InstrumentKey: Debug + Clone + Send + 'static,
85        Subscription<BinanceSpot, Instrument, PublicTrades>: Identifier<BinanceMarket>,
86        Subscription<BinanceSpot, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
87        Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
88        Subscription<BinanceFuturesUsd, Instrument, PublicTrades>: Identifier<BinanceMarket>,
89        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL1>: Identifier<BinanceMarket>,
90        Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
91        Subscription<BinanceFuturesUsd, Instrument, Liquidations>: Identifier<BinanceMarket>,
92        Subscription<Bitfinex, Instrument, PublicTrades>: Identifier<BitfinexMarket>,
93        Subscription<Bitmex, Instrument, PublicTrades>: Identifier<BitmexMarket>,
94        Subscription<BybitSpot, Instrument, PublicTrades>: Identifier<BybitMarket>,
95        Subscription<BybitSpot, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
96        Subscription<BybitSpot, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
97        Subscription<BybitPerpetualsUsd, Instrument, PublicTrades>: Identifier<BybitMarket>,
98        Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL1>: Identifier<BybitMarket>,
99        Subscription<BybitPerpetualsUsd, Instrument, OrderBooksL2>: Identifier<BybitMarket>,
100        Subscription<Coinbase, Instrument, PublicTrades>: Identifier<CoinbaseMarket>,
101        Subscription<GateioSpot, Instrument, PublicTrades>: Identifier<GateioMarket>,
102        Subscription<GateioFuturesUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
103        Subscription<GateioFuturesBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
104        Subscription<GateioPerpetualsUsd, Instrument, PublicTrades>: Identifier<GateioMarket>,
105        Subscription<GateioPerpetualsBtc, Instrument, PublicTrades>: Identifier<GateioMarket>,
106        Subscription<GateioOptions, Instrument, PublicTrades>: Identifier<GateioMarket>,
107        Subscription<Kraken, Instrument, PublicTrades>: Identifier<KrakenMarket>,
108        Subscription<Kraken, Instrument, OrderBooksL1>: Identifier<KrakenMarket>,
109        Subscription<Okx, Instrument, PublicTrades>: Identifier<OkxMarket>,
110    {
111        // Validate & dedup Subscription batches
112        let batches = validate_batches(subscription_batches)?;
113
114        // Generate required Channels from Subscription batches
115        let channels = Channels::try_from(&batches)?;
116
117        let futures =
118            batches.into_iter().map(|mut batch| {
119                batch.sort_unstable_by_key(|sub| (sub.exchange, sub.kind));
120                let by_exchange_by_sub_kind =
121                    batch.into_iter().chunk_by(|sub| (sub.exchange, sub.kind));
122
123                let batch_futures =
124                    by_exchange_by_sub_kind
125                        .into_iter()
126                        .map(|((exchange, sub_kind), subs)| {
127                            let subs = subs.into_iter().collect::<Vec<_>>();
128                            let txs = Arc::clone(&channels.txs);
129                            async move {
130                                match (exchange, sub_kind) {
131                                    (ExchangeId::BinanceSpot, SubKind::PublicTrades) => {
132                                        init_market_stream(
133                                            STREAM_RECONNECTION_POLICY,
134                                            subs.into_iter()
135                                                .map(|sub| {
136                                                    Subscription::new(
137                                                        BinanceSpot::default(),
138                                                        sub.instrument,
139                                                        PublicTrades,
140                                                    )
141                                                })
142                                                .collect(),
143                                        )
144                                        .await
145                                        .map(|stream| {
146                                            tokio::spawn(stream.forward_to(
147                                                txs.trades.get(&exchange).unwrap().clone(),
148                                            ))
149                                        })
150                                    }
151                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL1) => {
152                                        init_market_stream(
153                                            STREAM_RECONNECTION_POLICY,
154                                            subs.into_iter()
155                                                .map(|sub| {
156                                                    Subscription::new(
157                                                        BinanceSpot::default(),
158                                                        sub.instrument,
159                                                        OrderBooksL1,
160                                                    )
161                                                })
162                                                .collect(),
163                                        )
164                                        .await
165                                        .map(|stream| {
166                                            tokio::spawn(stream.forward_to(
167                                                txs.l1s.get(&exchange).unwrap().clone(),
168                                            ))
169                                        })
170                                    }
171                                    (ExchangeId::BinanceSpot, SubKind::OrderBooksL2) => {
172                                        init_market_stream(
173                                            STREAM_RECONNECTION_POLICY,
174                                            subs.into_iter()
175                                                .map(|sub| {
176                                                    Subscription::new(
177                                                        BinanceSpot::default(),
178                                                        sub.instrument,
179                                                        OrderBooksL2,
180                                                    )
181                                                })
182                                                .collect(),
183                                        )
184                                        .await
185                                        .map(|stream| {
186                                            tokio::spawn(stream.forward_to(
187                                                txs.l2s.get(&exchange).unwrap().clone(),
188                                            ))
189                                        })
190                                    }
191                                    (ExchangeId::BinanceFuturesUsd, SubKind::PublicTrades) => {
192                                        init_market_stream(
193                                            STREAM_RECONNECTION_POLICY,
194                                            subs.into_iter()
195                                                .map(|sub| {
196                                                    Subscription::new(
197                                                        BinanceFuturesUsd::default(),
198                                                        sub.instrument,
199                                                        PublicTrades,
200                                                    )
201                                                })
202                                                .collect(),
203                                        )
204                                        .await
205                                        .map(|stream| {
206                                            tokio::spawn(stream.forward_to(
207                                                txs.trades.get(&exchange).unwrap().clone(),
208                                            ))
209                                        })
210                                    }
211                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL1) => {
212                                        init_market_stream(
213                                            STREAM_RECONNECTION_POLICY,
214                                            subs.into_iter()
215                                                .map(|sub| {
216                                                    Subscription::<_, Instrument, _>::new(
217                                                        BinanceFuturesUsd::default(),
218                                                        sub.instrument,
219                                                        OrderBooksL1,
220                                                    )
221                                                })
222                                                .collect(),
223                                        )
224                                        .await
225                                        .map(|stream| {
226                                            tokio::spawn(stream.forward_to(
227                                                txs.l1s.get(&exchange).unwrap().clone(),
228                                            ))
229                                        })
230                                    }
231                                    (ExchangeId::BinanceFuturesUsd, SubKind::OrderBooksL2) => {
232                                        init_market_stream(
233                                            STREAM_RECONNECTION_POLICY,
234                                            subs.into_iter()
235                                                .map(|sub| {
236                                                    Subscription::<_, Instrument, _>::new(
237                                                        BinanceFuturesUsd::default(),
238                                                        sub.instrument,
239                                                        OrderBooksL2,
240                                                    )
241                                                })
242                                                .collect(),
243                                        )
244                                        .await
245                                        .map(|stream| {
246                                            tokio::spawn(stream.forward_to(
247                                                txs.l2s.get(&exchange).unwrap().clone(),
248                                            ))
249                                        })
250                                    }
251                                    (ExchangeId::BinanceFuturesUsd, SubKind::Liquidations) => {
252                                        init_market_stream(
253                                            STREAM_RECONNECTION_POLICY,
254                                            subs.into_iter()
255                                                .map(|sub| {
256                                                    Subscription::<_, Instrument, _>::new(
257                                                        BinanceFuturesUsd::default(),
258                                                        sub.instrument,
259                                                        Liquidations,
260                                                    )
261                                                })
262                                                .collect(),
263                                        )
264                                        .await
265                                        .map(|stream| {
266                                            tokio::spawn(stream.forward_to(
267                                                txs.liquidations.get(&exchange).unwrap().clone(),
268                                            ))
269                                        })
270                                    }
271                                    (ExchangeId::Bitfinex, SubKind::PublicTrades) => {
272                                        init_market_stream(
273                                            STREAM_RECONNECTION_POLICY,
274                                            subs.into_iter()
275                                                .map(|sub| {
276                                                    Subscription::new(
277                                                        Bitfinex,
278                                                        sub.instrument,
279                                                        PublicTrades,
280                                                    )
281                                                })
282                                                .collect(),
283                                        )
284                                        .await
285                                        .map(|stream| {
286                                            tokio::spawn(stream.forward_to(
287                                                txs.trades.get(&exchange).unwrap().clone(),
288                                            ))
289                                        })
290                                    }
291                                    (ExchangeId::Bitmex, SubKind::PublicTrades) => {
292                                        init_market_stream(
293                                            STREAM_RECONNECTION_POLICY,
294                                            subs.into_iter()
295                                                .map(|sub| {
296                                                    Subscription::new(
297                                                        Bitmex,
298                                                        sub.instrument,
299                                                        PublicTrades,
300                                                    )
301                                                })
302                                                .collect(),
303                                        )
304                                        .await
305                                        .map(|stream| {
306                                            tokio::spawn(stream.forward_to(
307                                                txs.trades.get(&exchange).unwrap().clone(),
308                                            ))
309                                        })
310                                    }
311                                    (ExchangeId::BybitSpot, SubKind::PublicTrades) => {
312                                        init_market_stream(
313                                            STREAM_RECONNECTION_POLICY,
314                                            subs.into_iter()
315                                                .map(|sub| {
316                                                    Subscription::new(
317                                                        BybitSpot::default(),
318                                                        sub.instrument,
319                                                        PublicTrades,
320                                                    )
321                                                })
322                                                .collect(),
323                                        )
324                                        .await
325                                        .map(|stream| {
326                                            tokio::spawn(stream.forward_to(
327                                                txs.trades.get(&exchange).unwrap().clone(),
328                                            ))
329                                        })
330                                    }
331                                    (ExchangeId::BybitSpot, SubKind::OrderBooksL1) => {
332                                        init_market_stream(
333                                            STREAM_RECONNECTION_POLICY,
334                                            subs.into_iter()
335                                                .map(|sub| {
336                                                    Subscription::new(
337                                                        BybitSpot::default(),
338                                                        sub.instrument,
339                                                        OrderBooksL1,
340                                                    )
341                                                })
342                                                .collect(),
343                                        )
344                                        .await
345                                        .map(|stream| {
346                                            tokio::spawn(stream.forward_to(
347                                                txs.l1s.get(&exchange).unwrap().clone(),
348                                            ))
349                                        })
350                                    }
351                                    (ExchangeId::BybitSpot, SubKind::OrderBooksL2) => {
352                                        init_market_stream(
353                                            STREAM_RECONNECTION_POLICY,
354                                            subs.into_iter()
355                                                .map(|sub| {
356                                                    Subscription::new(
357                                                        BybitSpot::default(),
358                                                        sub.instrument,
359                                                        OrderBooksL2,
360                                                    )
361                                                })
362                                                .collect(),
363                                        )
364                                        .await
365                                        .map(|stream| {
366                                            tokio::spawn(stream.forward_to(
367                                                txs.l2s.get(&exchange).unwrap().clone(),
368                                            ))
369                                        })
370                                    }
371                                    (ExchangeId::BybitPerpetualsUsd, SubKind::PublicTrades) => {
372                                        init_market_stream(
373                                            STREAM_RECONNECTION_POLICY,
374                                            subs.into_iter()
375                                                .map(|sub| {
376                                                    Subscription::new(
377                                                        BybitPerpetualsUsd::default(),
378                                                        sub.instrument,
379                                                        PublicTrades,
380                                                    )
381                                                })
382                                                .collect(),
383                                        )
384                                        .await
385                                        .map(|stream| {
386                                            tokio::spawn(stream.forward_to(
387                                                txs.trades.get(&exchange).unwrap().clone(),
388                                            ))
389                                        })
390                                    }
391                                    (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL1) => {
392                                        init_market_stream(
393                                            STREAM_RECONNECTION_POLICY,
394                                            subs.into_iter()
395                                                .map(|sub| {
396                                                    Subscription::new(
397                                                        BybitSpot::default(),
398                                                        sub.instrument,
399                                                        OrderBooksL1,
400                                                    )
401                                                })
402                                                .collect(),
403                                        )
404                                        .await
405                                        .map(|stream| {
406                                            tokio::spawn(stream.forward_to(
407                                                txs.l1s.get(&exchange).unwrap().clone(),
408                                            ))
409                                        })
410                                    }
411                                    (ExchangeId::BybitPerpetualsUsd, SubKind::OrderBooksL2) => {
412                                        init_market_stream(
413                                            STREAM_RECONNECTION_POLICY,
414                                            subs.into_iter()
415                                                .map(|sub| {
416                                                    Subscription::new(
417                                                        BybitSpot::default(),
418                                                        sub.instrument,
419                                                        OrderBooksL2,
420                                                    )
421                                                })
422                                                .collect(),
423                                        )
424                                        .await
425                                        .map(|stream| {
426                                            tokio::spawn(stream.forward_to(
427                                                txs.l2s.get(&exchange).unwrap().clone(),
428                                            ))
429                                        })
430                                    }
431                                    (ExchangeId::Coinbase, SubKind::PublicTrades) => {
432                                        init_market_stream(
433                                            STREAM_RECONNECTION_POLICY,
434                                            subs.into_iter()
435                                                .map(|sub| {
436                                                    Subscription::new(
437                                                        Coinbase,
438                                                        sub.instrument,
439                                                        PublicTrades,
440                                                    )
441                                                })
442                                                .collect(),
443                                        )
444                                        .await
445                                        .map(|stream| {
446                                            tokio::spawn(stream.forward_to(
447                                                txs.trades.get(&exchange).unwrap().clone(),
448                                            ))
449                                        })
450                                    }
451                                    (ExchangeId::GateioSpot, SubKind::PublicTrades) => {
452                                        init_market_stream(
453                                            STREAM_RECONNECTION_POLICY,
454                                            subs.into_iter()
455                                                .map(|sub| {
456                                                    Subscription::new(
457                                                        GateioSpot::default(),
458                                                        sub.instrument,
459                                                        PublicTrades,
460                                                    )
461                                                })
462                                                .collect(),
463                                        )
464                                        .await
465                                        .map(|stream| {
466                                            tokio::spawn(stream.forward_to(
467                                                txs.trades.get(&exchange).unwrap().clone(),
468                                            ))
469                                        })
470                                    }
471                                    (ExchangeId::GateioFuturesUsd, SubKind::PublicTrades) => {
472                                        init_market_stream(
473                                            STREAM_RECONNECTION_POLICY,
474                                            subs.into_iter()
475                                                .map(|sub| {
476                                                    Subscription::new(
477                                                        GateioFuturesUsd::default(),
478                                                        sub.instrument,
479                                                        PublicTrades,
480                                                    )
481                                                })
482                                                .collect(),
483                                        )
484                                        .await
485                                        .map(|stream| {
486                                            tokio::spawn(stream.forward_to(
487                                                txs.trades.get(&exchange).unwrap().clone(),
488                                            ))
489                                        })
490                                    }
491                                    (ExchangeId::GateioFuturesBtc, SubKind::PublicTrades) => {
492                                        init_market_stream(
493                                            STREAM_RECONNECTION_POLICY,
494                                            subs.into_iter()
495                                                .map(|sub| {
496                                                    Subscription::new(
497                                                        GateioFuturesBtc::default(),
498                                                        sub.instrument,
499                                                        PublicTrades,
500                                                    )
501                                                })
502                                                .collect(),
503                                        )
504                                        .await
505                                        .map(|stream| {
506                                            tokio::spawn(stream.forward_to(
507                                                txs.trades.get(&exchange).unwrap().clone(),
508                                            ))
509                                        })
510                                    }
511                                    (ExchangeId::GateioPerpetualsUsd, SubKind::PublicTrades) => {
512                                        init_market_stream(
513                                            STREAM_RECONNECTION_POLICY,
514                                            subs.into_iter()
515                                                .map(|sub| {
516                                                    Subscription::new(
517                                                        GateioPerpetualsUsd::default(),
518                                                        sub.instrument,
519                                                        PublicTrades,
520                                                    )
521                                                })
522                                                .collect(),
523                                        )
524                                        .await
525                                        .map(|stream| {
526                                            tokio::spawn(stream.forward_to(
527                                                txs.trades.get(&exchange).unwrap().clone(),
528                                            ))
529                                        })
530                                    }
531                                    (ExchangeId::GateioPerpetualsBtc, SubKind::PublicTrades) => {
532                                        init_market_stream(
533                                            STREAM_RECONNECTION_POLICY,
534                                            subs.into_iter()
535                                                .map(|sub| {
536                                                    Subscription::new(
537                                                        GateioPerpetualsBtc::default(),
538                                                        sub.instrument,
539                                                        PublicTrades,
540                                                    )
541                                                })
542                                                .collect(),
543                                        )
544                                        .await
545                                        .map(|stream| {
546                                            tokio::spawn(stream.forward_to(
547                                                txs.trades.get(&exchange).unwrap().clone(),
548                                            ))
549                                        })
550                                    }
551                                    (ExchangeId::GateioOptions, SubKind::PublicTrades) => {
552                                        init_market_stream(
553                                            STREAM_RECONNECTION_POLICY,
554                                            subs.into_iter()
555                                                .map(|sub| {
556                                                    Subscription::new(
557                                                        GateioOptions::default(),
558                                                        sub.instrument,
559                                                        PublicTrades,
560                                                    )
561                                                })
562                                                .collect(),
563                                        )
564                                        .await
565                                        .map(|stream| {
566                                            tokio::spawn(stream.forward_to(
567                                                txs.trades.get(&exchange).unwrap().clone(),
568                                            ))
569                                        })
570                                    }
571                                    (ExchangeId::Kraken, SubKind::PublicTrades) => {
572                                        init_market_stream(
573                                            STREAM_RECONNECTION_POLICY,
574                                            subs.into_iter()
575                                                .map(|sub| {
576                                                    Subscription::new(
577                                                        Kraken,
578                                                        sub.instrument,
579                                                        PublicTrades,
580                                                    )
581                                                })
582                                                .collect(),
583                                        )
584                                        .await
585                                        .map(|stream| {
586                                            tokio::spawn(stream.forward_to(
587                                                txs.trades.get(&exchange).unwrap().clone(),
588                                            ))
589                                        })
590                                    }
591                                    (ExchangeId::Kraken, SubKind::OrderBooksL1) => {
592                                        init_market_stream(
593                                            STREAM_RECONNECTION_POLICY,
594                                            subs.into_iter()
595                                                .map(|sub| {
596                                                    Subscription::new(
597                                                        Kraken,
598                                                        sub.instrument,
599                                                        OrderBooksL1,
600                                                    )
601                                                })
602                                                .collect(),
603                                        )
604                                        .await
605                                        .map(|stream| {
606                                            tokio::spawn(stream.forward_to(
607                                                txs.l1s.get(&exchange).unwrap().clone(),
608                                            ))
609                                        })
610                                    }
611                                    (ExchangeId::Okx, SubKind::PublicTrades) => init_market_stream(
612                                        STREAM_RECONNECTION_POLICY,
613                                        subs.into_iter()
614                                            .map(|sub| {
615                                                Subscription::new(Okx, sub.instrument, PublicTrades)
616                                            })
617                                            .collect(),
618                                    )
619                                    .await
620                                    .map(|stream| {
621                                        tokio::spawn(
622                                            stream.forward_to(
623                                                txs.trades.get(&exchange).unwrap().clone(),
624                                            ),
625                                        )
626                                    }),
627                                    (exchange, sub_kind) => {
628                                        Err(DataError::Unsupported { exchange, sub_kind })
629                                    }
630                                }
631                            }
632                        });
633
634                try_join_all(batch_futures)
635            });
636
637        try_join_all(futures).await?;
638
639        Ok(Self {
640            trades: channels
641                .rxs
642                .trades
643                .into_iter()
644                .map(|(exchange, rx)| (exchange, rx.into_stream()))
645                .collect(),
646            l1s: channels
647                .rxs
648                .l1s
649                .into_iter()
650                .map(|(exchange, rx)| (exchange, rx.into_stream()))
651                .collect(),
652            l2s: channels
653                .rxs
654                .l2s
655                .into_iter()
656                .map(|(exchange, rx)| (exchange, rx.into_stream()))
657                .collect(),
658            liquidations: channels
659                .rxs
660                .liquidations
661                .into_iter()
662                .map(|(exchange, rx)| (exchange, rx.into_stream()))
663                .collect(),
664        })
665    }
666
667    /// Remove an exchange [`PublicTrade`] `Stream` from the [`DynamicStreams`] collection.
668    ///
669    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
670    pub fn select_trades(
671        &mut self,
672        exchange: ExchangeId,
673    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
674        self.trades.remove(&exchange)
675    }
676
677    /// Select and merge every exchange [`PublicTrade`] `Stream` using
678    /// [`SelectAll`](futures_util::stream::select_all::select_all).
679    pub fn select_all_trades(
680        &mut self,
681    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, PublicTrade>>> {
682        futures_util::stream::select_all::select_all(std::mem::take(&mut self.trades).into_values())
683    }
684
685    /// Remove an exchange [`OrderBookL1`] `Stream` from the [`DynamicStreams`] collection.
686    ///
687    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
688    pub fn select_l1s(
689        &mut self,
690        exchange: ExchangeId,
691    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
692        self.l1s.remove(&exchange)
693    }
694
695    /// Select and merge every exchange [`OrderBookL1`] `Stream` using
696    /// [`SelectAll`](futures_util::stream::select_all::select_all).
697    pub fn select_all_l1s(
698        &mut self,
699    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookL1>>> {
700        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l1s).into_values())
701    }
702
703    /// Remove an exchange [`OrderBookEvent`] `Stream` from the [`DynamicStreams`] collection.
704    ///
705    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
706    pub fn select_l2s(
707        &mut self,
708        exchange: ExchangeId,
709    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
710        self.l2s.remove(&exchange)
711    }
712
713    /// Select and merge every exchange [`OrderBookEvent`] `Stream` using
714    /// [`SelectAll`](futures_util::stream::select_all::select_all).
715    pub fn select_all_l2s(
716        &mut self,
717    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, OrderBookEvent>>> {
718        futures_util::stream::select_all::select_all(std::mem::take(&mut self.l2s).into_values())
719    }
720
721    /// Remove an exchange [`Liquidation`] `Stream` from the [`DynamicStreams`] collection.
722    ///
723    /// Note that calling this method will permanently remove this `Stream` from [`Self`].
724    pub fn select_liquidations(
725        &mut self,
726        exchange: ExchangeId,
727    ) -> Option<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
728        self.liquidations.remove(&exchange)
729    }
730
731    /// Select and merge every exchange [`Liquidation`] `Stream` using
732    /// [`SelectAll`](futures_util::stream::select_all::select_all).
733    pub fn select_all_liquidations(
734        &mut self,
735    ) -> SelectAll<UnboundedReceiverStream<MarketStreamResult<InstrumentKey, Liquidation>>> {
736        futures_util::stream::select_all::select_all(
737            std::mem::take(&mut self.liquidations).into_values(),
738        )
739    }
740
741    /// Select and merge every exchange `Stream` for every data type using [`select_all`](futures_util::stream::select_all::select_all)
742    ///
743    /// Note that using [`MarketStreamResult<Instrument, DataKind>`] as the `Output` is suitable for most
744    /// use cases.
745    pub fn select_all<Output>(self) -> impl Stream<Item = Output>
746    where
747        InstrumentKey: Send + 'static,
748        Output: 'static,
749        MarketStreamResult<InstrumentKey, PublicTrade>: Into<Output>,
750        MarketStreamResult<InstrumentKey, OrderBookL1>: Into<Output>,
751        MarketStreamResult<InstrumentKey, OrderBookEvent>: Into<Output>,
752        MarketStreamResult<InstrumentKey, Liquidation>: Into<Output>,
753    {
754        let Self {
755            trades,
756            l1s,
757            l2s,
758            liquidations,
759        } = self;
760
761        let trades = trades
762            .into_values()
763            .map(|stream| stream.map(MarketStreamResult::into).boxed());
764
765        let l1s = l1s
766            .into_values()
767            .map(|stream| stream.map(MarketStreamResult::into).boxed());
768
769        let l2s = l2s
770            .into_values()
771            .map(|stream| stream.map(MarketStreamResult::into).boxed());
772
773        let liquidations = liquidations
774            .into_values()
775            .map(|stream| stream.map(MarketStreamResult::into).boxed());
776
777        let all = trades.chain(l1s).chain(l2s).chain(liquidations);
778
779        futures_util::stream::select_all::select_all(all)
780    }
781}
782
783pub fn validate_batches<SubBatchIter, SubIter, Sub, Instrument>(
784    batches: SubBatchIter,
785) -> Result<Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>, DataError>
786where
787    SubBatchIter: IntoIterator<Item = SubIter>,
788    SubIter: IntoIterator<Item = Sub>,
789    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
790    Instrument: InstrumentData + Ord,
791{
792    batches
793        .into_iter()
794        .map(validate_subscriptions::<SubIter, Sub, Instrument>)
795        .collect()
796}
797
798pub fn validate_subscriptions<SubIter, Sub, Instrument>(
799    batch: SubIter,
800) -> Result<Vec<Subscription<ExchangeId, Instrument, SubKind>>, DataError>
801where
802    SubIter: IntoIterator<Item = Sub>,
803    Sub: Into<Subscription<ExchangeId, Instrument, SubKind>>,
804    Instrument: InstrumentData + Ord,
805{
806    // Validate Subscriptions
807    let mut batch = batch
808        .into_iter()
809        .map(Sub::into)
810        .map(Validator::validate)
811        .collect::<Result<Vec<_>, SocketError>>()?;
812
813    // Remove duplicate Subscriptions
814    batch.sort();
815    batch.dedup();
816
817    Ok(batch)
818}
819
820struct Channels<InstrumentKey> {
821    txs: Arc<Txs<InstrumentKey>>,
822    rxs: Rxs<InstrumentKey>,
823}
824
825impl<'a, Instrument> TryFrom<&'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>>
826    for Channels<Instrument::Key>
827where
828    Instrument: InstrumentData,
829{
830    type Error = DataError;
831
832    fn try_from(
833        value: &'a Vec<Vec<Subscription<ExchangeId, Instrument, SubKind>>>,
834    ) -> Result<Self, Self::Error> {
835        let mut txs = Txs::default();
836        let mut rxs = Rxs::default();
837
838        for sub in value.iter().flatten() {
839            match sub.kind {
840                SubKind::PublicTrades => {
841                    if let (None, None) =
842                        (txs.trades.get(&sub.exchange), rxs.trades.get(&sub.exchange))
843                    {
844                        let (tx, rx) = mpsc_unbounded();
845                        txs.trades.insert(sub.exchange, tx);
846                        rxs.trades.insert(sub.exchange, rx);
847                    }
848                }
849                SubKind::OrderBooksL1 => {
850                    if let (None, None) = (txs.l1s.get(&sub.exchange), rxs.l1s.get(&sub.exchange)) {
851                        let (tx, rx) = mpsc_unbounded();
852                        txs.l1s.insert(sub.exchange, tx);
853                        rxs.l1s.insert(sub.exchange, rx);
854                    }
855                }
856                SubKind::OrderBooksL2 => {
857                    if let (None, None) = (txs.l2s.get(&sub.exchange), rxs.l2s.get(&sub.exchange)) {
858                        let (tx, rx) = mpsc_unbounded();
859                        txs.l2s.insert(sub.exchange, tx);
860                        rxs.l2s.insert(sub.exchange, rx);
861                    }
862                }
863                SubKind::Liquidations => {
864                    if let (None, None) = (
865                        txs.liquidations.get(&sub.exchange),
866                        rxs.liquidations.get(&sub.exchange),
867                    ) {
868                        let (tx, rx) = mpsc_unbounded();
869                        txs.liquidations.insert(sub.exchange, tx);
870                        rxs.liquidations.insert(sub.exchange, rx);
871                    }
872                }
873                unsupported => return Err(DataError::UnsupportedSubKind(unsupported)),
874            }
875        }
876
877        Ok(Channels {
878            txs: Arc::new(txs),
879            rxs,
880        })
881    }
882}
883
884struct Txs<InstrumentKey> {
885    trades: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
886    l1s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
887    l2s: FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
888    liquidations:
889        FnvHashMap<ExchangeId, UnboundedTx<MarketStreamResult<InstrumentKey, Liquidation>>>,
890}
891
892impl<InstrumentKey> Default for Txs<InstrumentKey> {
893    fn default() -> Self {
894        Self {
895            trades: Default::default(),
896            l1s: Default::default(),
897            l2s: Default::default(),
898            liquidations: Default::default(),
899        }
900    }
901}
902
903struct Rxs<InstrumentKey> {
904    trades: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, PublicTrade>>>,
905    l1s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookL1>>>,
906    l2s: FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, OrderBookEvent>>>,
907    liquidations:
908        FnvHashMap<ExchangeId, UnboundedRx<MarketStreamResult<InstrumentKey, Liquidation>>>,
909}
910
911impl<InstrumentKey> Default for Rxs<InstrumentKey> {
912    fn default() -> Self {
913        Self {
914            trades: Default::default(),
915            l1s: Default::default(),
916            l2s: Default::default(),
917            liquidations: Default::default(),
918        }
919    }
920}