use crate::{
Identifier,
books::{
OrderBook,
map::{OrderBookMap, OrderBookMapMulti},
},
error::DataError,
exchange::StreamSelector,
instrument::InstrumentData,
streams::{Streams, consumer::MarketStreamEvent, reconnect::stream::ReconnectingStream},
subscription::{
Subscription,
book::{OrderBookEvent, OrderBooksL2},
},
};
use fnv::FnvHashMap;
use futures::Stream;
use futures_util::StreamExt;
use parking_lot::RwLock;
use std::{
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
};
use tracing::warn;
#[derive(Debug)]
pub struct OrderBookL2Manager<St, BookMap> {
pub stream: St,
pub books: BookMap,
}
impl<St, BookMap> OrderBookL2Manager<St, BookMap>
where
St: Stream<Item = MarketStreamEvent<BookMap::Key, OrderBookEvent>> + Unpin,
BookMap: OrderBookMap,
BookMap::Key: Debug,
{
pub async fn run(mut self) {
while let Some(stream_event) = self.stream.next().await {
let event = match stream_event {
MarketStreamEvent::Reconnecting(exchange) => {
warn!(%exchange, "OrderBook manager input stream disconnected");
continue;
}
MarketStreamEvent::Item(event) => event,
};
let Some(book) = self.books.find(&event.instrument) else {
warn!(
instrument = ?event.instrument,
"consumed MarketStreamEvent<_, OrderBookEvent> for non-configured instrument"
);
continue;
};
let mut book_lock = book.write();
book_lock.update(&event.kind);
}
}
}
pub async fn init_multi_order_book_l2_manager<SubBatchIter, SubIter, Sub, Exchange, Instrument>(
subscription_batches: SubBatchIter,
) -> Result<
OrderBookL2Manager<
impl Stream<Item = MarketStreamEvent<Instrument::Key, OrderBookEvent>>,
impl OrderBookMap<Key = Instrument::Key>,
>,
DataError,
>
where
SubBatchIter: IntoIterator<Item = SubIter>,
SubIter: IntoIterator<Item = Sub>,
Sub: Into<Subscription<Exchange, Instrument, OrderBooksL2>>,
Exchange: StreamSelector<Instrument, OrderBooksL2> + Ord + Display + Send + Sync + 'static,
Instrument: InstrumentData + Ord + Display + 'static,
Instrument::Key: Eq + Hash + Send + 'static,
Subscription<Exchange, Instrument, OrderBooksL2>:
Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
{
let (stream_builder, books) = subscription_batches.into_iter().fold(
(Streams::<OrderBooksL2>::builder(), FnvHashMap::default()),
|(builder, mut books), batch| {
let batch = batch.into_iter().map(|sub| {
let subscription = sub.into();
books.insert(
subscription.instrument.key().clone(),
Arc::new(RwLock::new(OrderBook::default())),
);
subscription
});
let builder = builder.subscribe(batch);
(builder, books)
},
);
let stream = stream_builder
.init()
.await?
.select_all()
.with_error_handler(|error| {
warn!(
?error,
"OrderBookL2Manager consumed recoverable MarketStream error"
)
});
Ok(OrderBookL2Manager {
stream,
books: OrderBookMapMulti::new(books),
})
}