1pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39 any::{Any, type_name},
40 cell::{Ref, RefCell},
41 collections::{VecDeque, hash_map::Entry},
42 fmt::{Debug, Display},
43 num::NonZeroUsize,
44 rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54 cache::Cache,
55 clock::Clock,
56 logging::{RECV, RES},
57 messages::data::{
58 DataCommand, DataResponse, ForwardPricesResponse, RequestCommand, RequestForwardPrices,
59 SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
60 SubscribeCommand, SubscribeOptionChain, UnsubscribeBars, UnsubscribeBookDeltas,
61 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
62 UnsubscribeInstrumentStatus, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
63 UnsubscribeQuotes,
64 },
65 msgbus::{
66 self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
67 switchboard::{self, MessagingSwitchboard},
68 },
69 runner::get_data_cmd_sender,
70 timer::{TimeEvent, TimeEventCallback},
71};
72use nautilus_core::{
73 UUID4, WeakCell,
74 correctness::{
75 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
76 },
77 datetime::millis_to_nanos_unchecked,
78};
79#[cfg(feature = "defi")]
80use nautilus_model::defi::DefiData;
81use nautilus_model::{
82 data::{
83 Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate,
84 InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
85 OrderBookDepth10, QuoteTick, TradeTick,
86 option_chain::{OptionGreeks, StrikeRange},
87 },
88 enums::{
89 AggregationSource, BarAggregation, BookType, MarketStatusAction, PriceType, RecordFlag,
90 },
91 identifiers::{ClientId, InstrumentId, OptionSeriesId, Venue},
92 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
93 orderbook::OrderBook,
94 types::Price,
95};
96#[cfg(feature = "streaming")]
97use nautilus_persistence::backend::catalog::ParquetDataCatalog;
98use ustr::Ustr;
99
100#[cfg(feature = "defi")]
101#[allow(unused_imports)] use crate::defi::engine as _;
103#[cfg(feature = "defi")]
104use crate::engine::pool::PoolUpdater;
105use crate::{
106 aggregation::{
107 BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
108 TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
109 ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
110 VolumeRunsBarAggregator,
111 },
112 client::DataClientAdapter,
113 option_chains::OptionChainManager,
114};
115
116#[derive(Debug, Clone)]
121pub(crate) enum DeferredCommand {
122 Subscribe(SubscribeCommand),
123 Unsubscribe(UnsubscribeCommand),
124 ExpireSeries(OptionSeriesId),
125}
126
127pub(crate) type DeferredCommandQueue = Rc<RefCell<VecDeque<DeferredCommand>>>;
129
130#[derive(Clone)]
135pub enum BarAggregatorSubscription {
136 Bar {
137 topic: MStr<Topic>,
138 handler: TypedHandler<Bar>,
139 },
140 Trade {
141 topic: MStr<Topic>,
142 handler: TypedHandler<TradeTick>,
143 },
144 Quote {
145 topic: MStr<Topic>,
146 handler: TypedHandler<QuoteTick>,
147 },
148}
149
150impl Debug for BarAggregatorSubscription {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 match self {
153 Self::Bar { topic, handler } => f
154 .debug_struct(stringify!(Bar))
155 .field("topic", topic)
156 .field("handler_id", &handler.id())
157 .finish(),
158 Self::Trade { topic, handler } => f
159 .debug_struct(stringify!(Trade))
160 .field("topic", topic)
161 .field("handler_id", &handler.id())
162 .finish(),
163 Self::Quote { topic, handler } => f
164 .debug_struct(stringify!(Quote))
165 .field("topic", topic)
166 .field("handler_id", &handler.id())
167 .finish(),
168 }
169 }
170}
171
172#[derive(Debug)]
174pub struct DataEngine {
175 pub(crate) clock: Rc<RefCell<dyn Clock>>,
176 pub(crate) cache: Rc<RefCell<Cache>>,
177 pub(crate) external_clients: AHashSet<ClientId>,
178 clients: IndexMap<ClientId, DataClientAdapter>,
179 default_client: Option<DataClientAdapter>,
180 #[cfg(feature = "streaming")]
181 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
182 routing_map: IndexMap<Venue, ClientId>,
183 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
184 book_deltas_subs: AHashSet<InstrumentId>,
185 book_depth10_subs: AHashSet<InstrumentId>,
186 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
187 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
188 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
189 bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
190 option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
191 option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
192 deferred_cmd_queue: DeferredCommandQueue,
193 pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
194 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
195 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
196 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
197 pub(crate) msgbus_priority: u8,
198 pub(crate) config: DataEngineConfig,
199 #[cfg(feature = "defi")]
200 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
201 #[cfg(feature = "defi")]
202 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
203 #[cfg(feature = "defi")]
204 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
205 #[cfg(feature = "defi")]
206 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
207}
208
209impl DataEngine {
210 #[must_use]
212 pub fn new(
213 clock: Rc<RefCell<dyn Clock>>,
214 cache: Rc<RefCell<Cache>>,
215 config: Option<DataEngineConfig>,
216 ) -> Self {
217 let config = config.unwrap_or_default();
218
219 let external_clients: AHashSet<ClientId> = config
220 .external_clients
221 .clone()
222 .unwrap_or_default()
223 .into_iter()
224 .collect();
225
226 Self {
227 clock,
228 cache,
229 external_clients,
230 clients: IndexMap::new(),
231 default_client: None,
232 #[cfg(feature = "streaming")]
233 catalogs: AHashMap::new(),
234 routing_map: IndexMap::new(),
235 book_intervals: AHashMap::new(),
236 book_deltas_subs: AHashSet::new(),
237 book_depth10_subs: AHashSet::new(),
238 book_updaters: AHashMap::new(),
239 book_snapshotters: AHashMap::new(),
240 bar_aggregators: AHashMap::new(),
241 bar_aggregator_handlers: AHashMap::new(),
242 option_chain_managers: AHashMap::new(),
243 option_chain_instrument_index: AHashMap::new(),
244 deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
245 pending_option_chain_requests: AHashMap::new(),
246 _synthetic_quote_feeds: AHashMap::new(),
247 _synthetic_trade_feeds: AHashMap::new(),
248 buffered_deltas_map: AHashMap::new(),
249 msgbus_priority: 10, config,
251 #[cfg(feature = "defi")]
252 pool_updaters: AHashMap::new(),
253 #[cfg(feature = "defi")]
254 pool_updaters_pending: AHashSet::new(),
255 #[cfg(feature = "defi")]
256 pool_snapshot_pending: AHashSet::new(),
257 #[cfg(feature = "defi")]
258 pool_event_buffers: AHashMap::new(),
259 }
260 }
261
262 pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
264 let weak = WeakCell::from(Rc::downgrade(engine));
265
266 let weak1 = weak.clone();
267 msgbus::register_data_command_endpoint(
268 MessagingSwitchboard::data_engine_execute(),
269 TypedIntoHandler::from(move |cmd: DataCommand| {
270 if let Some(rc) = weak1.upgrade() {
271 rc.borrow_mut().execute(cmd);
272 }
273 }),
274 );
275
276 msgbus::register_data_command_endpoint(
277 MessagingSwitchboard::data_engine_queue_execute(),
278 TypedIntoHandler::from(move |cmd: DataCommand| {
279 get_data_cmd_sender().clone().execute(cmd);
280 }),
281 );
282
283 let weak2 = weak.clone();
285 msgbus::register_any(
286 MessagingSwitchboard::data_engine_process(),
287 ShareableMessageHandler::from_any(move |data: &dyn Any| {
288 if let Some(rc) = weak2.upgrade() {
289 rc.borrow_mut().process(data);
290 }
291 }),
292 );
293
294 let weak3 = weak.clone();
296 msgbus::register_data_endpoint(
297 MessagingSwitchboard::data_engine_process_data(),
298 TypedIntoHandler::from(move |data: Data| {
299 if let Some(rc) = weak3.upgrade() {
300 rc.borrow_mut().process_data(data);
301 }
302 }),
303 );
304
305 #[cfg(feature = "defi")]
307 {
308 let weak4 = weak.clone();
309 msgbus::register_defi_data_endpoint(
310 MessagingSwitchboard::data_engine_process_defi_data(),
311 TypedIntoHandler::from(move |data: DefiData| {
312 if let Some(rc) = weak4.upgrade() {
313 rc.borrow_mut().process_defi_data(data);
314 }
315 }),
316 );
317 }
318
319 let weak5 = weak;
320 msgbus::register_data_response_endpoint(
321 MessagingSwitchboard::data_engine_response(),
322 TypedIntoHandler::from(move |resp: DataResponse| {
323 if let Some(rc) = weak5.upgrade() {
324 rc.borrow_mut().response(resp);
325 }
326 }),
327 );
328 }
329
330 #[must_use]
332 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
333 self.clock.borrow()
334 }
335
336 #[must_use]
338 pub fn get_cache(&self) -> Ref<'_, Cache> {
339 self.cache.borrow()
340 }
341
342 #[must_use]
344 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
345 Rc::clone(&self.cache)
346 }
347
348 #[cfg(feature = "streaming")]
354 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
355 let name = Ustr::from(name.unwrap_or("catalog_0"));
356
357 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
358
359 self.catalogs.insert(name, catalog);
360 log::info!("Registered catalog <{name}>");
361 }
362
363 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
370 let client_id = client.client_id();
371
372 if let Some(default_client) = &self.default_client {
373 check_predicate_false(
374 default_client.client_id() == client.client_id(),
375 "client_id already registered as default client",
376 )
377 .expect(FAILED);
378 }
379
380 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
381
382 if let Some(routing) = routing {
383 self.routing_map.insert(routing, client_id);
384 log::debug!("Set client {client_id} routing for {routing}");
385 }
386
387 if client.venue.is_none() && self.default_client.is_none() {
388 self.default_client = Some(client);
389 log::debug!("Registered client {client_id} for default routing");
390 } else {
391 self.clients.insert(client_id, client);
392 log::debug!("Registered client {client_id}");
393 }
394 }
395
396 pub fn deregister_client(&mut self, client_id: &ClientId) {
402 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
403
404 self.clients.shift_remove(client_id);
405 log::info!("Deregistered client {client_id}");
406 }
407
408 pub fn register_default_client(&mut self, client: DataClientAdapter) {
420 check_predicate_true(
421 self.default_client.is_none(),
422 "default client already registered",
423 )
424 .expect(FAILED);
425
426 let client_id = client.client_id();
427
428 self.default_client = Some(client);
429 log::debug!("Registered default client {client_id}");
430 }
431
432 pub fn start(&mut self) {
434 for client in self.get_clients_mut() {
435 if let Err(e) = client.start() {
436 log::error!("{e}");
437 }
438 }
439
440 for aggregator in self.bar_aggregators.values() {
441 if aggregator.borrow().bar_type().spec().is_time_aggregated() {
442 aggregator
443 .borrow_mut()
444 .start_timer(Some(aggregator.clone()));
445 }
446 }
447 }
448
449 pub fn stop(&mut self) {
451 for client in self.get_clients_mut() {
452 if let Err(e) = client.stop() {
453 log::error!("{e}");
454 }
455 }
456
457 for aggregator in self.bar_aggregators.values() {
458 aggregator.borrow_mut().stop();
459 }
460 }
461
462 pub fn reset(&mut self) {
464 for client in self.get_clients_mut() {
465 if let Err(e) = client.reset() {
466 log::error!("{e}");
467 }
468 }
469
470 let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
471 for bar_type in bar_types {
472 if let Err(e) = self.stop_bar_aggregator(bar_type) {
473 log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
474 }
475 }
476 }
477
478 pub fn dispose(&mut self) {
480 for client in self.get_clients_mut() {
481 if let Err(e) = client.dispose() {
482 log::error!("{e}");
483 }
484 }
485
486 self.clock.borrow_mut().cancel_timers();
487 }
488
489 pub async fn connect(&mut self) {
493 let futures: Vec<_> = self
494 .get_clients_mut()
495 .into_iter()
496 .map(|client| client.connect())
497 .collect();
498
499 let results = join_all(futures).await;
500
501 for error in results.into_iter().filter_map(Result::err) {
502 log::error!("Failed to connect data client: {error}");
503 }
504 }
505
506 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
512 let futures: Vec<_> = self
513 .get_clients_mut()
514 .into_iter()
515 .map(|client| client.disconnect())
516 .collect();
517
518 let results = join_all(futures).await;
519 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
520
521 if errors.is_empty() {
522 Ok(())
523 } else {
524 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
525 anyhow::bail!(
526 "Failed to disconnect data clients: {}",
527 error_msgs.join("; ")
528 )
529 }
530 }
531
532 #[must_use]
534 pub fn check_connected(&self) -> bool {
535 self.get_clients()
536 .iter()
537 .all(|client| client.is_connected())
538 }
539
540 #[must_use]
542 pub fn check_disconnected(&self) -> bool {
543 self.get_clients()
544 .iter()
545 .all(|client| !client.is_connected())
546 }
547
548 #[must_use]
550 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
551 self.get_clients()
552 .into_iter()
553 .map(|client| (client.client_id(), client.is_connected()))
554 .collect()
555 }
556
557 #[must_use]
559 pub fn registered_clients(&self) -> Vec<ClientId> {
560 self.get_clients()
561 .into_iter()
562 .map(|client| client.client_id())
563 .collect()
564 }
565
566 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
569 where
570 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
571 T: Clone,
572 {
573 self.get_clients()
574 .into_iter()
575 .flat_map(get_subs)
576 .cloned()
577 .collect()
578 }
579
580 #[must_use]
581 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
582 let (default_opt, clients_map) = (&self.default_client, &self.clients);
583 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
584
585 if let Some(default) = default_opt {
586 clients.push(default);
587 }
588
589 clients
590 }
591
592 #[must_use]
593 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
594 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
595 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
596
597 if let Some(default) = default_opt {
598 clients.push(default);
599 }
600
601 clients
602 }
603
604 pub fn get_client(
605 &mut self,
606 client_id: Option<&ClientId>,
607 venue: Option<&Venue>,
608 ) -> Option<&mut DataClientAdapter> {
609 if let Some(client_id) = client_id {
610 if let Some(client) = self.clients.get_mut(client_id) {
612 return Some(client);
613 }
614
615 if let Some(default) = self.default_client.as_mut()
617 && default.client_id() == *client_id
618 {
619 return Some(default);
620 }
621
622 return None;
624 }
625
626 if let Some(v) = venue {
627 if let Some(client_id) = self.routing_map.get(v) {
629 return self.clients.get_mut(client_id);
630 }
631 }
632
633 self.get_default_client()
635 }
636
637 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
638 self.default_client.as_mut()
639 }
640
641 #[must_use]
643 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
644 self.collect_subscriptions(|client| &client.subscriptions_custom)
645 }
646
647 #[must_use]
649 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
650 self.collect_subscriptions(|client| &client.subscriptions_instrument)
651 }
652
653 #[must_use]
655 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
656 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
657 }
658
659 #[must_use]
661 pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
662 self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
663 }
664
665 #[must_use]
667 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
668 self.book_intervals
669 .values()
670 .flat_map(|set| set.iter().copied())
671 .collect()
672 }
673
674 #[must_use]
676 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
677 self.collect_subscriptions(|client| &client.subscriptions_quotes)
678 }
679
680 #[must_use]
682 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
683 self.collect_subscriptions(|client| &client.subscriptions_trades)
684 }
685
686 #[must_use]
688 pub fn subscribed_bars(&self) -> Vec<BarType> {
689 self.collect_subscriptions(|client| &client.subscriptions_bars)
690 }
691
692 #[must_use]
694 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
695 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
696 }
697
698 #[must_use]
700 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
701 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
702 }
703
704 #[must_use]
706 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
707 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
708 }
709
710 #[must_use]
712 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
713 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
714 }
715
716 #[must_use]
718 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
719 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
720 }
721
722 pub fn execute(&mut self, cmd: DataCommand) {
728 if let Err(e) = match cmd {
729 DataCommand::Subscribe(c) => self.execute_subscribe(&c),
730 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
731 DataCommand::Request(c) => self.execute_request(c),
732 #[cfg(feature = "defi")]
733 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
734 #[cfg(feature = "defi")]
735 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(&c),
736 #[cfg(feature = "defi")]
737 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
738 _ => {
739 log::warn!("Unhandled DataCommand variant");
740 Ok(())
741 }
742 } {
743 log::error!("{e}");
744 }
745 }
746
747 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
754 match &cmd {
756 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
757 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
758 SubscribeCommand::BookSnapshots(cmd) => {
759 return self.subscribe_book_snapshots(cmd);
761 }
762 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
763 SubscribeCommand::OptionChain(cmd) => {
764 self.subscribe_option_chain(cmd);
765 return Ok(());
766 }
767 _ => {} }
769
770 if let Some(client_id) = cmd.client_id()
771 && self.external_clients.contains(client_id)
772 {
773 if self.config.debug {
774 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
775 }
776 return Ok(());
777 }
778
779 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
780 client.execute_subscribe(cmd);
781 } else {
782 log::error!(
783 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
784 cmd.client_id(),
785 cmd.venue(),
786 );
787 }
788
789 Ok(())
790 }
791
792 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
798 match &cmd {
799 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
800 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
801 UnsubscribeCommand::BookSnapshots(cmd) => {
802 self.unsubscribe_book_snapshots(cmd);
804 return Ok(());
805 }
806 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
807 UnsubscribeCommand::OptionChain(cmd) => {
808 self.unsubscribe_option_chain(cmd);
809 return Ok(());
810 }
811 _ => {} }
813
814 if let Some(client_id) = cmd.client_id()
815 && self.external_clients.contains(client_id)
816 {
817 if self.config.debug {
818 log::debug!(
819 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
820 );
821 }
822 return Ok(());
823 }
824
825 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
826 client.execute_unsubscribe(cmd);
827 } else {
828 log::error!(
829 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
830 cmd.client_id(),
831 cmd.venue(),
832 );
833 }
834
835 Ok(())
836 }
837
838 pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
845 if let Some(cid) = req.client_id()
847 && self.external_clients.contains(cid)
848 {
849 if self.config.debug {
850 log::debug!("Skipping data request for external client {cid}: {req:?}");
851 }
852 return Ok(());
853 }
854
855 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
856 match req {
857 RequestCommand::Data(req) => client.request_data(req),
858 RequestCommand::Instrument(req) => client.request_instrument(req),
859 RequestCommand::Instruments(req) => client.request_instruments(req),
860 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
861 RequestCommand::BookDepth(req) => client.request_book_depth(req),
862 RequestCommand::Quotes(req) => client.request_quotes(req),
863 RequestCommand::Trades(req) => client.request_trades(req),
864 RequestCommand::FundingRates(req) => client.request_funding_rates(req),
865 RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
866 RequestCommand::Bars(req) => client.request_bars(req),
867 }
868 } else {
869 anyhow::bail!(
870 "Cannot handle request: no client found for {:?} {:?}",
871 req.client_id(),
872 req.venue()
873 );
874 }
875 }
876
877 pub fn process(&mut self, data: &dyn Any) {
881 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
883 self.handle_instrument(instrument);
884 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
885 self.handle_funding_rate(*funding_rate);
886 } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
887 self.handle_instrument_status(*status);
888 } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
889 self.cache.borrow_mut().add_option_greeks(*option_greeks);
890 let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
891 msgbus::publish_option_greeks(topic, option_greeks);
892 self.drain_deferred_commands();
893 } else {
894 log::error!("Cannot process data {data:?}, type is unrecognized");
895 }
896
897 }
899
900 pub fn process_data(&mut self, data: Data) {
902 match data {
903 Data::Delta(delta) => self.handle_delta(delta),
904 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
905 Data::Depth10(depth) => self.handle_depth10(*depth),
906 Data::Quote(quote) => {
907 self.handle_quote(quote);
908 self.drain_deferred_commands();
909 }
910 Data::Trade(trade) => self.handle_trade(trade),
911 Data::Bar(bar) => self.handle_bar(bar),
912 Data::MarkPriceUpdate(mark_price) => {
913 self.handle_mark_price(mark_price);
914 self.drain_deferred_commands();
915 }
916 Data::IndexPriceUpdate(index_price) => {
917 self.handle_index_price(index_price);
918 self.drain_deferred_commands();
919 }
920 Data::InstrumentClose(close) => self.handle_instrument_close(close),
921 Data::Custom(custom) => self.handle_custom_data(&custom),
922 }
923 }
924
925 #[allow(clippy::needless_pass_by_value)] pub fn response(&mut self, resp: DataResponse) {
928 log::debug!("{RECV}{RES} {resp:?}");
929
930 let correlation_id = *resp.correlation_id();
931
932 match &resp {
933 DataResponse::Instrument(r) => {
934 self.handle_instrument_response(r.data.clone());
935 }
936 DataResponse::Instruments(r) => {
937 self.handle_instruments(&r.data);
938 }
939 DataResponse::Quotes(r) => {
940 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
941 self.handle_quotes(&r.data);
942 }
943 }
944 DataResponse::Trades(r) => {
945 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
946 self.handle_trades(&r.data);
947 }
948 }
949 DataResponse::FundingRates(r) => {
950 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
951 self.handle_funding_rates(&r.data);
952 }
953 }
954 DataResponse::Bars(r) => {
955 if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
956 self.handle_bars(&r.data);
957 }
958 }
959 DataResponse::Book(r) => self.handle_book_response(&r.data),
960 DataResponse::ForwardPrices(r) => {
961 return self.handle_forward_prices_response(&correlation_id, r);
962 }
963 _ => todo!("Handle other response types"),
964 }
965
966 msgbus::send_response(&correlation_id, &resp);
967 }
968
969 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
972 log::debug!("Handling instrument: {}", instrument.id());
973
974 if let Err(e) = self
975 .cache
976 .as_ref()
977 .borrow_mut()
978 .add_instrument(instrument.clone())
979 {
980 log_error_on_cache_insert(&e);
981 }
982
983 let topic = switchboard::get_instrument_topic(instrument.id());
984 log::debug!("Publishing instrument to topic: {topic}");
985 msgbus::publish_any(topic, instrument);
986
987 self.update_option_chains(instrument);
988 }
989
990 fn update_option_chains(&mut self, instrument: &InstrumentAny) {
991 let Some(underlying) = instrument.underlying() else {
992 return;
993 };
994 let Some(expiration_ns) = instrument.expiration_ns() else {
995 return;
996 };
997 let Some(strike) = instrument.strike_price() else {
998 return;
999 };
1000 let Some(kind) = instrument.option_kind() else {
1001 return;
1002 };
1003
1004 let venue = instrument.id().venue;
1005 let settlement = instrument.settlement_currency().code;
1006 let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
1007
1008 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1010 return;
1011 };
1012
1013 let clock = self.clock.clone();
1014 let client = self.get_client(None, Some(&venue));
1015
1016 if manager_rc
1017 .borrow_mut()
1018 .add_instrument(instrument.id(), strike, kind, client, &clock)
1019 {
1020 self.option_chain_instrument_index
1021 .insert(instrument.id(), series_id);
1022 }
1023 }
1024
1025 fn handle_delta(&mut self, delta: OrderBookDelta) {
1026 let deltas = if self.config.buffer_deltas {
1027 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
1028 buffered_deltas.deltas.push(delta);
1029 buffered_deltas.flags = delta.flags;
1030 buffered_deltas.sequence = delta.sequence;
1031 buffered_deltas.ts_event = delta.ts_event;
1032 buffered_deltas.ts_init = delta.ts_init;
1033 } else {
1034 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1035 self.buffered_deltas_map
1036 .insert(delta.instrument_id, buffered_deltas);
1037 }
1038
1039 if !RecordFlag::F_LAST.matches(delta.flags) {
1040 return; }
1042
1043 self.buffered_deltas_map
1044 .remove(&delta.instrument_id)
1045 .expect("buffered deltas exist")
1046 } else {
1047 OrderBookDeltas::new(delta.instrument_id, vec![delta])
1048 };
1049
1050 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1051 msgbus::publish_deltas(topic, &deltas);
1052 }
1053
1054 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
1055 if self.config.buffer_deltas {
1056 let instrument_id = deltas.instrument_id;
1057
1058 for delta in deltas.deltas {
1059 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
1060 buffered_deltas.deltas.push(delta);
1061 buffered_deltas.flags = delta.flags;
1062 buffered_deltas.sequence = delta.sequence;
1063 buffered_deltas.ts_event = delta.ts_event;
1064 buffered_deltas.ts_init = delta.ts_init;
1065 } else {
1066 let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1067 self.buffered_deltas_map
1068 .insert(instrument_id, buffered_deltas);
1069 }
1070
1071 if RecordFlag::F_LAST.matches(delta.flags) {
1072 let deltas_to_publish = self
1073 .buffered_deltas_map
1074 .remove(&instrument_id)
1075 .expect("buffered deltas exist");
1076 let topic = switchboard::get_book_deltas_topic(instrument_id);
1077 msgbus::publish_deltas(topic, &deltas_to_publish);
1078 }
1079 }
1080 } else {
1081 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1082 msgbus::publish_deltas(topic, &deltas);
1083 }
1084 }
1085
1086 fn handle_depth10(&self, depth: OrderBookDepth10) {
1087 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
1088 msgbus::publish_depth10(topic, &depth);
1089 }
1090
1091 fn handle_quote(&self, quote: QuoteTick) {
1092 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
1093 log_error_on_cache_insert(&e);
1094 }
1095
1096 let topic = switchboard::get_quotes_topic(quote.instrument_id);
1099 msgbus::publish_quote(topic, "e);
1100 }
1101
1102 fn handle_trade(&self, trade: TradeTick) {
1103 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1104 log_error_on_cache_insert(&e);
1105 }
1106
1107 let topic = switchboard::get_trades_topic(trade.instrument_id);
1110 msgbus::publish_trade(topic, &trade);
1111 }
1112
1113 fn handle_bar(&self, bar: Bar) {
1114 if self.config.validate_data_sequence
1116 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1117 {
1118 if bar.ts_event < last_bar.ts_event {
1119 log::warn!(
1120 "Bar {bar} was prior to last bar `ts_event` {}",
1121 last_bar.ts_event
1122 );
1123 return; }
1125
1126 if bar.ts_init < last_bar.ts_init {
1127 log::warn!(
1128 "Bar {bar} was prior to last bar `ts_init` {}",
1129 last_bar.ts_init
1130 );
1131 return; }
1133 }
1135
1136 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1137 log_error_on_cache_insert(&e);
1138 }
1139
1140 let topic = switchboard::get_bars_topic(bar.bar_type);
1141 msgbus::publish_bar(topic, &bar);
1142 }
1143
1144 fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
1145 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1146 log_error_on_cache_insert(&e);
1147 }
1148
1149 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1150 msgbus::publish_mark_price(topic, &mark_price);
1151 }
1152
1153 fn handle_index_price(&self, index_price: IndexPriceUpdate) {
1154 if let Err(e) = self
1155 .cache
1156 .as_ref()
1157 .borrow_mut()
1158 .add_index_price(index_price)
1159 {
1160 log_error_on_cache_insert(&e);
1161 }
1162
1163 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1164 msgbus::publish_index_price(topic, &index_price);
1165 }
1166
1167 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1169 if let Err(e) = self
1170 .cache
1171 .as_ref()
1172 .borrow_mut()
1173 .add_funding_rate(funding_rate)
1174 {
1175 log_error_on_cache_insert(&e);
1176 }
1177
1178 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1179 msgbus::publish_funding_rate(topic, &funding_rate);
1180 }
1181
1182 fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1183 let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1184 msgbus::publish_any(topic, &status);
1185
1186 if self
1188 .option_chain_instrument_index
1189 .contains_key(&status.instrument_id)
1190 && matches!(
1191 status.action,
1192 MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
1193 )
1194 {
1195 self.expire_option_chain_instrument(status.instrument_id);
1196 }
1197 }
1198
1199 fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
1206 let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
1207 return;
1208 };
1209
1210 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1211 return;
1212 };
1213
1214 let series_empty = manager_rc
1215 .borrow_mut()
1216 .handle_instrument_expired(&instrument_id);
1217
1218 self.drain_deferred_commands();
1220
1221 log::info!(
1222 "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
1223 );
1224
1225 if series_empty {
1226 manager_rc.borrow_mut().teardown(&self.clock);
1227 self.option_chain_managers.remove(&series_id);
1228
1229 log::info!("Torn down empty option chain manager for {series_id}");
1230 }
1231 }
1232
1233 fn handle_instrument_close(&self, close: InstrumentClose) {
1234 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1235 msgbus::publish_any(topic, &close);
1236 }
1237
1238 fn handle_custom_data(&self, custom: &CustomData) {
1239 log::debug!("Processing custom data: {}", custom.data.type_name());
1240 let topic = switchboard::get_custom_topic(&custom.data_type);
1241 msgbus::publish_any(topic, custom);
1242 }
1243
1244 fn drain_deferred_commands(&mut self) {
1248 loop {
1250 let commands: VecDeque<DeferredCommand> =
1251 std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
1252
1253 if commands.is_empty() {
1254 break;
1255 }
1256
1257 for cmd in commands {
1258 match cmd {
1259 DeferredCommand::Subscribe(sub) => {
1260 let client = self.get_client(sub.client_id(), sub.venue());
1261 if let Some(client) = client {
1262 client.execute_subscribe(&sub);
1263 }
1264 }
1265 DeferredCommand::Unsubscribe(unsub) => {
1266 let client = self.get_client(unsub.client_id(), unsub.venue());
1267 if let Some(client) = client {
1268 client.execute_unsubscribe(&unsub);
1269 }
1270 }
1271 DeferredCommand::ExpireSeries(series_id) => {
1272 self.expire_series(series_id);
1273 }
1274 }
1275 }
1276 }
1277 }
1278
1279 fn expire_series(&mut self, series_id: OptionSeriesId) {
1285 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1286 return;
1287 };
1288
1289 let instrument_ids: Vec<InstrumentId> = self
1290 .option_chain_instrument_index
1291 .iter()
1292 .filter(|(_, sid)| **sid == series_id)
1293 .map(|(id, _)| *id)
1294 .collect();
1295
1296 for id in &instrument_ids {
1297 self.option_chain_instrument_index.remove(id);
1298 manager_rc.borrow_mut().handle_instrument_expired(id);
1299 }
1300
1301 manager_rc.borrow_mut().teardown(&self.clock);
1302 self.option_chain_managers.remove(&series_id);
1303
1304 log::info!("Proactively torn down expired option chain {series_id}");
1305 }
1306
1307 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1310 if cmd.instrument_id.is_synthetic() {
1311 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1312 }
1313
1314 self.book_deltas_subs.insert(cmd.instrument_id);
1315 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1316
1317 Ok(())
1318 }
1319
1320 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1321 if cmd.instrument_id.is_synthetic() {
1322 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1323 }
1324
1325 self.book_depth10_subs.insert(cmd.instrument_id);
1326 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1327
1328 Ok(())
1329 }
1330
1331 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1332 if cmd.instrument_id.is_synthetic() {
1333 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1334 }
1335
1336 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
1338 Entry::Vacant(e) => {
1339 let mut set = AHashSet::new();
1340 set.insert(cmd.instrument_id);
1341 e.insert(set);
1342 true
1343 }
1344 Entry::Occupied(mut e) => {
1345 e.get_mut().insert(cmd.instrument_id);
1346 false
1347 }
1348 };
1349
1350 if first_for_interval {
1351 let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
1353 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1354
1355 let snap_info = BookSnapshotInfo {
1356 instrument_id: cmd.instrument_id,
1357 venue: cmd.instrument_id.venue,
1358 is_composite: cmd.instrument_id.symbol.is_composite(),
1359 root: Ustr::from(cmd.instrument_id.symbol.root()),
1360 topic,
1361 interval_ms: cmd.interval_ms,
1362 };
1363
1364 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1366 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1367
1368 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1369 self.book_snapshotters
1370 .insert(cmd.instrument_id, snapshotter.clone());
1371 let timer_name = snapshotter.timer_name;
1372
1373 let callback_fn: Rc<dyn Fn(TimeEvent)> =
1374 Rc::new(move |event| snapshotter.snapshot(event));
1375 let callback = TimeEventCallback::from(callback_fn);
1376
1377 self.clock
1378 .borrow_mut()
1379 .set_timer_ns(
1380 &timer_name,
1381 interval_ns,
1382 Some(start_time_ns.into()),
1383 None,
1384 Some(callback),
1385 None,
1386 None,
1387 )
1388 .expect(FAILED);
1389 }
1390
1391 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1393 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1394 }
1395
1396 if let Some(client_id) = cmd.client_id.as_ref()
1397 && self.external_clients.contains(client_id)
1398 {
1399 if self.config.debug {
1400 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1401 }
1402 return Ok(());
1403 }
1404
1405 log::debug!(
1406 "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1407 cmd.instrument_id,
1408 cmd.client_id,
1409 cmd.venue,
1410 );
1411
1412 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1413 let deltas_cmd = SubscribeBookDeltas::new(
1414 cmd.instrument_id,
1415 cmd.book_type,
1416 cmd.client_id,
1417 cmd.venue,
1418 UUID4::new(),
1419 cmd.ts_init,
1420 cmd.depth,
1421 true, Some(cmd.command_id),
1423 cmd.params.clone(),
1424 );
1425 log::debug!(
1426 "Calling client.execute_subscribe for BookDeltas: {}",
1427 cmd.instrument_id
1428 );
1429 client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1430 } else {
1431 log::error!(
1432 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1433 cmd.client_id,
1434 cmd.venue,
1435 );
1436 }
1437
1438 Ok(())
1439 }
1440
1441 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1442 match cmd.bar_type.aggregation_source() {
1443 AggregationSource::Internal => {
1444 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1445 self.start_bar_aggregator(cmd.bar_type)?;
1446 }
1447 }
1448 AggregationSource::External => {
1449 if cmd.bar_type.instrument_id().is_synthetic() {
1450 anyhow::bail!(
1451 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1452 );
1453 }
1454 }
1455 }
1456
1457 Ok(())
1458 }
1459
1460 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) {
1461 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1462 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1463 return;
1464 }
1465
1466 self.book_deltas_subs.remove(&cmd.instrument_id);
1467
1468 let topics = vec![
1469 switchboard::get_book_deltas_topic(cmd.instrument_id),
1470 switchboard::get_book_depth10_topic(cmd.instrument_id),
1471 ];
1473
1474 self.maintain_book_updater(&cmd.instrument_id, &topics);
1475 self.maintain_book_snapshotter(&cmd.instrument_id);
1476 }
1477
1478 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) {
1479 if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1480 log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1481 return;
1482 }
1483
1484 self.book_depth10_subs.remove(&cmd.instrument_id);
1485
1486 let topics = vec![
1487 switchboard::get_book_deltas_topic(cmd.instrument_id),
1488 switchboard::get_book_depth10_topic(cmd.instrument_id),
1489 ];
1490
1491 self.maintain_book_updater(&cmd.instrument_id, &topics);
1492 self.maintain_book_snapshotter(&cmd.instrument_id);
1493 }
1494
1495 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
1496 let is_subscribed = self
1497 .book_intervals
1498 .values()
1499 .any(|set| set.contains(&cmd.instrument_id));
1500
1501 if !is_subscribed {
1502 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1503 return;
1504 }
1505
1506 let mut to_remove = Vec::new();
1508 for (interval, set) in &mut self.book_intervals {
1509 if set.remove(&cmd.instrument_id) && set.is_empty() {
1510 to_remove.push(*interval);
1511 }
1512 }
1513
1514 for interval in to_remove {
1515 self.book_intervals.remove(&interval);
1516 }
1517
1518 let topics = vec![
1519 switchboard::get_book_deltas_topic(cmd.instrument_id),
1520 switchboard::get_book_depth10_topic(cmd.instrument_id),
1521 ];
1522
1523 self.maintain_book_updater(&cmd.instrument_id, &topics);
1524 self.maintain_book_snapshotter(&cmd.instrument_id);
1525
1526 let still_in_intervals = self
1527 .book_intervals
1528 .values()
1529 .any(|set| set.contains(&cmd.instrument_id));
1530
1531 if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1532 if let Some(client_id) = cmd.client_id.as_ref()
1533 && self.external_clients.contains(client_id)
1534 {
1535 return;
1536 }
1537
1538 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1539 let deltas_cmd = UnsubscribeBookDeltas::new(
1540 cmd.instrument_id,
1541 cmd.client_id,
1542 cmd.venue,
1543 UUID4::new(),
1544 cmd.ts_init,
1545 Some(cmd.command_id),
1546 cmd.params.clone(),
1547 );
1548 client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1549 }
1550 }
1551 }
1552
1553 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
1554 let bar_type = cmd.bar_type;
1555
1556 let topic = switchboard::get_bars_topic(bar_type.standard());
1558 if msgbus::exact_subscriber_count_bars(topic) > 0 {
1559 return;
1560 }
1561
1562 if self.bar_aggregators.contains_key(&bar_type.standard())
1563 && let Err(e) = self.stop_bar_aggregator(bar_type)
1564 {
1565 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1566 }
1567
1568 if bar_type.is_composite() {
1570 let source_type = bar_type.composite();
1571 let source_topic = switchboard::get_bars_topic(source_type);
1572 if msgbus::exact_subscriber_count_bars(source_topic) == 0
1573 && self.bar_aggregators.contains_key(&source_type)
1574 && let Err(e) = self.stop_bar_aggregator(source_type)
1575 {
1576 log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1577 }
1578 }
1579 }
1580
1581 fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
1582 let series_id = cmd.series_id;
1583
1584 if let Some(old) = self.option_chain_managers.remove(&series_id) {
1586 log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
1587 let all_ids = old.borrow().all_instrument_ids();
1588 let old_venue = old.borrow().venue();
1589 old.borrow_mut().teardown(&self.clock);
1590 self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
1591 }
1592
1593 self.pending_option_chain_requests
1595 .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
1596
1597 if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
1600 let resolved_client_id = self
1602 .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
1603 .map(|c| c.client_id);
1604
1605 if let Some(client_id) = resolved_client_id {
1606 let request_id = UUID4::new();
1607 let ts_init = self.clock.borrow().timestamp_ns();
1608
1609 let sample_instrument_id = {
1612 let cache = self.cache.borrow();
1613 cache
1614 .instruments(&series_id.venue, Some(&series_id.underlying))
1615 .iter()
1616 .find(|i| {
1617 i.expiration_ns() == Some(series_id.expiration_ns)
1618 && i.settlement_currency().code == series_id.settlement_currency
1619 })
1620 .map(|i| i.id())
1621 };
1622
1623 let request = RequestForwardPrices::new(
1624 series_id.venue,
1625 series_id.underlying,
1626 sample_instrument_id,
1627 Some(client_id),
1628 request_id,
1629 ts_init,
1630 None,
1631 );
1632
1633 self.pending_option_chain_requests
1634 .insert(request_id, cmd.clone());
1635
1636 let req_cmd = RequestCommand::ForwardPrices(request);
1637 if let Err(e) = self.execute_request(req_cmd) {
1638 log::warn!("Failed to request forward prices for {series_id}: {e}");
1639 let cmd = self
1640 .pending_option_chain_requests
1641 .remove(&request_id)
1642 .expect("just inserted");
1643 self.create_option_chain_manager(&cmd, None);
1644 }
1645
1646 return;
1647 }
1648 }
1649
1650 self.create_option_chain_manager(cmd, None);
1651 }
1652
1653 fn create_option_chain_manager(
1655 &mut self,
1656 cmd: &SubscribeOptionChain,
1657 initial_atm_price: Option<Price>,
1658 ) {
1659 let series_id = cmd.series_id;
1660 let cache = self.cache.clone();
1661 let clock = self.clock.clone();
1662 let priority = self.msgbus_priority;
1663 let deferred_cmd_queue = self.deferred_cmd_queue.clone();
1664
1665 let manager_rc = {
1666 let client = self.get_client(cmd.client_id.as_ref(), Some(&series_id.venue));
1667 OptionChainManager::create_and_setup(
1668 series_id,
1669 &cache,
1670 cmd,
1671 &clock,
1672 priority,
1673 client,
1674 initial_atm_price,
1675 deferred_cmd_queue,
1676 )
1677 };
1678
1679 for id in manager_rc.borrow().all_instrument_ids() {
1681 self.option_chain_instrument_index.insert(id, series_id);
1682 }
1683
1684 self.option_chain_managers.insert(series_id, manager_rc);
1685 }
1686
1687 fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
1688 let series_id = cmd.series_id;
1689
1690 let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
1691 log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
1692 return;
1693 };
1694
1695 let all_ids = manager_rc.borrow().all_instrument_ids();
1697 let venue = manager_rc.borrow().venue();
1698
1699 for id in &all_ids {
1701 self.option_chain_instrument_index.remove(id);
1702 }
1703
1704 manager_rc.borrow_mut().teardown(&self.clock);
1705
1706 self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
1708
1709 log::info!("Unsubscribed option chain for {series_id}");
1710 }
1711
1712 fn forward_option_chain_unsubscribes(
1714 &mut self,
1715 instrument_ids: &[InstrumentId],
1716 venue: Venue,
1717 client_id: Option<ClientId>,
1718 ) {
1719 let ts_init = self.clock.borrow().timestamp_ns();
1720
1721 let Some(client) = self.get_client(client_id.as_ref(), Some(&venue)) else {
1722 log::error!(
1723 "Cannot forward option chain unsubscribes: no client found for venue={venue}",
1724 );
1725 return;
1726 };
1727
1728 for instrument_id in instrument_ids {
1729 client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
1730 *instrument_id,
1731 client_id,
1732 Some(venue),
1733 UUID4::new(),
1734 ts_init,
1735 None,
1736 None,
1737 )));
1738 client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
1739 UnsubscribeOptionGreeks::new(
1740 *instrument_id,
1741 client_id,
1742 Some(venue),
1743 UUID4::new(),
1744 ts_init,
1745 None,
1746 None,
1747 ),
1748 ));
1749 client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
1750 UnsubscribeInstrumentStatus::new(
1751 *instrument_id,
1752 client_id,
1753 Some(venue),
1754 UUID4::new(),
1755 ts_init,
1756 None,
1757 None,
1758 ),
1759 ));
1760 }
1761 }
1762
1763 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, _topics: &[MStr<Topic>]) {
1764 let Some(updater) = self.book_updaters.get(instrument_id) else {
1765 return;
1766 };
1767
1768 let has_deltas = self.book_deltas_subs.contains(instrument_id);
1770 let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1771
1772 let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1773 let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1774 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1775 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1776
1777 if !has_deltas {
1779 msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1780 }
1781
1782 if !has_depth10 {
1783 msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1784 }
1785
1786 if !has_deltas && !has_depth10 {
1788 self.book_updaters.remove(instrument_id);
1789 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1790 }
1791 }
1792
1793 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1794 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1795 let topic = switchboard::get_book_snapshots_topic(
1796 *instrument_id,
1797 snapshotter.snap_info.interval_ms,
1798 );
1799
1800 if msgbus::subscriber_count_book_snapshots(topic) == 0 {
1802 let timer_name = snapshotter.timer_name;
1803 self.book_snapshotters.remove(instrument_id);
1804 let mut clock = self.clock.borrow_mut();
1805 if clock.timer_exists(&timer_name) {
1806 clock.cancel_timer(&timer_name);
1807 }
1808 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1809 }
1810 }
1811 }
1812
1813 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1816 let mut cache = self.cache.as_ref().borrow_mut();
1817 if let Err(e) = cache.add_instrument(instrument) {
1818 log_error_on_cache_insert(&e);
1819 }
1820 }
1821
1822 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1823 let mut cache = self.cache.as_ref().borrow_mut();
1825 for instrument in instruments {
1826 if let Err(e) = cache.add_instrument(instrument.clone()) {
1827 log_error_on_cache_insert(&e);
1828 }
1829 }
1830 }
1831
1832 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1833 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1834 log_error_on_cache_insert(&e);
1835 }
1836 }
1837
1838 fn handle_trades(&self, trades: &[TradeTick]) {
1839 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1840 log_error_on_cache_insert(&e);
1841 }
1842 }
1843
1844 fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1845 if let Err(e) = self
1846 .cache
1847 .as_ref()
1848 .borrow_mut()
1849 .add_funding_rates(funding_rates)
1850 {
1851 log_error_on_cache_insert(&e);
1852 }
1853 }
1854
1855 fn handle_bars(&self, bars: &[Bar]) {
1856 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1857 log_error_on_cache_insert(&e);
1858 }
1859 }
1860
1861 fn handle_book_response(&self, book: &OrderBook) {
1862 log::debug!("Adding order book {} to cache", book.instrument_id);
1863
1864 if let Err(e) = self
1865 .cache
1866 .as_ref()
1867 .borrow_mut()
1868 .add_order_book(book.clone())
1869 {
1870 log_error_on_cache_insert(&e);
1871 }
1872 }
1873
1874 fn handle_forward_prices_response(
1877 &mut self,
1878 correlation_id: &UUID4,
1879 resp: &ForwardPricesResponse,
1880 ) {
1881 let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
1882 log::debug!(
1883 "No pending option chain request for correlation_id={correlation_id}, ignoring"
1884 );
1885 return;
1886 };
1887
1888 let series_id = cmd.series_id;
1889
1890 let cache = self.cache.borrow();
1893 let mut best_price: Option<Price> = None;
1894
1895 for fp in &resp.data {
1896 if let Some(instrument) = cache.instrument(&fp.instrument_id)
1898 && let Some(expiration) = instrument.expiration_ns()
1899 && expiration == series_id.expiration_ns
1900 && instrument.settlement_currency().code == series_id.settlement_currency
1901 {
1902 match Price::from_decimal(fp.forward_price) {
1903 Ok(price) => best_price = Some(price),
1904 Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
1905 }
1906 break;
1907 }
1908 }
1909 drop(cache);
1910
1911 if let Some(price) = best_price {
1912 log::info!("Forward price for {series_id}: {price} (instant bootstrap)",);
1913 } else {
1914 log::info!(
1915 "No matching forward price found for {series_id}, will bootstrap from live data",
1916 );
1917 }
1918
1919 self.create_option_chain_manager(&cmd, best_price);
1920 }
1921
1922 #[allow(clippy::too_many_arguments)]
1925 fn setup_book_updater(
1926 &mut self,
1927 instrument_id: &InstrumentId,
1928 book_type: BookType,
1929 only_deltas: bool,
1930 managed: bool,
1931 ) -> anyhow::Result<()> {
1932 let mut cache = self.cache.borrow_mut();
1933 if managed && !cache.has_order_book(instrument_id) {
1934 let book = OrderBook::new(*instrument_id, book_type);
1935 log::debug!("Created {book}");
1936 cache.add_order_book(book)?;
1937 }
1938
1939 let updater = self
1941 .book_updaters
1942 .entry(*instrument_id)
1943 .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
1944 .clone();
1945
1946 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1948 let deltas_handler = TypedHandler::new(updater.clone());
1949 msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
1950
1951 if !only_deltas {
1953 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1954 let depth_handler = TypedHandler::new(updater);
1955 msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
1956 }
1957
1958 Ok(())
1959 }
1960
1961 fn create_bar_aggregator(
1962 &self,
1963 instrument: &InstrumentAny,
1964 bar_type: BarType,
1965 ) -> Box<dyn BarAggregator> {
1966 let cache = self.cache.clone();
1967
1968 let handler = move |bar: Bar| {
1969 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1970 log_error_on_cache_insert(&e);
1971 }
1972
1973 let topic = switchboard::get_bars_topic(bar.bar_type);
1974 msgbus::publish_bar(topic, &bar);
1975 };
1976
1977 let clock = self.clock.clone();
1978 let config = self.config.clone();
1979
1980 let price_precision = instrument.price_precision();
1981 let size_precision = instrument.size_precision();
1982
1983 if bar_type.spec().is_time_aggregated() {
1984 let time_bars_origin_offset = config
1986 .time_bars_origins
1987 .get(&bar_type.spec().aggregation)
1988 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1989
1990 Box::new(TimeBarAggregator::new(
1991 bar_type,
1992 price_precision,
1993 size_precision,
1994 clock,
1995 handler,
1996 config.time_bars_build_with_no_updates,
1997 config.time_bars_timestamp_on_close,
1998 config.time_bars_interval_type,
1999 time_bars_origin_offset,
2000 config.time_bars_build_delay,
2001 config.time_bars_skip_first_non_full_bar,
2002 ))
2003 } else {
2004 match bar_type.spec().aggregation {
2005 BarAggregation::Tick => Box::new(TickBarAggregator::new(
2006 bar_type,
2007 price_precision,
2008 size_precision,
2009 handler,
2010 )) as Box<dyn BarAggregator>,
2011 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
2012 bar_type,
2013 price_precision,
2014 size_precision,
2015 handler,
2016 )) as Box<dyn BarAggregator>,
2017 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
2018 bar_type,
2019 price_precision,
2020 size_precision,
2021 handler,
2022 )) as Box<dyn BarAggregator>,
2023 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
2024 bar_type,
2025 price_precision,
2026 size_precision,
2027 handler,
2028 )) as Box<dyn BarAggregator>,
2029 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
2030 bar_type,
2031 price_precision,
2032 size_precision,
2033 handler,
2034 )) as Box<dyn BarAggregator>,
2035 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
2036 bar_type,
2037 price_precision,
2038 size_precision,
2039 handler,
2040 )) as Box<dyn BarAggregator>,
2041 BarAggregation::Value => Box::new(ValueBarAggregator::new(
2042 bar_type,
2043 price_precision,
2044 size_precision,
2045 handler,
2046 )) as Box<dyn BarAggregator>,
2047 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
2048 bar_type,
2049 price_precision,
2050 size_precision,
2051 handler,
2052 )) as Box<dyn BarAggregator>,
2053 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
2054 bar_type,
2055 price_precision,
2056 size_precision,
2057 handler,
2058 )) as Box<dyn BarAggregator>,
2059 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
2060 bar_type,
2061 price_precision,
2062 size_precision,
2063 instrument.price_increment(),
2064 handler,
2065 )) as Box<dyn BarAggregator>,
2066 _ => panic!(
2067 "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
2068 bar_type.spec().aggregation
2069 ),
2070 }
2071 }
2072 }
2073
2074 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2075 let instrument = {
2077 let cache = self.cache.borrow();
2078 cache
2079 .instrument(&bar_type.instrument_id())
2080 .ok_or_else(|| {
2081 anyhow::anyhow!(
2082 "Cannot start bar aggregation: no instrument found for {}",
2083 bar_type.instrument_id(),
2084 )
2085 })?
2086 .clone()
2087 };
2088
2089 let bar_key = bar_type.standard();
2091
2092 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
2094 rc.clone()
2095 } else {
2096 let agg = self.create_bar_aggregator(&instrument, bar_type);
2097 let rc = Rc::new(RefCell::new(agg));
2098 self.bar_aggregators.insert(bar_key, rc.clone());
2099 rc
2100 };
2101
2102 let mut subscriptions = Vec::new();
2104
2105 if bar_type.is_composite() {
2106 let topic = switchboard::get_bars_topic(bar_type.composite());
2107 let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_key));
2108 msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
2109 subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
2110 } else if bar_type.spec().price_type == PriceType::Last {
2111 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
2112 let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_key));
2113 msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
2114 subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
2115 } else {
2116 if matches!(
2118 bar_type.spec().aggregation,
2119 BarAggregation::TickImbalance
2120 | BarAggregation::VolumeImbalance
2121 | BarAggregation::ValueImbalance
2122 | BarAggregation::TickRuns
2123 | BarAggregation::VolumeRuns
2124 | BarAggregation::ValueRuns
2125 ) {
2126 log::warn!(
2127 "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
2128 data with `aggressor_side`, but `price_type` is not LAST so it will receive \
2129 quote data: bars will not emit correctly",
2130 );
2131 }
2132
2133 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
2134 let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_key));
2135 msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
2136 subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
2137 }
2138
2139 self.bar_aggregator_handlers.insert(bar_key, subscriptions);
2140
2141 self.setup_bar_aggregator(bar_type, false)?;
2143
2144 aggregator.borrow_mut().set_is_running(true);
2145
2146 Ok(())
2147 }
2148
2149 fn setup_bar_aggregator(&self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
2153 let bar_key = bar_type.standard();
2154 let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
2155 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
2156 })?;
2157
2158 let handler: Box<dyn FnMut(Bar)> = if historical {
2160 let cache = self.cache.clone();
2162 Box::new(move |bar: Bar| {
2163 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2164 log_error_on_cache_insert(&e);
2165 }
2166 })
2168 } else {
2169 let cache = self.cache.clone();
2171 Box::new(move |bar: Bar| {
2172 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2173 log_error_on_cache_insert(&e);
2174 }
2175 let topic = switchboard::get_bars_topic(bar.bar_type);
2176 msgbus::publish_bar(topic, &bar);
2177 })
2178 };
2179
2180 aggregator
2181 .borrow_mut()
2182 .set_historical_mode(historical, handler);
2183
2184 if bar_type.spec().is_time_aggregated() {
2186 use nautilus_common::clock::TestClock;
2187
2188 if historical {
2189 let test_clock = Rc::new(RefCell::new(TestClock::new()));
2191 aggregator.borrow_mut().set_clock(test_clock);
2192 let aggregator_weak = Rc::downgrade(aggregator);
2195 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
2196 } else {
2197 aggregator.borrow_mut().set_clock(self.clock.clone());
2198 aggregator
2199 .borrow_mut()
2200 .start_timer(Some(aggregator.clone()));
2201 }
2202 }
2203
2204 Ok(())
2205 }
2206
2207 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2208 let aggregator = self
2209 .bar_aggregators
2210 .remove(&bar_type.standard())
2211 .ok_or_else(|| {
2212 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
2213 })?;
2214
2215 aggregator.borrow_mut().stop();
2216
2217 let bar_key = bar_type.standard();
2219 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
2220 for sub in subs {
2221 match sub {
2222 BarAggregatorSubscription::Bar { topic, handler } => {
2223 msgbus::unsubscribe_bars(topic.into(), &handler);
2224 }
2225 BarAggregatorSubscription::Trade { topic, handler } => {
2226 msgbus::unsubscribe_trades(topic.into(), &handler);
2227 }
2228 BarAggregatorSubscription::Quote { topic, handler } => {
2229 msgbus::unsubscribe_quotes(topic.into(), &handler);
2230 }
2231 }
2232 }
2233 }
2234
2235 Ok(())
2236 }
2237}
2238
2239#[inline(always)]
2240fn log_error_on_cache_insert<T: Display>(e: &T) {
2241 log::error!("Error on cache insert: {e}");
2242}
2243
2244#[inline(always)]
2245fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
2246 if data.is_empty() {
2247 let name = type_name::<T>();
2248 let short_name = name.rsplit("::").next().unwrap_or(name);
2249 log::warn!("Received empty {short_name} response for {id} {correlation_id}");
2250 return true;
2251 }
2252 false
2253}