1use std::{
17 cell::{Ref, RefCell},
18 num::NonZeroUsize,
19 rc::Rc,
20};
21
22use indexmap::IndexMap;
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, Handler, MStr, Topic, switchboard},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::{OrderBookDeltas, OrderBookDepth10, QuoteTick},
30 enums::InstrumentClass,
31 identifiers::{InstrumentId, Venue},
32 instruments::Instrument,
33 orderbook::OrderBook,
34};
35use ustr::Ustr;
36
37#[derive(Clone, Debug)]
39pub struct BookSnapshotInfo {
40 pub instrument_id: InstrumentId,
41 pub venue: Venue,
42 pub parent: Option<(Ustr, InstrumentClass)>,
45 pub topic: MStr<Topic>,
46 pub interval_ms: NonZeroUsize,
47}
48
49pub(crate) type BookSnapshotInfos = Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>;
54
55pub(crate) type BookSnapshotKey = (InstrumentId, NonZeroUsize);
57
58pub(crate) enum BookSnapshotUnsubscribeResult {
60 NotSubscribed,
62 Decremented,
64 Removed,
66}
67
68#[derive(Debug)]
74pub struct BookUpdater {
75 pub id: Ustr,
76 pub instrument_id: InstrumentId,
77 pub cache: Rc<RefCell<Cache>>,
78 pub emit_quotes_from_book: bool,
79}
80
81impl BookUpdater {
82 pub fn new(
84 instrument_id: &InstrumentId,
85 cache: Rc<RefCell<Cache>>,
86 emit_quotes_from_book: bool,
87 ) -> Self {
88 Self {
89 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
90 instrument_id: *instrument_id,
91 cache,
92 emit_quotes_from_book,
93 }
94 }
95}
96
97impl Handler<OrderBookDeltas> for BookUpdater {
98 fn id(&self) -> Ustr {
99 self.id
100 }
101
102 fn handle(&self, deltas: &OrderBookDeltas) {
103 let mut emit: Option<QuoteTick> = None;
104 {
105 let mut cache = self.cache.borrow_mut();
106 if let Some(book) = cache.order_book_mut(&deltas.instrument_id) {
107 if let Err(e) = book.apply_deltas(deltas) {
108 log::error!("Failed to apply deltas: {e}");
109 return;
110 }
111
112 if self.emit_quotes_from_book {
113 emit = derive_quote_from_book(book);
114 }
115 }
116 }
117
118 if let Some(quote) = emit {
119 publish_quote_if_changed(&self.cache, quote);
120 }
121 }
122}
123
124impl Handler<OrderBookDepth10> for BookUpdater {
125 fn id(&self) -> Ustr {
126 self.id
127 }
128
129 fn handle(&self, depth: &OrderBookDepth10) {
130 let mut emit: Option<QuoteTick> = None;
131 {
132 let mut cache = self.cache.borrow_mut();
133 if let Some(book) = cache.order_book_mut(&depth.instrument_id) {
134 if let Err(e) = book.apply_depth(depth) {
135 log::error!("Failed to apply depth: {e}");
136 return;
137 }
138
139 if self.emit_quotes_from_book {
140 emit = derive_quote_from_book(book);
141 }
142 }
143 }
144
145 if let Some(quote) = emit {
146 publish_quote_if_changed(&self.cache, quote);
147 }
148 }
149}
150
151fn derive_quote_from_book(book: &OrderBook) -> Option<QuoteTick> {
152 let bid_price = book.best_bid_price()?;
153 let ask_price = book.best_ask_price()?;
154 let bid_size = book.best_bid_size()?;
155 let ask_size = book.best_ask_size()?;
156
157 if bid_size.raw == 0 || ask_size.raw == 0 {
158 return None;
159 }
160
161 Some(QuoteTick::new(
162 book.instrument_id,
163 bid_price,
164 ask_price,
165 bid_size,
166 ask_size,
167 book.ts_last,
168 book.ts_last,
169 ))
170}
171
172pub(crate) fn publish_quote_if_changed(cache: &Rc<RefCell<Cache>>, quote: QuoteTick) {
177 let publish = {
178 let cache_ref = cache.borrow();
179 match cache_ref.quote("e.instrument_id) {
180 None => true,
181 Some(last) => {
182 last.bid_price != quote.bid_price
183 || last.ask_price != quote.ask_price
184 || last.bid_size != quote.bid_size
185 || last.ask_size != quote.ask_size
186 }
187 }
188 };
189
190 if !publish {
191 return;
192 }
193
194 if let Err(e) = cache.borrow_mut().add_quote(quote) {
195 log::error!("Error on cache insert: {e}");
196 }
197
198 let topic = switchboard::get_quotes_topic(quote.instrument_id);
199 msgbus::publish_quote(topic, "e);
200}
201
202#[derive(Debug)]
208pub struct BookSnapshotter {
209 pub timer_name: Ustr,
210 pub interval_ms: NonZeroUsize,
211 pub snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
212 pub cache: Rc<RefCell<Cache>>,
213}
214
215impl BookSnapshotter {
216 pub fn new(
218 interval_ms: NonZeroUsize,
219 snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
220 cache: Rc<RefCell<Cache>>,
221 ) -> Self {
222 let timer_name = format!("OrderBookSnapshots|{interval_ms}");
223
224 Self {
225 timer_name: Ustr::from(&timer_name),
226 interval_ms,
227 snapshot_infos,
228 cache,
229 }
230 }
231
232 pub fn snapshot(&self, _event: TimeEvent) {
233 let snapshot_infos: Vec<BookSnapshotInfo> =
234 self.snapshot_infos.borrow().values().cloned().collect();
235
236 log::debug!(
237 "BookSnapshotter.snapshot called for {} subscriptions at {}ms",
238 snapshot_infos.len(),
239 self.interval_ms,
240 );
241
242 let cache = self.cache.borrow();
243
244 for snap_info in snapshot_infos {
245 self.publish_snapshot(&snap_info, &cache);
246 }
247 }
248
249 fn publish_snapshot(&self, snap_info: &BookSnapshotInfo, cache: &Ref<Cache>) {
250 if let Some((root, class)) = snap_info.parent {
251 let topic = snap_info.topic;
252 for instrument in cache.instruments_by_parent(&snap_info.venue, &root, class) {
253 self.publish_order_book(&instrument.id(), topic, cache);
254 }
255 } else {
256 self.publish_order_book(&snap_info.instrument_id, snap_info.topic, cache);
257 }
258 }
259
260 fn publish_order_book(
261 &self,
262 instrument_id: &InstrumentId,
263 topic: MStr<Topic>,
264 cache: &Ref<Cache>,
265 ) {
266 let book = cache
267 .order_book(instrument_id)
268 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
269
270 if book.update_count == 0 {
271 log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
272 return;
273 }
274 log::debug!(
275 "Publishing OrderBook snapshot for {instrument_id} (update_count={})",
276 book.update_count
277 );
278
279 msgbus::publish_book(topic, book);
280 }
281}