Skip to main content

rustrade_data/books/
manager.rs

1use crate::{
2    Identifier,
3    books::{
4        OrderBook,
5        map::{OrderBookMap, OrderBookMapMulti},
6    },
7    error::DataError,
8    exchange::StreamSelector,
9    instrument::InstrumentData,
10    streams::{Streams, consumer::MarketStreamEvent, reconnect::stream::ReconnectingStream},
11    subscriber::WebSocketSubscriber,
12    subscription::{
13        Subscription,
14        book::{OrderBookEvent, OrderBooksL2},
15    },
16};
17use fnv::FnvHashMap;
18use futures::Stream;
19use futures_util::StreamExt;
20use parking_lot::RwLock;
21use std::{
22    fmt::{Debug, Display},
23    hash::Hash,
24    sync::Arc,
25};
26use tracing::warn;
27
28/// Maintains a set of local L2 [`OrderBook`]s by applying streamed [`OrderBookEvent`]s to the
29/// associated [`OrderBook`] in the [`OrderBookMap`].
30#[derive(Debug)]
31pub struct OrderBookL2Manager<St, BookMap> {
32    pub stream: St,
33    pub books: BookMap,
34}
35
36impl<St, BookMap> OrderBookL2Manager<St, BookMap>
37where
38    St: Stream<Item = MarketStreamEvent<BookMap::Key, OrderBookEvent>> + Unpin,
39    BookMap: OrderBookMap,
40    BookMap::Key: Debug,
41{
42    /// Manage local L2 [`OrderBook`]s.
43    pub async fn run(mut self) {
44        while let Some(stream_event) = self.stream.next().await {
45            // Extract MarketEvent<InstrumentKey, OrderBookEvent>
46            let event = match stream_event {
47                MarketStreamEvent::Reconnecting(exchange) => {
48                    warn!(%exchange, "OrderBook manager input stream disconnected");
49                    continue;
50                }
51                MarketStreamEvent::Item(event) => event,
52            };
53
54            // Find OrderBook associated with the MarketEvent InstrumentKey
55            let Some(book) = self.books.find(&event.instrument) else {
56                warn!(
57                    instrument = ?event.instrument,
58                    "consumed MarketStreamEvent<_, OrderBookEvent> for non-configured instrument"
59                );
60                continue;
61            };
62
63            let mut book_lock = book.write();
64            book_lock.update(&event.kind);
65        }
66    }
67}
68
69/// Initialise a [`OrderBookL2Manager`] using the provided batches of [`OrderBooksL2`]
70/// [`Subscription`]s.
71///
72/// See `examples/order_books_l2_manager` for how to use this initialisation paradigm.
73pub async fn init_multi_order_book_l2_manager<SubBatchIter, SubIter, Sub, Exchange, Instrument>(
74    subscription_batches: SubBatchIter,
75) -> Result<
76    OrderBookL2Manager<
77        impl Stream<Item = MarketStreamEvent<Instrument::Key, OrderBookEvent>>,
78        impl OrderBookMap<Key = Instrument::Key>,
79    >,
80    DataError,
81>
82where
83    SubBatchIter: IntoIterator<Item = SubIter>,
84    SubIter: IntoIterator<Item = Sub>,
85    Sub: Into<Subscription<Exchange, Instrument, OrderBooksL2>>,
86    Exchange: StreamSelector<Instrument, OrderBooksL2, Subscriber = WebSocketSubscriber>
87        + Ord
88        + Display
89        + Send
90        + Sync
91        + 'static,
92    Instrument: InstrumentData + Ord + Display + 'static,
93    Instrument::Key: Eq + Hash + Send + 'static,
94    Subscription<Exchange, Instrument, OrderBooksL2>:
95        Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
96{
97    // Generate Streams from provided OrderBooksL2 Subscription batches
98    let (stream_builder, books) = subscription_batches.into_iter().fold(
99        (Streams::<OrderBooksL2>::builder(), FnvHashMap::default()),
100        |(builder, mut books), batch| {
101            // Insert OrderBook Entry for each unique Subscription (duplicates upserted)
102            let batch = batch.into_iter().map(|sub| {
103                let subscription = sub.into();
104                books.insert(
105                    subscription.instrument.key().clone(),
106                    Arc::new(RwLock::new(OrderBook::default())),
107                );
108                subscription
109            });
110
111            let builder = builder.subscribe(WebSocketSubscriber, batch);
112            (builder, books)
113        },
114    );
115
116    // Initialise merged OrderBookL2 Stream
117    let stream = stream_builder
118        .init()
119        .await?
120        .select_all()
121        .with_error_handler(|error| {
122            warn!(
123                ?error,
124                "OrderBookL2Manager consumed recoverable MarketStream error"
125            )
126        });
127
128    Ok(OrderBookL2Manager {
129        stream,
130        books: OrderBookMapMulti::new(books),
131    })
132}