Skip to main content

rustrade_data/streams/builder/dynamic/
indexed.rs

1use crate::{
2    error::DataError,
3    event::DataKind,
4    instrument::MarketInstrumentData,
5    streams::{
6        builder::dynamic::DynamicStreams,
7        consumer::{MarketStreamEvent, MarketStreamResult},
8        reconnect::stream::ReconnectingStream,
9    },
10    subscription::{SubKind, Subscription},
11};
12use futures::Stream;
13use itertools::Itertools;
14use rustrade_instrument::{
15    Keyed,
16    exchange::ExchangeId,
17    index::{IndexedInstruments, error::IndexError},
18    instrument::{InstrumentIndex, market_data::MarketDataInstrument},
19};
20use tracing::warn;
21
22/// Initialise an indexed [`DynamicStreams`] using batches of indexed [`Subscription`] batches.
23///
24/// This function:
25/// 1. Generates indexed market data Subscriptions from all Instrument-SubKind combinations found
26///    in the provided `IndexedInstruments` and `SubKind` slice.
27/// 2. Initialise an indexed [`DynamicStreams`] .
28/// 3. Combines all market streams into a single `Stream` via
29///    [`select_all`](futures_util::stream::select_all::select_all)
30/// 4. Handles recoverable errors by logging them at `warn` level.
31///
32/// See [`generate_indexed_market_data_subscription_batches`] for how indexed `Subscriptions` can
33/// be conveniently generated from an [`IndexedInstruments`] collection.
34///
35/// See [`index_market_data_subscription_batches`] for how unindexed `Subscriptions` can be
36/// indexed using an [`IndexedInstruments`] collection.
37pub async fn init_indexed_multi_exchange_market_stream(
38    instruments: &IndexedInstruments,
39    sub_kinds: &[SubKind],
40) -> Result<impl Stream<Item = MarketStreamEvent<InstrumentIndex, DataKind>> + use<>, DataError> {
41    // Generate indexed market data Subscriptions
42    let subscriptions = generate_indexed_market_data_subscription_batches(instruments, sub_kinds);
43
44    // Initialise an indexed MarketStream via DynamicStreams
45    let stream = DynamicStreams::init(subscriptions)
46        .await?
47        .select_all::<MarketStreamResult<InstrumentIndex, DataKind>>()
48        .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
49
50    Ok(stream)
51}
52
53/// Generates batches of indexed market data `Subscriptions` from a collection of
54/// `IndexedInstruments`.
55///
56/// This function:
57/// 1. Groups instruments by [`ExchangeId`].
58/// 2. Generates indexed `Subscriptions` for each Instrument-SubKind combination.
59/// 4. Returns the `Subscriptions` grouped by [`ExchangeId`].
60///
61/// # Arguments
62/// * `instruments` - Collection of `IndexedInstruments` to generate `Subscriptions` for
63/// * `sub_kinds` - Slice of `SubKinds` to generate for each instrument
64pub fn generate_indexed_market_data_subscription_batches(
65    instruments: &IndexedInstruments,
66    sub_kinds: &[SubKind],
67) -> Vec<Vec<Subscription<ExchangeId, MarketInstrumentData<InstrumentIndex>, SubKind>>> {
68    // Generate Iterator<Item = Keyed<ExchangeId, MarketInstrumentData<InstrumentIndex>>>
69    let instruments = instruments.instruments().iter().map(|keyed| {
70        let exchange = keyed.value.exchange.value;
71        let instrument = MarketInstrumentData::from(keyed);
72        Keyed::new(exchange, instrument)
73    });
74
75    // Chunk instruments by ExchangeId
76    let instruments = instruments.sorted_unstable_by_key(|exchange| exchange.key);
77
78    // Generate Subscriptions
79    instruments
80        .chunk_by(|exchange| exchange.key)
81        .into_iter()
82        .map(|(_exchange, instruments)| {
83            instruments
84                .into_iter()
85                .flat_map(
86                    |Keyed {
87                         key: exchange,
88                         value: instrument,
89                     }| {
90                        sub_kinds
91                            .iter()
92                            .map(move |kind| Subscription::new(exchange, instrument.clone(), *kind))
93                    },
94                )
95                .collect::<Vec<_>>()
96        })
97        .collect()
98}
99
100/// Indexes batches of market data `Subscriptions` using a collection of `IndexedInstruments`.
101///
102/// This function maps unindexed market data `Subscriptions` to indexed ones by:
103/// 1. Finding the `AssetIndex` for the base and quote assets.
104/// 2. Finding the `InstrumentIndex` associated with the `Subscription` `ExchangeId`, `SubKind` and
105///    assets.
106/// 3. Creating new `Subscriptions` with indexed instruments.
107///
108///
109/// # Arguments
110/// * `instruments` - Collection of `IndexedInstruments` used for indexing
111/// * `batches` - Iterator of `Subscription` batches to be indexed
112pub fn index_market_data_subscription_batches<SubBatchIter, SubIter, Sub>(
113    instruments: &IndexedInstruments,
114    batches: SubBatchIter,
115) -> Result<
116    Vec<Vec<Subscription<ExchangeId, Keyed<InstrumentIndex, MarketDataInstrument>>>>,
117    DataError,
118>
119where
120    SubBatchIter: IntoIterator<Item = SubIter>,
121    SubIter: IntoIterator<Item = Sub>,
122    Sub: Into<Subscription<ExchangeId, MarketDataInstrument, SubKind>>,
123{
124    batches
125        .into_iter()
126        .map(|batch| batch
127            .into_iter()
128            .map(|sub| {
129                let sub = sub.into();
130
131                let base_index = instruments.find_asset_index(sub.exchange, &sub.instrument.base)?;
132                let quote_index = instruments.find_asset_index(sub.exchange, &sub.instrument.quote)?;
133
134                let find_instrument = |exchange, kind, base, quote| {
135                    instruments
136                        .instruments()
137                        .iter()
138                        .find_map(|indexed| {
139                            (
140                                indexed.value.exchange.value == exchange
141                                    && indexed.value.kind.eq_market_data_instrument_kind(kind)
142                                    && indexed.value.underlying.base == base
143                                    && indexed.value.underlying.quote == quote
144                            ).then_some(indexed.key)
145                        })
146                        .ok_or(IndexError::InstrumentIndex(format!(
147                            "Instrument: ({}, {}, {}, {}) must be present in indexed instruments: {:?}",
148                            exchange, kind, base, quote, instruments.instruments()
149                        )))
150                };
151
152                let instrument_index = find_instrument(sub.exchange, &sub.instrument.kind, base_index, quote_index)?;
153
154                Ok(Subscription {
155                    exchange: sub.exchange,
156                    instrument: Keyed::new(instrument_index, sub.instrument),
157                    kind: sub.kind,
158                })
159            })
160            .collect()
161        )
162        .collect()
163}