1pub mod book;
32pub mod config;
33mod handlers;
34
35use std::{
36 any::Any,
37 cell::{Ref, RefCell},
38 collections::hash_map::Entry,
39 fmt::Display,
40 num::NonZeroUsize,
41 rc::Rc,
42};
43
44use ahash::{AHashMap, AHashSet};
45use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
46use config::DataEngineConfig;
47use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
48use indexmap::IndexMap;
49use nautilus_common::{
50 cache::Cache,
51 clock::Clock,
52 logging::{RECV, RES},
53 messages::data::{
54 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
55 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
56 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
57 UnsubscribeCommand,
58 },
59 msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
60 timer::TimeEventCallback,
61};
62use nautilus_core::{
63 correctness::{
64 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
65 },
66 datetime::millis_to_nanos,
67};
68use nautilus_model::{
69 data::{
70 Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
71 TradeTick,
72 close::InstrumentClose,
73 prices::{IndexPriceUpdate, MarkPriceUpdate},
74 },
75 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
76 identifiers::{ClientId, InstrumentId, Venue},
77 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
78 orderbook::OrderBook,
79};
80use nautilus_persistence::backend::catalog::ParquetDataCatalog;
81use ustr::Ustr;
82
83use crate::{
84 aggregation::{
85 BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
86 VolumeBarAggregator,
87 },
88 client::DataClientAdapter,
89};
90
91#[derive(Debug)]
93pub struct DataEngine {
94 clock: Rc<RefCell<dyn Clock>>,
95 cache: Rc<RefCell<Cache>>,
96 clients: IndexMap<ClientId, DataClientAdapter>,
97 default_client: Option<DataClientAdapter>,
98 external_clients: AHashSet<ClientId>,
99 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
100 routing_map: IndexMap<Venue, ClientId>,
101 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
102 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
103 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
104 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
105 bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
106 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
107 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
108 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
109 msgbus_priority: u8,
110 config: DataEngineConfig,
111}
112
113impl DataEngine {
114 #[must_use]
116 pub fn new(
117 clock: Rc<RefCell<dyn Clock>>,
118 cache: Rc<RefCell<Cache>>,
119 config: Option<DataEngineConfig>,
120 ) -> Self {
121 let config = config.unwrap_or_default();
122
123 let external_clients: AHashSet<ClientId> = config
124 .external_clients
125 .clone()
126 .unwrap_or_default()
127 .into_iter()
128 .collect();
129
130 Self {
131 clock,
132 cache,
133 clients: IndexMap::new(),
134 default_client: None,
135 external_clients,
136 catalogs: AHashMap::new(),
137 routing_map: IndexMap::new(),
138 book_intervals: AHashMap::new(),
139 book_updaters: AHashMap::new(),
140 book_snapshotters: AHashMap::new(),
141 bar_aggregators: AHashMap::new(),
142 bar_aggregator_handlers: AHashMap::new(),
143 _synthetic_quote_feeds: AHashMap::new(),
144 _synthetic_trade_feeds: AHashMap::new(),
145 buffered_deltas_map: AHashMap::new(),
146 msgbus_priority: 10, config,
148 }
149 }
150
151 #[must_use]
153 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
154 self.clock.borrow()
155 }
156
157 #[must_use]
159 pub fn get_cache(&self) -> Ref<'_, Cache> {
160 self.cache.borrow()
161 }
162
163 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
169 let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
170
171 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
172
173 self.catalogs.insert(name, catalog);
174 log::info!("Registered catalog <{name}>");
175 }
176
177 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
184 let client_id = client.client_id();
185
186 if let Some(default_client) = &self.default_client {
187 check_predicate_false(
188 default_client.client_id() == client.client_id(),
189 "client_id already registered as default client",
190 )
191 .expect(FAILED);
192 }
193
194 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
195
196 if let Some(routing) = routing {
197 self.routing_map.insert(routing, client_id);
198 log::info!("Set client {client_id} routing for {routing}");
199 }
200
201 if client.venue.is_none() && self.default_client.is_none() {
202 self.default_client = Some(client);
203 log::info!("Registered client {client_id} for default routing");
204 } else {
205 self.clients.insert(client_id, client);
206 log::info!("Registered client {client_id}");
207 }
208 }
209
210 pub fn deregister_client(&mut self, client_id: &ClientId) {
216 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
217
218 self.clients.shift_remove(client_id);
219 log::info!("Deregistered client {client_id}");
220 }
221
222 pub fn register_default_client(&mut self, client: DataClientAdapter) {
234 check_predicate_true(
235 self.default_client.is_none(),
236 "default client already registered",
237 )
238 .expect(FAILED);
239
240 let client_id = client.client_id();
241
242 self.default_client = Some(client);
243 log::info!("Registered default client {client_id}");
244 }
245
246 pub fn start(&self) {
248 for client in self.get_clients() {
249 if let Err(e) = client.start() {
250 log::error!("{e}");
251 }
252 }
253 }
254
255 pub fn stop(&self) {
257 for client in self.get_clients() {
258 if let Err(e) = client.stop() {
259 log::error!("{e}");
260 }
261 }
262 }
263
264 pub fn reset(&self) {
266 for client in self.get_clients() {
267 if let Err(e) = client.reset() {
268 log::error!("{e}");
269 }
270 }
271 }
272
273 pub fn dispose(&self) {
275 for client in self.get_clients() {
276 if let Err(e) = client.dispose() {
277 log::error!("{e}");
278 }
279 }
280
281 self.clock.borrow_mut().cancel_timers();
282 }
283
284 #[must_use]
286 pub fn check_connected(&self) -> bool {
287 self.get_clients()
288 .iter()
289 .all(|client| client.is_connected())
290 }
291
292 #[must_use]
294 pub fn check_disconnected(&self) -> bool {
295 self.get_clients()
296 .iter()
297 .all(|client| !client.is_connected())
298 }
299
300 #[must_use]
302 pub fn registered_clients(&self) -> Vec<ClientId> {
303 self.get_clients()
304 .into_iter()
305 .map(|client| client.client_id())
306 .collect()
307 }
308
309 fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
312 where
313 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
314 T: Clone,
315 {
316 self.get_clients()
317 .into_iter()
318 .flat_map(get_subs)
319 .cloned()
320 .collect()
321 }
322
323 #[must_use]
324 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
325 let (default_opt, clients_map) = (&self.default_client, &self.clients);
326 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
327
328 if let Some(default) = default_opt {
329 clients.push(default);
330 }
331
332 clients
333 }
334
335 #[must_use]
336 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
337 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
338 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
339
340 if let Some(default) = default_opt {
341 clients.push(default);
342 }
343
344 clients
345 }
346
347 pub fn get_client(
348 &mut self,
349 client_id: Option<&ClientId>,
350 venue: Option<&Venue>,
351 ) -> Option<&mut DataClientAdapter> {
352 if let Some(client_id) = client_id {
353 if let Some(client) = self.clients.get_mut(client_id) {
355 return Some(client);
356 }
357
358 if let Some(default) = self.default_client.as_mut() {
360 if default.client_id() == *client_id {
361 return Some(default);
362 }
363 }
364
365 return None;
367 }
368
369 if let Some(v) = venue {
370 if let Some(client_id) = self.routing_map.get(v) {
372 return self.clients.get_mut(client_id);
373 }
374 }
375
376 self.get_default_client()
378 }
379
380 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
381 self.default_client.as_mut()
382 }
383
384 #[must_use]
386 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
387 self.collect_subscriptions(|client| &client.subscriptions_custom)
388 }
389
390 #[must_use]
392 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
393 self.collect_subscriptions(|client| &client.subscriptions_instrument)
394 }
395
396 #[must_use]
398 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
399 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
400 }
401
402 #[must_use]
404 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
405 self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
406 }
407
408 #[must_use]
410 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
411 self.collect_subscriptions(|client| &client.subscriptions_quotes)
412 }
413
414 #[must_use]
416 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
417 self.collect_subscriptions(|client| &client.subscriptions_trades)
418 }
419
420 #[must_use]
422 pub fn subscribed_bars(&self) -> Vec<BarType> {
423 self.collect_subscriptions(|client| &client.subscriptions_bars)
424 }
425
426 #[must_use]
428 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
429 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
430 }
431
432 #[must_use]
434 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
435 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
436 }
437
438 #[must_use]
440 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
441 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
442 }
443
444 #[must_use]
446 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
447 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
448 }
449
450 pub fn execute(&mut self, cmd: &DataCommand) {
456 if let Err(e) = match cmd {
457 DataCommand::Subscribe(c) => self.execute_subscribe(c),
458 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
459 DataCommand::Request(c) => self.execute_request(c),
460 } {
461 log::error!("{e}");
462 }
463 }
464
465 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
472 match &cmd {
474 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
475 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
476 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
477 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
478 _ => {} }
480
481 if let Some(client_id) = cmd.client_id() {
483 if self.external_clients.contains(client_id) {
484 return Ok(());
485 }
486 }
487
488 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
490 client.execute_subscribe(cmd);
491 } else {
492 log::error!(
493 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
494 cmd.client_id(),
495 cmd.venue(),
496 );
497 }
498
499 Ok(())
500 }
501
502 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
508 match &cmd {
509 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
510 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
511 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
512 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
513 _ => {} }
515
516 if let Some(client_id) = cmd.client_id() {
518 if self.external_clients.contains(client_id) {
519 return Ok(());
520 }
521 }
522
523 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
525 client.execute_unsubscribe(cmd);
526 } else {
527 log::error!(
528 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
529 cmd.client_id(),
530 cmd.venue(),
531 );
532 }
533
534 Ok(())
535 }
536
537 pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
544 if let Some(cid) = req.client_id() {
546 if self.external_clients.contains(cid) {
547 return Ok(());
548 }
549 }
550 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
551 match req {
552 RequestCommand::Data(req) => client.request_data(req),
553 RequestCommand::Instrument(req) => client.request_instrument(req),
554 RequestCommand::Instruments(req) => client.request_instruments(req),
555 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
556 RequestCommand::Quotes(req) => client.request_quotes(req),
557 RequestCommand::Trades(req) => client.request_trades(req),
558 RequestCommand::Bars(req) => client.request_bars(req),
559 }
560 } else {
561 anyhow::bail!(
562 "Cannot handle request: no client found for {:?} {:?}",
563 req.client_id(),
564 req.venue()
565 );
566 }
567 }
568
569 pub fn process(&mut self, data: &dyn Any) {
573 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
575 self.handle_instrument(instrument.clone());
576 } else {
577 log::error!("Cannot process data {data:?}, type is unrecognized");
578 }
579 }
580
581 pub fn process_data(&mut self, data: Data) {
583 match data {
584 Data::Delta(delta) => self.handle_delta(delta),
585 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
586 Data::Depth10(depth) => self.handle_depth10(*depth),
587 Data::Quote(quote) => self.handle_quote(quote),
588 Data::Trade(trade) => self.handle_trade(trade),
589 Data::Bar(bar) => self.handle_bar(bar),
590 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
591 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
592 Data::InstrumentClose(close) => self.handle_instrument_close(close),
593 }
594 }
595
596 pub fn response(&self, resp: DataResponse) {
598 log::debug!("{RECV}{RES} {resp:?}");
599
600 match &resp {
601 DataResponse::Instrument(resp) => {
602 self.handle_instrument_response(resp.data.clone());
603 }
604 DataResponse::Instruments(resp) => {
605 self.handle_instruments(&resp.data);
606 }
607 DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
608 DataResponse::Trades(resp) => self.handle_trades(&resp.data),
609 DataResponse::Bars(resp) => self.handle_bars(&resp.data),
610 _ => todo!(),
611 }
612
613 msgbus::send_response(resp.correlation_id(), &resp);
614 }
615
616 fn handle_instrument(&mut self, instrument: InstrumentAny) {
619 if let Err(e) = self
620 .cache
621 .as_ref()
622 .borrow_mut()
623 .add_instrument(instrument.clone())
624 {
625 log_error_on_cache_insert(&e);
626 }
627
628 let topic = switchboard::get_instrument_topic(instrument.id());
629 msgbus::publish(topic, &instrument as &dyn Any);
630 }
631
632 fn handle_delta(&mut self, delta: OrderBookDelta) {
633 let deltas = if self.config.buffer_deltas {
634 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
635 buffered_deltas.deltas.push(delta);
636 } else {
637 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
638 self.buffered_deltas_map
639 .insert(delta.instrument_id, buffered_deltas);
640 }
641
642 if !RecordFlag::F_LAST.matches(delta.flags) {
643 return; }
645
646 self.buffered_deltas_map
648 .remove(&delta.instrument_id)
649 .unwrap()
650 } else {
651 OrderBookDeltas::new(delta.instrument_id, vec![delta])
652 };
653
654 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
655 msgbus::publish(topic, &deltas as &dyn Any);
656 }
657
658 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
659 let deltas = if self.config.buffer_deltas {
660 let mut is_last_delta = false;
661 for delta in &deltas.deltas {
662 if RecordFlag::F_LAST.matches(delta.flags) {
663 is_last_delta = true;
664 break;
665 }
666 }
667
668 let instrument_id = deltas.instrument_id;
669
670 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
671 buffered_deltas.deltas.extend(deltas.deltas);
672 } else {
673 self.buffered_deltas_map.insert(instrument_id, deltas);
674 }
675
676 if !is_last_delta {
677 return;
678 }
679
680 self.buffered_deltas_map.remove(&instrument_id).unwrap()
682 } else {
683 deltas
684 };
685
686 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
687 msgbus::publish(topic, &deltas as &dyn Any);
688 }
689
690 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
691 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
692 msgbus::publish(topic, &depth as &dyn Any);
693 }
694
695 fn handle_quote(&mut self, quote: QuoteTick) {
696 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
697 log_error_on_cache_insert(&e);
698 }
699
700 let topic = switchboard::get_quotes_topic(quote.instrument_id);
703 msgbus::publish(topic, "e as &dyn Any);
704 }
705
706 fn handle_trade(&mut self, trade: TradeTick) {
707 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
708 log_error_on_cache_insert(&e);
709 }
710
711 let topic = switchboard::get_trades_topic(trade.instrument_id);
714 msgbus::publish(topic, &trade as &dyn Any);
715 }
716
717 fn handle_bar(&mut self, bar: Bar) {
718 if self.config.validate_data_sequence {
720 if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
721 if bar.ts_event < last_bar.ts_event {
722 log::warn!(
723 "Bar {bar} was prior to last bar `ts_event` {}",
724 last_bar.ts_event
725 );
726 return; }
728 if bar.ts_init < last_bar.ts_init {
729 log::warn!(
730 "Bar {bar} was prior to last bar `ts_init` {}",
731 last_bar.ts_init
732 );
733 return; }
735 }
737 }
738
739 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
740 log_error_on_cache_insert(&e);
741 }
742
743 let topic = switchboard::get_bars_topic(bar.bar_type);
744 msgbus::publish(topic, &bar as &dyn Any);
745 }
746
747 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
748 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
749 log_error_on_cache_insert(&e);
750 }
751
752 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
753 msgbus::publish(topic, &mark_price as &dyn Any);
754 }
755
756 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
757 if let Err(e) = self
758 .cache
759 .as_ref()
760 .borrow_mut()
761 .add_index_price(index_price)
762 {
763 log_error_on_cache_insert(&e);
764 }
765
766 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
767 msgbus::publish(topic, &index_price as &dyn Any);
768 }
769
770 fn handle_instrument_close(&mut self, close: InstrumentClose) {
771 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
772 msgbus::publish(topic, &close as &dyn Any);
773 }
774
775 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
778 if cmd.instrument_id.is_synthetic() {
779 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
780 }
781
782 self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
783
784 Ok(())
785 }
786
787 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
788 if cmd.instrument_id.is_synthetic() {
789 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
790 }
791
792 self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
793
794 Ok(())
795 }
796
797 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
798 if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
799 return Ok(());
800 }
801
802 if cmd.instrument_id.is_synthetic() {
803 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
804 }
805
806 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
808 Entry::Vacant(e) => {
809 let mut set = AHashSet::new();
810 set.insert(cmd.instrument_id);
811 e.insert(set);
812 true
813 }
814 Entry::Occupied(mut e) => {
815 e.get_mut().insert(cmd.instrument_id);
816 false
817 }
818 };
819
820 if first_for_interval {
821 let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
823 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id);
824
825 let snap_info = BookSnapshotInfo {
826 instrument_id: cmd.instrument_id,
827 venue: cmd.instrument_id.venue,
828 is_composite: cmd.instrument_id.symbol.is_composite(),
829 root: Ustr::from(cmd.instrument_id.symbol.root()),
830 topic,
831 interval_ms: cmd.interval_ms,
832 };
833
834 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
836 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
837
838 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
839 self.book_snapshotters
840 .insert(cmd.instrument_id, snapshotter.clone());
841 let timer_name = snapshotter.timer_name;
842
843 let callback =
844 TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
845
846 self.clock
847 .borrow_mut()
848 .set_timer_ns(
849 &timer_name,
850 interval_ns,
851 start_time_ns.into(),
852 None,
853 Some(callback),
854 None,
855 )
856 .expect(FAILED);
857 }
858
859 self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
860
861 Ok(())
862 }
863
864 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
865 match cmd.bar_type.aggregation_source() {
866 AggregationSource::Internal => {
867 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
868 self.start_bar_aggregator(cmd.bar_type)?;
869 }
870 }
871 AggregationSource::External => {
872 if cmd.bar_type.instrument_id().is_synthetic() {
873 anyhow::bail!(
874 "Cannot subscribe for externally aggregated synthetic instrument bar data"
875 );
876 }
877 }
878 }
879
880 Ok(())
881 }
882
883 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
884 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
885 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
886 return Ok(());
887 }
888
889 let topics = vec![
890 switchboard::get_book_deltas_topic(cmd.instrument_id),
891 switchboard::get_book_depth10_topic(cmd.instrument_id),
892 switchboard::get_book_snapshots_topic(cmd.instrument_id),
893 ];
894
895 self.maintain_book_updater(&cmd.instrument_id, &topics);
896 self.maintain_book_snapshotter(&cmd.instrument_id);
897
898 Ok(())
899 }
900
901 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
902 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
903 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
904 return Ok(());
905 }
906
907 let topics = vec![
908 switchboard::get_book_deltas_topic(cmd.instrument_id),
909 switchboard::get_book_depth10_topic(cmd.instrument_id),
910 switchboard::get_book_snapshots_topic(cmd.instrument_id),
911 ];
912
913 self.maintain_book_updater(&cmd.instrument_id, &topics);
914 self.maintain_book_snapshotter(&cmd.instrument_id);
915
916 Ok(())
917 }
918
919 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
920 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
921 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
922 return Ok(());
923 }
924
925 let mut to_remove = Vec::new();
927 for (interval, set) in &mut self.book_intervals {
928 if set.remove(&cmd.instrument_id) && set.is_empty() {
929 to_remove.push(*interval);
930 }
931 }
932
933 for interval in to_remove {
934 self.book_intervals.remove(&interval);
935 }
936
937 let topics = vec![
938 switchboard::get_book_deltas_topic(cmd.instrument_id),
939 switchboard::get_book_depth10_topic(cmd.instrument_id),
940 switchboard::get_book_snapshots_topic(cmd.instrument_id),
941 ];
942
943 self.maintain_book_updater(&cmd.instrument_id, &topics);
944 self.maintain_book_snapshotter(&cmd.instrument_id);
945
946 Ok(())
947 }
948
949 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
951 let bar_type = cmd.bar_type;
953 if self.bar_aggregators.contains_key(&bar_type.standard()) {
954 if let Err(err) = self.stop_bar_aggregator(bar_type) {
955 log::error!("Error stopping bar aggregator for {bar_type}: {err}");
956 }
957 self.bar_aggregators.remove(&bar_type.standard());
958 log::debug!("Removed bar aggregator for {bar_type}");
959 }
960 Ok(())
961 }
962
963 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
964 if let Some(updater) = self.book_updaters.get(instrument_id) {
965 let handler = ShareableMessageHandler(updater.clone());
966
967 for topic in topics {
969 if msgbus::subscriptions_count(topic.as_str()) == 1
970 && msgbus::is_subscribed(topic.as_str(), handler.clone())
971 {
972 log::debug!("Unsubscribing BookUpdater from {topic}");
973 msgbus::unsubscribe_topic(*topic, handler.clone());
974 }
975 }
976
977 let still_subscribed = topics
979 .iter()
980 .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
981 if !still_subscribed {
982 self.book_updaters.remove(instrument_id);
983 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
984 }
985 }
986 }
987
988 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
989 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
990 let topic = switchboard::get_book_snapshots_topic(*instrument_id);
991
992 if msgbus::subscriptions_count(topic.as_str()) == 0 {
994 let timer_name = snapshotter.timer_name;
995 self.book_snapshotters.remove(instrument_id);
996 let mut clock = self.clock.borrow_mut();
997 if clock.timer_names().contains(&timer_name.as_str()) {
998 clock.cancel_timer(&timer_name);
999 }
1000 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1001 }
1002 }
1003 }
1004
1005 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1008 let mut cache = self.cache.as_ref().borrow_mut();
1009 if let Err(e) = cache.add_instrument(instrument) {
1010 log_error_on_cache_insert(&e);
1011 }
1012 }
1013
1014 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1015 let mut cache = self.cache.as_ref().borrow_mut();
1017 for instrument in instruments {
1018 if let Err(e) = cache.add_instrument(instrument.clone()) {
1019 log_error_on_cache_insert(&e);
1020 }
1021 }
1022 }
1023
1024 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1025 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1026 log_error_on_cache_insert(&e);
1027 }
1028 }
1029
1030 fn handle_trades(&self, trades: &[TradeTick]) {
1031 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1032 log_error_on_cache_insert(&e);
1033 }
1034 }
1035
1036 fn handle_bars(&self, bars: &[Bar]) {
1037 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1038 log_error_on_cache_insert(&e);
1039 }
1040 }
1041
1042 #[allow(clippy::too_many_arguments)]
1045 fn setup_order_book(
1046 &mut self,
1047 instrument_id: &InstrumentId,
1048 book_type: BookType,
1049 only_deltas: bool,
1050 managed: bool,
1051 ) -> anyhow::Result<()> {
1052 let mut cache = self.cache.borrow_mut();
1053 if managed && !cache.has_order_book(instrument_id) {
1054 let book = OrderBook::new(*instrument_id, book_type);
1055 log::debug!("Created {book}");
1056 cache.add_order_book(book)?;
1057 }
1058
1059 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1061 self.book_updaters.insert(*instrument_id, updater.clone());
1062
1063 let handler = ShareableMessageHandler(updater);
1064
1065 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1066 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1067 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1068 }
1069
1070 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1071 if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1072 msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1073 }
1074
1075 Ok(())
1076 }
1077
1078 fn create_bar_aggregator(
1079 &mut self,
1080 instrument: &InstrumentAny,
1081 bar_type: BarType,
1082 ) -> Box<dyn BarAggregator> {
1083 let cache = self.cache.clone();
1084
1085 let handler = move |bar: Bar| {
1086 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1087 log_error_on_cache_insert(&e);
1088 }
1089
1090 let topic = switchboard::get_bars_topic(bar.bar_type);
1091 msgbus::publish(topic, &bar as &dyn Any);
1092 };
1093
1094 let clock = self.clock.clone();
1095 let config = self.config.clone();
1096
1097 let price_precision = instrument.price_precision();
1098 let size_precision = instrument.size_precision();
1099
1100 if bar_type.spec().is_time_aggregated() {
1101 Box::new(TimeBarAggregator::new(
1102 bar_type,
1103 price_precision,
1104 size_precision,
1105 clock,
1106 handler,
1107 false, config.time_bars_build_with_no_updates,
1109 config.time_bars_timestamp_on_close,
1110 config.time_bars_interval_type,
1111 None, 20, false, ))
1115 } else {
1116 match bar_type.spec().aggregation {
1117 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1118 bar_type,
1119 price_precision,
1120 size_precision,
1121 handler,
1122 false,
1123 )) as Box<dyn BarAggregator>,
1124 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1125 bar_type,
1126 price_precision,
1127 size_precision,
1128 handler,
1129 false,
1130 )) as Box<dyn BarAggregator>,
1131 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1132 bar_type,
1133 price_precision,
1134 size_precision,
1135 handler,
1136 false,
1137 )) as Box<dyn BarAggregator>,
1138 _ => panic!(
1139 "Cannot create aggregator: {} aggregation not currently supported",
1140 bar_type.spec().aggregation
1141 ),
1142 }
1143 }
1144 }
1145
1146 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1147 let instrument = {
1149 let cache = self.cache.borrow();
1150 cache
1151 .instrument(&bar_type.instrument_id())
1152 .ok_or_else(|| {
1153 anyhow::anyhow!(
1154 "Cannot start bar aggregation: no instrument found for {}",
1155 bar_type.instrument_id(),
1156 )
1157 })?
1158 .clone()
1159 };
1160
1161 let bar_key = bar_type.standard();
1163
1164 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1166 rc.clone()
1167 } else {
1168 let agg = self.create_bar_aggregator(&instrument, bar_type);
1169 let rc = Rc::new(RefCell::new(agg));
1170 self.bar_aggregators.insert(bar_key, rc.clone());
1171 rc
1172 };
1173
1174 let mut handlers = Vec::new();
1176
1177 if bar_type.is_composite() {
1178 let topic = switchboard::get_bars_topic(bar_type.composite());
1179 let handler =
1180 ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1181
1182 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1183 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1184 }
1185
1186 handlers.push((topic, handler));
1187 } else if bar_type.spec().price_type == PriceType::Last {
1188 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1189 let handler =
1190 ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1191
1192 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1193 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1194 }
1195
1196 handlers.push((topic, handler));
1197 } else {
1198 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1199 let handler =
1200 ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1201
1202 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1203 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1204 }
1205
1206 handlers.push((topic, handler));
1207 }
1208
1209 self.bar_aggregator_handlers.insert(bar_key, handlers);
1210 aggregator.borrow_mut().set_is_running(true);
1211
1212 Ok(())
1213 }
1214
1215 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1216 let aggregator = self
1217 .bar_aggregators
1218 .remove(&bar_type.standard())
1219 .ok_or_else(|| {
1220 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1221 })?;
1222
1223 aggregator.borrow_mut().stop();
1224
1225 let bar_key = bar_type.standard();
1227 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1228 for (topic, handler) in subs {
1229 if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1230 msgbus::unsubscribe_topic(topic, handler);
1231 }
1232 }
1233 }
1234
1235 Ok(())
1236 }
1237}
1238
1239#[inline(always)]
1240fn log_error_on_cache_insert<T: Display>(e: &T) {
1241 log::error!("Error on cache insert: {e}");
1242}