barter_data_sniper/streams/builder/
mod.rs

1use super::Streams;
2use crate::{
3    error::DataError,
4    exchange::StreamSelector,
5    instrument::InstrumentData,
6    streams::{
7        consumer::{init_market_stream, MarketStreamResult, STREAM_RECONNECTION_POLICY},
8        reconnect::stream::ReconnectingStream,
9    },
10    subscription::{Subscription, SubscriptionKind},
11    Identifier,
12};
13use barter_instrument_copy::exchange::ExchangeId;
14use barter_integration_copy::{
15    channel::{mpsc_unbounded, UnboundedRx, UnboundedTx},
16    Validator,
17};
18use futures_util::StreamExt;
19use std::{collections::HashMap, fmt::Debug, future::Future, pin::Pin};
20
21/// Defines the [`MultiStreamBuilder`](multi::MultiStreamBuilder) API for ergonomically
22/// initialising a common [`Streams<Output>`](Streams) from multiple
23/// [`StreamBuilder<SubscriptionKind>`](StreamBuilder)s.
24pub mod multi;
25
26/// Defines the [`DynamicStreams`](dynamic::DynamicStreams) API for initialising an arbitrary number
27/// of [`MarketStream`]s from the [`ExchangeId`] and [`SubKind`] enums, rather than concrete
28/// types.
29pub mod dynamic;
30
31/// Communicative type alias representing the [`Future`] result of a [`Subscription`] [`validate`]
32/// call generated whilst executing [`StreamBuilder::subscribe`].
33pub type SubscribeFuture = Pin<Box<dyn Future<Output = Result<(), DataError>>>>;
34
35/// Builder to configure and initialise a [`Streams<MarketEvent<SubscriptionKind::Event>`](Streams) instance
36/// for a specific [`SubscriptionKind`].
37#[derive(Default)]
38pub struct StreamBuilder<InstrumentKey, Kind>
39where
40    Kind: SubscriptionKind,
41{
42    pub channels:
43        HashMap<ExchangeId, ExchangeChannel<MarketStreamResult<InstrumentKey, Kind::Event>>>,
44    pub futures: Vec<SubscribeFuture>,
45}
46
47impl<InstrumentKey, Kind> Debug for StreamBuilder<InstrumentKey, Kind>
48where
49    InstrumentKey: Debug,
50    Kind: SubscriptionKind,
51{
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.debug_struct("StreamBuilder<InstrumentKey, SubscriptionKind>")
54            .field("channels", &self.channels)
55            .field("num_futures", &self.futures.len())
56            .finish()
57    }
58}
59
60impl<InstrumentKey, Kind> StreamBuilder<InstrumentKey, Kind>
61where
62    Kind: SubscriptionKind,
63{
64    /// Construct a new [`Self`].
65    pub fn new() -> Self {
66        Self {
67            channels: HashMap::new(),
68            futures: Vec::new(),
69        }
70    }
71
72    /// Add a collection of [`Subscription`]s to the [`StreamBuilder`] that will be actioned on
73    /// a distinct [`WebSocket`](barter_integration_copy::protocol::websocket::WebSocket) connection.
74    ///
75    /// Note that [`Subscription`]s are not actioned until the
76    /// [`init()`](StreamBuilder::init()) method is invoked.
77    pub fn subscribe<SubIter, Sub, Exchange, Instrument>(mut self, subscriptions: SubIter) -> Self
78    where
79        SubIter: IntoIterator<Item = Sub>,
80        Sub: Into<Subscription<Exchange, Instrument, Kind>>,
81        Exchange: StreamSelector<Instrument, Kind> + Ord + Send + Sync + 'static,
82        Instrument: InstrumentData<Key = InstrumentKey> + Ord + 'static,
83        Instrument::Key: Clone + Send + 'static,
84        Kind: Ord + Send + Sync + 'static,
85        Kind::Event: Clone + Send,
86        Subscription<Exchange, Instrument, Kind>:
87            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
88    {
89        // Construct Vec<Subscriptions> from input SubIter
90        let subscriptions = subscriptions.into_iter().map(Sub::into).collect::<Vec<_>>();
91
92        // Acquire channel Sender to send Market<Kind::Event> from consumer loop to user
93        // '--> Add ExchangeChannel Entry if this Exchange <--> SubscriptionKind combination is new
94        let exchange_tx = self.channels.entry(Exchange::ID).or_default().tx.clone();
95
96        // Add Future that once awaited will yield the Result<(), SocketError> of subscribing
97        self.futures.push(Box::pin(async move {
98            // Validate Subscriptions
99            let mut subscriptions = subscriptions
100                .into_iter()
101                .map(Subscription::validate)
102                .collect::<Result<Vec<_>, _>>()?;
103
104            // Remove duplicate Subscriptions
105            subscriptions.sort();
106            subscriptions.dedup();
107
108            // Initialise a MarketEvent `ReconnectingStream`
109            let stream = init_market_stream(STREAM_RECONNECTION_POLICY, subscriptions).await?;
110
111            // Forward MarketEvents to ExchangeTx
112            tokio::spawn(stream.boxed().forward_to(exchange_tx));
113
114            Ok(())
115        }));
116
117        self
118    }
119
120    /// Spawn a [`MarketEvent<SubscriptionKind::Event>`](MarketEvent) consumer loop for each collection of
121    /// [`Subscription`]s added to [`StreamBuilder`] via the
122    /// [`subscribe()`](StreamBuilder::subscribe()) method.
123    ///
124    /// Each consumer loop distributes consumed [`MarketEvent<SubscriptionKind::Event>s`](MarketEvent) to
125    /// the [`Streams`] `HashMap` returned by this method.
126    pub async fn init(
127        self,
128    ) -> Result<Streams<MarketStreamResult<InstrumentKey, Kind::Event>>, DataError> {
129        // Await Stream initialisation perpetual and ensure success
130        futures::future::try_join_all(self.futures).await?;
131
132        // Construct Streams using each ExchangeChannel receiver
133        Ok(Streams {
134            streams: self
135                .channels
136                .into_iter()
137                .map(|(exchange, channel)| (exchange, channel.rx))
138                .collect(),
139        })
140    }
141}
142
143/// Convenient type that holds the [`mpsc::UnboundedSender`] and [`mpsc::UnboundedReceiver`] for a
144/// [`MarketEvent<T>`](MarketEvent) channel.
145#[derive(Debug)]
146pub struct ExchangeChannel<T> {
147    tx: UnboundedTx<T>,
148    rx: UnboundedRx<T>,
149}
150
151impl<T> ExchangeChannel<T> {
152    /// Construct a new [`Self`].
153    pub fn new() -> Self {
154        let (tx, rx) = mpsc_unbounded();
155        Self { tx, rx }
156    }
157}
158
159impl<T> Default for ExchangeChannel<T> {
160    fn default() -> Self {
161        Self::new()
162    }
163}