barter_data/streams/
mod.rs1use self::builder::{StreamBuilder, multi::MultiStreamBuilder};
2use crate::subscription::SubscriptionKind;
3use barter_instrument::exchange::ExchangeId;
4use barter_integration::channel::UnboundedRx;
5use fnv::FnvHashMap;
6use futures::Stream;
7
8pub mod builder;
11
12pub mod consumer;
15
16pub mod reconnect;
19
20#[derive(Debug)]
22pub struct Streams<T> {
23 pub streams: FnvHashMap<ExchangeId, UnboundedRx<T>>,
24}
25
26impl<T> Streams<T> {
27 pub fn builder<InstrumentKey, Kind>() -> StreamBuilder<InstrumentKey, Kind>
29 where
30 Kind: SubscriptionKind,
31 {
32 StreamBuilder::<InstrumentKey, Kind>::new()
33 }
34
35 pub fn builder_multi() -> MultiStreamBuilder<T> {
38 MultiStreamBuilder::<T>::new()
39 }
40
41 pub fn select(&mut self, exchange: ExchangeId) -> Option<impl Stream<Item = T> + '_> {
43 self.streams.remove(&exchange).map(UnboundedRx::into_stream)
44 }
45
46 pub fn select_all(self) -> impl Stream<Item = T> {
49 let all = self.streams.into_values().map(UnboundedRx::into_stream);
50 futures_util::stream::select_all::select_all(all)
51 }
52}