rustrade_data/streams/builder/dynamic/
indexed.rs1use crate::{
2 error::DataError,
3 event::DataKind,
4 instrument::MarketInstrumentData,
5 streams::{
6 builder::dynamic::DynamicStreams,
7 consumer::{MarketStreamEvent, MarketStreamResult},
8 reconnect::stream::ReconnectingStream,
9 },
10 subscription::{SubKind, Subscription},
11};
12use futures::Stream;
13use itertools::Itertools;
14use rustrade_instrument::{
15 Keyed,
16 exchange::ExchangeId,
17 index::{IndexedInstruments, error::IndexError},
18 instrument::{InstrumentIndex, market_data::MarketDataInstrument},
19};
20use tracing::warn;
21
22pub async fn init_indexed_multi_exchange_market_stream(
38 instruments: &IndexedInstruments,
39 sub_kinds: &[SubKind],
40) -> Result<impl Stream<Item = MarketStreamEvent<InstrumentIndex, DataKind>> + use<>, DataError> {
41 let subscriptions = generate_indexed_market_data_subscription_batches(instruments, sub_kinds);
43
44 let stream = DynamicStreams::init(subscriptions)
46 .await?
47 .select_all::<MarketStreamResult<InstrumentIndex, DataKind>>()
48 .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
49
50 Ok(stream)
51}
52
53pub fn generate_indexed_market_data_subscription_batches(
65 instruments: &IndexedInstruments,
66 sub_kinds: &[SubKind],
67) -> Vec<Vec<Subscription<ExchangeId, MarketInstrumentData<InstrumentIndex>, SubKind>>> {
68 let instruments = instruments.instruments().iter().map(|keyed| {
70 let exchange = keyed.value.exchange.value;
71 let instrument = MarketInstrumentData::from(keyed);
72 Keyed::new(exchange, instrument)
73 });
74
75 let instruments = instruments.sorted_unstable_by_key(|exchange| exchange.key);
77
78 instruments
80 .chunk_by(|exchange| exchange.key)
81 .into_iter()
82 .map(|(_exchange, instruments)| {
83 instruments
84 .into_iter()
85 .flat_map(
86 |Keyed {
87 key: exchange,
88 value: instrument,
89 }| {
90 sub_kinds
91 .iter()
92 .map(move |kind| Subscription::new(exchange, instrument.clone(), *kind))
93 },
94 )
95 .collect::<Vec<_>>()
96 })
97 .collect()
98}
99
100pub fn index_market_data_subscription_batches<SubBatchIter, SubIter, Sub>(
113 instruments: &IndexedInstruments,
114 batches: SubBatchIter,
115) -> Result<
116 Vec<Vec<Subscription<ExchangeId, Keyed<InstrumentIndex, MarketDataInstrument>>>>,
117 DataError,
118>
119where
120 SubBatchIter: IntoIterator<Item = SubIter>,
121 SubIter: IntoIterator<Item = Sub>,
122 Sub: Into<Subscription<ExchangeId, MarketDataInstrument, SubKind>>,
123{
124 batches
125 .into_iter()
126 .map(|batch| batch
127 .into_iter()
128 .map(|sub| {
129 let sub = sub.into();
130
131 let base_index = instruments.find_asset_index(sub.exchange, &sub.instrument.base)?;
132 let quote_index = instruments.find_asset_index(sub.exchange, &sub.instrument.quote)?;
133
134 let find_instrument = |exchange, kind, base, quote| {
135 instruments
136 .instruments()
137 .iter()
138 .find_map(|indexed| {
139 (
140 indexed.value.exchange.value == exchange
141 && indexed.value.kind.eq_market_data_instrument_kind(kind)
142 && indexed.value.underlying.base == base
143 && indexed.value.underlying.quote == quote
144 ).then_some(indexed.key)
145 })
146 .ok_or(IndexError::InstrumentIndex(format!(
147 "Instrument: ({}, {}, {}, {}) must be present in indexed instruments: {:?}",
148 exchange, kind, base, quote, instruments.instruments()
149 )))
150 };
151
152 let instrument_index = find_instrument(sub.exchange, &sub.instrument.kind, base_index, quote_index)?;
153
154 Ok(Subscription {
155 exchange: sub.exchange,
156 instrument: Keyed::new(instrument_index, sub.instrument),
157 kind: sub.kind,
158 })
159 })
160 .collect()
161 )
162 .collect()
163}