nautilus_data/engine/
book.rs1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 num::NonZeroUsize,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, MStr, Topic, handler::MessageHandler},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::Data,
30 identifiers::{InstrumentId, Venue},
31 instruments::Instrument,
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: MStr<Topic>,
43 pub interval_ms: NonZeroUsize,
44}
45
46#[derive(Debug)]
47pub struct BookUpdater {
48 pub id: Ustr,
49 pub instrument_id: InstrumentId,
50 pub cache: Rc<RefCell<Cache>>,
51}
52
53impl BookUpdater {
54 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
56 Self {
57 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
58 instrument_id: *instrument_id,
59 cache,
60 }
61 }
62}
63
64impl MessageHandler for BookUpdater {
65 fn id(&self) -> Ustr {
66 self.id
67 }
68
69 fn handle(&self, message: &dyn Any) {
70 if let Some(data) = message.downcast_ref::<Data>() {
72 if let Some(book) = self
73 .cache
74 .borrow_mut()
75 .order_book_mut(&data.instrument_id())
76 {
77 match data {
78 Data::Delta(delta) => book.apply_delta(delta),
79 Data::Deltas(deltas) => book.apply_deltas(deltas),
80 Data::Depth10(depth) => book.apply_depth(depth),
81 _ => log::error!("Invalid data type for book update, was {data:?}"),
82 }
83 }
84 }
85 }
86
87 fn as_any(&self) -> &dyn Any {
88 self
89 }
90}
91
92#[derive(Debug)]
93pub struct BookSnapshotter {
94 pub id: Ustr,
95 pub timer_name: Ustr,
96 pub snap_info: BookSnapshotInfo,
97 pub cache: Rc<RefCell<Cache>>,
98}
99
100impl BookSnapshotter {
101 pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
103 let id_str = format!(
104 "{}-{}",
105 stringify!(BookSnapshotter),
106 snap_info.instrument_id
107 );
108 let timer_name = format!(
109 "OrderBook|{}|{}",
110 snap_info.instrument_id, snap_info.interval_ms
111 );
112
113 Self {
114 id: Ustr::from(&id_str),
115 timer_name: Ustr::from(&timer_name),
116 snap_info,
117 cache,
118 }
119 }
120
121 pub fn snapshot(&self, _event: TimeEvent) {
122 let cache = self.cache.borrow();
123
124 if self.snap_info.is_composite {
125 let topic = self.snap_info.topic;
126 let underlying = self.snap_info.root;
127 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
128 self.publish_order_book(&instrument.id(), topic, &cache);
129 }
130 } else {
131 self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
132 }
133 }
134
135 fn publish_order_book(
136 &self,
137 instrument_id: &InstrumentId,
138 topic: MStr<Topic>,
139 cache: &Ref<Cache>,
140 ) {
141 let book = cache
142 .order_book(instrument_id)
143 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
144
145 if book.update_count == 0 {
146 log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
147 return;
148 }
149
150 msgbus::publish(topic, book as &dyn Any);
151 }
152}