rustrade_data/books/
manager.rs1use crate::{
2 Identifier,
3 books::{
4 OrderBook,
5 map::{OrderBookMap, OrderBookMapMulti},
6 },
7 error::DataError,
8 exchange::StreamSelector,
9 instrument::InstrumentData,
10 streams::{Streams, consumer::MarketStreamEvent, reconnect::stream::ReconnectingStream},
11 subscriber::WebSocketSubscriber,
12 subscription::{
13 Subscription,
14 book::{OrderBookEvent, OrderBooksL2},
15 },
16};
17use fnv::FnvHashMap;
18use futures::Stream;
19use futures_util::StreamExt;
20use parking_lot::RwLock;
21use std::{
22 fmt::{Debug, Display},
23 hash::Hash,
24 sync::Arc,
25};
26use tracing::warn;
27
28#[derive(Debug)]
31pub struct OrderBookL2Manager<St, BookMap> {
32 pub stream: St,
33 pub books: BookMap,
34}
35
36impl<St, BookMap> OrderBookL2Manager<St, BookMap>
37where
38 St: Stream<Item = MarketStreamEvent<BookMap::Key, OrderBookEvent>> + Unpin,
39 BookMap: OrderBookMap,
40 BookMap::Key: Debug,
41{
42 pub async fn run(mut self) {
44 while let Some(stream_event) = self.stream.next().await {
45 let event = match stream_event {
47 MarketStreamEvent::Reconnecting(exchange) => {
48 warn!(%exchange, "OrderBook manager input stream disconnected");
49 continue;
50 }
51 MarketStreamEvent::Item(event) => event,
52 };
53
54 let Some(book) = self.books.find(&event.instrument) else {
56 warn!(
57 instrument = ?event.instrument,
58 "consumed MarketStreamEvent<_, OrderBookEvent> for non-configured instrument"
59 );
60 continue;
61 };
62
63 let mut book_lock = book.write();
64 book_lock.update(&event.kind);
65 }
66 }
67}
68
69pub async fn init_multi_order_book_l2_manager<SubBatchIter, SubIter, Sub, Exchange, Instrument>(
74 subscription_batches: SubBatchIter,
75) -> Result<
76 OrderBookL2Manager<
77 impl Stream<Item = MarketStreamEvent<Instrument::Key, OrderBookEvent>>,
78 impl OrderBookMap<Key = Instrument::Key>,
79 >,
80 DataError,
81>
82where
83 SubBatchIter: IntoIterator<Item = SubIter>,
84 SubIter: IntoIterator<Item = Sub>,
85 Sub: Into<Subscription<Exchange, Instrument, OrderBooksL2>>,
86 Exchange: StreamSelector<Instrument, OrderBooksL2, Subscriber = WebSocketSubscriber>
87 + Ord
88 + Display
89 + Send
90 + Sync
91 + 'static,
92 Instrument: InstrumentData + Ord + Display + 'static,
93 Instrument::Key: Eq + Hash + Send + 'static,
94 Subscription<Exchange, Instrument, OrderBooksL2>:
95 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
96{
97 let (stream_builder, books) = subscription_batches.into_iter().fold(
99 (Streams::<OrderBooksL2>::builder(), FnvHashMap::default()),
100 |(builder, mut books), batch| {
101 let batch = batch.into_iter().map(|sub| {
103 let subscription = sub.into();
104 books.insert(
105 subscription.instrument.key().clone(),
106 Arc::new(RwLock::new(OrderBook::default())),
107 );
108 subscription
109 });
110
111 let builder = builder.subscribe(WebSocketSubscriber, batch);
112 (builder, books)
113 },
114 );
115
116 let stream = stream_builder
118 .init()
119 .await?
120 .select_all()
121 .with_error_handler(|error| {
122 warn!(
123 ?error,
124 "OrderBookL2Manager consumed recoverable MarketStream error"
125 )
126 });
127
128 Ok(OrderBookL2Manager {
129 stream,
130 books: OrderBookMapMulti::new(books),
131 })
132}