barter_data/books/
manager.rs1use crate::{
2 Identifier,
3 books::{
4 OrderBook,
5 map::{OrderBookMap, OrderBookMapMulti},
6 },
7 error::DataError,
8 exchange::StreamSelector,
9 instrument::InstrumentData,
10 streams::{Streams, consumer::MarketStreamEvent, reconnect::stream::ReconnectingStream},
11 subscription::{
12 Subscription,
13 book::{OrderBookEvent, OrderBooksL2},
14 },
15};
16use fnv::FnvHashMap;
17use futures::Stream;
18use futures_util::StreamExt;
19use parking_lot::RwLock;
20use std::{
21 fmt::{Debug, Display},
22 hash::Hash,
23 sync::Arc,
24};
25use tracing::warn;
26
27#[derive(Debug)]
30pub struct OrderBookL2Manager<St, BookMap> {
31 pub stream: St,
32 pub books: BookMap,
33}
34
35impl<St, BookMap> OrderBookL2Manager<St, BookMap>
36where
37 St: Stream<Item = MarketStreamEvent<BookMap::Key, OrderBookEvent>> + Unpin,
38 BookMap: OrderBookMap,
39 BookMap::Key: Debug,
40{
41 pub async fn run(mut self) {
43 while let Some(stream_event) = self.stream.next().await {
44 let event = match stream_event {
46 MarketStreamEvent::Reconnecting(exchange) => {
47 warn!(%exchange, "OrderBook manager input stream disconnected");
48 continue;
49 }
50 MarketStreamEvent::Item(event) => event,
51 };
52
53 let Some(book) = self.books.find(&event.instrument) else {
55 warn!(
56 instrument = ?event.instrument,
57 "consumed MarketStreamEvent<_, OrderBookEvent> for non-configured instrument"
58 );
59 continue;
60 };
61
62 let mut book_lock = book.write();
63 book_lock.update(&event.kind);
64 }
65 }
66}
67
68pub async fn init_multi_order_book_l2_manager<SubBatchIter, SubIter, Sub, Exchange, Instrument>(
73 subscription_batches: SubBatchIter,
74) -> Result<
75 OrderBookL2Manager<
76 impl Stream<Item = MarketStreamEvent<Instrument::Key, OrderBookEvent>>,
77 impl OrderBookMap<Key = Instrument::Key>,
78 >,
79 DataError,
80>
81where
82 SubBatchIter: IntoIterator<Item = SubIter>,
83 SubIter: IntoIterator<Item = Sub>,
84 Sub: Into<Subscription<Exchange, Instrument, OrderBooksL2>>,
85 Exchange: StreamSelector<Instrument, OrderBooksL2> + Ord + Display + Send + Sync + 'static,
86 Instrument: InstrumentData + Ord + Display + 'static,
87 Instrument::Key: Eq + Hash + Send + 'static,
88 Subscription<Exchange, Instrument, OrderBooksL2>:
89 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
90{
91 let (stream_builder, books) = subscription_batches.into_iter().fold(
93 (Streams::<OrderBooksL2>::builder(), FnvHashMap::default()),
94 |(builder, mut books), batch| {
95 let batch = batch.into_iter().map(|sub| {
97 let subscription = sub.into();
98 books.insert(
99 subscription.instrument.key().clone(),
100 Arc::new(RwLock::new(OrderBook::default())),
101 );
102 subscription
103 });
104
105 let builder = builder.subscribe(batch);
106 (builder, books)
107 },
108 );
109
110 let stream = stream_builder
112 .init()
113 .await?
114 .select_all()
115 .with_error_handler(|error| {
116 warn!(
117 ?error,
118 "OrderBookL2Manager consumed recoverable MarketStream error"
119 )
120 });
121
122 Ok(OrderBookL2Manager {
123 stream,
124 books: OrderBookMapMulti::new(books),
125 })
126}