barter_data/streams/
mod.rs

1use self::builder::{StreamBuilder, multi::MultiStreamBuilder};
2use crate::subscription::SubscriptionKind;
3use barter_instrument::exchange::ExchangeId;
4use barter_integration::channel::UnboundedRx;
5use fnv::FnvHashMap;
6use futures::Stream;
7
8/// Defines the [`StreamBuilder`] and [`MultiStreamBuilder`] APIs for ergonomically initialising
9/// [`MarketStream`](super::MarketStream) [`Streams`].
10pub mod builder;
11
12/// Central consumer loop functionality used by the [`StreamBuilder`] to
13/// drive a re-connecting [`MarketStream`](super::MarketStream).
14pub mod consumer;
15
16/// Defines a [`ReconnectingStream`](reconnect::stream::ReconnectingStream) and associated logic
17/// for generating an auto reconnecting `Stream`.
18pub mod reconnect;
19
20/// Ergonomic collection of exchange market event receivers.
21#[derive(Debug)]
22pub struct Streams<T> {
23    pub streams: FnvHashMap<ExchangeId, UnboundedRx<T>>,
24}
25
26impl<T> Streams<T> {
27    /// Construct a [`StreamBuilder`] for configuring new market event [`Streams`].
28    pub fn builder<InstrumentKey, Kind>() -> StreamBuilder<InstrumentKey, Kind>
29    where
30        Kind: SubscriptionKind,
31    {
32        StreamBuilder::<InstrumentKey, Kind>::new()
33    }
34
35    /// Construct a [`MultiStreamBuilder`] for configuring new
36    /// [`MarketEvent<T>`](crate::event::MarketEvent) [`Streams`].
37    pub fn builder_multi() -> MultiStreamBuilder<T> {
38        MultiStreamBuilder::<T>::new()
39    }
40
41    /// Remove an exchange market event [`Stream`] from the [`Streams`] `HashMap`.
42    pub fn select(&mut self, exchange: ExchangeId) -> Option<impl Stream<Item = T> + '_> {
43        self.streams.remove(&exchange).map(UnboundedRx::into_stream)
44    }
45
46    /// Select and merge every exchange `Stream` using
47    /// [`select_all`](futures_util::stream::select_all::select_all).
48    pub fn select_all(self) -> impl Stream<Item = T> {
49        let all = self.streams.into_values().map(UnboundedRx::into_stream);
50        futures_util::stream::select_all::select_all(all)
51    }
52}