Skip to main content

nautilus_data/engine/
book.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::{Ref, RefCell},
18    num::NonZeroUsize,
19    rc::Rc,
20};
21
22use indexmap::IndexMap;
23use nautilus_common::{
24    cache::Cache,
25    msgbus::{self, Handler, MStr, Topic, switchboard},
26    timer::TimeEvent,
27};
28use nautilus_model::{
29    data::{OrderBookDeltas, OrderBookDepth10, QuoteTick},
30    enums::InstrumentClass,
31    identifiers::{InstrumentId, Venue},
32    instruments::Instrument,
33    orderbook::OrderBook,
34};
35use ustr::Ustr;
36
37/// Contains information for creating snapshots of specific order books.
38#[derive(Clone, Debug)]
39pub struct BookSnapshotInfo {
40    pub instrument_id: InstrumentId,
41    pub venue: Venue,
42    /// Parent expansion components `(root, class)` when this snapshot subscription
43    /// targets a parent symbol. `None` for concrete (exact-instrument) subscriptions.
44    pub parent: Option<(Ustr, InstrumentClass)>,
45    pub topic: MStr<Topic>,
46    pub interval_ms: NonZeroUsize,
47}
48
49/// Reference-counted map of per-instrument book snapshot descriptors.
50///
51/// Shared between the engine (which populates it on subscribe) and the
52/// [`BookSnapshotter`] timer callback (which iterates it on each tick).
53pub(crate) type BookSnapshotInfos = Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>;
54
55/// Reference count key for a book snapshot subscription.
56pub(crate) type BookSnapshotKey = (InstrumentId, NonZeroUsize);
57
58/// Outcome of decrementing a book snapshot subscription.
59pub(crate) enum BookSnapshotUnsubscribeResult {
60    /// No matching subscription was found.
61    NotSubscribed,
62    /// The reference count was decremented but other consumers remain.
63    Decremented,
64    /// The last consumer was removed; tear down associated state.
65    Removed,
66}
67
68/// Handles order book updates and delta processing for a specific instrument.
69///
70/// The `BookUpdater` processes incoming order book deltas and maintains
71/// the current state of an order book. It can handle both incremental
72/// updates and full snapshots for the instrument it's assigned to.
73#[derive(Debug)]
74pub struct BookUpdater {
75    pub id: Ustr,
76    pub instrument_id: InstrumentId,
77    pub cache: Rc<RefCell<Cache>>,
78    pub emit_quotes_from_book: bool,
79}
80
81impl BookUpdater {
82    /// Creates a new [`BookUpdater`] instance.
83    pub fn new(
84        instrument_id: &InstrumentId,
85        cache: Rc<RefCell<Cache>>,
86        emit_quotes_from_book: bool,
87    ) -> Self {
88        Self {
89            id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
90            instrument_id: *instrument_id,
91            cache,
92            emit_quotes_from_book,
93        }
94    }
95}
96
97impl Handler<OrderBookDeltas> for BookUpdater {
98    fn id(&self) -> Ustr {
99        self.id
100    }
101
102    fn handle(&self, deltas: &OrderBookDeltas) {
103        let mut emit: Option<QuoteTick> = None;
104        {
105            let mut cache = self.cache.borrow_mut();
106            if let Some(book) = cache.order_book_mut(&deltas.instrument_id) {
107                if let Err(e) = book.apply_deltas(deltas) {
108                    log::error!("Failed to apply deltas: {e}");
109                    return;
110                }
111
112                if self.emit_quotes_from_book {
113                    emit = derive_quote_from_book(book);
114                }
115            }
116        }
117
118        if let Some(quote) = emit {
119            publish_quote_if_changed(&self.cache, quote);
120        }
121    }
122}
123
124impl Handler<OrderBookDepth10> for BookUpdater {
125    fn id(&self) -> Ustr {
126        self.id
127    }
128
129    fn handle(&self, depth: &OrderBookDepth10) {
130        let mut emit: Option<QuoteTick> = None;
131        {
132            let mut cache = self.cache.borrow_mut();
133            if let Some(book) = cache.order_book_mut(&depth.instrument_id) {
134                if let Err(e) = book.apply_depth(depth) {
135                    log::error!("Failed to apply depth: {e}");
136                    return;
137                }
138
139                if self.emit_quotes_from_book {
140                    emit = derive_quote_from_book(book);
141                }
142            }
143        }
144
145        if let Some(quote) = emit {
146            publish_quote_if_changed(&self.cache, quote);
147        }
148    }
149}
150
151fn derive_quote_from_book(book: &OrderBook) -> Option<QuoteTick> {
152    let bid_price = book.best_bid_price()?;
153    let ask_price = book.best_ask_price()?;
154    let bid_size = book.best_bid_size()?;
155    let ask_size = book.best_ask_size()?;
156
157    if bid_size.raw == 0 || ask_size.raw == 0 {
158        return None;
159    }
160
161    Some(QuoteTick::new(
162        book.instrument_id,
163        bid_price,
164        ask_price,
165        bid_size,
166        ask_size,
167        book.ts_last,
168        book.ts_last,
169    ))
170}
171
172/// Publishes the derived `QuoteTick` if top-of-book changed.
173///
174/// Writes to cache and republishes only when bid/ask price or size differs
175/// from the cached quote.
176pub(crate) fn publish_quote_if_changed(cache: &Rc<RefCell<Cache>>, quote: QuoteTick) {
177    let publish = {
178        let cache_ref = cache.borrow();
179        match cache_ref.quote(&quote.instrument_id) {
180            None => true,
181            Some(last) => {
182                last.bid_price != quote.bid_price
183                    || last.ask_price != quote.ask_price
184                    || last.bid_size != quote.bid_size
185                    || last.ask_size != quote.ask_size
186            }
187        }
188    };
189
190    if !publish {
191        return;
192    }
193
194    if let Err(e) = cache.borrow_mut().add_quote(quote) {
195        log::error!("Error on cache insert: {e}");
196    }
197
198    let topic = switchboard::get_quotes_topic(quote.instrument_id);
199    msgbus::publish_quote(topic, &quote);
200}
201
202/// Creates periodic snapshots of order books at configured intervals.
203///
204/// The `BookSnapshotter` generates order book snapshots on timer events,
205/// publishing them as market data. This is useful for providing periodic
206/// full order book state updates in addition to incremental delta updates.
207#[derive(Debug)]
208pub struct BookSnapshotter {
209    pub timer_name: Ustr,
210    pub interval_ms: NonZeroUsize,
211    pub snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
212    pub cache: Rc<RefCell<Cache>>,
213}
214
215impl BookSnapshotter {
216    /// Creates a new [`BookSnapshotter`] instance.
217    pub fn new(
218        interval_ms: NonZeroUsize,
219        snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
220        cache: Rc<RefCell<Cache>>,
221    ) -> Self {
222        let timer_name = format!("OrderBookSnapshots|{interval_ms}");
223
224        Self {
225            timer_name: Ustr::from(&timer_name),
226            interval_ms,
227            snapshot_infos,
228            cache,
229        }
230    }
231
232    pub fn snapshot(&self, _event: TimeEvent) {
233        let snapshot_infos: Vec<BookSnapshotInfo> =
234            self.snapshot_infos.borrow().values().cloned().collect();
235
236        log::debug!(
237            "BookSnapshotter.snapshot called for {} subscriptions at {}ms",
238            snapshot_infos.len(),
239            self.interval_ms,
240        );
241
242        let cache = self.cache.borrow();
243
244        for snap_info in snapshot_infos {
245            self.publish_snapshot(&snap_info, &cache);
246        }
247    }
248
249    fn publish_snapshot(&self, snap_info: &BookSnapshotInfo, cache: &Ref<Cache>) {
250        if let Some((root, class)) = snap_info.parent {
251            let topic = snap_info.topic;
252            for instrument in cache.instruments_by_parent(&snap_info.venue, &root, class) {
253                self.publish_order_book(&instrument.id(), topic, cache);
254            }
255        } else {
256            self.publish_order_book(&snap_info.instrument_id, snap_info.topic, cache);
257        }
258    }
259
260    fn publish_order_book(
261        &self,
262        instrument_id: &InstrumentId,
263        topic: MStr<Topic>,
264        cache: &Ref<Cache>,
265    ) {
266        let book = cache
267            .order_book(instrument_id)
268            .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
269
270        if book.update_count == 0 {
271            log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
272            return;
273        }
274        log::debug!(
275            "Publishing OrderBook snapshot for {instrument_id} (update_count={})",
276            book.update_count
277        );
278
279        msgbus::publish_book(topic, book);
280    }
281}