barter_data/books/
manager.rs

1use crate::{
2    Identifier,
3    books::{
4        OrderBook,
5        map::{OrderBookMap, OrderBookMapMulti},
6    },
7    error::DataError,
8    exchange::StreamSelector,
9    instrument::InstrumentData,
10    streams::{Streams, consumer::MarketStreamEvent, reconnect::stream::ReconnectingStream},
11    subscription::{
12        Subscription,
13        book::{OrderBookEvent, OrderBooksL2},
14    },
15};
16use fnv::FnvHashMap;
17use futures::Stream;
18use futures_util::StreamExt;
19use parking_lot::RwLock;
20use std::{
21    fmt::{Debug, Display},
22    hash::Hash,
23    sync::Arc,
24};
25use tracing::warn;
26
27/// Maintains a set of local L2 [`OrderBook`]s by applying streamed [`OrderBookEvent`]s to the
28/// associated [`OrderBook`] in the [`OrderBookMap`].
29#[derive(Debug)]
30pub struct OrderBookL2Manager<St, BookMap> {
31    pub stream: St,
32    pub books: BookMap,
33}
34
35impl<St, BookMap> OrderBookL2Manager<St, BookMap>
36where
37    St: Stream<Item = MarketStreamEvent<BookMap::Key, OrderBookEvent>> + Unpin,
38    BookMap: OrderBookMap,
39    BookMap::Key: Debug,
40{
41    /// Manage local L2 [`OrderBook`]s.
42    pub async fn run(mut self) {
43        while let Some(stream_event) = self.stream.next().await {
44            // Extract MarketEvent<InstrumentKey, OrderBookEvent>
45            let event = match stream_event {
46                MarketStreamEvent::Reconnecting(exchange) => {
47                    warn!(%exchange, "OrderBook manager input stream disconnected");
48                    continue;
49                }
50                MarketStreamEvent::Item(event) => event,
51            };
52
53            // Find OrderBook associated with the MarketEvent InstrumentKey
54            let Some(book) = self.books.find(&event.instrument) else {
55                warn!(
56                    instrument = ?event.instrument,
57                    "consumed MarketStreamEvent<_, OrderBookEvent> for non-configured instrument"
58                );
59                continue;
60            };
61
62            let mut book_lock = book.write();
63            book_lock.update(&event.kind);
64        }
65    }
66}
67
68/// Initialise a [`OrderBookL2Manager`] using the provided batches of [`OrderBooksL2`]
69/// [`Subscription`]s.
70///
71/// See `examples/order_books_l2_manager` for how to use this initialisation paradigm.
72pub async fn init_multi_order_book_l2_manager<SubBatchIter, SubIter, Sub, Exchange, Instrument>(
73    subscription_batches: SubBatchIter,
74) -> Result<
75    OrderBookL2Manager<
76        impl Stream<Item = MarketStreamEvent<Instrument::Key, OrderBookEvent>>,
77        impl OrderBookMap<Key = Instrument::Key>,
78    >,
79    DataError,
80>
81where
82    SubBatchIter: IntoIterator<Item = SubIter>,
83    SubIter: IntoIterator<Item = Sub>,
84    Sub: Into<Subscription<Exchange, Instrument, OrderBooksL2>>,
85    Exchange: StreamSelector<Instrument, OrderBooksL2> + Ord + Display + Send + Sync + 'static,
86    Instrument: InstrumentData + Ord + Display + 'static,
87    Instrument::Key: Eq + Hash + Send + 'static,
88    Subscription<Exchange, Instrument, OrderBooksL2>:
89        Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
90{
91    // Generate Streams from provided OrderBooksL2 Subscription batches
92    let (stream_builder, books) = subscription_batches.into_iter().fold(
93        (Streams::<OrderBooksL2>::builder(), FnvHashMap::default()),
94        |(builder, mut books), batch| {
95            // Insert OrderBook Entry for each unique Subscription (duplicates upserted)
96            let batch = batch.into_iter().map(|sub| {
97                let subscription = sub.into();
98                books.insert(
99                    subscription.instrument.key().clone(),
100                    Arc::new(RwLock::new(OrderBook::default())),
101                );
102                subscription
103            });
104
105            let builder = builder.subscribe(batch);
106            (builder, books)
107        },
108    );
109
110    // Initialise merged OrderBookL2 Stream
111    let stream = stream_builder
112        .init()
113        .await?
114        .select_all()
115        .with_error_handler(|error| {
116            warn!(
117                ?error,
118                "OrderBookL2Manager consumed recoverable MarketStream error"
119            )
120        });
121
122    Ok(OrderBookL2Manager {
123        stream,
124        books: OrderBookMapMulti::new(books),
125    })
126}