barter_data_sniper/streams/builder/
mod.rs1use super::Streams;
2use crate::{
3 error::DataError,
4 exchange::StreamSelector,
5 instrument::InstrumentData,
6 streams::{
7 consumer::{init_market_stream, MarketStreamResult, STREAM_RECONNECTION_POLICY},
8 reconnect::stream::ReconnectingStream,
9 },
10 subscription::{Subscription, SubscriptionKind},
11 Identifier,
12};
13use barter_instrument_copy::exchange::ExchangeId;
14use barter_integration_copy::{
15 channel::{mpsc_unbounded, UnboundedRx, UnboundedTx},
16 Validator,
17};
18use futures_util::StreamExt;
19use std::{collections::HashMap, fmt::Debug, future::Future, pin::Pin};
20
21pub mod multi;
25
26pub mod dynamic;
30
31pub type SubscribeFuture = Pin<Box<dyn Future<Output = Result<(), DataError>>>>;
34
35#[derive(Default)]
38pub struct StreamBuilder<InstrumentKey, Kind>
39where
40 Kind: SubscriptionKind,
41{
42 pub channels:
43 HashMap<ExchangeId, ExchangeChannel<MarketStreamResult<InstrumentKey, Kind::Event>>>,
44 pub futures: Vec<SubscribeFuture>,
45}
46
47impl<InstrumentKey, Kind> Debug for StreamBuilder<InstrumentKey, Kind>
48where
49 InstrumentKey: Debug,
50 Kind: SubscriptionKind,
51{
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.debug_struct("StreamBuilder<InstrumentKey, SubscriptionKind>")
54 .field("channels", &self.channels)
55 .field("num_futures", &self.futures.len())
56 .finish()
57 }
58}
59
60impl<InstrumentKey, Kind> StreamBuilder<InstrumentKey, Kind>
61where
62 Kind: SubscriptionKind,
63{
64 pub fn new() -> Self {
66 Self {
67 channels: HashMap::new(),
68 futures: Vec::new(),
69 }
70 }
71
72 pub fn subscribe<SubIter, Sub, Exchange, Instrument>(mut self, subscriptions: SubIter) -> Self
78 where
79 SubIter: IntoIterator<Item = Sub>,
80 Sub: Into<Subscription<Exchange, Instrument, Kind>>,
81 Exchange: StreamSelector<Instrument, Kind> + Ord + Send + Sync + 'static,
82 Instrument: InstrumentData<Key = InstrumentKey> + Ord + 'static,
83 Instrument::Key: Clone + Send + 'static,
84 Kind: Ord + Send + Sync + 'static,
85 Kind::Event: Clone + Send,
86 Subscription<Exchange, Instrument, Kind>:
87 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
88 {
89 let subscriptions = subscriptions.into_iter().map(Sub::into).collect::<Vec<_>>();
91
92 let exchange_tx = self.channels.entry(Exchange::ID).or_default().tx.clone();
95
96 self.futures.push(Box::pin(async move {
98 let mut subscriptions = subscriptions
100 .into_iter()
101 .map(Subscription::validate)
102 .collect::<Result<Vec<_>, _>>()?;
103
104 subscriptions.sort();
106 subscriptions.dedup();
107
108 let stream = init_market_stream(STREAM_RECONNECTION_POLICY, subscriptions).await?;
110
111 tokio::spawn(stream.boxed().forward_to(exchange_tx));
113
114 Ok(())
115 }));
116
117 self
118 }
119
120 pub async fn init(
127 self,
128 ) -> Result<Streams<MarketStreamResult<InstrumentKey, Kind::Event>>, DataError> {
129 futures::future::try_join_all(self.futures).await?;
131
132 Ok(Streams {
134 streams: self
135 .channels
136 .into_iter()
137 .map(|(exchange, channel)| (exchange, channel.rx))
138 .collect(),
139 })
140 }
141}
142
143#[derive(Debug)]
146pub struct ExchangeChannel<T> {
147 tx: UnboundedTx<T>,
148 rx: UnboundedRx<T>,
149}
150
151impl<T> ExchangeChannel<T> {
152 pub fn new() -> Self {
154 let (tx, rx) = mpsc_unbounded();
155 Self { tx, rx }
156 }
157}
158
159impl<T> Default for ExchangeChannel<T> {
160 fn default() -> Self {
161 Self::new()
162 }
163}