use super::Streams;
use crate::{
Identifier,
error::DataError,
exchange::StreamSelector,
instrument::InstrumentData,
streams::{
consumer::{MarketStreamResult, STREAM_RECONNECTION_POLICY, init_market_stream},
reconnect::stream::ReconnectingStream,
},
subscription::{Subscription, SubscriptionKind},
};
use barter_instrument::exchange::ExchangeId;
use barter_integration::{Validator, channel::Channel};
use std::{
collections::HashMap,
fmt::{Debug, Display},
future::Future,
pin::Pin,
};
pub mod multi;
pub mod dynamic;
pub type SubscribeFuture = Pin<Box<dyn Future<Output = Result<(), DataError>>>>;
#[derive(Default)]
pub struct StreamBuilder<InstrumentKey, Kind>
where
Kind: SubscriptionKind,
{
pub channels: HashMap<ExchangeId, Channel<MarketStreamResult<InstrumentKey, Kind::Event>>>,
pub futures: Vec<SubscribeFuture>,
}
impl<InstrumentKey, Kind> Debug for StreamBuilder<InstrumentKey, Kind>
where
InstrumentKey: Debug,
Kind: SubscriptionKind,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamBuilder<InstrumentKey, SubscriptionKind>")
.field("channels", &self.channels)
.field("num_futures", &self.futures.len())
.finish()
}
}
impl<InstrumentKey, Kind> StreamBuilder<InstrumentKey, Kind>
where
Kind: SubscriptionKind,
{
pub fn new() -> Self {
Self {
channels: HashMap::new(),
futures: Vec::new(),
}
}
pub fn subscribe<SubIter, Sub, Exchange, Instrument>(mut self, subscriptions: SubIter) -> Self
where
SubIter: IntoIterator<Item = Sub>,
Sub: Into<Subscription<Exchange, Instrument, Kind>>,
Exchange: StreamSelector<Instrument, Kind> + Ord + Send + Sync + 'static,
Instrument: InstrumentData<Key = InstrumentKey> + Ord + Display + 'static,
Instrument::Key: Debug + Clone + Send + 'static,
Kind: Ord + Display + Send + Sync + 'static,
Kind::Event: Clone + Send,
Subscription<Exchange, Instrument, Kind>:
Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
{
let subscriptions = subscriptions.into_iter().map(Sub::into).collect::<Vec<_>>();
let exchange_tx = self.channels.entry(Exchange::ID).or_default().tx.clone();
self.futures.push(Box::pin(async move {
let mut subscriptions = subscriptions
.into_iter()
.map(Subscription::validate)
.collect::<Result<Vec<_>, _>>()?;
subscriptions.sort();
subscriptions.dedup();
let stream = init_market_stream(STREAM_RECONNECTION_POLICY, subscriptions).await?;
tokio::spawn(stream.forward_to(exchange_tx));
Ok(())
}));
self
}
pub async fn init(
self,
) -> Result<Streams<MarketStreamResult<InstrumentKey, Kind::Event>>, DataError> {
futures::future::try_join_all(self.futures).await?;
Ok(Streams {
streams: self
.channels
.into_iter()
.map(|(exchange, channel)| (exchange, channel.rx))
.collect(),
})
}
}