1pub mod bar;
32pub mod book;
33mod commands;
34pub mod config;
35mod handlers;
36
37#[cfg(feature = "defi")]
38pub mod pool;
39
40#[cfg(feature = "streaming")]
41mod streaming;
42
43use std::{
44 any::{Any, type_name},
45 cell::{Ref, RefCell},
46 collections::VecDeque,
47 fmt::{Debug, Display},
48 num::NonZeroUsize,
49 rc::Rc,
50};
51
52use ahash::{AHashMap, AHashSet};
53pub use bar::BarAggregatorSubscription;
54use bar::{BarAggregatorKey, bar_aggregator_key};
55use book::{
56 BookSnapshotInfo, BookSnapshotInfos, BookSnapshotKey, BookSnapshotUnsubscribeResult,
57 BookSnapshotter, BookUpdater,
58};
59pub(crate) use commands::{DeferredCommand, DeferredCommandQueue};
60use config::DataEngineConfig;
61use futures::future::join_all;
62use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler, SpreadQuoteHandler};
63use indexmap::IndexMap;
64use nautilus_common::{
65 cache::Cache,
66 clock::Clock,
67 logging::{RECV, RES},
68 messages::data::{
69 DataCommand, DataResponse, ForwardPricesResponse, RequestCommand, RequestForwardPrices,
70 SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
71 SubscribeCommand, SubscribeOptionChain, SubscribeQuotes, UnsubscribeBars,
72 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
73 UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionChain,
74 UnsubscribeOptionGreeks, UnsubscribeQuotes, is_parent_subscription,
75 },
76 msgbus::{
77 self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
78 switchboard::{self, MessagingSwitchboard},
79 },
80 runner::get_data_cmd_sender,
81 timer::{TimeEvent, TimeEventCallback},
82};
83use nautilus_core::{
84 Params, UUID4, WeakCell,
85 correctness::{
86 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
87 },
88 datetime::millis_to_nanos_unchecked,
89};
90#[cfg(feature = "defi")]
91use nautilus_model::defi::DefiData;
92use nautilus_model::{
93 data::{
94 Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate,
95 InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
96 OrderBookDepth10, QuoteTick, TradeTick,
97 option_chain::{OptionGreeks, StrikeRange},
98 },
99 enums::{
100 AggregationSource, BarAggregation, BookType, InstrumentClass, MarketStatusAction,
101 OrderSide, PriceType, RecordFlag,
102 },
103 identifiers::{ClientId, InstrumentId, OptionSeriesId, Symbol, Venue},
104 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
105 orderbook::OrderBook,
106 types::{Price, Quantity},
107};
108#[cfg(feature = "streaming")]
109use streaming::CatalogMap;
110use ustr::Ustr;
111
112#[cfg(feature = "defi")]
113#[allow(unused_imports)] use crate::defi::engine as _;
115#[cfg(feature = "defi")]
116use crate::engine::pool::PoolUpdater;
117use crate::{
118 aggregation::{
119 BarAggregator, RenkoBarAggregator, SpreadQuoteAggregator, TickBarAggregator,
120 TickImbalanceBarAggregator, TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator,
121 ValueImbalanceBarAggregator, ValueRunsBarAggregator, VolumeBarAggregator,
122 VolumeImbalanceBarAggregator, VolumeRunsBarAggregator,
123 },
124 client::DataClientAdapter,
125 option_chains::OptionChainManager,
126};
127
128const BAR_AGGREGATOR_PRIORITY: u32 = 5;
130const GENERIC_SPREAD_ID_SEPARATOR: &str = "___";
131
132#[derive(Debug)]
134pub struct DataEngine {
135 pub(crate) clock: Rc<RefCell<dyn Clock>>,
136 pub(crate) cache: Rc<RefCell<Cache>>,
137 pub(crate) external_clients: AHashSet<ClientId>,
138 clients: IndexMap<ClientId, DataClientAdapter>,
139 default_client: Option<DataClientAdapter>,
140 #[cfg(feature = "streaming")]
141 catalogs: CatalogMap,
142 routing_map: IndexMap<Venue, ClientId>,
143 book_intervals: AHashMap<NonZeroUsize, BookSnapshotInfos>,
144 book_snapshot_counts: IndexMap<BookSnapshotKey, usize>,
145 book_deltas_subs: AHashSet<InstrumentId>,
146 book_depth10_subs: AHashSet<InstrumentId>,
147 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
148 book_deltas_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
149 book_depth10_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
150 book_snapshotters: AHashMap<NonZeroUsize, Rc<BookSnapshotter>>,
151 bar_aggregators: IndexMap<BarAggregatorKey, Rc<RefCell<Box<dyn BarAggregator>>>>,
152 bar_aggregator_handlers: AHashMap<BarAggregatorKey, Vec<BarAggregatorSubscription>>,
153 spread_quote_aggregators: AHashMap<InstrumentId, Rc<RefCell<SpreadQuoteAggregator>>>,
154 spread_quote_handlers: AHashMap<InstrumentId, Vec<(InstrumentId, TypedHandler<QuoteTick>)>>,
155 option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
156 option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
157 deferred_cmd_queue: DeferredCommandQueue,
158 pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
159 synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
160 synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
161 subscribed_synthetic_quotes: AHashSet<InstrumentId>,
162 subscribed_synthetic_trades: AHashSet<InstrumentId>,
163 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
164 command_count: u64,
165 data_count: u64,
166 request_count: u64,
167 response_count: u64,
168 pub(crate) msgbus_priority: u32,
169 pub(crate) config: DataEngineConfig,
170 #[cfg(feature = "defi")]
171 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
172 #[cfg(feature = "defi")]
173 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
174 #[cfg(feature = "defi")]
175 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
176 #[cfg(feature = "defi")]
177 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
178}
179
180impl DataEngine {
181 #[must_use]
183 pub fn new(
184 clock: Rc<RefCell<dyn Clock>>,
185 cache: Rc<RefCell<Cache>>,
186 config: Option<DataEngineConfig>,
187 ) -> Self {
188 let config = config.unwrap_or_default();
189
190 let external_clients: AHashSet<ClientId> = config
191 .external_clients
192 .clone()
193 .unwrap_or_default()
194 .into_iter()
195 .collect();
196
197 Self {
198 clock,
199 cache,
200 external_clients,
201 clients: IndexMap::new(),
202 default_client: None,
203 #[cfg(feature = "streaming")]
204 catalogs: CatalogMap::new(),
205 routing_map: IndexMap::new(),
206 book_intervals: AHashMap::new(),
207 book_snapshot_counts: IndexMap::new(),
208 book_deltas_subs: AHashSet::new(),
209 book_depth10_subs: AHashSet::new(),
210 book_updaters: AHashMap::new(),
211 book_deltas_parent_expansions: AHashMap::new(),
212 book_depth10_parent_expansions: AHashMap::new(),
213 book_snapshotters: AHashMap::new(),
214 bar_aggregators: IndexMap::new(),
215 bar_aggregator_handlers: AHashMap::new(),
216 spread_quote_aggregators: AHashMap::new(),
217 spread_quote_handlers: AHashMap::new(),
218 option_chain_managers: AHashMap::new(),
219 option_chain_instrument_index: AHashMap::new(),
220 deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
221 pending_option_chain_requests: AHashMap::new(),
222 synthetic_quote_feeds: AHashMap::new(),
223 synthetic_trade_feeds: AHashMap::new(),
224 subscribed_synthetic_quotes: AHashSet::new(),
225 subscribed_synthetic_trades: AHashSet::new(),
226 buffered_deltas_map: AHashMap::new(),
227 command_count: 0,
228 data_count: 0,
229 request_count: 0,
230 response_count: 0,
231 msgbus_priority: 10, config,
233 #[cfg(feature = "defi")]
234 pool_updaters: AHashMap::new(),
235 #[cfg(feature = "defi")]
236 pool_updaters_pending: AHashSet::new(),
237 #[cfg(feature = "defi")]
238 pool_snapshot_pending: AHashSet::new(),
239 #[cfg(feature = "defi")]
240 pool_event_buffers: AHashMap::new(),
241 }
242 }
243
244 pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
246 let weak = WeakCell::from(Rc::downgrade(engine));
247
248 let weak1 = weak.clone();
249 msgbus::register_data_command_endpoint(
250 MessagingSwitchboard::data_engine_execute(),
251 TypedIntoHandler::from(move |cmd: DataCommand| {
252 if let Some(rc) = weak1.upgrade() {
253 rc.borrow_mut().execute(cmd);
254 }
255 }),
256 );
257
258 msgbus::register_data_command_endpoint(
259 MessagingSwitchboard::data_engine_queue_execute(),
260 TypedIntoHandler::from(move |cmd: DataCommand| {
261 get_data_cmd_sender().clone().execute(cmd);
262 }),
263 );
264
265 let weak2 = weak.clone();
267 msgbus::register_any(
268 MessagingSwitchboard::data_engine_process(),
269 ShareableMessageHandler::from_any(move |data: &dyn Any| {
270 if let Some(rc) = weak2.upgrade() {
271 rc.borrow_mut().process(data);
272 }
273 }),
274 );
275
276 let weak3 = weak.clone();
278 msgbus::register_data_endpoint(
279 MessagingSwitchboard::data_engine_process_data(),
280 TypedIntoHandler::from(move |data: Data| {
281 if let Some(rc) = weak3.upgrade() {
282 rc.borrow_mut().process_data(data);
283 }
284 }),
285 );
286
287 #[cfg(feature = "defi")]
289 {
290 let weak4 = weak.clone();
291 msgbus::register_defi_data_endpoint(
292 MessagingSwitchboard::data_engine_process_defi_data(),
293 TypedIntoHandler::from(move |data: DefiData| {
294 if let Some(rc) = weak4.upgrade() {
295 rc.borrow_mut().process_defi_data(data);
296 }
297 }),
298 );
299 }
300
301 let weak5 = weak;
302 msgbus::register_data_response_endpoint(
303 MessagingSwitchboard::data_engine_response(),
304 TypedIntoHandler::from(move |resp: DataResponse| {
305 if let Some(rc) = weak5.upgrade() {
306 rc.borrow_mut().response(resp);
307 }
308 }),
309 );
310 }
311
312 #[must_use]
314 pub const fn command_count(&self) -> u64 {
315 self.command_count
316 }
317
318 #[must_use]
320 pub const fn data_count(&self) -> u64 {
321 self.data_count
322 }
323
324 #[cfg(feature = "defi")]
325 pub(crate) const fn increment_data_count(&mut self) {
326 self.data_count += 1;
327 }
328
329 #[must_use]
331 pub const fn request_count(&self) -> u64 {
332 self.request_count
333 }
334
335 #[must_use]
337 pub const fn response_count(&self) -> u64 {
338 self.response_count
339 }
340
341 #[must_use]
343 pub fn has_option_chain_manager(&self, series_id: &OptionSeriesId) -> bool {
344 self.option_chain_managers.contains_key(series_id)
345 }
346
347 #[must_use]
349 pub fn pending_option_chain_request_count(&self) -> usize {
350 self.pending_option_chain_requests.len()
351 }
352
353 #[must_use]
355 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
356 self.clock.borrow()
357 }
358
359 #[must_use]
361 pub fn get_cache(&self) -> Ref<'_, Cache> {
362 self.cache.borrow()
363 }
364
365 #[must_use]
367 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
368 Rc::clone(&self.cache)
369 }
370
371 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
378 let client_id = client.client_id();
379
380 if let Some(default_client) = &self.default_client {
381 check_predicate_false(
382 default_client.client_id() == client.client_id(),
383 "client_id already registered as default client",
384 )
385 .expect(FAILED);
386 }
387
388 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
389
390 if let Some(routing) = routing {
391 self.routing_map.insert(routing, client_id);
392 log::debug!("Set client {client_id} routing for {routing}");
393 }
394
395 if client.venue.is_none() && self.default_client.is_none() {
396 self.default_client = Some(client);
397 log::debug!("Registered client {client_id} for default routing");
398 } else {
399 self.clients.insert(client_id, client);
400 log::debug!("Registered client {client_id}");
401 }
402 }
403
404 pub fn deregister_client(&mut self, client_id: &ClientId) {
410 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
411
412 self.clients.shift_remove(client_id);
413 log::info!("Deregistered client {client_id}");
414 }
415
416 pub fn register_default_client(&mut self, client: DataClientAdapter) {
428 check_predicate_true(
429 self.default_client.is_none(),
430 "default client already registered",
431 )
432 .expect(FAILED);
433
434 let client_id = client.client_id();
435
436 self.default_client = Some(client);
437 log::debug!("Registered default client {client_id}");
438 }
439
440 pub fn start(&mut self) {
442 for client in self.get_clients_mut() {
443 if let Err(e) = client.start() {
444 log::error!("{e}");
445 }
446 }
447
448 for aggregator in self.bar_aggregators.values() {
449 if aggregator.borrow().bar_type().spec().is_time_aggregated() {
450 aggregator
451 .borrow_mut()
452 .start_timer(Some(aggregator.clone()));
453 }
454 }
455
456 for aggregator in self.spread_quote_aggregators.values() {
457 aggregator
458 .borrow_mut()
459 .start_timer(Some(aggregator.clone()));
460 }
461 }
462
463 pub fn stop(&mut self) {
465 for client in self.get_clients_mut() {
466 if let Err(e) = client.stop() {
467 log::error!("{e}");
468 }
469 }
470
471 for aggregator in self.bar_aggregators.values() {
472 aggregator.borrow_mut().stop();
473 }
474
475 for aggregator in self.spread_quote_aggregators.values() {
476 aggregator.borrow_mut().stop_timer();
477 }
478 }
479
480 pub fn reset(&mut self) {
482 for client in self.get_clients_mut() {
483 if let Err(e) = client.reset() {
484 log::error!("{e}");
485 }
486 }
487
488 let keys: Vec<BarAggregatorKey> = self.bar_aggregators.keys().copied().collect();
489 for (bar_type, request_id) in keys {
490 if let Err(e) = self.stop_bar_aggregator(bar_type, request_id) {
491 log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
492 }
493 }
494
495 let spread_ids: Vec<InstrumentId> = self.spread_quote_aggregators.keys().copied().collect();
496 for spread_id in spread_ids {
497 self.stop_spread_quote_aggregator(spread_id);
498 }
499
500 let managers: Vec<_> = self.option_chain_managers.drain().collect();
502 for (_, manager) in managers {
503 manager.borrow_mut().teardown(&self.clock);
504 }
505 self.option_chain_instrument_index.clear();
506 self.pending_option_chain_requests.clear();
507
508 let book_updaters: Vec<(InstrumentId, Rc<BookUpdater>)> =
513 self.book_updaters.drain().collect();
514 for (instrument_id, updater) in book_updaters {
515 let deltas_topic = switchboard::get_book_deltas_topic(instrument_id);
516 let depth_topic = switchboard::get_book_depth10_topic(instrument_id);
517 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
518 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
519 msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
520 msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
521 }
522 self.book_deltas_parent_expansions.clear();
523 self.book_depth10_parent_expansions.clear();
524
525 self.book_deltas_subs.clear();
526 self.book_depth10_subs.clear();
527 self.book_intervals.clear();
528 self.book_snapshot_counts.clear();
529 self.book_snapshotters.clear();
530 self.buffered_deltas_map.clear();
531
532 self.synthetic_quote_feeds.clear();
533 self.synthetic_trade_feeds.clear();
534 self.subscribed_synthetic_quotes.clear();
535 self.subscribed_synthetic_trades.clear();
536
537 self.deferred_cmd_queue.borrow_mut().clear();
538
539 self.clock.borrow_mut().cancel_timers();
540
541 self.command_count = 0;
542 self.data_count = 0;
543 self.request_count = 0;
544 self.response_count = 0;
545 }
546
547 pub fn dispose(&mut self) {
549 for client in self.get_clients_mut() {
550 if let Err(e) = client.dispose() {
551 log::error!("{e}");
552 }
553 }
554
555 self.clock.borrow_mut().cancel_timers();
556 }
557
558 pub async fn connect(&mut self) {
562 let futures: Vec<_> = self
563 .get_clients_mut()
564 .into_iter()
565 .map(DataClientAdapter::connect)
566 .collect();
567
568 let results = join_all(futures).await;
569
570 for error in results.into_iter().filter_map(Result::err) {
571 log::error!("Failed to connect data client: {error}");
572 }
573 }
574
575 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
581 let futures: Vec<_> = self
582 .get_clients_mut()
583 .into_iter()
584 .map(DataClientAdapter::disconnect)
585 .collect();
586
587 let results = join_all(futures).await;
588 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
589
590 if errors.is_empty() {
591 Ok(())
592 } else {
593 let error_msgs: Vec<_> = errors.iter().map(ToString::to_string).collect();
594 anyhow::bail!(
595 "Failed to disconnect data clients: {}",
596 error_msgs.join("; ")
597 )
598 }
599 }
600
601 #[must_use]
603 pub fn check_connected(&self) -> bool {
604 self.get_clients()
605 .iter()
606 .all(|client| client.is_connected())
607 }
608
609 #[must_use]
611 pub fn check_disconnected(&self) -> bool {
612 self.get_clients()
613 .iter()
614 .all(|client| !client.is_connected())
615 }
616
617 #[must_use]
619 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
620 self.get_clients()
621 .into_iter()
622 .map(|client| (client.client_id(), client.is_connected()))
623 .collect()
624 }
625
626 #[must_use]
628 pub fn registered_clients(&self) -> Vec<ClientId> {
629 self.get_clients()
630 .into_iter()
631 .map(|client| client.client_id())
632 .collect()
633 }
634
635 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
636 where
637 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
638 T: Clone,
639 {
640 self.get_clients()
641 .into_iter()
642 .flat_map(get_subs)
643 .cloned()
644 .collect()
645 }
646
647 #[must_use]
648 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
649 let (default_opt, clients_map) = (&self.default_client, &self.clients);
650 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
651
652 if let Some(default) = default_opt {
653 clients.push(default);
654 }
655
656 clients
657 }
658
659 #[must_use]
660 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
661 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
662 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
663
664 if let Some(default) = default_opt {
665 clients.push(default);
666 }
667
668 clients
669 }
670
671 pub fn get_client(
672 &mut self,
673 client_id: Option<&ClientId>,
674 venue: Option<&Venue>,
675 ) -> Option<&mut DataClientAdapter> {
676 if let Some(client_id) = client_id {
677 if let Some(client) = self.clients.get_mut(client_id) {
679 return Some(client);
680 }
681
682 if let Some(default) = self.default_client.as_mut()
684 && default.client_id() == *client_id
685 {
686 return Some(default);
687 }
688
689 return None;
691 }
692
693 if let Some(v) = venue {
694 if let Some(client_id) = self.routing_map.get(v) {
696 return self.clients.get_mut(client_id);
697 }
698 }
699
700 self.get_default_client()
702 }
703
704 fn get_command_client(
709 &mut self,
710 client_id: Option<&ClientId>,
711 venue: Option<&Venue>,
712 ) -> Option<&mut DataClientAdapter> {
713 let backtest_id = ClientId::new("BACKTEST");
714 if self.clients.contains_key(&backtest_id) {
717 return self.clients.get_mut(&backtest_id);
718 }
719 let default_is_backtest = self
720 .default_client
721 .as_ref()
722 .is_some_and(|c| c.client_id() == backtest_id);
723 if default_is_backtest {
724 return self.default_client.as_mut();
725 }
726 self.get_client(client_id, venue)
727 }
728
729 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
730 self.default_client.as_mut()
731 }
732
733 #[must_use]
735 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
736 self.collect_subscriptions(|client| &client.subscriptions_custom)
737 }
738
739 #[must_use]
741 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
742 self.collect_subscriptions(|client| &client.subscriptions_instrument)
743 }
744
745 #[must_use]
747 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
748 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
749 }
750
751 #[must_use]
753 pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
754 self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
755 }
756
757 #[must_use]
759 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
760 self.book_snapshot_counts
761 .keys()
762 .map(|(instrument_id, _)| *instrument_id)
763 .collect()
764 }
765
766 #[must_use]
768 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
769 self.collect_subscriptions(|client| &client.subscriptions_quotes)
770 }
771
772 #[must_use]
774 pub fn subscribed_synthetic_quotes(&self) -> Vec<InstrumentId> {
775 self.subscribed_synthetic_quotes.iter().copied().collect()
776 }
777
778 #[must_use]
780 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
781 self.collect_subscriptions(|client| &client.subscriptions_trades)
782 }
783
784 #[must_use]
786 pub fn subscribed_synthetic_trades(&self) -> Vec<InstrumentId> {
787 self.subscribed_synthetic_trades.iter().copied().collect()
788 }
789
790 #[must_use]
792 pub fn subscribed_bars(&self) -> Vec<BarType> {
793 self.collect_subscriptions(|client| &client.subscriptions_bars)
794 }
795
796 #[must_use]
798 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
799 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
800 }
801
802 #[must_use]
804 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
805 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
806 }
807
808 #[must_use]
810 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
811 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
812 }
813
814 #[must_use]
816 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
817 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
818 }
819
820 #[must_use]
822 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
823 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
824 }
825
826 pub fn execute(&mut self, cmd: DataCommand) {
830 match &cmd {
831 DataCommand::Subscribe(_) | DataCommand::Unsubscribe(_) => self.command_count += 1,
832 DataCommand::Request(_) => self.request_count += 1,
833 #[cfg(feature = "defi")]
834 DataCommand::DefiRequest(_) => self.request_count += 1,
835 #[cfg(feature = "defi")]
836 DataCommand::DefiSubscribe(_) | DataCommand::DefiUnsubscribe(_) => {
837 self.command_count += 1;
838 }
839 _ => {}
840 }
841
842 if let Err(e) = match cmd {
843 DataCommand::Subscribe(c) => self.execute_subscribe(c),
844 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
845 DataCommand::Request(c) => self.execute_request(c),
846 #[cfg(feature = "defi")]
847 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
848 #[cfg(feature = "defi")]
849 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
850 #[cfg(feature = "defi")]
851 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
852 _ => {
853 log::warn!("Unhandled DataCommand variant");
854 Ok(())
855 }
856 } {
857 log::error!("{e}");
858 }
859 }
860
861 pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
868 match &cmd {
870 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
871 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
872 SubscribeCommand::BookSnapshots(cmd) => {
873 return self.subscribe_book_snapshots(cmd);
875 }
876 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
877 SubscribeCommand::OptionChain(cmd) => {
878 self.subscribe_option_chain(cmd);
879 return Ok(());
880 }
881 SubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
882 self.subscribe_synthetic_quotes(cmd.instrument_id);
883 return Ok(());
884 }
885 SubscribeCommand::Quotes(cmd)
886 if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
887 {
888 self.subscribe_spread_quotes(cmd);
889 return Ok(());
890 }
891 SubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
892 self.subscribe_synthetic_trades(cmd.instrument_id);
893 return Ok(());
894 }
895 SubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
896 anyhow::bail!("Cannot subscribe for synthetic instrument `Instrument` data");
897 }
898 SubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
899 anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentStatus` data");
900 }
901 SubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
902 anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentClose` data");
903 }
904 SubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
905 anyhow::bail!("Cannot subscribe for synthetic instrument `OptionGreeks` data");
906 }
907 _ => {} }
909
910 if let Some(client_id) = cmd.client_id()
911 && self.external_clients.contains(client_id)
912 {
913 if self.config.debug {
914 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
915 }
916 return Ok(());
917 }
918
919 #[cfg(feature = "streaming")]
920 let cmd = self.subscribe_command_with_prefilled_start_ns(cmd)?;
921
922 if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
923 client.execute_subscribe(cmd);
924 } else {
925 log::error!(
926 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
927 cmd.client_id(),
928 cmd.venue(),
929 );
930 }
931
932 Ok(())
933 }
934
935 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
941 match &cmd {
942 UnsubscribeCommand::BookDeltas(cmd) if !self.unsubscribe_book_deltas(cmd) => {
943 return Ok(());
944 }
945 UnsubscribeCommand::BookDepth10(cmd) if !self.unsubscribe_book_depth10(cmd) => {
946 return Ok(());
947 }
948 UnsubscribeCommand::BookSnapshots(cmd) => {
949 self.unsubscribe_book_snapshots(cmd);
951 return Ok(());
952 }
953 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
954 UnsubscribeCommand::OptionChain(cmd) => {
955 self.unsubscribe_option_chain(cmd);
956 return Ok(());
957 }
958 UnsubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
959 self.unsubscribe_synthetic_quotes(cmd.instrument_id);
960 return Ok(());
961 }
962 UnsubscribeCommand::Quotes(cmd)
963 if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
964 {
965 self.unsubscribe_spread_quotes(cmd);
966 return Ok(());
967 }
968 UnsubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
969 self.unsubscribe_synthetic_trades(cmd.instrument_id);
970 return Ok(());
971 }
972 UnsubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
973 anyhow::bail!("Cannot unsubscribe from synthetic instrument `Instrument` data");
974 }
975 UnsubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
976 anyhow::bail!(
977 "Cannot unsubscribe from synthetic instrument `InstrumentStatus` data"
978 );
979 }
980 UnsubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
981 anyhow::bail!(
982 "Cannot unsubscribe from synthetic instrument `InstrumentClose` data"
983 );
984 }
985 UnsubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
986 anyhow::bail!("Cannot unsubscribe from synthetic instrument `OptionGreeks` data");
987 }
988 _ => {}
989 }
990
991 if let Some(client_id) = cmd.client_id()
992 && self.external_clients.contains(client_id)
993 {
994 if self.config.debug {
995 log::debug!(
996 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
997 );
998 }
999 return Ok(());
1000 }
1001
1002 if Self::topic_has_remaining_subscribers(cmd) {
1004 return Ok(());
1005 }
1006
1007 if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
1008 client.execute_unsubscribe(cmd);
1009 } else {
1010 log::error!(
1011 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1012 cmd.client_id(),
1013 cmd.venue(),
1014 );
1015 }
1016
1017 Ok(())
1018 }
1019
1020 fn topic_has_remaining_subscribers(cmd: &UnsubscribeCommand) -> bool {
1021 match cmd {
1025 UnsubscribeCommand::Quotes(c) => {
1026 let topic = switchboard::get_quotes_topic(c.instrument_id);
1027 msgbus::exact_subscriber_count_quotes(topic) > 0
1028 }
1029 UnsubscribeCommand::Trades(c) => {
1030 let topic = switchboard::get_trades_topic(c.instrument_id);
1031 msgbus::exact_subscriber_count_trades(topic) > 0
1032 }
1033 UnsubscribeCommand::MarkPrices(c) => {
1034 let topic = switchboard::get_mark_price_topic(c.instrument_id);
1035 msgbus::exact_subscriber_count_mark_prices(topic) > 0
1036 }
1037 UnsubscribeCommand::IndexPrices(c) => {
1038 let topic = switchboard::get_index_price_topic(c.instrument_id);
1039 msgbus::exact_subscriber_count_index_prices(topic) > 0
1040 }
1041 UnsubscribeCommand::FundingRates(c) => {
1042 let topic = switchboard::get_funding_rate_topic(c.instrument_id);
1043 msgbus::exact_subscriber_count_funding_rates(topic) > 0
1044 }
1045 UnsubscribeCommand::OptionGreeks(c) => {
1046 let topic = switchboard::get_option_greeks_topic(c.instrument_id);
1047 msgbus::exact_subscriber_count_option_greeks(topic) > 0
1048 }
1049 _ => false,
1050 }
1051 }
1052
1053 pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
1060 if let Some(cid) = req.client_id()
1062 && self.external_clients.contains(cid)
1063 {
1064 if self.config.debug {
1065 log::debug!("Skipping data request for external client {cid}: {req:?}");
1066 }
1067 return Ok(());
1068 }
1069
1070 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
1071 match req {
1072 RequestCommand::Data(req) => client.request_data(req),
1073 RequestCommand::Instrument(req) => client.request_instrument(req),
1074 RequestCommand::Instruments(req) => client.request_instruments(req),
1075 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
1076 RequestCommand::BookDepth(req) => client.request_book_depth(req),
1077 RequestCommand::Quotes(req) => client.request_quotes(req),
1078 RequestCommand::Trades(req) => client.request_trades(req),
1079 RequestCommand::FundingRates(req) => client.request_funding_rates(req),
1080 RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
1081 RequestCommand::Bars(req) => client.request_bars(req),
1082 }
1083 } else {
1084 anyhow::bail!(
1085 "Cannot handle request: no client found for {:?} {:?}",
1086 req.client_id(),
1087 req.venue()
1088 );
1089 }
1090 }
1091
1092 pub fn process(&mut self, data: &dyn Any) {
1097 self.data_count += 1;
1098 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
1100 self.handle_instrument(instrument);
1101 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
1102 self.handle_funding_rate(*funding_rate);
1103 } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
1104 self.handle_instrument_status(*status);
1105 } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
1106 self.cache.borrow_mut().add_option_greeks(*option_greeks);
1107 let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
1108 msgbus::publish_option_greeks(topic, option_greeks);
1109 self.drain_deferred_commands();
1110 } else if let Some(custom) = data.downcast_ref::<CustomData>() {
1111 self.handle_custom_data(custom);
1112 } else {
1113 log::error!("Cannot process data {data:?}, type is unrecognized");
1114 }
1115 }
1116
1117 pub fn process_data(&mut self, data: Data) {
1119 self.data_count += 1;
1120
1121 match data {
1122 Data::Delta(delta) => self.handle_delta(delta),
1123 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
1124 Data::Depth10(depth) => self.handle_depth10(*depth),
1125 Data::Quote(quote) => {
1126 self.handle_quote(quote);
1127 self.drain_deferred_commands();
1128 }
1129 Data::Trade(trade) => self.handle_trade(trade),
1130 Data::Bar(bar) => self.handle_bar(bar),
1131 Data::MarkPriceUpdate(mark_price) => {
1132 self.handle_mark_price(mark_price);
1133 self.drain_deferred_commands();
1134 }
1135 Data::IndexPriceUpdate(index_price) => {
1136 self.handle_index_price(index_price);
1137 self.drain_deferred_commands();
1138 }
1139 Data::InstrumentStatus(status) => {
1140 self.handle_instrument_status(status);
1141 self.drain_deferred_commands();
1142 }
1143 Data::InstrumentClose(close) => self.handle_instrument_close(close),
1144 Data::Custom(custom) => self.handle_custom_data(&custom),
1145 }
1146 }
1147
1148 pub fn process_historical(&mut self, data: Data) {
1156 self.data_count += 1;
1157
1158 match data {
1159 Data::Delta(delta) => self.handle_delta_pipeline(delta),
1160 Data::Deltas(deltas) => self.handle_deltas_pipeline(&deltas.into_inner()),
1161 Data::Depth10(depth) => self.handle_depth10_pipeline(*depth),
1162 Data::Quote(quote) => self.handle_quote_pipeline(quote),
1163 Data::Trade(trade) => self.handle_trade_pipeline(trade),
1164 Data::Bar(bar) => self.handle_bar_pipeline(bar),
1165 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price_pipeline(mark_price),
1166 Data::IndexPriceUpdate(index_price) => self.handle_index_price_pipeline(index_price),
1167 Data::InstrumentStatus(status) => self.handle_instrument_status_pipeline(status),
1168 Data::InstrumentClose(close) => self.handle_instrument_close_pipeline(close),
1169 Data::Custom(custom) => self.handle_custom_data_pipeline(&custom),
1170 }
1171 }
1172
1173 #[expect(clippy::needless_pass_by_value)] pub fn response(&mut self, resp: DataResponse) {
1176 if log::log_enabled!(log::Level::Debug) {
1177 let correlation_id = resp.correlation_id();
1178 match resp.record_count() {
1179 Some(count) => log::debug!(
1180 "{RECV}{RES} {} correlation_id={correlation_id} records={count}",
1181 resp.kind(),
1182 ),
1183 None => log::debug!(
1184 "{RECV}{RES} {} correlation_id={correlation_id}",
1185 resp.kind(),
1186 ),
1187 }
1188 }
1189 log::trace!("{RECV}{RES} {resp:?}");
1190
1191 self.response_count += 1;
1192 let correlation_id = *resp.correlation_id();
1193
1194 match &resp {
1195 DataResponse::Instrument(r) => {
1196 self.handle_instrument_response(r.data.clone());
1197 }
1198 DataResponse::Instruments(r) => {
1199 self.handle_instruments(&r.data);
1200 }
1201 DataResponse::Quotes(r) => {
1202 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1203 self.handle_quotes(&r.data);
1204 }
1205 }
1206 DataResponse::Trades(r) => {
1207 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1208 self.handle_trades(&r.data);
1209 }
1210 }
1211 DataResponse::FundingRates(r) => {
1212 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1213 self.handle_funding_rates(&r.data);
1214 }
1215 }
1216 DataResponse::Bars(r) => {
1217 if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
1218 self.handle_bars(&r.data);
1219 }
1220 }
1221 DataResponse::Book(r) => self.handle_book_response(&r.data),
1222 DataResponse::ForwardPrices(r) => {
1223 return self.handle_forward_prices_response(&correlation_id, r);
1224 }
1225 DataResponse::Data(_) => {}
1226 }
1227
1228 msgbus::send_response(&correlation_id, &resp);
1229 }
1230
1231 #[inline]
1232 fn pipeline_cache_writes_allowed(&self) -> bool {
1233 !self.config.disable_historical_cache
1234 }
1235
1236 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
1237 log::debug!("Handling instrument: {}", instrument.id());
1238
1239 if let Err(e) = self
1240 .cache
1241 .as_ref()
1242 .borrow_mut()
1243 .add_instrument(instrument.clone())
1244 {
1245 log_error_on_cache_insert(&e);
1246 }
1247
1248 let topic = switchboard::get_instrument_topic(instrument.id());
1249 log::debug!("Publishing instrument to topic: {topic}");
1250 msgbus::publish_instrument(topic, instrument);
1251
1252 self.update_option_chains(instrument);
1253 }
1254
1255 fn update_option_chains(&mut self, instrument: &InstrumentAny) {
1256 let Some(underlying) = instrument.underlying() else {
1257 return;
1258 };
1259 let Some(expiration_ns) = instrument.expiration_ns() else {
1260 return;
1261 };
1262 let Some(strike) = instrument.strike_price() else {
1263 return;
1264 };
1265 let Some(kind) = instrument.option_kind() else {
1266 return;
1267 };
1268
1269 let venue = instrument.id().venue;
1270 let settlement = instrument.settlement_currency().code;
1271 let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
1272
1273 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1275 return;
1276 };
1277
1278 let clock = self.clock.clone();
1279 let client = self.get_command_client(None, Some(&venue));
1280
1281 if manager_rc
1282 .borrow_mut()
1283 .add_instrument(instrument.id(), strike, kind, client, &clock)
1284 {
1285 self.option_chain_instrument_index
1286 .insert(instrument.id(), series_id);
1287 }
1288 }
1289
1290 fn handle_delta(&mut self, delta: OrderBookDelta) {
1291 let deltas = if self.config.buffer_deltas {
1292 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
1293 buffered_deltas.deltas.push(delta);
1294 buffered_deltas.flags = delta.flags;
1295 buffered_deltas.sequence = delta.sequence;
1296 buffered_deltas.ts_event = delta.ts_event;
1297 buffered_deltas.ts_init = delta.ts_init;
1298 } else {
1299 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1300 self.buffered_deltas_map
1301 .insert(delta.instrument_id, buffered_deltas);
1302 }
1303
1304 if !RecordFlag::F_LAST.matches(delta.flags) {
1305 return; }
1307
1308 self.buffered_deltas_map
1309 .remove(&delta.instrument_id)
1310 .expect("buffered deltas exist")
1311 } else {
1312 OrderBookDeltas::new(delta.instrument_id, vec![delta])
1313 };
1314
1315 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1316 msgbus::publish_deltas(topic, &deltas);
1317 }
1318
1319 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
1320 if self.config.buffer_deltas {
1321 let instrument_id = deltas.instrument_id;
1322
1323 for delta in deltas.deltas {
1324 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
1325 buffered_deltas.deltas.push(delta);
1326 buffered_deltas.flags = delta.flags;
1327 buffered_deltas.sequence = delta.sequence;
1328 buffered_deltas.ts_event = delta.ts_event;
1329 buffered_deltas.ts_init = delta.ts_init;
1330 } else {
1331 let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1332 self.buffered_deltas_map
1333 .insert(instrument_id, buffered_deltas);
1334 }
1335
1336 if RecordFlag::F_LAST.matches(delta.flags) {
1337 let deltas_to_publish = self
1338 .buffered_deltas_map
1339 .remove(&instrument_id)
1340 .expect("buffered deltas exist");
1341 let topic = switchboard::get_book_deltas_topic(instrument_id);
1342 msgbus::publish_deltas(topic, &deltas_to_publish);
1343 }
1344 }
1345 } else {
1346 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1347 msgbus::publish_deltas(topic, &deltas);
1348 }
1349 }
1350
1351 fn handle_depth10(&self, depth: OrderBookDepth10) {
1352 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
1353 msgbus::publish_depth10(topic, &depth);
1354
1355 if self.config.emit_quotes_from_book_depths
1356 && let Some(quote) = derive_quote_from_depth(&depth)
1357 {
1358 book::publish_quote_if_changed(&self.cache, quote);
1359 }
1360 }
1361
1362 fn handle_quote(&self, quote: QuoteTick) {
1363 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
1364 log_error_on_cache_insert(&e);
1365 }
1366
1367 for synthetic_quote in self.synthetic_quotes_from_quote(quote) {
1368 let topic = switchboard::get_quotes_topic(synthetic_quote.instrument_id);
1369 msgbus::publish_quote(topic, &synthetic_quote);
1370 }
1371
1372 let topic = switchboard::get_quotes_topic(quote.instrument_id);
1373 msgbus::publish_quote(topic, "e);
1374 }
1375
1376 fn handle_trade(&self, trade: TradeTick) {
1377 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1378 log_error_on_cache_insert(&e);
1379 }
1380
1381 for synthetic_trade in self.synthetic_trades_from_trade(trade) {
1382 let topic = switchboard::get_trades_topic(synthetic_trade.instrument_id);
1383 msgbus::publish_trade(topic, &synthetic_trade);
1384 }
1385
1386 let topic = switchboard::get_trades_topic(trade.instrument_id);
1387 msgbus::publish_trade(topic, &trade);
1388 }
1389
1390 fn synthetic_quotes_from_quote(&self, update: QuoteTick) -> Vec<QuoteTick> {
1391 let Some(synthetics) = self.synthetic_quote_feeds.get(&update.instrument_id) else {
1392 return Vec::new();
1393 };
1394
1395 synthetics
1396 .iter()
1397 .filter_map(|synthetic| self.synthetic_quote_from_update(synthetic, update))
1398 .collect()
1399 }
1400
1401 fn synthetic_quote_from_update(
1402 &self,
1403 synthetic: &SyntheticInstrument,
1404 update: QuoteTick,
1405 ) -> Option<QuoteTick> {
1406 let cache = self.cache.borrow();
1407 let mut bid_inputs = Vec::with_capacity(synthetic.components.len());
1408 let mut ask_inputs = Vec::with_capacity(synthetic.components.len());
1409
1410 for instrument_id in &synthetic.components {
1411 let (bid_price, ask_price) = if *instrument_id == update.instrument_id {
1412 (update.bid_price, update.ask_price)
1413 } else {
1414 let Some(component_quote) = cache.quote(instrument_id) else {
1415 log::warn!(
1416 "Cannot calculate synthetic instrument {} price, no quotes for {} yet",
1417 synthetic.id,
1418 instrument_id,
1419 );
1420 return None;
1421 };
1422 (component_quote.bid_price, component_quote.ask_price)
1423 };
1424
1425 bid_inputs.push(bid_price.as_f64());
1426 ask_inputs.push(ask_price.as_f64());
1427 }
1428 drop(cache);
1429
1430 let bid_price = match synthetic.calculate(&bid_inputs) {
1431 Ok(price) => price,
1432 Err(e) => {
1433 log::error!(
1434 "Cannot calculate synthetic instrument {} bid price: {e}",
1435 synthetic.id
1436 );
1437 return None;
1438 }
1439 };
1440 let ask_price = match synthetic.calculate(&ask_inputs) {
1441 Ok(price) => price,
1442 Err(e) => {
1443 log::error!(
1444 "Cannot calculate synthetic instrument {} ask price: {e}",
1445 synthetic.id
1446 );
1447 return None;
1448 }
1449 };
1450 let size_one = Quantity::from(1);
1451
1452 Some(QuoteTick::new(
1453 synthetic.id,
1454 bid_price,
1455 ask_price,
1456 size_one,
1457 size_one,
1458 update.ts_event,
1459 self.clock.borrow().timestamp_ns(),
1460 ))
1461 }
1462
1463 fn synthetic_trades_from_trade(&self, update: TradeTick) -> Vec<TradeTick> {
1464 let Some(synthetics) = self.synthetic_trade_feeds.get(&update.instrument_id) else {
1465 return Vec::new();
1466 };
1467
1468 synthetics
1469 .iter()
1470 .filter_map(|synthetic| self.synthetic_trade_from_update(synthetic, update))
1471 .collect()
1472 }
1473
1474 fn synthetic_trade_from_update(
1475 &self,
1476 synthetic: &SyntheticInstrument,
1477 update: TradeTick,
1478 ) -> Option<TradeTick> {
1479 let cache = self.cache.borrow();
1480 let mut inputs = Vec::with_capacity(synthetic.components.len());
1481
1482 for instrument_id in &synthetic.components {
1483 let price = if *instrument_id == update.instrument_id {
1484 update.price
1485 } else {
1486 let Some(component_trade) = cache.trade(instrument_id) else {
1487 log::warn!(
1488 "Cannot calculate synthetic instrument {} price, no trades for {} yet",
1489 synthetic.id,
1490 instrument_id,
1491 );
1492 return None;
1493 };
1494 component_trade.price
1495 };
1496
1497 inputs.push(price.as_f64());
1498 }
1499 drop(cache);
1500
1501 let price = match synthetic.calculate(&inputs) {
1502 Ok(price) => price,
1503 Err(e) => {
1504 log::error!(
1505 "Cannot calculate synthetic instrument {} trade price: {e}",
1506 synthetic.id
1507 );
1508 return None;
1509 }
1510 };
1511
1512 Some(TradeTick::new(
1513 synthetic.id,
1514 price,
1515 Quantity::from(1),
1516 update.aggressor_side,
1517 update.trade_id,
1518 update.ts_event,
1519 self.clock.borrow().timestamp_ns(),
1520 ))
1521 }
1522
1523 fn handle_bar(&self, bar: Bar) {
1524 process_engine_bar(&self.cache, self.config.validate_data_sequence, true, bar);
1525 }
1526
1527 fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
1528 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1529 log_error_on_cache_insert(&e);
1530 }
1531
1532 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1533 msgbus::publish_mark_price(topic, &mark_price);
1534 }
1535
1536 fn handle_index_price(&self, index_price: IndexPriceUpdate) {
1537 if let Err(e) = self
1538 .cache
1539 .as_ref()
1540 .borrow_mut()
1541 .add_index_price(index_price)
1542 {
1543 log_error_on_cache_insert(&e);
1544 }
1545
1546 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1547 msgbus::publish_index_price(topic, &index_price);
1548 }
1549
1550 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1552 if let Err(e) = self
1553 .cache
1554 .as_ref()
1555 .borrow_mut()
1556 .add_funding_rate(funding_rate)
1557 {
1558 log_error_on_cache_insert(&e);
1559 }
1560
1561 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1562 msgbus::publish_funding_rate(topic, &funding_rate);
1563 }
1564
1565 fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1566 if let Err(e) = self
1567 .cache
1568 .as_ref()
1569 .borrow_mut()
1570 .add_instrument_status(status)
1571 {
1572 log_error_on_cache_insert(&e);
1573 }
1574
1575 let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1576 msgbus::publish_any(topic, &status);
1577
1578 if self
1579 .option_chain_instrument_index
1580 .contains_key(&status.instrument_id)
1581 && matches!(
1582 status.action,
1583 MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
1584 )
1585 {
1586 self.expire_option_chain_instrument(status.instrument_id);
1587 }
1588 }
1589
1590 fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
1597 let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
1598 return;
1599 };
1600
1601 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1602 return;
1603 };
1604
1605 let series_empty = manager_rc
1606 .borrow_mut()
1607 .handle_instrument_expired(&instrument_id);
1608
1609 self.drain_deferred_commands();
1611
1612 log::info!(
1613 "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
1614 );
1615
1616 if series_empty {
1617 manager_rc.borrow_mut().teardown(&self.clock);
1618 self.option_chain_managers.remove(&series_id);
1619
1620 log::info!("Torn down empty option chain manager for {series_id}");
1621 }
1622 }
1623
1624 fn handle_instrument_close(&self, close: InstrumentClose) {
1625 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1626 msgbus::publish_any(topic, &close);
1627 }
1628
1629 fn handle_custom_data(&self, custom: &CustomData) {
1630 log::debug!("Processing custom data: {}", custom.data.type_name());
1631 let topic = switchboard::get_custom_topic(&custom.data_type);
1632 msgbus::publish_any(topic, custom);
1633 }
1634
1635 fn handle_delta_pipeline(&self, delta: OrderBookDelta) {
1636 let deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1638 let topic = historical_topic_of(switchboard::get_book_deltas_topic(deltas.instrument_id));
1639 msgbus::publish_deltas(topic, &deltas);
1640 }
1641
1642 fn handle_deltas_pipeline(&self, deltas: &OrderBookDeltas) {
1643 let topic = historical_topic_of(switchboard::get_book_deltas_topic(deltas.instrument_id));
1644 msgbus::publish_deltas(topic, deltas);
1645 }
1646
1647 fn handle_depth10_pipeline(&self, depth: OrderBookDepth10) {
1648 let topic = historical_topic_of(switchboard::get_book_depth10_topic(depth.instrument_id));
1649 msgbus::publish_depth10(topic, &depth);
1650 }
1651
1652 fn handle_quote_pipeline(&self, quote: QuoteTick) {
1653 if self.pipeline_cache_writes_allowed()
1654 && let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote)
1655 {
1656 log_error_on_cache_insert(&e);
1657 }
1658
1659 let topic = historical_topic_of(switchboard::get_quotes_topic(quote.instrument_id));
1660 msgbus::publish_quote(topic, "e);
1661 }
1662
1663 fn handle_trade_pipeline(&self, trade: TradeTick) {
1664 if self.pipeline_cache_writes_allowed()
1665 && let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade)
1666 {
1667 log_error_on_cache_insert(&e);
1668 }
1669
1670 let topic = historical_topic_of(switchboard::get_trades_topic(trade.instrument_id));
1671 msgbus::publish_trade(topic, &trade);
1672 }
1673
1674 fn handle_bar_pipeline(&self, bar: Bar) {
1675 if !validate_bar_sequence(&self.cache, self.config.validate_data_sequence, &bar) {
1676 return;
1677 }
1678
1679 if self.pipeline_cache_writes_allowed()
1680 && let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar)
1681 {
1682 log_error_on_cache_insert(&e);
1683 }
1684
1685 let topic = historical_topic_of(switchboard::get_bars_topic(bar.bar_type));
1686 msgbus::publish_bar(topic, &bar);
1687 }
1688
1689 fn handle_mark_price_pipeline(&self, mark_price: MarkPriceUpdate) {
1690 if self.pipeline_cache_writes_allowed()
1691 && let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price)
1692 {
1693 log_error_on_cache_insert(&e);
1694 }
1695
1696 let topic =
1697 historical_topic_of(switchboard::get_mark_price_topic(mark_price.instrument_id));
1698 msgbus::publish_mark_price(topic, &mark_price);
1699 }
1700
1701 fn handle_index_price_pipeline(&self, index_price: IndexPriceUpdate) {
1702 if self.pipeline_cache_writes_allowed()
1703 && let Err(e) = self
1704 .cache
1705 .as_ref()
1706 .borrow_mut()
1707 .add_index_price(index_price)
1708 {
1709 log_error_on_cache_insert(&e);
1710 }
1711
1712 let topic = historical_topic_of(switchboard::get_index_price_topic(
1713 index_price.instrument_id,
1714 ));
1715 msgbus::publish_index_price(topic, &index_price);
1716 }
1717
1718 fn handle_instrument_status_pipeline(&self, status: InstrumentStatus) {
1719 if self.pipeline_cache_writes_allowed()
1720 && let Err(e) = self
1721 .cache
1722 .as_ref()
1723 .borrow_mut()
1724 .add_instrument_status(status)
1725 {
1726 log_error_on_cache_insert(&e);
1727 }
1728
1729 let topic = historical_topic_of(switchboard::get_instrument_status_topic(
1730 status.instrument_id,
1731 ));
1732 msgbus::publish_any(topic, &status);
1733 }
1734
1735 fn handle_instrument_close_pipeline(&self, close: InstrumentClose) {
1736 let topic =
1737 historical_topic_of(switchboard::get_instrument_close_topic(close.instrument_id));
1738 msgbus::publish_any(topic, &close);
1739 }
1740
1741 fn handle_custom_data_pipeline(&self, custom: &CustomData) {
1742 log::debug!("Pipeline custom data: {}", custom.data.type_name());
1743 let topic = historical_topic_of(switchboard::get_custom_topic(&custom.data_type));
1744 msgbus::publish_any(topic, custom);
1745 }
1746
1747 fn drain_deferred_commands(&mut self) {
1751 loop {
1753 let commands: VecDeque<DeferredCommand> =
1754 std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
1755
1756 if commands.is_empty() {
1757 break;
1758 }
1759
1760 for cmd in commands {
1761 match cmd {
1762 DeferredCommand::Subscribe(sub) => {
1763 let client = self.get_command_client(sub.client_id(), sub.venue());
1764 if let Some(client) = client {
1765 client.execute_subscribe(sub);
1766 }
1767 }
1768 DeferredCommand::Unsubscribe(unsub) => {
1769 let client = self.get_command_client(unsub.client_id(), unsub.venue());
1770 if let Some(client) = client {
1771 client.execute_unsubscribe(&unsub);
1772 }
1773 }
1774 DeferredCommand::ExpireInstrument(instrument_id) => {
1775 self.expire_option_chain_instrument(instrument_id);
1776 }
1777 DeferredCommand::ExpireSeries(series_id) => {
1778 self.expire_series(series_id);
1779 }
1780 }
1781 }
1782 }
1783 }
1784
1785 fn expire_series(&mut self, series_id: OptionSeriesId) {
1791 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1792 return;
1793 };
1794
1795 let instrument_ids: Vec<InstrumentId> = self
1796 .option_chain_instrument_index
1797 .iter()
1798 .filter(|(_, sid)| **sid == series_id)
1799 .map(|(id, _)| *id)
1800 .collect();
1801
1802 for id in &instrument_ids {
1803 self.option_chain_instrument_index.remove(id);
1804 manager_rc.borrow_mut().handle_instrument_expired(id);
1805 }
1806
1807 manager_rc.borrow_mut().teardown(&self.clock);
1808 self.option_chain_managers.remove(&series_id);
1809
1810 log::info!("Proactively torn down expired option chain {series_id}");
1811 }
1812
1813 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1814 if cmd.instrument_id.is_synthetic() {
1815 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1816 }
1817
1818 let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
1821
1822 self.book_deltas_subs.insert(cmd.instrument_id);
1823 if cmd.managed {
1824 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, parent)?;
1825 }
1826
1827 Ok(())
1828 }
1829
1830 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1831 if cmd.instrument_id.is_synthetic() {
1832 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1833 }
1834
1835 let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
1836
1837 self.book_depth10_subs.insert(cmd.instrument_id);
1838 if cmd.managed {
1839 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
1840 }
1841
1842 Ok(())
1843 }
1844
1845 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1846 if cmd.instrument_id.is_synthetic() {
1847 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1848 }
1849
1850 let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
1851
1852 let had_snapshots = self.has_book_snapshot_subscriptions(&cmd.instrument_id);
1853 let inserted = self.increment_book_snapshot_subscription(cmd, parent);
1854
1855 if inserted && !had_snapshots {
1856 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
1861 }
1862
1863 if had_snapshots || self.book_deltas_subs.contains(&cmd.instrument_id) {
1864 return Ok(());
1865 }
1866
1867 if let Some(client_id) = cmd.client_id.as_ref()
1868 && self.external_clients.contains(client_id)
1869 {
1870 if self.config.debug {
1871 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
1872 }
1873 return Ok(());
1874 }
1875
1876 log::debug!(
1877 "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1878 cmd.instrument_id,
1879 cmd.client_id,
1880 cmd.venue,
1881 );
1882
1883 if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1884 let deltas_cmd = SubscribeBookDeltas::new(
1885 cmd.instrument_id,
1886 cmd.book_type,
1887 cmd.client_id,
1888 cmd.venue,
1889 UUID4::new(),
1890 cmd.ts_init,
1891 cmd.depth,
1892 true, Some(cmd.command_id),
1894 cmd.params.clone(),
1895 );
1896 log::debug!(
1897 "Calling client.execute_subscribe for BookDeltas: {}",
1898 cmd.instrument_id
1899 );
1900 client.execute_subscribe(SubscribeCommand::BookDeltas(deltas_cmd));
1901 } else {
1902 log::error!(
1903 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1904 cmd.client_id,
1905 cmd.venue,
1906 );
1907 }
1908
1909 Ok(())
1910 }
1911
1912 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1913 match cmd.bar_type.aggregation_source() {
1914 AggregationSource::Internal => {
1915 if !self
1916 .bar_aggregators
1917 .contains_key(&bar_aggregator_key(cmd.bar_type, None))
1918 {
1919 self.start_bar_aggregator(cmd.bar_type, None)?;
1920 }
1921 }
1922 AggregationSource::External => {
1923 if cmd.bar_type.instrument_id().is_synthetic() {
1924 anyhow::bail!(
1925 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1926 );
1927 }
1928 }
1929 }
1930
1931 Ok(())
1932 }
1933
1934 fn subscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
1935 let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
1936 log::error!(
1937 "Cannot subscribe to `QuoteTick` data for synthetic instrument {instrument_id}, not found",
1938 );
1939 return;
1940 };
1941
1942 if !self.subscribed_synthetic_quotes.insert(instrument_id) {
1943 return;
1944 }
1945
1946 for component_id in &synthetic.components {
1947 let synthetics = self.synthetic_quote_feeds.entry(*component_id).or_default();
1948 if !synthetics
1949 .iter()
1950 .any(|registered| registered.id == synthetic.id)
1951 {
1952 synthetics.push(synthetic.clone());
1953 }
1954 }
1955 }
1956
1957 fn subscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
1958 let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
1959 log::error!(
1960 "Cannot subscribe to `TradeTick` data for synthetic instrument {instrument_id}, not found",
1961 );
1962 return;
1963 };
1964
1965 if !self.subscribed_synthetic_trades.insert(instrument_id) {
1966 return;
1967 }
1968
1969 for component_id in &synthetic.components {
1970 let synthetics = self.synthetic_trade_feeds.entry(*component_id).or_default();
1971 if !synthetics
1972 .iter()
1973 .any(|registered| registered.id == synthetic.id)
1974 {
1975 synthetics.push(synthetic.clone());
1976 }
1977 }
1978 }
1979
1980 fn is_spread_quote_command(
1981 &self,
1982 instrument_id: InstrumentId,
1983 params: Option<&Params>,
1984 ) -> bool {
1985 if !params
1986 .and_then(|params| params.get_bool("aggregate_spread_quotes"))
1987 .unwrap_or(false)
1988 {
1989 return false;
1990 }
1991
1992 self.cache
1993 .borrow()
1994 .instrument(&instrument_id)
1995 .is_some_and(InstrumentAny::is_spread)
1996 }
1997
1998 fn subscribe_spread_quotes(&mut self, cmd: &SubscribeQuotes) {
1999 if self
2000 .spread_quote_aggregators
2001 .contains_key(&cmd.instrument_id)
2002 {
2003 log::warn!(
2004 "SpreadQuoteAggregator for {} is currently in use, subscription can't be started",
2005 cmd.instrument_id,
2006 );
2007 return;
2008 }
2009
2010 let Some(instrument) = self.cache.borrow().instrument(&cmd.instrument_id).cloned() else {
2011 log::error!(
2012 "Cannot create spread quote aggregator: no instrument found for {}",
2013 cmd.instrument_id,
2014 );
2015 return;
2016 };
2017 let Some(legs) = spread_instrument_legs(&instrument) else {
2018 log::error!(
2019 "Cannot create spread quote aggregator: invalid spread legs for {}",
2020 cmd.instrument_id,
2021 );
2022 return;
2023 };
2024
2025 if legs.len() <= 1 {
2026 log::error!(
2027 "Cannot create spread quote aggregator: spread instrument {} should have more than one leg",
2028 cmd.instrument_id,
2029 );
2030 return;
2031 }
2032
2033 let cache = self.cache.clone();
2034 let handler = Box::new(move |quote: QuoteTick| {
2035 if let Err(e) = cache.borrow_mut().add_quote(quote) {
2036 log_error_on_cache_insert(&e);
2037 }
2038 let topic = switchboard::get_quotes_topic(quote.instrument_id);
2039 msgbus::publish_quote(topic, "e);
2040 });
2041 let aggregator = Rc::new(RefCell::new(SpreadQuoteAggregator::new(
2042 cmd.instrument_id,
2043 &legs,
2044 matches!(instrument, InstrumentAny::FuturesSpread(_)),
2045 instrument.price_precision(),
2046 instrument.size_precision(),
2047 handler,
2048 self.clock.clone(),
2049 false,
2050 spread_quote_update_interval_seconds(cmd.params.as_ref()),
2051 cmd.params
2052 .as_ref()
2053 .and_then(|params| params.get_u64("quote_build_delay"))
2054 .unwrap_or(0),
2055 None,
2056 None,
2057 )));
2058
2059 let mut handlers = Vec::with_capacity(legs.len());
2060 for (leg_id, _) in &legs {
2061 let topic = switchboard::get_quotes_topic(*leg_id);
2062 let handler = TypedHandler::new(SpreadQuoteHandler::new(
2063 &aggregator,
2064 cmd.instrument_id,
2065 *leg_id,
2066 ));
2067 msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
2068 handlers.push((*leg_id, handler));
2069 }
2070
2071 aggregator
2072 .borrow_mut()
2073 .start_timer(Some(aggregator.clone()));
2074 aggregator.borrow_mut().set_running(true);
2075 self.spread_quote_aggregators
2076 .insert(cmd.instrument_id, aggregator);
2077 self.spread_quote_handlers
2078 .insert(cmd.instrument_id, handlers);
2079
2080 for (leg_id, _) in legs {
2081 let subscribe = SubscribeQuotes::new(
2082 leg_id,
2083 cmd.client_id,
2084 cmd.venue,
2085 UUID4::new(),
2086 cmd.ts_init,
2087 Some(cmd.command_id),
2088 cmd.params.clone(),
2089 );
2090 self.execute(DataCommand::Subscribe(SubscribeCommand::Quotes(subscribe)));
2091 }
2092 }
2093
2094 fn unsubscribe_spread_quotes(&mut self, cmd: &UnsubscribeQuotes) {
2095 let Some(leg_ids) = self.stop_spread_quote_aggregator(cmd.instrument_id) else {
2096 return;
2097 };
2098
2099 for leg_id in leg_ids {
2100 let unsubscribe = UnsubscribeQuotes::new(
2101 leg_id,
2102 cmd.client_id,
2103 cmd.venue,
2104 UUID4::new(),
2105 cmd.ts_init,
2106 Some(cmd.command_id),
2107 cmd.params.clone(),
2108 );
2109 self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(
2110 unsubscribe,
2111 )));
2112 }
2113 }
2114
2115 fn stop_spread_quote_aggregator(
2116 &mut self,
2117 spread_instrument_id: InstrumentId,
2118 ) -> Option<Vec<InstrumentId>> {
2119 let Some(aggregator) = self.spread_quote_aggregators.remove(&spread_instrument_id) else {
2120 log::warn!(
2121 "Cannot stop spread quote aggregator: no aggregator to stop for {spread_instrument_id}",
2122 );
2123 return None;
2124 };
2125
2126 aggregator.borrow_mut().stop_timer();
2127 aggregator.borrow_mut().set_running(false);
2128
2129 let handlers = self
2130 .spread_quote_handlers
2131 .remove(&spread_instrument_id)
2132 .unwrap_or_default();
2133 let mut leg_ids = Vec::with_capacity(handlers.len());
2134 for (leg_id, handler) in handlers {
2135 let topic = switchboard::get_quotes_topic(leg_id);
2136 msgbus::unsubscribe_quotes(topic.into(), &handler);
2137 leg_ids.push(leg_id);
2138 }
2139
2140 Some(leg_ids)
2141 }
2142
2143 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> bool {
2144 if !self.book_deltas_subs.contains(&cmd.instrument_id) {
2145 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
2146 return false;
2147 }
2148
2149 self.book_deltas_subs.remove(&cmd.instrument_id);
2150 self.maintain_book_updater(&cmd.instrument_id);
2151
2152 !self.has_book_snapshot_subscriptions(&cmd.instrument_id)
2155 }
2156
2157 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> bool {
2158 if !self.book_depth10_subs.contains(&cmd.instrument_id) {
2159 log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
2160 return false;
2161 }
2162
2163 self.book_depth10_subs.remove(&cmd.instrument_id);
2164 self.maintain_book_updater(&cmd.instrument_id);
2165
2166 true
2167 }
2168
2169 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
2170 match self.decrement_book_snapshot_subscription(cmd.instrument_id, cmd.interval_ms) {
2171 BookSnapshotUnsubscribeResult::NotSubscribed => {
2172 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
2173 return;
2174 }
2175 BookSnapshotUnsubscribeResult::Decremented => return,
2176 BookSnapshotUnsubscribeResult::Removed => {}
2177 }
2178
2179 if self.has_book_snapshot_subscriptions(&cmd.instrument_id) {
2180 return;
2181 }
2182
2183 self.maintain_book_updater(&cmd.instrument_id);
2184
2185 if self.book_deltas_subs.contains(&cmd.instrument_id) {
2186 return;
2187 }
2188
2189 if let Some(client_id) = cmd.client_id.as_ref()
2190 && self.external_clients.contains(client_id)
2191 {
2192 return;
2193 }
2194
2195 if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
2196 let deltas_cmd = UnsubscribeBookDeltas::new(
2197 cmd.instrument_id,
2198 cmd.client_id,
2199 cmd.venue,
2200 UUID4::new(),
2201 cmd.ts_init,
2202 Some(cmd.command_id),
2203 cmd.params.clone(),
2204 );
2205 client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
2206 }
2207 }
2208
2209 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
2210 let bar_type = cmd.bar_type;
2211
2212 let topic = switchboard::get_bars_topic(bar_type.standard());
2214 if msgbus::exact_subscriber_count_bars(topic) > 0 {
2215 return;
2216 }
2217
2218 if self
2219 .bar_aggregators
2220 .contains_key(&bar_aggregator_key(bar_type, None))
2221 && let Err(e) = self.stop_bar_aggregator(bar_type, None)
2222 {
2223 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
2224 }
2225
2226 if bar_type.is_composite() {
2228 let source_type = bar_type.composite();
2229 let source_topic = switchboard::get_bars_topic(source_type);
2230 if msgbus::exact_subscriber_count_bars(source_topic) == 0
2231 && self
2232 .bar_aggregators
2233 .contains_key(&bar_aggregator_key(source_type, None))
2234 && let Err(e) = self.stop_bar_aggregator(source_type, None)
2235 {
2236 log::error!("Error stopping source bar aggregator for {source_type}: {e}");
2237 }
2238 }
2239 }
2240
2241 fn unsubscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
2242 if !self.subscribed_synthetic_quotes.remove(&instrument_id) {
2243 log::warn!("Cannot unsubscribe from synthetic `QuoteTick` data: not subscribed");
2244 return;
2245 }
2246
2247 self.synthetic_quote_feeds.retain(|_, synthetics| {
2248 synthetics.retain(|synthetic| synthetic.id != instrument_id);
2249 !synthetics.is_empty()
2250 });
2251 }
2252
2253 fn unsubscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
2254 if !self.subscribed_synthetic_trades.remove(&instrument_id) {
2255 log::warn!("Cannot unsubscribe from synthetic `TradeTick` data: not subscribed");
2256 return;
2257 }
2258
2259 self.synthetic_trade_feeds.retain(|_, synthetics| {
2260 synthetics.retain(|synthetic| synthetic.id != instrument_id);
2261 !synthetics.is_empty()
2262 });
2263 }
2264
2265 fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
2266 let series_id = cmd.series_id;
2267
2268 if let Some(old) = self.option_chain_managers.remove(&series_id) {
2270 log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
2271 let all_ids = old.borrow().all_instrument_ids();
2272 let old_venue = old.borrow().venue();
2273 old.borrow_mut().teardown(&self.clock);
2274 self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
2275 }
2276
2277 self.pending_option_chain_requests
2279 .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
2280
2281 if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
2284 let resolved_client_id = self
2286 .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
2287 .map(|c| c.client_id);
2288
2289 if let Some(client_id) = resolved_client_id {
2290 let request_id = UUID4::new();
2291 let ts_init = self.clock.borrow().timestamp_ns();
2292
2293 let sample_instrument_id = {
2296 let cache = self.cache.borrow();
2297 cache
2298 .instruments(&series_id.venue, Some(&series_id.underlying))
2299 .iter()
2300 .find(|i| {
2301 i.expiration_ns() == Some(series_id.expiration_ns)
2302 && i.settlement_currency().code == series_id.settlement_currency
2303 })
2304 .map(|i| i.id())
2305 };
2306
2307 let request = RequestForwardPrices::new(
2308 series_id.venue,
2309 series_id.underlying,
2310 sample_instrument_id,
2311 Some(client_id),
2312 request_id,
2313 ts_init,
2314 None,
2315 );
2316
2317 self.pending_option_chain_requests
2318 .insert(request_id, cmd.clone());
2319
2320 let req_cmd = RequestCommand::ForwardPrices(request);
2321 if let Err(e) = self.execute_request(req_cmd) {
2322 log::warn!("Failed to request forward prices for {series_id}: {e}");
2323 let cmd = self
2324 .pending_option_chain_requests
2325 .remove(&request_id)
2326 .expect("just inserted");
2327 self.create_option_chain_manager(&cmd, None);
2328 }
2329
2330 return;
2331 }
2332 }
2333
2334 self.create_option_chain_manager(cmd, None);
2335 }
2336
2337 fn create_option_chain_manager(
2339 &mut self,
2340 cmd: &SubscribeOptionChain,
2341 initial_atm_price: Option<Price>,
2342 ) {
2343 let series_id = cmd.series_id;
2344 let cache = self.cache.clone();
2345 let clock = self.clock.clone();
2346 let priority = self.msgbus_priority;
2347 let deferred_cmd_queue = self.deferred_cmd_queue.clone();
2348
2349 let manager_rc = {
2350 let client = self.get_command_client(cmd.client_id.as_ref(), Some(&series_id.venue));
2351 OptionChainManager::create_and_setup(
2352 series_id,
2353 &cache,
2354 cmd,
2355 &clock,
2356 priority,
2357 client,
2358 initial_atm_price,
2359 deferred_cmd_queue,
2360 )
2361 };
2362
2363 for id in manager_rc.borrow().all_instrument_ids() {
2365 self.option_chain_instrument_index.insert(id, series_id);
2366 }
2367
2368 self.option_chain_managers.insert(series_id, manager_rc);
2369 }
2370
2371 fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
2372 let series_id = cmd.series_id;
2373
2374 let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
2375 log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
2376 return;
2377 };
2378
2379 let all_ids = manager_rc.borrow().all_instrument_ids();
2381 let venue = manager_rc.borrow().venue();
2382
2383 for id in &all_ids {
2385 self.option_chain_instrument_index.remove(id);
2386 }
2387
2388 manager_rc.borrow_mut().teardown(&self.clock);
2389
2390 self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
2392
2393 log::info!("Unsubscribed option chain for {series_id}");
2394 }
2395
2396 fn forward_option_chain_unsubscribes(
2398 &mut self,
2399 instrument_ids: &[InstrumentId],
2400 venue: Venue,
2401 client_id: Option<ClientId>,
2402 ) {
2403 let ts_init = self.clock.borrow().timestamp_ns();
2404
2405 let Some(client) = self.get_command_client(client_id.as_ref(), Some(&venue)) else {
2406 log::error!(
2407 "Cannot forward option chain unsubscribes: no client found for venue={venue}",
2408 );
2409 return;
2410 };
2411
2412 for instrument_id in instrument_ids {
2413 client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
2414 *instrument_id,
2415 client_id,
2416 Some(venue),
2417 UUID4::new(),
2418 ts_init,
2419 None,
2420 None,
2421 )));
2422 client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
2423 UnsubscribeOptionGreeks::new(
2424 *instrument_id,
2425 client_id,
2426 Some(venue),
2427 UUID4::new(),
2428 ts_init,
2429 None,
2430 None,
2431 ),
2432 ));
2433 client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
2434 UnsubscribeInstrumentStatus::new(
2435 *instrument_id,
2436 client_id,
2437 Some(venue),
2438 UUID4::new(),
2439 ts_init,
2440 None,
2441 None,
2442 ),
2443 ));
2444 }
2445 }
2446
2447 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId) {
2448 let is_parent = self
2455 .book_deltas_parent_expansions
2456 .contains_key(instrument_id)
2457 || self
2458 .book_depth10_parent_expansions
2459 .contains_key(instrument_id);
2460 let target_ids: Vec<InstrumentId> = if is_parent {
2461 let mut set: AHashSet<InstrumentId> = AHashSet::new();
2462
2463 if let Some(expansion) = self.book_deltas_parent_expansions.get(instrument_id) {
2464 set.extend(expansion.iter().copied());
2465 }
2466
2467 if let Some(expansion) = self.book_depth10_parent_expansions.get(instrument_id) {
2468 set.extend(expansion.iter().copied());
2469 }
2470
2471 if set.is_empty() {
2472 return;
2473 }
2474
2475 set.into_iter().collect()
2476 } else {
2477 vec![*instrument_id]
2478 };
2479
2480 if is_parent {
2481 let parent_still_needs_deltas = self.book_deltas_subs.contains(instrument_id)
2486 || self.book_depth10_subs.contains(instrument_id)
2487 || self.has_book_snapshot_subscriptions(instrument_id);
2488 let parent_still_needs_depth10 = self.book_depth10_subs.contains(instrument_id)
2489 || self.has_book_snapshot_subscriptions(instrument_id);
2490
2491 if !parent_still_needs_deltas {
2492 self.book_deltas_parent_expansions.remove(instrument_id);
2493 }
2494
2495 if !parent_still_needs_depth10 {
2496 self.book_depth10_parent_expansions.remove(instrument_id);
2497 }
2498 }
2499
2500 for target_id in &target_ids {
2501 let wants_deltas = self.is_underlying_wanted_for_deltas(target_id);
2502 let wants_depth10 = self.is_underlying_wanted_for_depth10(target_id);
2503
2504 let Some(updater) = self.book_updaters.get(target_id).cloned() else {
2505 continue;
2506 };
2507
2508 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
2509 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
2510
2511 if !wants_deltas {
2512 let topic = switchboard::get_book_deltas_topic(*target_id);
2513 msgbus::unsubscribe_book_deltas(topic.into(), &deltas_handler);
2514 }
2515
2516 if !wants_depth10 {
2517 let topic = switchboard::get_book_depth10_topic(*target_id);
2518 msgbus::unsubscribe_book_depth10(topic.into(), &depth_handler);
2519 }
2520
2521 if !wants_deltas && !wants_depth10 {
2522 self.book_updaters.remove(target_id);
2523 log::debug!("Removed BookUpdater for instrument ID {target_id}");
2524 }
2525 }
2526 }
2527
2528 fn has_book_snapshot_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
2529 self.book_snapshot_counts
2530 .keys()
2531 .any(|(id, _)| id == instrument_id)
2532 }
2533
2534 fn increment_book_snapshot_subscription(
2535 &mut self,
2536 cmd: &SubscribeBookSnapshots,
2537 parent: Option<(Ustr, InstrumentClass)>,
2538 ) -> bool {
2539 let key = (cmd.instrument_id, cmd.interval_ms);
2540
2541 if let Some(count) = self.book_snapshot_counts.get_mut(&key) {
2542 *count += 1;
2543 return false;
2544 }
2545
2546 self.book_snapshot_counts.insert(key, 1);
2547
2548 let snapshot_infos = if let Some(snapshot_infos) = self.book_intervals.get(&cmd.interval_ms)
2549 {
2550 snapshot_infos.clone()
2551 } else {
2552 let snapshot_infos = Rc::new(RefCell::new(IndexMap::new()));
2553 self.book_intervals
2554 .insert(cmd.interval_ms, snapshot_infos.clone());
2555 self.schedule_book_snapshotter(cmd.interval_ms, snapshot_infos.clone());
2556 snapshot_infos
2557 };
2558
2559 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
2560 let snap_info = BookSnapshotInfo {
2561 instrument_id: cmd.instrument_id,
2562 venue: cmd.instrument_id.venue,
2563 parent,
2564 topic,
2565 interval_ms: cmd.interval_ms,
2566 };
2567
2568 snapshot_infos
2569 .borrow_mut()
2570 .insert(cmd.instrument_id, snap_info);
2571
2572 true
2573 }
2574
2575 fn decrement_book_snapshot_subscription(
2576 &mut self,
2577 instrument_id: InstrumentId,
2578 interval_ms: NonZeroUsize,
2579 ) -> BookSnapshotUnsubscribeResult {
2580 let key = (instrument_id, interval_ms);
2581
2582 let Some(count) = self.book_snapshot_counts.get_mut(&key) else {
2583 return BookSnapshotUnsubscribeResult::NotSubscribed;
2584 };
2585
2586 if *count > 1 {
2587 *count -= 1;
2588 return BookSnapshotUnsubscribeResult::Decremented;
2589 }
2590
2591 self.book_snapshot_counts.shift_remove(&key);
2592
2593 let remove_interval = if let Some(snapshot_infos) = self.book_intervals.get(&interval_ms) {
2594 let mut snapshot_infos = snapshot_infos.borrow_mut();
2595 snapshot_infos.shift_remove(&instrument_id);
2596 snapshot_infos.is_empty()
2597 } else {
2598 false
2599 };
2600
2601 if remove_interval {
2602 self.book_intervals.remove(&interval_ms);
2603
2604 if let Some(snapshotter) = self.book_snapshotters.remove(&interval_ms) {
2605 let timer_name = snapshotter.timer_name;
2606 let mut clock = self.clock.borrow_mut();
2607 if clock.timer_exists(&timer_name) {
2608 clock.cancel_timer(&timer_name);
2609 }
2610 }
2611 }
2612
2613 BookSnapshotUnsubscribeResult::Removed
2614 }
2615
2616 fn schedule_book_snapshotter(
2617 &mut self,
2618 interval_ms: NonZeroUsize,
2619 snapshot_infos: BookSnapshotInfos,
2620 ) {
2621 let interval_ns = millis_to_nanos_unchecked(interval_ms.get() as f64);
2622 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
2623 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
2624
2625 let snapshotter = Rc::new(BookSnapshotter::new(
2626 interval_ms,
2627 snapshot_infos,
2628 self.cache.clone(),
2629 ));
2630 let timer_name = snapshotter.timer_name;
2631 let snapshotter_callback = snapshotter.clone();
2632 let callback_fn: Rc<dyn Fn(TimeEvent)> =
2633 Rc::new(move |event| snapshotter_callback.snapshot(event));
2634 let callback = TimeEventCallback::from(callback_fn);
2635
2636 self.clock
2637 .borrow_mut()
2638 .set_timer_ns(
2639 &timer_name,
2640 interval_ns,
2641 Some(start_time_ns.into()),
2642 None,
2643 Some(callback),
2644 None,
2645 None,
2646 )
2647 .expect(FAILED);
2648
2649 self.book_snapshotters.insert(interval_ms, snapshotter);
2650 }
2651
2652 fn handle_instrument_response(&self, instrument: InstrumentAny) {
2653 let mut cache = self.cache.as_ref().borrow_mut();
2654 if let Err(e) = cache.add_instrument(instrument) {
2655 log_error_on_cache_insert(&e);
2656 }
2657 }
2658
2659 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
2660 let mut cache = self.cache.as_ref().borrow_mut();
2662 for instrument in instruments {
2663 if let Err(e) = cache.add_instrument(instrument.clone()) {
2664 log_error_on_cache_insert(&e);
2665 }
2666 }
2667 }
2668
2669 fn handle_quotes(&self, quotes: &[QuoteTick]) {
2670 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
2671 log_error_on_cache_insert(&e);
2672 }
2673 }
2674
2675 fn handle_trades(&self, trades: &[TradeTick]) {
2676 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
2677 log_error_on_cache_insert(&e);
2678 }
2679 }
2680
2681 fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
2682 if let Err(e) = self
2683 .cache
2684 .as_ref()
2685 .borrow_mut()
2686 .add_funding_rates(funding_rates)
2687 {
2688 log_error_on_cache_insert(&e);
2689 }
2690 }
2691
2692 fn handle_bars(&self, bars: &[Bar]) {
2693 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
2694 log_error_on_cache_insert(&e);
2695 }
2696 }
2697
2698 fn handle_book_response(&self, book: &OrderBook) {
2699 log::debug!("Adding order book {} to cache", book.instrument_id);
2700
2701 if let Err(e) = self
2702 .cache
2703 .as_ref()
2704 .borrow_mut()
2705 .add_order_book(book.clone())
2706 {
2707 log_error_on_cache_insert(&e);
2708 }
2709 }
2710
2711 fn handle_forward_prices_response(
2714 &mut self,
2715 correlation_id: &UUID4,
2716 resp: &ForwardPricesResponse,
2717 ) {
2718 let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
2719 log::debug!(
2720 "No pending option chain request for correlation_id={correlation_id}, ignoring"
2721 );
2722 return;
2723 };
2724
2725 let series_id = cmd.series_id;
2726
2727 let cache = self.cache.borrow();
2730 let mut best_price: Option<Price> = None;
2731
2732 for fp in &resp.data {
2733 if let Some(instrument) = cache.instrument(&fp.instrument_id)
2735 && let Some(expiration) = instrument.expiration_ns()
2736 && expiration == series_id.expiration_ns
2737 && instrument.settlement_currency().code == series_id.settlement_currency
2738 {
2739 match Price::from_decimal(fp.forward_price) {
2740 Ok(price) => best_price = Some(price),
2741 Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
2742 }
2743 break;
2744 }
2745 }
2746 drop(cache);
2747
2748 if let Some(price) = best_price {
2749 log::info!("Forward price for {series_id}: {price} (instant bootstrap)");
2750 } else {
2751 log::info!(
2752 "No matching forward price found for {series_id}, will bootstrap from live data",
2753 );
2754 }
2755
2756 self.create_option_chain_manager(&cmd, best_price);
2757 }
2758
2759 fn setup_book_updater(
2760 &mut self,
2761 instrument_id: &InstrumentId,
2762 book_type: BookType,
2763 only_deltas: bool,
2764 parent: Option<(Ustr, InstrumentClass)>,
2765 ) -> anyhow::Result<()> {
2766 let target_ids: Vec<InstrumentId> = if let Some((root, class)) = parent {
2771 self.cache
2772 .borrow()
2773 .instruments_by_parent(&instrument_id.venue, &root, class)
2774 .iter()
2775 .map(|i| i.id())
2776 .collect()
2777 } else {
2778 vec![*instrument_id]
2779 };
2780
2781 if parent.is_some() {
2782 self.book_deltas_parent_expansions
2783 .insert(*instrument_id, target_ids.clone());
2784
2785 if !only_deltas {
2786 self.book_depth10_parent_expansions
2787 .insert(*instrument_id, target_ids.clone());
2788 }
2789 }
2790
2791 {
2792 let mut cache = self.cache.borrow_mut();
2793 for target_id in &target_ids {
2794 if !cache.has_order_book(target_id) {
2795 let book = OrderBook::new(*target_id, book_type);
2796 log::debug!("Created {book}");
2797 cache.add_order_book(book)?;
2798 }
2799 }
2800 }
2801
2802 for target_id in &target_ids {
2803 let updater = self
2804 .book_updaters
2805 .entry(*target_id)
2806 .or_insert_with(|| {
2807 Rc::new(BookUpdater::new(
2808 target_id,
2809 self.cache.clone(),
2810 self.config.emit_quotes_from_book,
2811 ))
2812 })
2813 .clone();
2814
2815 let deltas_topic = switchboard::get_book_deltas_topic(*target_id);
2820 let deltas_handler = TypedHandler::new(updater.clone());
2821 msgbus::subscribe_book_deltas(
2822 deltas_topic.into(),
2823 deltas_handler,
2824 Some(self.msgbus_priority),
2825 );
2826
2827 if !only_deltas {
2828 let depth_topic = switchboard::get_book_depth10_topic(*target_id);
2829 let depth_handler = TypedHandler::new(updater);
2830 msgbus::subscribe_book_depth10(
2831 depth_topic.into(),
2832 depth_handler,
2833 Some(self.msgbus_priority),
2834 );
2835 }
2836 }
2837
2838 Ok(())
2839 }
2840
2841 fn is_underlying_wanted_for_deltas(&self, target_id: &InstrumentId) -> bool {
2842 if self.book_deltas_subs.contains(target_id)
2846 || self.book_depth10_subs.contains(target_id)
2847 || self.has_book_snapshot_subscriptions(target_id)
2848 {
2849 return true;
2850 }
2851 self.book_deltas_parent_expansions
2852 .values()
2853 .any(|expansion| expansion.contains(target_id))
2854 }
2855
2856 fn is_underlying_wanted_for_depth10(&self, target_id: &InstrumentId) -> bool {
2857 if self.book_depth10_subs.contains(target_id)
2860 || self.has_book_snapshot_subscriptions(target_id)
2861 {
2862 return true;
2863 }
2864 self.book_depth10_parent_expansions
2865 .values()
2866 .any(|expansion| expansion.contains(target_id))
2867 }
2868
2869 fn create_bar_aggregator(
2870 &self,
2871 instrument: &InstrumentAny,
2872 bar_type: BarType,
2873 ) -> Box<dyn BarAggregator> {
2874 let cache = self.cache.clone();
2875 let validate_sequence = self.config.validate_data_sequence;
2876
2877 let handler = move |bar: Bar| {
2878 process_engine_bar(&cache, validate_sequence, true, bar);
2879 };
2880
2881 let clock = self.clock.clone();
2882 let config = self.config.clone();
2883
2884 let price_precision = instrument.price_precision();
2885 let size_precision = instrument.size_precision();
2886
2887 if bar_type.spec().is_time_aggregated() {
2888 let time_bars_origin_offset = config
2889 .time_bars_origin_offset
2890 .get(&bar_type.spec().aggregation)
2891 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
2892
2893 Box::new(TimeBarAggregator::new(
2894 bar_type,
2895 price_precision,
2896 size_precision,
2897 clock,
2898 handler,
2899 config.time_bars_build_with_no_updates,
2900 config.time_bars_timestamp_on_close,
2901 config.time_bars_interval_type,
2902 time_bars_origin_offset,
2903 config.time_bars_build_delay,
2904 config.time_bars_skip_first_non_full_bar,
2905 ))
2906 } else {
2907 match bar_type.spec().aggregation {
2908 BarAggregation::Tick => Box::new(TickBarAggregator::new(
2909 bar_type,
2910 price_precision,
2911 size_precision,
2912 handler,
2913 )) as Box<dyn BarAggregator>,
2914 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
2915 bar_type,
2916 price_precision,
2917 size_precision,
2918 handler,
2919 )) as Box<dyn BarAggregator>,
2920 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
2921 bar_type,
2922 price_precision,
2923 size_precision,
2924 handler,
2925 )) as Box<dyn BarAggregator>,
2926 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
2927 bar_type,
2928 price_precision,
2929 size_precision,
2930 handler,
2931 )) as Box<dyn BarAggregator>,
2932 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
2933 bar_type,
2934 price_precision,
2935 size_precision,
2936 handler,
2937 )) as Box<dyn BarAggregator>,
2938 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
2939 bar_type,
2940 price_precision,
2941 size_precision,
2942 handler,
2943 )) as Box<dyn BarAggregator>,
2944 BarAggregation::Value => Box::new(ValueBarAggregator::new(
2945 bar_type,
2946 price_precision,
2947 size_precision,
2948 handler,
2949 )) as Box<dyn BarAggregator>,
2950 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
2951 bar_type,
2952 price_precision,
2953 size_precision,
2954 handler,
2955 )) as Box<dyn BarAggregator>,
2956 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
2957 bar_type,
2958 price_precision,
2959 size_precision,
2960 handler,
2961 )) as Box<dyn BarAggregator>,
2962 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
2963 bar_type,
2964 price_precision,
2965 size_precision,
2966 instrument.price_increment(),
2967 handler,
2968 )) as Box<dyn BarAggregator>,
2969 other => unreachable!(
2970 "Unsupported internal bar aggregation dispatch for {other:?}; update `create_bar_aggregator`"
2971 ),
2972 }
2973 }
2974 }
2975
2976 fn start_bar_aggregator(
2980 &mut self,
2981 bar_type: BarType,
2982 request_id: Option<UUID4>,
2983 ) -> anyhow::Result<()> {
2984 let instrument = {
2986 let cache = self.cache.borrow();
2987 cache
2988 .instrument(&bar_type.instrument_id())
2989 .ok_or_else(|| {
2990 anyhow::anyhow!(
2991 "Cannot start bar aggregation: no instrument found for {}",
2992 bar_type.instrument_id(),
2993 )
2994 })?
2995 .clone()
2996 };
2997
2998 let key = bar_aggregator_key(bar_type, request_id);
2999 let bar_type_std = bar_type.standard();
3000
3001 let aggregator = if let Some(rc) = self.bar_aggregators.get(&key) {
3003 rc.clone()
3004 } else {
3005 let agg = self.create_bar_aggregator(&instrument, bar_type);
3006 let rc = Rc::new(RefCell::new(agg));
3007 self.bar_aggregators.insert(key, rc.clone());
3008 rc
3009 };
3010
3011 let mut subscriptions = Vec::new();
3013
3014 if bar_type.is_composite() {
3015 let topic = switchboard::get_bars_topic(bar_type.composite());
3016 let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_type_std));
3017 msgbus::subscribe_bars(topic.into(), handler.clone(), None);
3018 subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
3019 } else if bar_type.spec().price_type == PriceType::Last {
3020 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
3021 let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_type_std));
3022 msgbus::subscribe_trades(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
3023 subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
3024 } else {
3025 if matches!(
3027 bar_type.spec().aggregation,
3028 BarAggregation::TickImbalance
3029 | BarAggregation::VolumeImbalance
3030 | BarAggregation::ValueImbalance
3031 | BarAggregation::TickRuns
3032 | BarAggregation::VolumeRuns
3033 | BarAggregation::ValueRuns
3034 ) {
3035 log::warn!(
3036 "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
3037 data with `aggressor_side`, but `price_type` is not LAST so it will receive \
3038 quote data: bars will not emit correctly",
3039 );
3040 }
3041
3042 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
3043 let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_type_std));
3044 msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
3045 subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
3046 }
3047
3048 self.bar_aggregator_handlers.insert(key, subscriptions);
3049
3050 self.setup_bar_aggregator(bar_type, false, request_id)?;
3052
3053 aggregator.borrow_mut().set_is_running(true);
3054
3055 Ok(())
3056 }
3057
3058 fn setup_bar_aggregator(
3062 &self,
3063 bar_type: BarType,
3064 historical: bool,
3065 request_id: Option<UUID4>,
3066 ) -> anyhow::Result<()> {
3067 let key = bar_aggregator_key(bar_type, request_id);
3068 let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
3069 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
3070 })?;
3071
3072 let cache = self.cache.clone();
3074 let validate_sequence = self.config.validate_data_sequence;
3075 let publish = !historical;
3076 let handler: Box<dyn FnMut(Bar)> = Box::new(move |bar: Bar| {
3077 process_engine_bar(&cache, validate_sequence, publish, bar);
3078 });
3079
3080 aggregator
3081 .borrow_mut()
3082 .set_historical_mode(historical, handler);
3083
3084 if bar_type.spec().is_time_aggregated() {
3086 use nautilus_common::clock::TestClock;
3087
3088 if historical {
3089 let test_clock = Rc::new(RefCell::new(TestClock::new()));
3091 aggregator.borrow_mut().set_clock(test_clock);
3092 let aggregator_weak = Rc::downgrade(aggregator);
3095 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
3096 } else {
3097 aggregator.borrow_mut().set_clock(self.clock.clone());
3098 aggregator
3099 .borrow_mut()
3100 .start_timer(Some(aggregator.clone()));
3101 }
3102 }
3103
3104 Ok(())
3105 }
3106
3107 fn stop_bar_aggregator(
3108 &mut self,
3109 bar_type: BarType,
3110 request_id: Option<UUID4>,
3111 ) -> anyhow::Result<()> {
3112 let key = bar_aggregator_key(bar_type, request_id);
3113 let aggregator = self.bar_aggregators.shift_remove(&key).ok_or_else(|| {
3114 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
3115 })?;
3116
3117 aggregator.borrow_mut().stop();
3118
3119 if let Some(subs) = self.bar_aggregator_handlers.remove(&key) {
3121 for sub in subs {
3122 match sub {
3123 BarAggregatorSubscription::Bar { topic, handler } => {
3124 msgbus::unsubscribe_bars(topic.into(), &handler);
3125 }
3126 BarAggregatorSubscription::Trade { topic, handler } => {
3127 msgbus::unsubscribe_trades(topic.into(), &handler);
3128 }
3129 BarAggregatorSubscription::Quote { topic, handler } => {
3130 msgbus::unsubscribe_quotes(topic.into(), &handler);
3131 }
3132 }
3133 }
3134 }
3135
3136 Ok(())
3137 }
3138}
3139
3140fn resolve_parent_components(
3148 instrument_id: &InstrumentId,
3149 params: Option<&Params>,
3150) -> anyhow::Result<Option<(Ustr, InstrumentClass)>> {
3151 if !is_parent_subscription(params) {
3152 return Ok(None);
3153 }
3154 let Some((root, class)) = instrument_id.parse_parent_components() else {
3155 anyhow::bail!(
3156 "Cannot expand parent subscription for {instrument_id}: \
3157 symbol does not parse as `<root>.<class>` with a recognised class suffix"
3158 );
3159 };
3160 Ok(Some((Ustr::from(root), class)))
3161}
3162
3163fn spread_quote_update_interval_seconds(params: Option<&Params>) -> Option<u64> {
3164 match params.and_then(|params| params.get("update_interval_seconds")) {
3165 Some(value) if value.is_null() => None,
3166 Some(value) => value.as_u64().filter(|interval| *interval > 0),
3167 None => Some(1),
3168 }
3169}
3170
3171fn spread_instrument_legs(instrument: &InstrumentAny) -> Option<Vec<(InstrumentId, i64)>> {
3172 if !instrument.is_spread() {
3173 return None;
3174 }
3175
3176 let instrument_id = instrument.id();
3177 let symbol = instrument_id.symbol.as_str();
3178 if !symbol.contains(GENERIC_SPREAD_ID_SEPARATOR) {
3179 return Some(vec![(instrument_id, 1)]);
3180 }
3181
3182 symbol
3183 .split(GENERIC_SPREAD_ID_SEPARATOR)
3184 .map(|component| parse_spread_leg(component, instrument_id.venue))
3185 .collect()
3186}
3187
3188fn parse_spread_leg(component: &str, venue: Venue) -> Option<(InstrumentId, i64)> {
3189 if let Some(rest) = component.strip_prefix("((") {
3190 let (ratio, symbol) = rest.split_once("))")?;
3191 return parse_spread_leg_parts(ratio, symbol, venue, -1);
3192 }
3193
3194 let rest = component.strip_prefix('(')?;
3195 let (ratio, symbol) = rest.split_once(')')?;
3196 parse_spread_leg_parts(ratio, symbol, venue, 1)
3197}
3198
3199fn parse_spread_leg_parts(
3200 ratio: &str,
3201 symbol: &str,
3202 venue: Venue,
3203 sign: i64,
3204) -> Option<(InstrumentId, i64)> {
3205 if symbol.is_empty() {
3206 return None;
3207 }
3208
3209 let ratio = ratio.parse::<i64>().ok()?.checked_mul(sign)?;
3210 if ratio == 0 {
3211 return None;
3212 }
3213
3214 Some((InstrumentId::new(Symbol::new(symbol), venue), ratio))
3215}
3216
3217#[inline(always)]
3218fn log_error_on_cache_insert<T: Display>(e: &T) {
3219 log::error!("Error on cache insert: {e}");
3220}
3221
3222#[inline]
3223fn historical_topic_of(live: MStr<Topic>) -> MStr<Topic> {
3224 MStr::<Topic>::from(format!("historical.{}", live.as_ref()))
3225}
3226
3227fn derive_quote_from_depth(depth: &OrderBookDepth10) -> Option<QuoteTick> {
3230 let bid = depth.bids.first()?;
3231 let ask = depth.asks.first()?;
3232
3233 if bid.side == OrderSide::NoOrderSide
3234 || ask.side == OrderSide::NoOrderSide
3235 || bid.size.raw == 0
3236 || ask.size.raw == 0
3237 {
3238 return None;
3239 }
3240
3241 Some(QuoteTick::new(
3242 depth.instrument_id,
3243 bid.price,
3244 ask.price,
3245 bid.size,
3246 ask.size,
3247 depth.ts_event,
3248 depth.ts_init,
3249 ))
3250}
3251
3252fn process_engine_bar(
3256 cache: &Rc<RefCell<Cache>>,
3257 validate_sequence: bool,
3258 publish: bool,
3259 bar: Bar,
3260) {
3261 if !validate_bar_sequence(cache, validate_sequence, &bar) {
3262 return;
3263 }
3264
3265 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
3266 log_error_on_cache_insert(&e);
3267 }
3268
3269 if publish {
3270 let topic = switchboard::get_bars_topic(bar.bar_type);
3271 msgbus::publish_bar(topic, &bar);
3272 }
3273}
3274
3275fn validate_bar_sequence(cache: &Rc<RefCell<Cache>>, validate_sequence: bool, bar: &Bar) -> bool {
3276 if !validate_sequence {
3277 return true;
3278 }
3279
3280 let Some(last_bar) = cache.as_ref().borrow().bar(&bar.bar_type).copied() else {
3281 return true;
3282 };
3283
3284 if bar.ts_event < last_bar.ts_event {
3285 log::warn!(
3286 "Bar {bar} was prior to last bar `ts_event` {}",
3287 last_bar.ts_event,
3288 );
3289 return false;
3290 }
3291
3292 if bar.ts_init < last_bar.ts_init {
3293 log::warn!(
3294 "Bar {bar} was prior to last bar `ts_init` {}",
3295 last_bar.ts_init,
3296 );
3297 return false;
3298 }
3299
3300 true
3303}
3304
3305#[inline(always)]
3306fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
3307 if data.is_empty() {
3308 let name = type_name::<T>();
3309 let short_name = name.rsplit("::").next().unwrap_or(name);
3310 log::warn!("Received empty {short_name} response for {id} {correlation_id}");
3311 return true;
3312 }
3313 false
3314}