use crate::{
error::DataError,
event::DataKind,
instrument::MarketInstrumentData,
streams::{
builder::dynamic::DynamicStreams,
consumer::{MarketStreamEvent, MarketStreamResult},
reconnect::stream::ReconnectingStream,
},
subscription::{SubKind, Subscription},
};
use barter_instrument::{
Keyed,
exchange::ExchangeId,
index::{IndexedInstruments, error::IndexError},
instrument::{InstrumentIndex, market_data::MarketDataInstrument},
};
use futures::Stream;
use itertools::Itertools;
use tracing::warn;
pub async fn init_indexed_multi_exchange_market_stream(
instruments: &IndexedInstruments,
sub_kinds: &[SubKind],
) -> Result<impl Stream<Item = MarketStreamEvent<InstrumentIndex, DataKind>> + use<>, DataError> {
let subscriptions = generate_indexed_market_data_subscription_batches(instruments, sub_kinds);
let stream = DynamicStreams::init(subscriptions)
.await?
.select_all::<MarketStreamResult<InstrumentIndex, DataKind>>()
.with_error_handler(|error| warn!(?error, "MarketStream generated error"));
Ok(stream)
}
pub fn generate_indexed_market_data_subscription_batches(
instruments: &IndexedInstruments,
sub_kinds: &[SubKind],
) -> Vec<Vec<Subscription<ExchangeId, MarketInstrumentData<InstrumentIndex>, SubKind>>> {
let instruments = instruments.instruments().iter().map(|keyed| {
let exchange = keyed.value.exchange.value;
let instrument = MarketInstrumentData::from(keyed);
Keyed::new(exchange, instrument)
});
let instruments = instruments.sorted_unstable_by_key(|exchange| exchange.key);
instruments
.chunk_by(|exchange| exchange.key)
.into_iter()
.map(|(_exchange, instruments)| {
instruments
.into_iter()
.flat_map(
|Keyed {
key: exchange,
value: instrument,
}| {
sub_kinds
.iter()
.map(move |kind| Subscription::new(exchange, instrument.clone(), *kind))
},
)
.collect::<Vec<_>>()
})
.collect()
}
pub fn index_market_data_subscription_batches<SubBatchIter, SubIter, Sub>(
instruments: &IndexedInstruments,
batches: SubBatchIter,
) -> Result<
Vec<Vec<Subscription<ExchangeId, Keyed<InstrumentIndex, MarketDataInstrument>>>>,
DataError,
>
where
SubBatchIter: IntoIterator<Item = SubIter>,
SubIter: IntoIterator<Item = Sub>,
Sub: Into<Subscription<ExchangeId, MarketDataInstrument, SubKind>>,
{
batches
.into_iter()
.map(|batch| batch
.into_iter()
.map(|sub| {
let sub = sub.into();
let base_index = instruments.find_asset_index(sub.exchange, &sub.instrument.base)?;
let quote_index = instruments.find_asset_index(sub.exchange, &sub.instrument.quote)?;
let find_instrument = |exchange, kind, base, quote| {
instruments
.instruments()
.iter()
.find_map(|indexed| {
(
indexed.value.exchange.value == exchange
&& indexed.value.kind.eq_market_data_instrument_kind(kind)
&& indexed.value.underlying.base == base
&& indexed.value.underlying.quote == quote
).then_some(indexed.key)
})
.ok_or(IndexError::InstrumentIndex(format!(
"Instrument: ({}, {}, {}, {}) must be present in indexed instruments: {:?}",
exchange, kind, base, quote, instruments.instruments()
)))
};
let instrument_index = find_instrument(sub.exchange, &sub.instrument.kind, base_index, quote_index)?;
Ok(Subscription {
exchange: sub.exchange,
instrument: Keyed::new(instrument_index, sub.instrument),
kind: sub.kind,
})
})
.collect()
)
.collect()
}