1pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39 any::{Any, type_name},
40 cell::{Ref, RefCell},
41 collections::hash_map::Entry,
42 fmt::{Debug, Display},
43 num::NonZeroUsize,
44 rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54 cache::Cache,
55 clock::Clock,
56 logging::{RECV, RES},
57 messages::data::{
58 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61 UnsubscribeCommand,
62 },
63 msgbus::{
64 self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
65 switchboard::{self, MessagingSwitchboard},
66 },
67 runner::get_data_cmd_sender,
68 timer::{TimeEvent, TimeEventCallback},
69};
70use nautilus_core::{
71 UUID4, WeakCell,
72 correctness::{
73 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
74 },
75 datetime::millis_to_nanos_unchecked,
76};
77#[cfg(feature = "defi")]
78use nautilus_model::defi::DefiData;
79use nautilus_model::{
80 data::{
81 Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
82 InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
83 QuoteTick, TradeTick,
84 },
85 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
86 identifiers::{ClientId, InstrumentId, Venue},
87 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
88 orderbook::OrderBook,
89};
90#[cfg(feature = "streaming")]
91use nautilus_persistence::backend::catalog::ParquetDataCatalog;
92use ustr::Ustr;
93
94#[cfg(feature = "defi")]
95#[allow(unused_imports)] use crate::defi::engine as _;
97#[cfg(feature = "defi")]
98use crate::engine::pool::PoolUpdater;
99use crate::{
100 aggregation::{
101 BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
102 TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
103 ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
104 VolumeRunsBarAggregator,
105 },
106 client::DataClientAdapter,
107};
108
109#[derive(Clone)]
114pub enum BarAggregatorSubscription {
115 Bar {
116 topic: MStr<Topic>,
117 handler: TypedHandler<Bar>,
118 },
119 Trade {
120 topic: MStr<Topic>,
121 handler: TypedHandler<TradeTick>,
122 },
123 Quote {
124 topic: MStr<Topic>,
125 handler: TypedHandler<QuoteTick>,
126 },
127}
128
129impl Debug for BarAggregatorSubscription {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 match self {
132 Self::Bar { topic, handler } => f
133 .debug_struct(stringify!(Bar))
134 .field("topic", topic)
135 .field("handler_id", &handler.id())
136 .finish(),
137 Self::Trade { topic, handler } => f
138 .debug_struct(stringify!(Trade))
139 .field("topic", topic)
140 .field("handler_id", &handler.id())
141 .finish(),
142 Self::Quote { topic, handler } => f
143 .debug_struct(stringify!(Quote))
144 .field("topic", topic)
145 .field("handler_id", &handler.id())
146 .finish(),
147 }
148 }
149}
150
151#[derive(Debug)]
153pub struct DataEngine {
154 pub(crate) clock: Rc<RefCell<dyn Clock>>,
155 pub(crate) cache: Rc<RefCell<Cache>>,
156 pub(crate) external_clients: AHashSet<ClientId>,
157 clients: IndexMap<ClientId, DataClientAdapter>,
158 default_client: Option<DataClientAdapter>,
159 #[cfg(feature = "streaming")]
160 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
161 routing_map: IndexMap<Venue, ClientId>,
162 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
163 book_deltas_subs: AHashSet<InstrumentId>,
164 book_depth10_subs: AHashSet<InstrumentId>,
165 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
166 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
167 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
168 bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
169 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
170 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
171 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
172 pub(crate) msgbus_priority: u8,
173 pub(crate) config: DataEngineConfig,
174 #[cfg(feature = "defi")]
175 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
176 #[cfg(feature = "defi")]
177 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
178 #[cfg(feature = "defi")]
179 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
180 #[cfg(feature = "defi")]
181 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
182}
183
184impl DataEngine {
185 #[must_use]
187 pub fn new(
188 clock: Rc<RefCell<dyn Clock>>,
189 cache: Rc<RefCell<Cache>>,
190 config: Option<DataEngineConfig>,
191 ) -> Self {
192 let config = config.unwrap_or_default();
193
194 let external_clients: AHashSet<ClientId> = config
195 .external_clients
196 .clone()
197 .unwrap_or_default()
198 .into_iter()
199 .collect();
200
201 Self {
202 clock,
203 cache,
204 external_clients,
205 clients: IndexMap::new(),
206 default_client: None,
207 #[cfg(feature = "streaming")]
208 catalogs: AHashMap::new(),
209 routing_map: IndexMap::new(),
210 book_intervals: AHashMap::new(),
211 book_deltas_subs: AHashSet::new(),
212 book_depth10_subs: AHashSet::new(),
213 book_updaters: AHashMap::new(),
214 book_snapshotters: AHashMap::new(),
215 bar_aggregators: AHashMap::new(),
216 bar_aggregator_handlers: AHashMap::new(),
217 _synthetic_quote_feeds: AHashMap::new(),
218 _synthetic_trade_feeds: AHashMap::new(),
219 buffered_deltas_map: AHashMap::new(),
220 msgbus_priority: 10, config,
222 #[cfg(feature = "defi")]
223 pool_updaters: AHashMap::new(),
224 #[cfg(feature = "defi")]
225 pool_updaters_pending: AHashSet::new(),
226 #[cfg(feature = "defi")]
227 pool_snapshot_pending: AHashSet::new(),
228 #[cfg(feature = "defi")]
229 pool_event_buffers: AHashMap::new(),
230 }
231 }
232
233 pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
235 let weak = WeakCell::from(Rc::downgrade(&engine));
236
237 let weak1 = weak.clone();
238 msgbus::register_data_command_endpoint(
239 MessagingSwitchboard::data_engine_execute(),
240 TypedIntoHandler::from(move |cmd: DataCommand| {
241 if let Some(rc) = weak1.upgrade() {
242 rc.borrow_mut().execute(cmd);
243 }
244 }),
245 );
246
247 msgbus::register_data_command_endpoint(
248 MessagingSwitchboard::data_engine_queue_execute(),
249 TypedIntoHandler::from(move |cmd: DataCommand| {
250 get_data_cmd_sender().clone().execute(cmd);
251 }),
252 );
253
254 let weak2 = weak.clone();
256 msgbus::register_any(
257 MessagingSwitchboard::data_engine_process(),
258 ShareableMessageHandler::from_any(move |data: &dyn Any| {
259 if let Some(rc) = weak2.upgrade() {
260 rc.borrow_mut().process(data);
261 }
262 }),
263 );
264
265 let weak3 = weak.clone();
267 msgbus::register_data_endpoint(
268 MessagingSwitchboard::data_engine_process_data(),
269 TypedIntoHandler::from(move |data: Data| {
270 if let Some(rc) = weak3.upgrade() {
271 rc.borrow_mut().process_data(data);
272 }
273 }),
274 );
275
276 #[cfg(feature = "defi")]
278 {
279 let weak4 = weak.clone();
280 msgbus::register_defi_data_endpoint(
281 MessagingSwitchboard::data_engine_process_defi_data(),
282 TypedIntoHandler::from(move |data: DefiData| {
283 if let Some(rc) = weak4.upgrade() {
284 rc.borrow_mut().process_defi_data(data);
285 }
286 }),
287 );
288 }
289
290 let weak5 = weak;
291 msgbus::register_data_response_endpoint(
292 MessagingSwitchboard::data_engine_response(),
293 TypedIntoHandler::from(move |resp: DataResponse| {
294 if let Some(rc) = weak5.upgrade() {
295 rc.borrow_mut().response(resp);
296 }
297 }),
298 );
299 }
300
301 #[must_use]
303 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
304 self.clock.borrow()
305 }
306
307 #[must_use]
309 pub fn get_cache(&self) -> Ref<'_, Cache> {
310 self.cache.borrow()
311 }
312
313 #[must_use]
315 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
316 Rc::clone(&self.cache)
317 }
318
319 #[cfg(feature = "streaming")]
325 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
326 let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
327
328 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
329
330 self.catalogs.insert(name, catalog);
331 log::info!("Registered catalog <{name}>");
332 }
333
334 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
341 let client_id = client.client_id();
342
343 if let Some(default_client) = &self.default_client {
344 check_predicate_false(
345 default_client.client_id() == client.client_id(),
346 "client_id already registered as default client",
347 )
348 .expect(FAILED);
349 }
350
351 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
352
353 if let Some(routing) = routing {
354 self.routing_map.insert(routing, client_id);
355 log::debug!("Set client {client_id} routing for {routing}");
356 }
357
358 if client.venue.is_none() && self.default_client.is_none() {
359 self.default_client = Some(client);
360 log::debug!("Registered client {client_id} for default routing");
361 } else {
362 self.clients.insert(client_id, client);
363 log::debug!("Registered client {client_id}");
364 }
365 }
366
367 pub fn deregister_client(&mut self, client_id: &ClientId) {
373 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
374
375 self.clients.shift_remove(client_id);
376 log::info!("Deregistered client {client_id}");
377 }
378
379 pub fn register_default_client(&mut self, client: DataClientAdapter) {
391 check_predicate_true(
392 self.default_client.is_none(),
393 "default client already registered",
394 )
395 .expect(FAILED);
396
397 let client_id = client.client_id();
398
399 self.default_client = Some(client);
400 log::debug!("Registered default client {client_id}");
401 }
402
403 pub fn start(&mut self) {
405 for client in self.get_clients_mut() {
406 if let Err(e) = client.start() {
407 log::error!("{e}");
408 }
409 }
410
411 for aggregator in self.bar_aggregators.values() {
412 if aggregator.borrow().bar_type().spec().is_time_aggregated() {
413 aggregator
414 .borrow_mut()
415 .start_timer(Some(aggregator.clone()));
416 }
417 }
418 }
419
420 pub fn stop(&mut self) {
422 for client in self.get_clients_mut() {
423 if let Err(e) = client.stop() {
424 log::error!("{e}");
425 }
426 }
427
428 for aggregator in self.bar_aggregators.values() {
429 aggregator.borrow_mut().stop();
430 }
431 }
432
433 pub fn reset(&mut self) {
435 for client in self.get_clients_mut() {
436 if let Err(e) = client.reset() {
437 log::error!("{e}");
438 }
439 }
440
441 let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
442 for bar_type in bar_types {
443 if let Err(e) = self.stop_bar_aggregator(bar_type) {
444 log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
445 }
446 }
447 }
448
449 pub fn dispose(&mut self) {
451 for client in self.get_clients_mut() {
452 if let Err(e) = client.dispose() {
453 log::error!("{e}");
454 }
455 }
456
457 self.clock.borrow_mut().cancel_timers();
458 }
459
460 pub async fn connect(&mut self) {
464 let futures: Vec<_> = self
465 .get_clients_mut()
466 .into_iter()
467 .map(|client| client.connect())
468 .collect();
469
470 let results = join_all(futures).await;
471
472 for error in results.into_iter().filter_map(Result::err) {
473 log::error!("Failed to connect data client: {error}");
474 }
475 }
476
477 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
483 let futures: Vec<_> = self
484 .get_clients_mut()
485 .into_iter()
486 .map(|client| client.disconnect())
487 .collect();
488
489 let results = join_all(futures).await;
490 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
491
492 if errors.is_empty() {
493 Ok(())
494 } else {
495 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
496 anyhow::bail!(
497 "Failed to disconnect data clients: {}",
498 error_msgs.join("; ")
499 )
500 }
501 }
502
503 #[must_use]
505 pub fn check_connected(&self) -> bool {
506 self.get_clients()
507 .iter()
508 .all(|client| client.is_connected())
509 }
510
511 #[must_use]
513 pub fn check_disconnected(&self) -> bool {
514 self.get_clients()
515 .iter()
516 .all(|client| !client.is_connected())
517 }
518
519 #[must_use]
521 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
522 self.get_clients()
523 .into_iter()
524 .map(|client| (client.client_id(), client.is_connected()))
525 .collect()
526 }
527
528 #[must_use]
530 pub fn registered_clients(&self) -> Vec<ClientId> {
531 self.get_clients()
532 .into_iter()
533 .map(|client| client.client_id())
534 .collect()
535 }
536
537 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
540 where
541 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
542 T: Clone,
543 {
544 self.get_clients()
545 .into_iter()
546 .flat_map(get_subs)
547 .cloned()
548 .collect()
549 }
550
551 #[must_use]
552 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
553 let (default_opt, clients_map) = (&self.default_client, &self.clients);
554 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
555
556 if let Some(default) = default_opt {
557 clients.push(default);
558 }
559
560 clients
561 }
562
563 #[must_use]
564 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
565 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
566 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
567
568 if let Some(default) = default_opt {
569 clients.push(default);
570 }
571
572 clients
573 }
574
575 pub fn get_client(
576 &mut self,
577 client_id: Option<&ClientId>,
578 venue: Option<&Venue>,
579 ) -> Option<&mut DataClientAdapter> {
580 if let Some(client_id) = client_id {
581 if let Some(client) = self.clients.get_mut(client_id) {
583 return Some(client);
584 }
585
586 if let Some(default) = self.default_client.as_mut()
588 && default.client_id() == *client_id
589 {
590 return Some(default);
591 }
592
593 return None;
595 }
596
597 if let Some(v) = venue {
598 if let Some(client_id) = self.routing_map.get(v) {
600 return self.clients.get_mut(client_id);
601 }
602 }
603
604 self.get_default_client()
606 }
607
608 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
609 self.default_client.as_mut()
610 }
611
612 #[must_use]
614 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
615 self.collect_subscriptions(|client| &client.subscriptions_custom)
616 }
617
618 #[must_use]
620 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
621 self.collect_subscriptions(|client| &client.subscriptions_instrument)
622 }
623
624 #[must_use]
626 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
627 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
628 }
629
630 #[must_use]
632 pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
633 self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
634 }
635
636 #[must_use]
638 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
639 self.book_intervals
640 .values()
641 .flat_map(|set| set.iter().copied())
642 .collect()
643 }
644
645 #[must_use]
647 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
648 self.collect_subscriptions(|client| &client.subscriptions_quotes)
649 }
650
651 #[must_use]
653 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
654 self.collect_subscriptions(|client| &client.subscriptions_trades)
655 }
656
657 #[must_use]
659 pub fn subscribed_bars(&self) -> Vec<BarType> {
660 self.collect_subscriptions(|client| &client.subscriptions_bars)
661 }
662
663 #[must_use]
665 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
666 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
667 }
668
669 #[must_use]
671 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
672 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
673 }
674
675 #[must_use]
677 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
678 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
679 }
680
681 #[must_use]
683 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
684 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
685 }
686
687 #[must_use]
689 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
690 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
691 }
692
693 pub fn execute(&mut self, cmd: DataCommand) {
699 if let Err(e) = match cmd {
700 DataCommand::Subscribe(c) => self.execute_subscribe(&c),
701 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
702 DataCommand::Request(c) => self.execute_request(c),
703 #[cfg(feature = "defi")]
704 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
705 #[cfg(feature = "defi")]
706 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(&c),
707 #[cfg(feature = "defi")]
708 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
709 _ => {
710 log::warn!("Unhandled DataCommand variant");
711 Ok(())
712 }
713 } {
714 log::error!("{e}");
715 }
716 }
717
718 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
725 match &cmd {
727 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
728 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
729 SubscribeCommand::BookSnapshots(cmd) => {
730 return self.subscribe_book_snapshots(cmd);
732 }
733 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
734 _ => {} }
736
737 if let Some(client_id) = cmd.client_id()
738 && self.external_clients.contains(client_id)
739 {
740 if self.config.debug {
741 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
742 }
743 return Ok(());
744 }
745
746 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
747 client.execute_subscribe(cmd);
748 } else {
749 log::error!(
750 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
751 cmd.client_id(),
752 cmd.venue(),
753 );
754 }
755
756 Ok(())
757 }
758
759 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
765 match &cmd {
766 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
767 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
768 UnsubscribeCommand::BookSnapshots(cmd) => {
769 return self.unsubscribe_book_snapshots(cmd);
771 }
772 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
773 _ => {} }
775
776 if let Some(client_id) = cmd.client_id()
777 && self.external_clients.contains(client_id)
778 {
779 if self.config.debug {
780 log::debug!(
781 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
782 );
783 }
784 return Ok(());
785 }
786
787 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
788 client.execute_unsubscribe(cmd);
789 } else {
790 log::error!(
791 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
792 cmd.client_id(),
793 cmd.venue(),
794 );
795 }
796
797 Ok(())
798 }
799
800 pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
807 if let Some(cid) = req.client_id()
809 && self.external_clients.contains(cid)
810 {
811 if self.config.debug {
812 log::debug!("Skipping data request for external client {cid}: {req:?}");
813 }
814 return Ok(());
815 }
816
817 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
818 match req {
819 RequestCommand::Data(req) => client.request_data(req),
820 RequestCommand::Instrument(req) => client.request_instrument(req),
821 RequestCommand::Instruments(req) => client.request_instruments(req),
822 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
823 RequestCommand::BookDepth(req) => client.request_book_depth(req),
824 RequestCommand::Quotes(req) => client.request_quotes(req),
825 RequestCommand::Trades(req) => client.request_trades(req),
826 RequestCommand::FundingRates(req) => client.request_funding_rates(req),
827 RequestCommand::Bars(req) => client.request_bars(req),
828 }
829 } else {
830 anyhow::bail!(
831 "Cannot handle request: no client found for {:?} {:?}",
832 req.client_id(),
833 req.venue()
834 );
835 }
836 }
837
838 pub fn process(&mut self, data: &dyn Any) {
842 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
844 self.handle_instrument(instrument.clone());
845 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
846 self.handle_funding_rate(*funding_rate);
847 } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
848 self.handle_instrument_status(*status);
849 } else {
850 log::error!("Cannot process data {data:?}, type is unrecognized");
851 }
852
853 }
855
856 pub fn process_data(&mut self, data: Data) {
858 match data {
859 Data::Delta(delta) => self.handle_delta(delta),
860 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
861 Data::Depth10(depth) => self.handle_depth10(*depth),
862 Data::Quote(quote) => self.handle_quote(quote),
863 Data::Trade(trade) => self.handle_trade(trade),
864 Data::Bar(bar) => self.handle_bar(bar),
865 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
866 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
867 Data::InstrumentClose(close) => self.handle_instrument_close(close),
868 }
869 }
870
871 pub fn response(&self, resp: DataResponse) {
873 log::debug!("{RECV}{RES} {resp:?}");
874
875 let correlation_id = *resp.correlation_id();
876
877 match &resp {
878 DataResponse::Instrument(r) => {
879 self.handle_instrument_response(r.data.clone());
880 }
881 DataResponse::Instruments(r) => {
882 self.handle_instruments(&r.data);
883 }
884 DataResponse::Quotes(r) => {
885 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
886 self.handle_quotes(&r.data);
887 }
888 }
889 DataResponse::Trades(r) => {
890 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
891 self.handle_trades(&r.data);
892 }
893 }
894 DataResponse::FundingRates(r) => {
895 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
896 self.handle_funding_rates(&r.data);
897 }
898 }
899 DataResponse::Bars(r) => {
900 if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
901 self.handle_bars(&r.data);
902 }
903 }
904 DataResponse::Book(r) => self.handle_book_response(&r.data),
905 _ => todo!("Handle other response types"),
906 }
907
908 msgbus::send_response(&correlation_id, resp);
909 }
910
911 fn handle_instrument(&mut self, instrument: InstrumentAny) {
914 log::debug!("Handling instrument: {}", instrument.id());
915
916 if let Err(e) = self
917 .cache
918 .as_ref()
919 .borrow_mut()
920 .add_instrument(instrument.clone())
921 {
922 log_error_on_cache_insert(&e);
923 }
924
925 let topic = switchboard::get_instrument_topic(instrument.id());
926 log::debug!("Publishing instrument to topic: {topic}");
927 msgbus::publish_any(topic, &instrument);
928 }
929
930 fn handle_delta(&mut self, delta: OrderBookDelta) {
931 let deltas = if self.config.buffer_deltas {
932 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
933 buffered_deltas.deltas.push(delta);
934 buffered_deltas.flags = delta.flags;
935 buffered_deltas.sequence = delta.sequence;
936 buffered_deltas.ts_event = delta.ts_event;
937 buffered_deltas.ts_init = delta.ts_init;
938 } else {
939 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
940 self.buffered_deltas_map
941 .insert(delta.instrument_id, buffered_deltas);
942 }
943
944 if !RecordFlag::F_LAST.matches(delta.flags) {
945 return; }
947
948 self.buffered_deltas_map
949 .remove(&delta.instrument_id)
950 .expect("buffered deltas exist")
951 } else {
952 OrderBookDeltas::new(delta.instrument_id, vec![delta])
953 };
954
955 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
956 msgbus::publish_deltas(topic, &deltas);
957 }
958
959 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
960 if self.config.buffer_deltas {
961 let instrument_id = deltas.instrument_id;
962
963 for delta in deltas.deltas {
964 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
965 buffered_deltas.deltas.push(delta);
966 buffered_deltas.flags = delta.flags;
967 buffered_deltas.sequence = delta.sequence;
968 buffered_deltas.ts_event = delta.ts_event;
969 buffered_deltas.ts_init = delta.ts_init;
970 } else {
971 let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
972 self.buffered_deltas_map
973 .insert(instrument_id, buffered_deltas);
974 }
975
976 if RecordFlag::F_LAST.matches(delta.flags) {
977 let deltas_to_publish = self
978 .buffered_deltas_map
979 .remove(&instrument_id)
980 .expect("buffered deltas exist");
981 let topic = switchboard::get_book_deltas_topic(instrument_id);
982 msgbus::publish_deltas(topic, &deltas_to_publish);
983 }
984 }
985 } else {
986 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
987 msgbus::publish_deltas(topic, &deltas);
988 }
989 }
990
991 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
992 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
993 msgbus::publish_depth10(topic, &depth);
994 }
995
996 fn handle_quote(&mut self, quote: QuoteTick) {
997 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
998 log_error_on_cache_insert(&e);
999 }
1000
1001 let topic = switchboard::get_quotes_topic(quote.instrument_id);
1004 msgbus::publish_quote(topic, "e);
1005 }
1006
1007 fn handle_trade(&mut self, trade: TradeTick) {
1008 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1009 log_error_on_cache_insert(&e);
1010 }
1011
1012 let topic = switchboard::get_trades_topic(trade.instrument_id);
1015 msgbus::publish_trade(topic, &trade);
1016 }
1017
1018 fn handle_bar(&mut self, bar: Bar) {
1019 if self.config.validate_data_sequence
1021 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1022 {
1023 if bar.ts_event < last_bar.ts_event {
1024 log::warn!(
1025 "Bar {bar} was prior to last bar `ts_event` {}",
1026 last_bar.ts_event
1027 );
1028 return; }
1030
1031 if bar.ts_init < last_bar.ts_init {
1032 log::warn!(
1033 "Bar {bar} was prior to last bar `ts_init` {}",
1034 last_bar.ts_init
1035 );
1036 return; }
1038 }
1040
1041 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1042 log_error_on_cache_insert(&e);
1043 }
1044
1045 let topic = switchboard::get_bars_topic(bar.bar_type);
1046 msgbus::publish_bar(topic, &bar);
1047 }
1048
1049 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
1050 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1051 log_error_on_cache_insert(&e);
1052 }
1053
1054 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1055 msgbus::publish_mark_price(topic, &mark_price);
1056 }
1057
1058 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
1059 if let Err(e) = self
1060 .cache
1061 .as_ref()
1062 .borrow_mut()
1063 .add_index_price(index_price)
1064 {
1065 log_error_on_cache_insert(&e);
1066 }
1067
1068 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1069 msgbus::publish_index_price(topic, &index_price);
1070 }
1071
1072 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1074 if let Err(e) = self
1075 .cache
1076 .as_ref()
1077 .borrow_mut()
1078 .add_funding_rate(funding_rate)
1079 {
1080 log_error_on_cache_insert(&e);
1081 }
1082
1083 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1084 msgbus::publish_funding_rate(topic, &funding_rate);
1085 }
1086
1087 fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1088 let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1089 msgbus::publish_any(topic, &status);
1090 }
1091
1092 fn handle_instrument_close(&mut self, close: InstrumentClose) {
1093 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1094 msgbus::publish_any(topic, &close);
1095 }
1096
1097 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1100 if cmd.instrument_id.is_synthetic() {
1101 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1102 }
1103
1104 self.book_deltas_subs.insert(cmd.instrument_id);
1105 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1106
1107 Ok(())
1108 }
1109
1110 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1111 if cmd.instrument_id.is_synthetic() {
1112 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1113 }
1114
1115 self.book_depth10_subs.insert(cmd.instrument_id);
1116 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1117
1118 Ok(())
1119 }
1120
1121 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1122 if cmd.instrument_id.is_synthetic() {
1123 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1124 }
1125
1126 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
1128 Entry::Vacant(e) => {
1129 let mut set = AHashSet::new();
1130 set.insert(cmd.instrument_id);
1131 e.insert(set);
1132 true
1133 }
1134 Entry::Occupied(mut e) => {
1135 e.get_mut().insert(cmd.instrument_id);
1136 false
1137 }
1138 };
1139
1140 if first_for_interval {
1141 let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
1143 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1144
1145 let snap_info = BookSnapshotInfo {
1146 instrument_id: cmd.instrument_id,
1147 venue: cmd.instrument_id.venue,
1148 is_composite: cmd.instrument_id.symbol.is_composite(),
1149 root: Ustr::from(cmd.instrument_id.symbol.root()),
1150 topic,
1151 interval_ms: cmd.interval_ms,
1152 };
1153
1154 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1156 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1157
1158 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1159 self.book_snapshotters
1160 .insert(cmd.instrument_id, snapshotter.clone());
1161 let timer_name = snapshotter.timer_name;
1162
1163 let callback_fn: Rc<dyn Fn(TimeEvent)> =
1164 Rc::new(move |event| snapshotter.snapshot(event));
1165 let callback = TimeEventCallback::from(callback_fn);
1166
1167 self.clock
1168 .borrow_mut()
1169 .set_timer_ns(
1170 &timer_name,
1171 interval_ns,
1172 Some(start_time_ns.into()),
1173 None,
1174 Some(callback),
1175 None,
1176 None,
1177 )
1178 .expect(FAILED);
1179 }
1180
1181 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1183 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1184 }
1185
1186 if let Some(client_id) = cmd.client_id.as_ref()
1187 && self.external_clients.contains(client_id)
1188 {
1189 if self.config.debug {
1190 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1191 }
1192 return Ok(());
1193 }
1194
1195 log::debug!(
1196 "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1197 cmd.instrument_id,
1198 cmd.client_id,
1199 cmd.venue,
1200 );
1201
1202 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1203 let deltas_cmd = SubscribeBookDeltas::new(
1204 cmd.instrument_id,
1205 cmd.book_type,
1206 cmd.client_id,
1207 cmd.venue,
1208 UUID4::new(),
1209 cmd.ts_init,
1210 cmd.depth,
1211 true, Some(cmd.command_id),
1213 cmd.params.clone(),
1214 );
1215 log::debug!(
1216 "Calling client.execute_subscribe for BookDeltas: {}",
1217 cmd.instrument_id
1218 );
1219 client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1220 } else {
1221 log::error!(
1222 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1223 cmd.client_id,
1224 cmd.venue,
1225 );
1226 }
1227
1228 Ok(())
1229 }
1230
1231 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1232 match cmd.bar_type.aggregation_source() {
1233 AggregationSource::Internal => {
1234 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1235 self.start_bar_aggregator(cmd.bar_type)?;
1236 }
1237 }
1238 AggregationSource::External => {
1239 if cmd.bar_type.instrument_id().is_synthetic() {
1240 anyhow::bail!(
1241 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1242 );
1243 }
1244 }
1245 }
1246
1247 Ok(())
1248 }
1249
1250 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1251 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1252 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1253 return Ok(());
1254 }
1255
1256 self.book_deltas_subs.remove(&cmd.instrument_id);
1257
1258 let topics = vec![
1259 switchboard::get_book_deltas_topic(cmd.instrument_id),
1260 switchboard::get_book_depth10_topic(cmd.instrument_id),
1261 ];
1263
1264 self.maintain_book_updater(&cmd.instrument_id, &topics);
1265 self.maintain_book_snapshotter(&cmd.instrument_id);
1266
1267 Ok(())
1268 }
1269
1270 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1271 if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1272 log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1273 return Ok(());
1274 }
1275
1276 self.book_depth10_subs.remove(&cmd.instrument_id);
1277
1278 let topics = vec![
1279 switchboard::get_book_deltas_topic(cmd.instrument_id),
1280 switchboard::get_book_depth10_topic(cmd.instrument_id),
1281 ];
1282
1283 self.maintain_book_updater(&cmd.instrument_id, &topics);
1284 self.maintain_book_snapshotter(&cmd.instrument_id);
1285
1286 Ok(())
1287 }
1288
1289 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1290 let is_subscribed = self
1291 .book_intervals
1292 .values()
1293 .any(|set| set.contains(&cmd.instrument_id));
1294
1295 if !is_subscribed {
1296 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1297 return Ok(());
1298 }
1299
1300 let mut to_remove = Vec::new();
1302 for (interval, set) in &mut self.book_intervals {
1303 if set.remove(&cmd.instrument_id) && set.is_empty() {
1304 to_remove.push(*interval);
1305 }
1306 }
1307
1308 for interval in to_remove {
1309 self.book_intervals.remove(&interval);
1310 }
1311
1312 let topics = vec![
1313 switchboard::get_book_deltas_topic(cmd.instrument_id),
1314 switchboard::get_book_depth10_topic(cmd.instrument_id),
1315 ];
1316
1317 self.maintain_book_updater(&cmd.instrument_id, &topics);
1318 self.maintain_book_snapshotter(&cmd.instrument_id);
1319
1320 let still_in_intervals = self
1321 .book_intervals
1322 .values()
1323 .any(|set| set.contains(&cmd.instrument_id));
1324
1325 if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1326 if let Some(client_id) = cmd.client_id.as_ref()
1327 && self.external_clients.contains(client_id)
1328 {
1329 return Ok(());
1330 }
1331
1332 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1333 let deltas_cmd = UnsubscribeBookDeltas::new(
1334 cmd.instrument_id,
1335 cmd.client_id,
1336 cmd.venue,
1337 UUID4::new(),
1338 cmd.ts_init,
1339 Some(cmd.command_id),
1340 cmd.params.clone(),
1341 );
1342 client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1343 }
1344 }
1345
1346 Ok(())
1347 }
1348
1349 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1350 let bar_type = cmd.bar_type;
1351
1352 let topic = switchboard::get_bars_topic(bar_type.standard());
1354 if msgbus::exact_subscriber_count_bars(topic) > 0 {
1355 return Ok(());
1356 }
1357
1358 if self.bar_aggregators.contains_key(&bar_type.standard())
1359 && let Err(e) = self.stop_bar_aggregator(bar_type)
1360 {
1361 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1362 }
1363
1364 if bar_type.is_composite() {
1366 let source_type = bar_type.composite();
1367 let source_topic = switchboard::get_bars_topic(source_type);
1368 if msgbus::exact_subscriber_count_bars(source_topic) == 0
1369 && self.bar_aggregators.contains_key(&source_type)
1370 && let Err(e) = self.stop_bar_aggregator(source_type)
1371 {
1372 log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1373 }
1374 }
1375
1376 Ok(())
1377 }
1378
1379 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, _topics: &[MStr<Topic>]) {
1380 let Some(updater) = self.book_updaters.get(instrument_id) else {
1381 return;
1382 };
1383
1384 let has_deltas = self.book_deltas_subs.contains(instrument_id);
1386 let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1387
1388 let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1389 let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1390 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1391 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1392
1393 if !has_deltas {
1395 msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1396 }
1397
1398 if !has_depth10 {
1399 msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1400 }
1401
1402 if !has_deltas && !has_depth10 {
1404 self.book_updaters.remove(instrument_id);
1405 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1406 }
1407 }
1408
1409 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1410 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1411 let topic = switchboard::get_book_snapshots_topic(
1412 *instrument_id,
1413 snapshotter.snap_info.interval_ms,
1414 );
1415
1416 if msgbus::subscriber_count_book_snapshots(topic) == 0 {
1418 let timer_name = snapshotter.timer_name;
1419 self.book_snapshotters.remove(instrument_id);
1420 let mut clock = self.clock.borrow_mut();
1421 if clock.timer_exists(&timer_name) {
1422 clock.cancel_timer(&timer_name);
1423 }
1424 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1425 }
1426 }
1427 }
1428
1429 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1432 let mut cache = self.cache.as_ref().borrow_mut();
1433 if let Err(e) = cache.add_instrument(instrument) {
1434 log_error_on_cache_insert(&e);
1435 }
1436 }
1437
1438 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1439 let mut cache = self.cache.as_ref().borrow_mut();
1441 for instrument in instruments {
1442 if let Err(e) = cache.add_instrument(instrument.clone()) {
1443 log_error_on_cache_insert(&e);
1444 }
1445 }
1446 }
1447
1448 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1449 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1450 log_error_on_cache_insert(&e);
1451 }
1452 }
1453
1454 fn handle_trades(&self, trades: &[TradeTick]) {
1455 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1456 log_error_on_cache_insert(&e);
1457 }
1458 }
1459
1460 fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1461 if let Err(e) = self
1462 .cache
1463 .as_ref()
1464 .borrow_mut()
1465 .add_funding_rates(funding_rates)
1466 {
1467 log_error_on_cache_insert(&e);
1468 }
1469 }
1470
1471 fn handle_bars(&self, bars: &[Bar]) {
1472 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1473 log_error_on_cache_insert(&e);
1474 }
1475 }
1476
1477 fn handle_book_response(&self, book: &OrderBook) {
1478 log::debug!("Adding order book {} to cache", book.instrument_id);
1479
1480 if let Err(e) = self
1481 .cache
1482 .as_ref()
1483 .borrow_mut()
1484 .add_order_book(book.clone())
1485 {
1486 log_error_on_cache_insert(&e);
1487 }
1488 }
1489
1490 #[allow(clippy::too_many_arguments)]
1493 fn setup_book_updater(
1494 &mut self,
1495 instrument_id: &InstrumentId,
1496 book_type: BookType,
1497 only_deltas: bool,
1498 managed: bool,
1499 ) -> anyhow::Result<()> {
1500 let mut cache = self.cache.borrow_mut();
1501 if managed && !cache.has_order_book(instrument_id) {
1502 let book = OrderBook::new(*instrument_id, book_type);
1503 log::debug!("Created {book}");
1504 cache.add_order_book(book)?;
1505 }
1506
1507 let updater = self
1509 .book_updaters
1510 .entry(*instrument_id)
1511 .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
1512 .clone();
1513
1514 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1516 let deltas_handler = TypedHandler::new(updater.clone());
1517 msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
1518
1519 if !only_deltas {
1521 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1522 let depth_handler = TypedHandler::new(updater);
1523 msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
1524 }
1525
1526 Ok(())
1527 }
1528
1529 fn create_bar_aggregator(
1530 &mut self,
1531 instrument: &InstrumentAny,
1532 bar_type: BarType,
1533 ) -> Box<dyn BarAggregator> {
1534 let cache = self.cache.clone();
1535
1536 let handler = move |bar: Bar| {
1537 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1538 log_error_on_cache_insert(&e);
1539 }
1540
1541 let topic = switchboard::get_bars_topic(bar.bar_type);
1542 msgbus::publish_bar(topic, &bar);
1543 };
1544
1545 let clock = self.clock.clone();
1546 let config = self.config.clone();
1547
1548 let price_precision = instrument.price_precision();
1549 let size_precision = instrument.size_precision();
1550
1551 if bar_type.spec().is_time_aggregated() {
1552 let time_bars_origin_offset = config
1554 .time_bars_origins
1555 .get(&bar_type.spec().aggregation)
1556 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1557
1558 Box::new(TimeBarAggregator::new(
1559 bar_type,
1560 price_precision,
1561 size_precision,
1562 clock,
1563 handler,
1564 config.time_bars_build_with_no_updates,
1565 config.time_bars_timestamp_on_close,
1566 config.time_bars_interval_type,
1567 time_bars_origin_offset,
1568 config.time_bars_build_delay,
1569 config.time_bars_skip_first_non_full_bar,
1570 ))
1571 } else {
1572 match bar_type.spec().aggregation {
1573 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1574 bar_type,
1575 price_precision,
1576 size_precision,
1577 handler,
1578 )) as Box<dyn BarAggregator>,
1579 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1580 bar_type,
1581 price_precision,
1582 size_precision,
1583 handler,
1584 )) as Box<dyn BarAggregator>,
1585 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1586 bar_type,
1587 price_precision,
1588 size_precision,
1589 handler,
1590 )) as Box<dyn BarAggregator>,
1591 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1592 bar_type,
1593 price_precision,
1594 size_precision,
1595 handler,
1596 )) as Box<dyn BarAggregator>,
1597 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1598 bar_type,
1599 price_precision,
1600 size_precision,
1601 handler,
1602 )) as Box<dyn BarAggregator>,
1603 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1604 bar_type,
1605 price_precision,
1606 size_precision,
1607 handler,
1608 )) as Box<dyn BarAggregator>,
1609 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1610 bar_type,
1611 price_precision,
1612 size_precision,
1613 handler,
1614 )) as Box<dyn BarAggregator>,
1615 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1616 bar_type,
1617 price_precision,
1618 size_precision,
1619 handler,
1620 )) as Box<dyn BarAggregator>,
1621 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1622 bar_type,
1623 price_precision,
1624 size_precision,
1625 handler,
1626 )) as Box<dyn BarAggregator>,
1627 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1628 bar_type,
1629 price_precision,
1630 size_precision,
1631 instrument.price_increment(),
1632 handler,
1633 )) as Box<dyn BarAggregator>,
1634 _ => panic!(
1635 "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1636 bar_type.spec().aggregation
1637 ),
1638 }
1639 }
1640 }
1641
1642 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1643 let instrument = {
1645 let cache = self.cache.borrow();
1646 cache
1647 .instrument(&bar_type.instrument_id())
1648 .ok_or_else(|| {
1649 anyhow::anyhow!(
1650 "Cannot start bar aggregation: no instrument found for {}",
1651 bar_type.instrument_id(),
1652 )
1653 })?
1654 .clone()
1655 };
1656
1657 let bar_key = bar_type.standard();
1659
1660 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1662 rc.clone()
1663 } else {
1664 let agg = self.create_bar_aggregator(&instrument, bar_type);
1665 let rc = Rc::new(RefCell::new(agg));
1666 self.bar_aggregators.insert(bar_key, rc.clone());
1667 rc
1668 };
1669
1670 let mut subscriptions = Vec::new();
1672
1673 if bar_type.is_composite() {
1674 let topic = switchboard::get_bars_topic(bar_type.composite());
1675 let handler = TypedHandler::new(BarBarHandler::new(aggregator.clone(), bar_key));
1676 msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
1677 subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
1678 } else if bar_type.spec().price_type == PriceType::Last {
1679 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1680 let handler = TypedHandler::new(BarTradeHandler::new(aggregator.clone(), bar_key));
1681 msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
1682 subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
1683 } else {
1684 if matches!(
1686 bar_type.spec().aggregation,
1687 BarAggregation::TickImbalance
1688 | BarAggregation::VolumeImbalance
1689 | BarAggregation::ValueImbalance
1690 | BarAggregation::TickRuns
1691 | BarAggregation::VolumeRuns
1692 | BarAggregation::ValueRuns
1693 ) {
1694 log::warn!(
1695 "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
1696 data with `aggressor_side`, but `price_type` is not LAST so it will receive \
1697 quote data: bars will not emit correctly",
1698 );
1699 }
1700
1701 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1702 let handler = TypedHandler::new(BarQuoteHandler::new(aggregator.clone(), bar_key));
1703 msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
1704 subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
1705 }
1706
1707 self.bar_aggregator_handlers.insert(bar_key, subscriptions);
1708
1709 self.setup_bar_aggregator(bar_type, false)?;
1711
1712 aggregator.borrow_mut().set_is_running(true);
1713
1714 Ok(())
1715 }
1716
1717 fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1721 let bar_key = bar_type.standard();
1722 let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1723 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1724 })?;
1725
1726 let handler: Box<dyn FnMut(Bar)> = if historical {
1728 let cache = self.cache.clone();
1730 Box::new(move |bar: Bar| {
1731 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1732 log_error_on_cache_insert(&e);
1733 }
1734 })
1736 } else {
1737 let cache = self.cache.clone();
1739 Box::new(move |bar: Bar| {
1740 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1741 log_error_on_cache_insert(&e);
1742 }
1743 let topic = switchboard::get_bars_topic(bar.bar_type);
1744 msgbus::publish_bar(topic, &bar);
1745 })
1746 };
1747
1748 aggregator
1749 .borrow_mut()
1750 .set_historical_mode(historical, handler);
1751
1752 if bar_type.spec().is_time_aggregated() {
1754 use nautilus_common::clock::TestClock;
1755
1756 if historical {
1757 let test_clock = Rc::new(RefCell::new(TestClock::new()));
1759 aggregator.borrow_mut().set_clock(test_clock);
1760 let aggregator_weak = Rc::downgrade(aggregator);
1763 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1764 } else {
1765 aggregator.borrow_mut().set_clock(self.clock.clone());
1766 aggregator
1767 .borrow_mut()
1768 .start_timer(Some(aggregator.clone()));
1769 }
1770 }
1771
1772 Ok(())
1773 }
1774
1775 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1776 let aggregator = self
1777 .bar_aggregators
1778 .remove(&bar_type.standard())
1779 .ok_or_else(|| {
1780 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1781 })?;
1782
1783 aggregator.borrow_mut().stop();
1784
1785 let bar_key = bar_type.standard();
1787 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1788 for sub in subs {
1789 match sub {
1790 BarAggregatorSubscription::Bar { topic, handler } => {
1791 msgbus::unsubscribe_bars(topic.into(), &handler);
1792 }
1793 BarAggregatorSubscription::Trade { topic, handler } => {
1794 msgbus::unsubscribe_trades(topic.into(), &handler);
1795 }
1796 BarAggregatorSubscription::Quote { topic, handler } => {
1797 msgbus::unsubscribe_quotes(topic.into(), &handler);
1798 }
1799 }
1800 }
1801 }
1802
1803 Ok(())
1804 }
1805}
1806
1807#[inline(always)]
1808fn log_error_on_cache_insert<T: Display>(e: &T) {
1809 log::error!("Error on cache insert: {e}");
1810}
1811
1812#[inline(always)]
1813fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
1814 if data.is_empty() {
1815 let name = type_name::<T>();
1816 let short_name = name.rsplit("::").next().unwrap_or(name);
1817 log::warn!("Received empty {short_name} response for {id} {correlation_id}");
1818 return true;
1819 }
1820 false
1821}