use self::builder::{StreamBuilder, multi::MultiStreamBuilder};
use crate::subscription::SubscriptionKind;
use barter_instrument::exchange::ExchangeId;
use barter_integration::channel::UnboundedRx;
use fnv::FnvHashMap;
use futures::Stream;
pub mod builder;
pub mod consumer;
pub mod reconnect;
#[derive(Debug)]
pub struct Streams<T> {
pub streams: FnvHashMap<ExchangeId, UnboundedRx<T>>,
}
impl<T> Streams<T> {
pub fn builder<InstrumentKey, Kind>() -> StreamBuilder<InstrumentKey, Kind>
where
Kind: SubscriptionKind,
{
StreamBuilder::<InstrumentKey, Kind>::new()
}
pub fn builder_multi() -> MultiStreamBuilder<T> {
MultiStreamBuilder::<T>::new()
}
pub fn select(&mut self, exchange: ExchangeId) -> Option<impl Stream<Item = T> + '_> {
self.streams.remove(&exchange).map(UnboundedRx::into_stream)
}
pub fn select_all(self) -> impl Stream<Item = T> {
let all = self.streams.into_values().map(UnboundedRx::into_stream);
futures_util::stream::select_all::select_all(all)
}
}