1pub mod bar;
32pub mod book;
33mod commands;
34pub mod config;
35mod handlers;
36mod requests;
37mod time_range;
38
39#[cfg(feature = "defi")]
40pub mod pool;
41
42#[cfg(feature = "streaming")]
43mod streaming;
44
45use std::{
46 any::{Any, type_name},
47 cell::{Ref, RefCell},
48 collections::VecDeque,
49 fmt::{Debug, Display},
50 num::NonZeroUsize,
51 rc::Rc,
52 str::FromStr,
53};
54
55use ahash::{AHashMap, AHashSet};
56use anyhow::Context;
57pub use bar::BarAggregatorSubscription;
58use bar::{BarAggregatorKey, bar_aggregator_key};
59use book::{
60 BookSnapshotInfo, BookSnapshotInfos, BookSnapshotKey, BookSnapshotUnsubscribeResult,
61 BookSnapshotter, BookUpdater,
62};
63pub(crate) use commands::{DeferredCommand, DeferredCommandQueue};
64use config::DataEngineConfig;
65use futures::future::join_all;
66use handlers::{
67 BAR_AGGREGATOR_PRIORITY, BarBarHandler, BarQuoteHandler, BarTradeHandler, SpreadQuoteHandler,
68};
69use indexmap::IndexMap;
70use nautilus_common::{
71 cache::Cache,
72 clock::Clock,
73 logging::{RECV, RES},
74 messages::data::{
75 BarsResponse, BookDeltasResponse, BookDepthResponse, CustomDataResponse, DataCommand,
76 DataResponse, ForwardPricesResponse, FundingRatesResponse, QuotesResponse, RequestBars,
77 RequestCommand, RequestForwardPrices, RequestJoin, RequestQuotes, RequestTrades,
78 SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
79 SubscribeCommand, SubscribeOptionChain, SubscribeQuotes, SubscribeTrades, TradesResponse,
80 UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
81 UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionChain,
82 UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades, is_parent_subscription,
83 },
84 msgbus::{
85 self, ShareableMessageHandler, TypedHandler, TypedIntoHandler,
86 switchboard::{self, MessagingSwitchboard},
87 },
88 runner::get_data_cmd_sender,
89 timer::{TimeEvent, TimeEventCallback},
90};
91use nautilus_core::{
92 Params, UUID4, UnixNanos, WeakCell,
93 correctness::{
94 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
95 },
96 datetime::{NANOSECONDS_IN_DAY, millis_to_nanos_unchecked},
97};
98#[cfg(feature = "defi")]
99use nautilus_model::defi::DefiData;
100use nautilus_model::{
101 data::{
102 Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, HasTsInit, IndexPriceUpdate,
103 InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
104 OrderBookDepth10, QuoteTick, TradeTick,
105 option_chain::{OptionGreeks, StrikeRange},
106 },
107 enums::{
108 AggregationSource, BarAggregation, BookType, InstrumentClass, MarketStatusAction,
109 OrderSide, PriceType, RecordFlag,
110 },
111 identifiers::{ClientId, InstrumentId, OptionSeriesId, Symbol, Venue},
112 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
113 orderbook::OrderBook,
114 types::{Price, Quantity},
115};
116use requests::{
117 ContinuousFutureRequest, ContinuousFutureRequestState, ContinuousFutureSegment,
118 ContinuousFutureSource, RequestBarAggregation, continuous_future_parent_request_id,
119 continuous_future_request_from_bars, continuous_future_subscription_from_bars,
120 has_continuous_future_params, request_bar_aggregation_from_params, request_params,
121 response_params,
122};
123#[cfg(feature = "streaming")]
124use streaming::CatalogMap;
125use time_range::{
126 TimeRangePipelineState, has_time_range_pipeline_params, is_time_range_pipeline_variant,
127};
128use ustr::Ustr;
129
130#[cfg(feature = "defi")]
131#[allow(unused_imports)] use crate::defi::engine as _;
133#[cfg(feature = "defi")]
134use crate::engine::pool::PoolUpdater;
135use crate::{
136 aggregation::{
137 BarAggregator, RenkoBarAggregator, SpreadQuoteAggregator, TickBarAggregator,
138 TickImbalanceBarAggregator, TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator,
139 ValueImbalanceBarAggregator, ValueRunsBarAggregator, VolumeBarAggregator,
140 VolumeImbalanceBarAggregator, VolumeRunsBarAggregator,
141 },
142 client::DataClientAdapter,
143 option_chains::OptionChainManager,
144};
145
146#[derive(Debug)]
148pub struct DataEngine {
149 pub(crate) clock: Rc<RefCell<dyn Clock>>,
150 pub(crate) cache: Rc<RefCell<Cache>>,
151 pub(crate) external_clients: AHashSet<ClientId>,
152 clients: IndexMap<ClientId, DataClientAdapter>,
153 default_client: Option<DataClientAdapter>,
154 routing_map: IndexMap<Venue, ClientId>,
155 book_intervals: AHashMap<NonZeroUsize, BookSnapshotInfos>,
156 book_snapshot_counts: IndexMap<BookSnapshotKey, usize>,
157 book_deltas_counts: IndexMap<BookDeltasKey, usize>,
158 book_depth10_subs: AHashSet<InstrumentId>,
159 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
160 book_deltas_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
161 book_depth10_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
162 book_snapshotters: AHashMap<NonZeroUsize, Rc<BookSnapshotter>>,
163 bar_aggregators: IndexMap<BarAggregatorKey, Rc<RefCell<Box<dyn BarAggregator>>>>,
164 bar_aggregator_handlers: AHashMap<BarAggregatorKey, Vec<BarAggregatorSubscription>>,
165 request_bar_aggregations: AHashMap<UUID4, RequestBarAggregation>,
166 request_pipeline_parent_request: AHashMap<UUID4, RequestCommand>,
167 request_pipeline_n_components: AHashMap<UUID4, usize>,
168 request_pipeline_parent_request_id: AHashMap<UUID4, UUID4>,
169 request_pipeline_responses: AHashMap<UUID4, Vec<DataResponse>>,
170 time_range_pipeline_requests: AHashMap<UUID4, TimeRangePipelineState>,
171 time_range_pipeline_parent_request_id: AHashMap<UUID4, UUID4>,
172 parent_join_request_id: AHashMap<UUID4, UUID4>,
173 pending_join_requests: AHashMap<UUID4, RequestJoin>,
174 continuous_future_requests: AHashMap<UUID4, ContinuousFutureRequestState>,
175 continuous_future_subscriptions: AHashMap<BarType, ContinuousFutureSubscriptionState>,
176 continuous_future_roller: Option<Rc<ContinuousFutureRoller>>,
177 spread_quote_aggregators: AHashMap<InstrumentId, Rc<RefCell<SpreadQuoteAggregator>>>,
178 spread_quote_handlers: AHashMap<InstrumentId, Vec<(InstrumentId, TypedHandler<QuoteTick>)>>,
179 option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
180 option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
181 deferred_cmd_queue: DeferredCommandQueue,
182 pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
183 synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
184 synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
185 subscribed_synthetic_quotes: AHashSet<InstrumentId>,
186 subscribed_synthetic_trades: AHashSet<InstrumentId>,
187 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
188 command_count: u64,
189 data_count: u64,
190 request_count: u64,
191 response_count: u64,
192 pub(crate) msgbus_priority: u32,
193 pub(crate) config: DataEngineConfig,
194 #[cfg(feature = "streaming")]
195 catalogs: CatalogMap,
196 #[cfg(feature = "defi")]
197 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
198 #[cfg(feature = "defi")]
199 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
200 #[cfg(feature = "defi")]
201 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
202 #[cfg(feature = "defi")]
203 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
204}
205
206enum BookDeltasUnsubscribeResult {
207 NotSubscribed,
208 Decremented,
209 Removed,
210}
211
212type BookDeltasKey = (InstrumentId, Option<ClientId>, Option<Venue>);
213
214impl DataEngine {
215 #[must_use]
217 pub fn new(
218 clock: Rc<RefCell<dyn Clock>>,
219 cache: Rc<RefCell<Cache>>,
220 config: Option<DataEngineConfig>,
221 ) -> Self {
222 let config = config.unwrap_or_default();
223
224 let external_clients: AHashSet<ClientId> = config
225 .external_clients
226 .clone()
227 .unwrap_or_default()
228 .into_iter()
229 .collect();
230
231 Self {
232 clock,
233 cache,
234 external_clients,
235 clients: IndexMap::new(),
236 default_client: None,
237 routing_map: IndexMap::new(),
238 book_intervals: AHashMap::new(),
239 book_snapshot_counts: IndexMap::new(),
240 book_deltas_counts: IndexMap::new(),
241 book_depth10_subs: AHashSet::new(),
242 book_updaters: AHashMap::new(),
243 book_deltas_parent_expansions: AHashMap::new(),
244 book_depth10_parent_expansions: AHashMap::new(),
245 book_snapshotters: AHashMap::new(),
246 bar_aggregators: IndexMap::new(),
247 bar_aggregator_handlers: AHashMap::new(),
248 request_bar_aggregations: AHashMap::new(),
249 request_pipeline_parent_request: AHashMap::new(),
250 request_pipeline_n_components: AHashMap::new(),
251 request_pipeline_parent_request_id: AHashMap::new(),
252 request_pipeline_responses: AHashMap::new(),
253 time_range_pipeline_requests: AHashMap::new(),
254 time_range_pipeline_parent_request_id: AHashMap::new(),
255 parent_join_request_id: AHashMap::new(),
256 pending_join_requests: AHashMap::new(),
257 continuous_future_requests: AHashMap::new(),
258 continuous_future_subscriptions: AHashMap::new(),
259 continuous_future_roller: None,
260 spread_quote_aggregators: AHashMap::new(),
261 spread_quote_handlers: AHashMap::new(),
262 option_chain_managers: AHashMap::new(),
263 option_chain_instrument_index: AHashMap::new(),
264 deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
265 pending_option_chain_requests: AHashMap::new(),
266 synthetic_quote_feeds: AHashMap::new(),
267 synthetic_trade_feeds: AHashMap::new(),
268 subscribed_synthetic_quotes: AHashSet::new(),
269 subscribed_synthetic_trades: AHashSet::new(),
270 buffered_deltas_map: AHashMap::new(),
271 command_count: 0,
272 data_count: 0,
273 request_count: 0,
274 response_count: 0,
275 msgbus_priority: 10, config,
277 #[cfg(feature = "streaming")]
278 catalogs: CatalogMap::new(),
279 #[cfg(feature = "defi")]
280 pool_updaters: AHashMap::new(),
281 #[cfg(feature = "defi")]
282 pool_updaters_pending: AHashSet::new(),
283 #[cfg(feature = "defi")]
284 pool_snapshot_pending: AHashSet::new(),
285 #[cfg(feature = "defi")]
286 pool_event_buffers: AHashMap::new(),
287 }
288 }
289
290 pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
292 let weak = WeakCell::from(Rc::downgrade(engine));
293 engine.borrow_mut().continuous_future_roller =
294 Some(Rc::new(ContinuousFutureRoller::new(engine)));
295
296 let weak1 = weak.clone();
297 msgbus::register_data_command_endpoint(
298 MessagingSwitchboard::data_engine_execute(),
299 TypedIntoHandler::from(move |cmd: DataCommand| {
300 if let Some(rc) = weak1.upgrade() {
301 rc.borrow_mut().execute(cmd);
302 }
303 }),
304 );
305
306 msgbus::register_data_command_endpoint(
307 MessagingSwitchboard::data_engine_queue_execute(),
308 TypedIntoHandler::from(move |cmd: DataCommand| {
309 get_data_cmd_sender().clone().execute(cmd);
310 }),
311 );
312
313 let weak2 = weak.clone();
315 msgbus::register_any(
316 MessagingSwitchboard::data_engine_process(),
317 ShareableMessageHandler::from_any(move |data: &dyn Any| {
318 if let Some(rc) = weak2.upgrade() {
319 rc.borrow_mut().process(data);
320 }
321 }),
322 );
323
324 let weak3 = weak.clone();
326 msgbus::register_data_endpoint(
327 MessagingSwitchboard::data_engine_process_data(),
328 TypedIntoHandler::from(move |data: Data| {
329 if let Some(rc) = weak3.upgrade() {
330 rc.borrow_mut().process_data(data);
331 }
332 }),
333 );
334
335 #[cfg(feature = "defi")]
337 {
338 let weak4 = weak.clone();
339 msgbus::register_defi_data_endpoint(
340 MessagingSwitchboard::data_engine_process_defi_data(),
341 TypedIntoHandler::from(move |data: DefiData| {
342 if let Some(rc) = weak4.upgrade() {
343 rc.borrow_mut().process_defi_data(data);
344 }
345 }),
346 );
347 }
348
349 let weak5 = weak;
350 msgbus::register_data_response_endpoint(
351 MessagingSwitchboard::data_engine_response(),
352 TypedIntoHandler::from(move |resp: DataResponse| {
353 if let Some(rc) = weak5.upgrade() {
354 rc.borrow_mut().response(resp);
355 }
356 }),
357 );
358 }
359
360 #[must_use]
362 pub const fn command_count(&self) -> u64 {
363 self.command_count
364 }
365
366 #[must_use]
368 pub const fn data_count(&self) -> u64 {
369 self.data_count
370 }
371
372 #[cfg(feature = "defi")]
373 pub(crate) const fn increment_data_count(&mut self) {
374 self.data_count += 1;
375 }
376
377 #[must_use]
379 pub const fn request_count(&self) -> u64 {
380 self.request_count
381 }
382
383 #[must_use]
385 pub const fn response_count(&self) -> u64 {
386 self.response_count
387 }
388
389 #[must_use]
391 pub fn has_option_chain_manager(&self, series_id: &OptionSeriesId) -> bool {
392 self.option_chain_managers.contains_key(series_id)
393 }
394
395 #[must_use]
397 pub fn pending_option_chain_request_count(&self) -> usize {
398 self.pending_option_chain_requests.len()
399 }
400
401 #[must_use]
403 pub fn request_pipeline_count(&self) -> usize {
404 self.request_pipeline_parent_request.len()
405 }
406
407 #[must_use]
409 pub fn time_range_pipeline_count(&self) -> usize {
410 self.time_range_pipeline_requests.len()
411 }
412
413 #[must_use]
415 pub fn pending_join_request_count(&self) -> usize {
416 self.pending_join_requests.len()
417 }
418
419 #[must_use]
421 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
422 self.clock.borrow()
423 }
424
425 #[must_use]
427 pub fn get_cache(&self) -> Ref<'_, Cache> {
428 self.cache.borrow()
429 }
430
431 #[must_use]
433 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
434 Rc::clone(&self.cache)
435 }
436
437 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
444 let client_id = client.client_id();
445
446 if let Some(default_client) = &self.default_client {
447 check_predicate_false(
448 default_client.client_id() == client.client_id(),
449 "client_id already registered as default client",
450 )
451 .expect(FAILED);
452 }
453
454 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
455
456 if let Some(routing) = routing {
457 self.routing_map.insert(routing, client_id);
458 log::debug!("Set client {client_id} routing for {routing}");
459 }
460
461 if client.venue.is_none() && self.default_client.is_none() {
462 self.default_client = Some(client);
463 log::debug!("Registered client {client_id} for default routing");
464 } else {
465 self.clients.insert(client_id, client);
466 log::debug!("Registered client {client_id}");
467 }
468 }
469
470 pub fn deregister_client(&mut self, client_id: &ClientId) {
476 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
477
478 self.clients.shift_remove(client_id);
479 log::info!("Deregistered client {client_id}");
480 }
481
482 pub fn register_default_client(&mut self, client: DataClientAdapter) {
494 check_predicate_true(
495 self.default_client.is_none(),
496 "default client already registered",
497 )
498 .expect(FAILED);
499
500 let client_id = client.client_id();
501
502 self.default_client = Some(client);
503 log::debug!("Registered default client {client_id}");
504 }
505
506 pub fn start(&mut self) {
508 for client in self.get_clients_mut() {
509 if let Err(e) = client.start() {
510 log::error!("{e}");
511 }
512 }
513
514 for aggregator in self.bar_aggregators.values() {
515 if aggregator.borrow().bar_type().spec().is_time_aggregated() {
516 aggregator
517 .borrow_mut()
518 .start_timer(Some(aggregator.clone()));
519 }
520 }
521
522 for aggregator in self.spread_quote_aggregators.values() {
523 aggregator
524 .borrow_mut()
525 .start_timer(Some(aggregator.clone()));
526 }
527 }
528
529 pub fn stop(&mut self) {
531 for client in self.get_clients_mut() {
532 if let Err(e) = client.stop() {
533 log::error!("{e}");
534 }
535 }
536
537 for aggregator in self.bar_aggregators.values() {
538 aggregator.borrow_mut().stop();
539 }
540
541 for aggregator in self.spread_quote_aggregators.values() {
542 aggregator.borrow_mut().stop_timer();
543 }
544 }
545
546 pub fn reset(&mut self) {
548 for client in self.get_clients_mut() {
549 if let Err(e) = client.reset() {
550 log::error!("{e}");
551 }
552 }
553
554 let keys: Vec<BarAggregatorKey> = self.bar_aggregators.keys().copied().collect();
555 for (bar_type, request_id) in keys {
556 if let Err(e) = self.stop_bar_aggregator(bar_type, request_id) {
557 log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
558 }
559 }
560
561 self.request_bar_aggregations.clear();
562 self.request_pipeline_parent_request.clear();
563 self.request_pipeline_n_components.clear();
564 self.request_pipeline_parent_request_id.clear();
565 self.request_pipeline_responses.clear();
566 self.time_range_pipeline_requests.clear();
567 self.time_range_pipeline_parent_request_id.clear();
568 self.parent_join_request_id.clear();
569 self.pending_join_requests.clear();
570 self.continuous_future_requests.clear();
571
572 for state in self.continuous_future_subscriptions.values_mut() {
573 if let Some(name) = state.timer_name.take() {
574 self.clock.borrow_mut().cancel_timer(&name);
575 }
576 }
577 self.continuous_future_subscriptions.clear();
578
579 let spread_ids: Vec<InstrumentId> = self.spread_quote_aggregators.keys().copied().collect();
580 for spread_id in spread_ids {
581 self.stop_spread_quote_aggregator(spread_id);
582 }
583
584 let managers: Vec<_> = self.option_chain_managers.drain().collect();
586 for (_, manager) in managers {
587 manager.borrow_mut().teardown(&self.clock);
588 }
589
590 self.option_chain_instrument_index.clear();
591 self.pending_option_chain_requests.clear();
592
593 let book_updaters: Vec<(InstrumentId, Rc<BookUpdater>)> =
598 self.book_updaters.drain().collect();
599 for (instrument_id, updater) in book_updaters {
600 let deltas_topic = switchboard::get_book_deltas_topic(instrument_id);
601 let depth_topic = switchboard::get_book_depth10_topic(instrument_id);
602 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
603 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
604 msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
605 msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
606 }
607
608 self.book_deltas_parent_expansions.clear();
609 self.book_depth10_parent_expansions.clear();
610
611 self.book_deltas_counts.clear();
612 self.book_depth10_subs.clear();
613 self.book_intervals.clear();
614 self.book_snapshot_counts.clear();
615 self.book_snapshotters.clear();
616 self.buffered_deltas_map.clear();
617
618 self.synthetic_quote_feeds.clear();
619 self.synthetic_trade_feeds.clear();
620 self.subscribed_synthetic_quotes.clear();
621 self.subscribed_synthetic_trades.clear();
622
623 self.deferred_cmd_queue.borrow_mut().clear();
624
625 self.clock.borrow_mut().cancel_timers();
626
627 self.command_count = 0;
628 self.data_count = 0;
629 self.request_count = 0;
630 self.response_count = 0;
631 }
632
633 pub fn dispose(&mut self) {
635 for client in self.get_clients_mut() {
636 if let Err(e) = client.dispose() {
637 log::error!("{e}");
638 }
639 }
640
641 self.clock.borrow_mut().cancel_timers();
642 }
643
644 pub async fn connect(&mut self) {
648 let futures: Vec<_> = self
649 .get_clients_mut()
650 .into_iter()
651 .map(DataClientAdapter::connect)
652 .collect();
653
654 let results = join_all(futures).await;
655
656 for error in results.into_iter().filter_map(Result::err) {
657 log::error!("Failed to connect data client: {error}");
658 }
659 }
660
661 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
667 let futures: Vec<_> = self
668 .get_clients_mut()
669 .into_iter()
670 .map(DataClientAdapter::disconnect)
671 .collect();
672
673 let results = join_all(futures).await;
674 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
675
676 if errors.is_empty() {
677 Ok(())
678 } else {
679 let error_msgs: Vec<_> = errors.iter().map(ToString::to_string).collect();
680 anyhow::bail!(
681 "Failed to disconnect data clients: {}",
682 error_msgs.join("; ")
683 )
684 }
685 }
686
687 #[must_use]
689 pub fn check_connected(&self) -> bool {
690 self.get_clients()
691 .iter()
692 .all(|client| client.is_connected())
693 }
694
695 #[must_use]
697 pub fn check_disconnected(&self) -> bool {
698 self.get_clients()
699 .iter()
700 .all(|client| !client.is_connected())
701 }
702
703 #[must_use]
705 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
706 self.get_clients()
707 .into_iter()
708 .map(|client| (client.client_id(), client.is_connected()))
709 .collect()
710 }
711
712 #[must_use]
714 pub fn registered_clients(&self) -> Vec<ClientId> {
715 self.get_clients()
716 .into_iter()
717 .map(|client| client.client_id())
718 .collect()
719 }
720
721 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
722 where
723 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
724 T: Clone,
725 {
726 self.get_clients()
727 .into_iter()
728 .flat_map(get_subs)
729 .cloned()
730 .collect()
731 }
732
733 #[must_use]
734 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
735 let (default_opt, clients_map) = (&self.default_client, &self.clients);
736 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
737
738 if let Some(default) = default_opt {
739 clients.push(default);
740 }
741
742 clients
743 }
744
745 #[must_use]
746 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
747 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
748 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
749
750 if let Some(default) = default_opt {
751 clients.push(default);
752 }
753
754 clients
755 }
756
757 pub fn get_client(
758 &mut self,
759 client_id: Option<&ClientId>,
760 venue: Option<&Venue>,
761 ) -> Option<&mut DataClientAdapter> {
762 if let Some(client_id) = client_id {
763 if let Some(client) = self.clients.get_mut(client_id) {
765 return Some(client);
766 }
767
768 if let Some(default) = self.default_client.as_mut()
770 && default.client_id() == *client_id
771 {
772 return Some(default);
773 }
774
775 return None;
777 }
778
779 if let Some(v) = venue {
780 if let Some(client_id) = self.routing_map.get(v) {
782 return self.clients.get_mut(client_id);
783 }
784 }
785
786 self.get_default_client()
788 }
789
790 fn get_command_client(
795 &mut self,
796 client_id: Option<&ClientId>,
797 venue: Option<&Venue>,
798 ) -> Option<&mut DataClientAdapter> {
799 let backtest_id = ClientId::new("BACKTEST");
800 if self.clients.contains_key(&backtest_id) {
803 return self.clients.get_mut(&backtest_id);
804 }
805 let default_is_backtest = self
806 .default_client
807 .as_ref()
808 .is_some_and(|c| c.client_id() == backtest_id);
809 if default_is_backtest {
810 return self.default_client.as_mut();
811 }
812 self.get_client(client_id, venue)
813 }
814
815 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
816 self.default_client.as_mut()
817 }
818
819 #[must_use]
821 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
822 self.collect_subscriptions(|client| &client.subscriptions_custom)
823 }
824
825 #[must_use]
827 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
828 self.collect_subscriptions(|client| &client.subscriptions_instrument)
829 }
830
831 #[must_use]
833 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
834 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
835 }
836
837 #[must_use]
839 pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
840 self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
841 }
842
843 #[must_use]
845 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
846 self.book_snapshot_counts
847 .keys()
848 .map(|(instrument_id, _)| *instrument_id)
849 .collect()
850 }
851
852 #[must_use]
854 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
855 self.collect_subscriptions(|client| &client.subscriptions_quotes)
856 }
857
858 #[must_use]
860 pub fn subscribed_synthetic_quotes(&self) -> Vec<InstrumentId> {
861 self.subscribed_synthetic_quotes.iter().copied().collect()
862 }
863
864 #[must_use]
866 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
867 self.collect_subscriptions(|client| &client.subscriptions_trades)
868 }
869
870 #[must_use]
872 pub fn subscribed_synthetic_trades(&self) -> Vec<InstrumentId> {
873 self.subscribed_synthetic_trades.iter().copied().collect()
874 }
875
876 #[must_use]
878 pub fn subscribed_bars(&self) -> Vec<BarType> {
879 self.collect_subscriptions(|client| &client.subscriptions_bars)
880 }
881
882 #[must_use]
884 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
885 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
886 }
887
888 #[must_use]
890 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
891 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
892 }
893
894 #[must_use]
896 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
897 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
898 }
899
900 #[must_use]
902 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
903 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
904 }
905
906 #[must_use]
908 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
909 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
910 }
911
912 pub fn execute(&mut self, cmd: DataCommand) {
921 match &cmd {
922 DataCommand::Subscribe(_) | DataCommand::Unsubscribe(_) => self.command_count += 1,
923 DataCommand::Request(_) => self.request_count += 1,
924 #[cfg(feature = "defi")]
925 DataCommand::DefiRequest(_) => self.request_count += 1,
926 #[cfg(feature = "defi")]
927 DataCommand::DefiSubscribe(_) | DataCommand::DefiUnsubscribe(_) => {
928 self.command_count += 1;
929 }
930 _ => {}
931 }
932
933 if let Err(e) = match cmd {
934 DataCommand::Subscribe(c) => self.execute_subscribe(c),
935 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
936 DataCommand::Request(c) => self.execute_request(c),
937 #[cfg(feature = "defi")]
938 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
939 #[cfg(feature = "defi")]
940 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
941 #[cfg(feature = "defi")]
942 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
943 _ => {
944 log::warn!("Unhandled DataCommand variant");
945 Ok(())
946 }
947 } {
948 log::error!("{e}");
949 }
950 }
951
952 pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
959 if let Some(client_id) = cmd.client_id()
960 && self.external_clients.contains(client_id)
961 {
962 if self.config.debug {
963 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
964 }
965 return Ok(());
966 }
967
968 match &cmd {
970 SubscribeCommand::BookDeltas(cmd) if !self.subscribe_book_deltas(cmd)? => {
971 return Ok(());
972 }
973 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
974 SubscribeCommand::BookSnapshots(cmd) => {
975 return self.subscribe_book_snapshots(cmd);
977 }
978 SubscribeCommand::Bars(cmd) if has_continuous_future_params(cmd.params.as_ref()) => {
979 return self.subscribe_continuous_future_bars(cmd);
980 }
981 SubscribeCommand::Bars(cmd) => {
982 self.subscribe_bars(cmd)?;
983 if cmd.bar_type.is_internally_aggregated() {
984 return Ok(());
985 }
986 }
987 SubscribeCommand::OptionChain(cmd) => {
988 self.subscribe_option_chain(cmd);
989 return Ok(());
990 }
991 SubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
992 self.subscribe_synthetic_quotes(cmd.instrument_id);
993 return Ok(());
994 }
995 SubscribeCommand::Quotes(cmd)
996 if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
997 {
998 self.subscribe_spread_quotes(cmd);
999 return Ok(());
1000 }
1001 SubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
1002 self.subscribe_synthetic_trades(cmd.instrument_id);
1003 return Ok(());
1004 }
1005 SubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
1006 anyhow::bail!("Cannot subscribe for synthetic instrument `Instrument` data");
1007 }
1008 SubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
1009 anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentStatus` data");
1010 }
1011 SubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
1012 anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentClose` data");
1013 }
1014 SubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
1015 anyhow::bail!("Cannot subscribe for synthetic instrument `OptionGreeks` data");
1016 }
1017 _ => {} }
1019
1020 #[cfg(feature = "streaming")]
1021 let cmd = self.subscribe_command_with_prefilled_start_ns(cmd)?;
1022
1023 if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
1024 client.execute_subscribe(cmd);
1025 } else {
1026 log::error!(
1027 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1028 cmd.client_id(),
1029 cmd.venue(),
1030 );
1031 }
1032
1033 Ok(())
1034 }
1035
1036 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
1042 if let Some(client_id) = cmd.client_id()
1043 && self.external_clients.contains(client_id)
1044 {
1045 if self.config.debug {
1046 log::debug!(
1047 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
1048 );
1049 }
1050 return Ok(());
1051 }
1052
1053 match &cmd {
1054 UnsubscribeCommand::BookDeltas(cmd) if !self.unsubscribe_book_deltas(cmd) => {
1055 return Ok(());
1056 }
1057 UnsubscribeCommand::BookDepth10(cmd) if !self.unsubscribe_book_depth10(cmd) => {
1058 return Ok(());
1059 }
1060 UnsubscribeCommand::BookSnapshots(cmd) => {
1061 self.unsubscribe_book_snapshots(cmd);
1063 return Ok(());
1064 }
1065 UnsubscribeCommand::Bars(cmd)
1066 if self
1067 .continuous_future_subscriptions
1068 .contains_key(&cmd.bar_type.standard()) =>
1069 {
1070 self.unsubscribe_continuous_future_bars(cmd);
1071 return Ok(());
1072 }
1073 UnsubscribeCommand::Bars(cmd) => {
1074 self.unsubscribe_bars(cmd);
1075 if cmd.bar_type.is_internally_aggregated() {
1076 return Ok(());
1077 }
1078 }
1079 UnsubscribeCommand::OptionChain(cmd) => {
1080 self.unsubscribe_option_chain(cmd);
1081 return Ok(());
1082 }
1083 UnsubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
1084 self.unsubscribe_synthetic_quotes(cmd.instrument_id);
1085 return Ok(());
1086 }
1087 UnsubscribeCommand::Quotes(cmd)
1088 if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
1089 {
1090 self.unsubscribe_spread_quotes(cmd);
1091 return Ok(());
1092 }
1093 UnsubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
1094 self.unsubscribe_synthetic_trades(cmd.instrument_id);
1095 return Ok(());
1096 }
1097 UnsubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
1098 anyhow::bail!("Cannot unsubscribe from synthetic instrument `Instrument` data");
1099 }
1100 UnsubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
1101 anyhow::bail!(
1102 "Cannot unsubscribe from synthetic instrument `InstrumentStatus` data"
1103 );
1104 }
1105 UnsubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
1106 anyhow::bail!(
1107 "Cannot unsubscribe from synthetic instrument `InstrumentClose` data"
1108 );
1109 }
1110 UnsubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
1111 anyhow::bail!("Cannot unsubscribe from synthetic instrument `OptionGreeks` data");
1112 }
1113 _ => {}
1114 }
1115
1116 if Self::topic_has_remaining_subscribers(cmd) {
1118 return Ok(());
1119 }
1120
1121 if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
1122 client.execute_unsubscribe(cmd);
1123 } else {
1124 log::error!(
1125 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1126 cmd.client_id(),
1127 cmd.venue(),
1128 );
1129 }
1130
1131 Ok(())
1132 }
1133
1134 fn topic_has_remaining_subscribers(cmd: &UnsubscribeCommand) -> bool {
1135 match cmd {
1139 UnsubscribeCommand::Quotes(c) => {
1140 let topic = switchboard::get_quotes_topic(c.instrument_id);
1141 msgbus::exact_subscriber_count_quotes(topic) > 0
1142 }
1143 UnsubscribeCommand::Trades(c) => {
1144 let topic = switchboard::get_trades_topic(c.instrument_id);
1145 msgbus::exact_subscriber_count_trades(topic) > 0
1146 }
1147 UnsubscribeCommand::MarkPrices(c) => {
1148 let topic = switchboard::get_mark_price_topic(c.instrument_id);
1149 msgbus::exact_subscriber_count_mark_prices(topic) > 0
1150 }
1151 UnsubscribeCommand::IndexPrices(c) => {
1152 let topic = switchboard::get_index_price_topic(c.instrument_id);
1153 msgbus::exact_subscriber_count_index_prices(topic) > 0
1154 }
1155 UnsubscribeCommand::FundingRates(c) => {
1156 let topic = switchboard::get_funding_rate_topic(c.instrument_id);
1157 msgbus::exact_subscriber_count_funding_rates(topic) > 0
1158 }
1159 UnsubscribeCommand::OptionGreeks(c) => {
1160 let topic = switchboard::get_option_greeks_topic(c.instrument_id);
1161 msgbus::exact_subscriber_count_option_greeks(topic) > 0
1162 }
1163 _ => false,
1164 }
1165 }
1166
1167 pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
1174 if let Some(cid) = req.client_id()
1176 && self.external_clients.contains(cid)
1177 {
1178 if self.config.debug {
1179 log::debug!("Skipping data request for external client {cid}: {req:?}");
1180 }
1181 return Ok(());
1182 }
1183
1184 if let RequestCommand::Join(join) = req {
1185 return self.handle_request_join(join);
1186 }
1187
1188 if has_continuous_future_params(request_params(&req)) {
1189 return self.execute_continuous_future_request(req);
1190 }
1191
1192 let request_id = *req.request_id();
1193 self.prepare_request_bar_aggregators(&req)?;
1194
1195 if has_time_range_pipeline_params(request_params(&req))
1196 && is_time_range_pipeline_variant(&req)
1197 {
1198 let result = self.execute_time_range_pipeline_request(req);
1199 if result.is_err() {
1200 self.cleanup_request_bar_aggregators(&request_id);
1201 }
1202 return result;
1203 }
1204
1205 #[cfg(feature = "streaming")]
1206 if self.catalogs_registered() && streaming::is_date_range_variant(&req) {
1207 let result = self.dispatch_date_range_request(req);
1208 if result.is_err() {
1209 self.cleanup_request_bar_aggregators(&request_id);
1210 }
1211 return result;
1212 }
1213
1214 let result = self.dispatch_request_to_client(req);
1215
1216 if result.is_err() {
1217 self.cleanup_request_bar_aggregators(&request_id);
1218 }
1219
1220 result.map(|_| ())
1221 }
1222
1223 pub(super) fn dispatch_request_to_client(
1224 &mut self,
1225 req: RequestCommand,
1226 ) -> anyhow::Result<ClientId> {
1227 let client_id = req.client_id().copied();
1228 let venue = req.venue().copied();
1229 let Some(client) = self.get_client(client_id.as_ref(), venue.as_ref()) else {
1230 anyhow::bail!("Cannot handle request: no client found for {client_id:?} {venue:?}");
1231 };
1232 let resolved_client_id = client.client_id();
1233
1234 match req {
1235 RequestCommand::Data(req) => client.request_data(req),
1236 RequestCommand::Instrument(req) => client.request_instrument(req),
1237 RequestCommand::Instruments(req) => client.request_instruments(req),
1238 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
1239 RequestCommand::BookDeltas(req) => client.request_book_deltas(req),
1240 RequestCommand::BookDepth(req) => client.request_book_depth(req),
1241 RequestCommand::Quotes(req) => client.request_quotes(req),
1242 RequestCommand::Trades(req) => client.request_trades(req),
1243 RequestCommand::FundingRates(req) => client.request_funding_rates(req),
1244 RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
1245 RequestCommand::Bars(req) => client.request_bars(req),
1246 RequestCommand::Join(_) => {
1247 anyhow::bail!("RequestJoin must be handled by handle_request_join")
1248 }
1249 }?;
1250
1251 Ok(resolved_client_id)
1252 }
1253
1254 fn execute_continuous_future_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
1255 let RequestCommand::Bars(parent) = req else {
1256 anyhow::bail!("Continuous future requests require `RequestBars`");
1257 };
1258 let request_id = parent.request_id;
1259 let Some(continuous_request) = continuous_future_request_from_bars(&parent)? else {
1260 return Ok(());
1261 };
1262
1263 self.ensure_continuous_future_target_instrument(&continuous_request);
1264 self.prepare_request_bar_aggregators_from_state(
1265 request_id,
1266 &continuous_request.request_bar_aggregation,
1267 )?;
1268
1269 let response_client_id = match self.resolve_request_client_id(
1270 parent.client_id.as_ref(),
1271 Some(&continuous_request.primary_bar_type.instrument_id().venue),
1272 ) {
1273 Ok(client_id) => client_id,
1274 Err(e) => {
1275 self.cleanup_request_bar_aggregators(&request_id);
1276 return Err(e);
1277 }
1278 };
1279 let (cursor_ns, end_ns) = match self.bound_continuous_future_dates(&parent) {
1280 Ok(bounds) => bounds,
1281 Err(e) => {
1282 self.cleanup_request_bar_aggregators(&request_id);
1283 return Err(e);
1284 }
1285 };
1286
1287 self.continuous_future_requests.insert(
1288 request_id,
1289 ContinuousFutureRequestState {
1290 parent,
1291 request: continuous_request,
1292 start_ns: cursor_ns,
1293 cursor_ns,
1294 end_ns,
1295 response_client_id,
1296 data_count: 0,
1297 },
1298 );
1299
1300 if let Err(e) = self.dispatch_next_continuous_future_segment(request_id) {
1301 self.continuous_future_requests.remove(&request_id);
1302 self.cleanup_request_bar_aggregators(&request_id);
1303 return Err(e);
1304 }
1305
1306 Ok(())
1307 }
1308
1309 fn resolve_request_client_id(
1310 &mut self,
1311 client_id: Option<&ClientId>,
1312 venue: Option<&Venue>,
1313 ) -> anyhow::Result<ClientId> {
1314 self.get_client(client_id, venue)
1315 .map(|client| client.client_id())
1316 .ok_or_else(|| {
1317 anyhow::anyhow!(
1318 "Cannot handle request: no client found for {client_id:?} {venue:?}"
1319 )
1320 })
1321 }
1322
1323 fn bound_continuous_future_dates(
1324 &self,
1325 request: &RequestBars,
1326 ) -> anyhow::Result<(UnixNanos, UnixNanos)> {
1327 let now = self.clock.borrow().timestamp_ns();
1328 let start = request
1329 .start
1330 .map(datetime_to_unix_nanos)
1331 .transpose()?
1332 .unwrap_or_default();
1333 let end = request
1334 .end
1335 .map(datetime_to_unix_nanos)
1336 .transpose()?
1337 .unwrap_or(now);
1338
1339 Ok((start.min(now), end.min(now)))
1340 }
1341
1342 fn ensure_continuous_future_target_instrument(&self, request: &ContinuousFutureRequest) {
1343 let target_id = request.primary_bar_type.instrument_id();
1344 if self.cache.borrow().instrument(&target_id).is_some() {
1345 return;
1346 }
1347
1348 let segment_id = request.first_segment_instrument_id();
1349 let segment_instrument = self.cache.borrow().instrument(&segment_id).cloned();
1350 let Some(segment_instrument) = segment_instrument else {
1351 log::warn!(
1352 "Cannot synthesize continuous future instrument {target_id}: first segment {segment_id} not in cache"
1353 );
1354 return;
1355 };
1356
1357 let InstrumentAny::FuturesContract(mut target) = segment_instrument else {
1358 log::warn!(
1359 "Cannot synthesize continuous future instrument {target_id}: segment {segment_id} is not a FuturesContract",
1360 );
1361 return;
1362 };
1363
1364 target.id = target_id;
1365 target.raw_symbol = target_id.symbol;
1366 target.activation_ns = UnixNanos::default();
1367 target.expiration_ns = UnixNanos::default();
1368
1369 if let Err(e) = self
1370 .cache
1371 .borrow_mut()
1372 .add_instrument(InstrumentAny::FuturesContract(target))
1373 {
1374 log_error_on_cache_insert(&e);
1375 }
1376 }
1377
1378 fn prepare_request_bar_aggregators_from_state(
1379 &mut self,
1380 request_id: UUID4,
1381 state: &RequestBarAggregation,
1382 ) -> anyhow::Result<()> {
1383 if !self.can_start_request_bar_aggregators(request_id, state) {
1384 anyhow::bail!(
1385 "Cannot request aggregated bars: one of the aggregators in `bar_types` is already running"
1386 );
1387 }
1388
1389 self.request_bar_aggregations
1390 .insert(request_id, state.clone());
1391
1392 if let Err(e) = self.init_request_bar_aggregators(request_id, state) {
1393 self.cleanup_request_bar_aggregators(&request_id);
1394 return Err(e);
1395 }
1396
1397 Ok(())
1398 }
1399
1400 fn dispatch_next_continuous_future_segment(&mut self, request_id: UUID4) -> anyhow::Result<()> {
1401 let Some(state) = self.continuous_future_requests.get(&request_id).cloned() else {
1402 anyhow::bail!("No active continuous future request for {request_id}");
1403 };
1404
1405 let Some(segment) = state
1406 .request
1407 .next_segment(state.cursor_ns.as_u64(), state.end_ns.as_u64())
1408 else {
1409 self.emit_empty_continuous_future_response(request_id);
1410 return Ok(());
1411 };
1412
1413 self.apply_continuous_future_adjustment(request_id, &state.request, segment.index)?;
1414 let child = self.build_continuous_future_child_request(request_id, &state, segment);
1415 if let Some(active) = self.continuous_future_requests.get_mut(&request_id) {
1416 active.cursor_ns = UnixNanos::from(segment.end_ns.saturating_add(1));
1417 }
1418
1419 self.dispatch_request_to_client(child).map(|_| ())
1420 }
1421
1422 fn apply_continuous_future_adjustment(
1423 &self,
1424 request_id: UUID4,
1425 request: &ContinuousFutureRequest,
1426 segment_index: usize,
1427 ) -> anyhow::Result<()> {
1428 let adjustment = request.adjustment_for_segment(segment_index);
1429 let key = bar_aggregator_key(request.primary_bar_type, Some(request_id));
1430 let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
1431 anyhow::anyhow!("No aggregator for continuous future request {request_id}")
1432 })?;
1433 aggregator
1434 .borrow_mut()
1435 .set_adjustment(adjustment, request.adjustment_mode);
1436
1437 Ok(())
1438 }
1439
1440 fn build_continuous_future_child_request(
1441 &self,
1442 request_id: UUID4,
1443 state: &ContinuousFutureRequestState,
1444 segment: ContinuousFutureSegment,
1445 ) -> RequestCommand {
1446 let source = state.request.source_for_segment(segment.instrument_id);
1447 let start = Some(UnixNanos::from(segment.start_ns).to_datetime_utc());
1448 let end = Some(UnixNanos::from(segment.end_ns).to_datetime_utc());
1449 let child_params = Some(
1450 state
1451 .request
1452 .child_params(state.parent.params.as_ref(), request_id),
1453 );
1454 let child_request_id = UUID4::new();
1455 let ts_init = self.clock.borrow().timestamp_ns();
1456
1457 match source {
1458 ContinuousFutureSource::Bars(bar_type) => RequestCommand::Bars(RequestBars::new(
1459 bar_type,
1460 start,
1461 end,
1462 state.parent.limit,
1463 state.parent.client_id,
1464 child_request_id,
1465 ts_init,
1466 child_params,
1467 )),
1468 ContinuousFutureSource::Trades => RequestCommand::Trades(RequestTrades::new(
1469 segment.instrument_id,
1470 start,
1471 end,
1472 state.parent.limit,
1473 state.parent.client_id,
1474 child_request_id,
1475 ts_init,
1476 child_params,
1477 )),
1478 ContinuousFutureSource::Quotes => RequestCommand::Quotes(RequestQuotes::new(
1479 segment.instrument_id,
1480 start,
1481 end,
1482 state.parent.limit,
1483 state.parent.client_id,
1484 child_request_id,
1485 ts_init,
1486 child_params,
1487 )),
1488 }
1489 }
1490
1491 fn emit_empty_continuous_future_response(&mut self, request_id: UUID4) {
1492 let Some(state) = self.continuous_future_requests.remove(&request_id) else {
1493 return;
1494 };
1495
1496 let mut params = state.parent.params.unwrap_or_default();
1497 if state.data_count != 0 {
1498 params.insert(
1499 "data_count".to_string(),
1500 serde_json::json!(state.data_count),
1501 );
1502 }
1503
1504 let response = DataResponse::Bars(BarsResponse::new(
1505 request_id,
1506 state.response_client_id,
1507 state.parent.bar_type,
1508 Vec::new(),
1509 Some(state.start_ns),
1510 Some(state.end_ns),
1511 self.clock.borrow().timestamp_ns(),
1512 Some(params),
1513 ));
1514 self.response(response);
1515 }
1516
1517 fn prepare_request_bar_aggregators(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
1518 let request_id = *req.request_id();
1519 let Some(state) = request_bar_aggregation_from_params(request_params(req))? else {
1520 return Ok(());
1521 };
1522
1523 self.prepare_request_bar_aggregators_from_state(request_id, &state)
1524 }
1525
1526 fn can_start_request_bar_aggregators(
1527 &self,
1528 request_id: UUID4,
1529 state: &RequestBarAggregation,
1530 ) -> bool {
1531 let aggregator_request_id = state.aggregator_request_id(request_id);
1532 state.bar_types.iter().all(|bar_type| {
1533 let key = bar_aggregator_key(*bar_type, aggregator_request_id);
1534 self.bar_aggregators
1535 .get(&key)
1536 .is_none_or(|aggregator| !aggregator.borrow().is_running())
1537 })
1538 }
1539
1540 fn init_request_bar_aggregators(
1541 &mut self,
1542 request_id: UUID4,
1543 state: &RequestBarAggregation,
1544 ) -> anyhow::Result<()> {
1545 let aggregator_request_id = state.aggregator_request_id(request_id);
1546
1547 for bar_type in &state.bar_types {
1548 self.create_bar_aggregator_for_key(*bar_type, aggregator_request_id)?;
1549 self.setup_bar_aggregator(*bar_type, true, aggregator_request_id)?;
1550
1551 let key = bar_aggregator_key(*bar_type, aggregator_request_id);
1552 if let Some(aggregator) = self.bar_aggregators.get(&key) {
1553 aggregator.borrow_mut().set_is_running(true);
1554 }
1555 }
1556
1557 self.set_request_bar_aggregator_chain_handlers(request_id, state);
1558
1559 Ok(())
1560 }
1561
1562 fn set_request_bar_aggregator_chain_handlers(
1563 &self,
1564 request_id: UUID4,
1565 state: &RequestBarAggregation,
1566 ) {
1567 let aggregator_request_id = state.aggregator_request_id(request_id);
1568
1569 for bar_type in &state.bar_types {
1570 let key = bar_aggregator_key(*bar_type, aggregator_request_id);
1571 let Some(aggregator) = self.bar_aggregators.get(&key).cloned() else {
1572 continue;
1573 };
1574
1575 let downstream: Vec<_> = state
1576 .bar_types
1577 .iter()
1578 .filter(|candidate| {
1579 candidate.is_composite()
1580 && candidate.composite().standard() == bar_type.standard()
1581 })
1582 .filter_map(|candidate| {
1583 let key = bar_aggregator_key(*candidate, aggregator_request_id);
1584 self.bar_aggregators.get(&key).cloned()
1585 })
1586 .collect();
1587 let cache = self.cache.clone();
1588 let validate_sequence = self.config.validate_data_sequence;
1589 let handler: Box<dyn FnMut(Bar)> = Box::new(move |bar: Bar| {
1590 process_engine_bar(&cache, validate_sequence, false, bar);
1591
1592 for aggregator in &downstream {
1593 aggregator.borrow_mut().handle_bar(bar);
1594 }
1595 });
1596
1597 aggregator.borrow_mut().set_historical_mode(true, handler);
1598 }
1599 }
1600
1601 fn cleanup_request_bar_aggregators(&mut self, request_id: &UUID4) -> bool {
1602 let Some(state) = self.request_bar_aggregations.remove(request_id) else {
1603 return false;
1604 };
1605 let aggregator_request_id = state.aggregator_request_id(*request_id);
1606
1607 for bar_type in state.bar_types {
1608 let key = bar_aggregator_key(bar_type, aggregator_request_id);
1609 let has_live_handlers =
1610 state.update_subscriptions && self.bar_aggregator_handlers.contains_key(&key);
1611 let keep_running = if has_live_handlers {
1612 match self.setup_bar_aggregator(bar_type, false, aggregator_request_id) {
1613 Ok(()) => true,
1614 Err(e) => {
1615 log::error!(
1616 "Error starting live request bar aggregator for {bar_type}: {e}"
1617 );
1618 false
1619 }
1620 }
1621 } else {
1622 false
1623 };
1624
1625 if let Some(aggregator) = self.bar_aggregators.get(&key) {
1626 aggregator.borrow_mut().set_is_running(keep_running);
1627 }
1628
1629 if !state.update_subscriptions
1630 && let Err(e) = self.stop_bar_aggregator(bar_type, aggregator_request_id)
1631 {
1632 log::error!("Error stopping request bar aggregator for {bar_type}: {e}");
1633 }
1634 }
1635
1636 true
1637 }
1638
1639 pub fn process(&mut self, data: &dyn Any) {
1644 self.data_count += 1;
1645 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
1649 self.handle_instrument(instrument);
1650 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
1651 self.handle_funding_rate(*funding_rate);
1652 } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
1653 self.handle_instrument_status(*status);
1654 } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
1655 self.cache.borrow_mut().add_option_greeks(*option_greeks);
1656 let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
1657 msgbus::publish_option_greeks(topic, option_greeks);
1658 self.drain_deferred_commands();
1659 } else if let Some(custom) = data.downcast_ref::<CustomData>() {
1660 self.handle_custom_data(custom);
1661 } else {
1662 log::error!("Cannot process data {data:?}, type is unrecognized");
1663 }
1664 }
1665
1666 pub fn process_data(&mut self, data: Data) {
1668 #[cfg(feature = "defi")]
1669 let data = match data {
1670 Data::Defi(defi) => {
1671 self.process_defi_data(*defi);
1672 return;
1673 }
1674 data => data,
1675 };
1676
1677 self.data_count += 1;
1678
1679 match data {
1680 Data::Delta(delta) => self.handle_delta(delta),
1681 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
1682 Data::Depth10(depth) => self.handle_depth10(*depth),
1683 Data::Quote(quote) => {
1684 self.handle_quote(quote);
1685 self.drain_deferred_commands();
1686 }
1687 Data::Trade(trade) => self.handle_trade(trade),
1688 Data::Bar(bar) => self.handle_bar(bar),
1689 Data::MarkPriceUpdate(mark_price) => {
1690 self.handle_mark_price(mark_price);
1691 self.drain_deferred_commands();
1692 }
1693 Data::IndexPriceUpdate(index_price) => {
1694 self.handle_index_price(index_price);
1695 self.drain_deferred_commands();
1696 }
1697 Data::FundingRateUpdate(funding_rate) => {
1698 self.handle_funding_rate(funding_rate);
1699 self.drain_deferred_commands();
1700 }
1701 Data::InstrumentStatus(status) => {
1702 self.handle_instrument_status(status);
1703 self.drain_deferred_commands();
1704 }
1705 Data::OptionGreeks(greeks) => {
1706 self.cache.borrow_mut().add_option_greeks(greeks);
1707 let topic = switchboard::get_option_greeks_topic(greeks.instrument_id);
1708 msgbus::publish_option_greeks(topic, &greeks);
1709 self.drain_deferred_commands();
1710 }
1711 Data::InstrumentClose(close) => self.handle_instrument_close(close),
1712 Data::Custom(custom) => self.handle_custom_data(&custom),
1713 #[cfg(feature = "defi")]
1714 Data::Defi(_) => unreachable!("handled before market data dispatch"),
1715 }
1716 }
1717
1718 pub fn process_pipeline(&mut self, data: Data) {
1725 #[cfg(feature = "defi")]
1726 let data = match data {
1727 Data::Defi(defi) => {
1728 self.process_defi_data(*defi);
1729 return;
1730 }
1731 data => data,
1732 };
1733
1734 self.data_count += 1;
1735
1736 match data {
1737 Data::Delta(delta) => self.handle_delta_pipeline(delta),
1738 Data::Deltas(deltas) => self.handle_deltas_pipeline(&deltas.into_inner()),
1739 Data::Depth10(depth) => self.handle_depth10_pipeline(*depth),
1740 Data::Quote(quote) => self.handle_quote_pipeline(quote),
1741 Data::Trade(trade) => self.handle_trade_pipeline(trade),
1742 Data::Bar(bar) => self.handle_bar_pipeline(bar),
1743 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price_pipeline(mark_price),
1744 Data::IndexPriceUpdate(index_price) => self.handle_index_price_pipeline(index_price),
1745 Data::FundingRateUpdate(funding_rate) => {
1746 self.handle_funding_rate_pipeline(funding_rate);
1747 }
1748 Data::InstrumentStatus(status) => self.handle_instrument_status_pipeline(status),
1749 Data::OptionGreeks(greeks) => self.handle_option_greeks_pipeline(greeks),
1750 Data::InstrumentClose(close) => self.handle_instrument_close_pipeline(close),
1751 Data::Custom(custom) => self.handle_custom_data_pipeline(&custom),
1752 #[cfg(feature = "defi")]
1753 Data::Defi(_) => unreachable!("handled before market data dispatch"),
1754 }
1755 }
1756
1757 pub fn response(&mut self, mut resp: DataResponse) {
1759 if log::log_enabled!(log::Level::Debug) {
1760 let correlation_id = resp.correlation_id();
1761 match resp.record_count() {
1762 Some(count) => log::debug!(
1763 "{RECV}{RES} {} correlation_id={correlation_id} records={count}",
1764 resp.kind(),
1765 ),
1766 None => log::debug!(
1767 "{RECV}{RES} {} correlation_id={correlation_id}",
1768 resp.kind(),
1769 ),
1770 }
1771 }
1772 log::trace!("{RECV}{RES} {resp:?}");
1773
1774 self.response_count += 1;
1775
1776 resp.trim_to_bounds();
1777
1778 if let Some(parent_id) = continuous_future_parent_request_id(response_params(&resp)) {
1779 self.handle_continuous_future_child_response(parent_id, &resp);
1780 return;
1781 }
1782
1783 let Some(resp) = self.handle_request_pipeline_response(resp) else {
1784 return;
1785 };
1786
1787 if let Some(parent_id) = self
1788 .time_range_pipeline_parent_request_id
1789 .remove(resp.correlation_id())
1790 {
1791 self.handle_time_range_pipeline_child_response(parent_id, &resp);
1792 return;
1793 }
1794
1795 if self
1796 .parent_join_request_id
1797 .contains_key(resp.correlation_id())
1798 {
1799 self.finalize_request_join(resp);
1800 return;
1801 }
1802
1803 let correlation_id = *resp.correlation_id();
1804
1805 match &resp {
1806 DataResponse::Instrument(r) => {
1807 self.handle_instrument_response(r.data.clone());
1808 }
1809 DataResponse::Instruments(r) => {
1810 self.handle_instruments(&r.data);
1811 }
1812 DataResponse::Quotes(r) => {
1813 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1814 self.handle_quotes(&r.data);
1815 }
1816 }
1817 DataResponse::Trades(r) => {
1818 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1819 self.handle_trades(&r.data);
1820 }
1821 }
1822 DataResponse::FundingRates(r) => {
1823 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1824 self.handle_funding_rates(&r.data);
1825 }
1826 }
1827 DataResponse::Bars(r) => {
1828 if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
1829 self.handle_bars(&r.data);
1830 }
1831 }
1832 DataResponse::Book(r) => self.handle_book_response(&r.data),
1833 DataResponse::BookDeltas(r) => {
1834 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1835 self.handle_book_deltas_response(r);
1836 }
1837 }
1838 DataResponse::BookDepth(r) => {
1839 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1840 self.handle_book_depth_response(r);
1841 }
1842 }
1843 DataResponse::ForwardPrices(r) => {
1844 self.process_request_bar_aggregation_response(&resp);
1845 return self.handle_forward_prices_response(&correlation_id, r);
1846 }
1847 DataResponse::Data(_) => {}
1848 }
1849
1850 self.process_request_bar_aggregation_response(&resp);
1851
1852 msgbus::send_response(&correlation_id, &resp);
1853 }
1854
1855 pub fn new_request_pipeline(&mut self, parent: RequestCommand, n_components: usize) {
1857 let parent_id = *parent.request_id();
1858 self.request_pipeline_n_components
1859 .insert(parent_id, n_components);
1860 self.request_pipeline_parent_request
1861 .insert(parent_id, parent);
1862 self.request_pipeline_responses
1863 .insert(parent_id, Vec::with_capacity(n_components));
1864 }
1865
1866 pub fn register_request_pipeline_leg(&mut self, leg_id: UUID4, parent_id: UUID4) {
1868 self.request_pipeline_parent_request_id
1869 .insert(leg_id, parent_id);
1870 }
1871
1872 fn handle_request_pipeline_response(&mut self, resp: DataResponse) -> Option<DataResponse> {
1877 let leg_id = *resp.correlation_id();
1878 let Some(parent_id) = self.request_pipeline_parent_request_id.remove(&leg_id) else {
1879 return Some(resp);
1880 };
1881
1882 let Some(buf) = self.request_pipeline_responses.get_mut(&parent_id) else {
1883 log::error!("Pipeline response buffer missing for parent {parent_id} (leg {leg_id})");
1884 return Some(resp);
1885 };
1886 buf.push(resp);
1887
1888 let expected = self.request_pipeline_n_components.get(&parent_id).copied();
1889 let received = buf.len();
1890 match expected {
1891 Some(n) if received < n => return None,
1892 Some(_) => {}
1893 None => {
1894 log::error!("Pipeline n_components missing for parent {parent_id}");
1895 return None;
1896 }
1897 }
1898
1899 let mut legs = self.request_pipeline_responses.remove(&parent_id)?;
1900 self.request_pipeline_n_components.remove(&parent_id);
1901 let parent = self.request_pipeline_parent_request.remove(&parent_id);
1902
1903 for leg in &mut legs {
1904 leg.trim_to_bounds();
1905 }
1906
1907 let (parent_start, parent_end) = parent_request_window(parent.as_ref());
1908 let rebuilt = rebuild_pipeline_response(parent_id, parent.as_ref(), legs);
1909
1910 if rebuilt.is_none()
1916 && let Some(original_id) = self.parent_join_request_id.remove(&parent_id)
1917 {
1918 self.pending_join_requests.remove(&original_id);
1919 log::error!(
1920 "Dropped RequestJoin {original_id} because pipeline rebuild failed for dated parent {parent_id}"
1921 );
1922 }
1923
1924 let mut rebuilt = rebuilt?;
1925
1926 if let DataResponse::BookDeltas(r) = &mut rebuilt {
1929 self.book_deltas_snapshot_replay(r);
1930 }
1931
1932 if parent_start.is_some() || parent_end.is_some() {
1938 rebuilt.trim_to_bounds();
1939 }
1940
1941 Some(rebuilt)
1942 }
1943
1944 fn book_deltas_snapshot_replay(&self, resp: &mut BookDeltasResponse) {
1949 let Some(original_start_ns) = resp.start else {
1950 return;
1951 };
1952
1953 let Some(first) = resp.data.first().copied() else {
1954 return;
1955 };
1956
1957 if !RecordFlag::F_SNAPSHOT.matches(first.flags) {
1958 return;
1959 }
1960
1961 if first.ts_init.as_u64() % NANOSECONDS_IN_DAY != 0 {
1962 return;
1963 }
1964
1965 if original_start_ns <= first.ts_init {
1967 return;
1968 }
1969
1970 if self
1971 .cache
1972 .borrow()
1973 .instrument(&resp.instrument_id)
1974 .is_none()
1975 {
1976 log::warn!(
1977 "Instrument {} not found in cache, skipping snapshot replay",
1978 resp.instrument_id,
1979 );
1980 return;
1981 }
1982
1983 let book_type = resp
1984 .params
1985 .as_ref()
1986 .and_then(|p| p.get_str("book_type"))
1987 .and_then(|s| BookType::from_str(s).ok())
1988 .unwrap_or(BookType::L2_MBP);
1989
1990 let mut book = OrderBook::new(resp.instrument_id, book_type);
1991 let mut before: Vec<OrderBookDelta> = Vec::new();
1992 let mut after: Vec<OrderBookDelta> = Vec::new();
1993 let mut last_applied_ts: Option<UnixNanos> = None;
1994 let mut crossed = false;
1995
1996 for delta in &resp.data {
1997 if crossed {
1998 after.push(*delta);
1999 } else {
2000 before.push(*delta);
2001 if delta.ts_init >= original_start_ns {
2002 crossed = true;
2003 last_applied_ts = Some(delta.ts_init);
2004 }
2005 }
2006 }
2007
2008 if !before.is_empty() {
2009 if last_applied_ts.is_none() {
2010 last_applied_ts = before.last().map(|d| d.ts_init);
2011 }
2012
2013 let batch = OrderBookDeltas::new(resp.instrument_id, before);
2014 if let Err(e) = book.apply_deltas(&batch) {
2015 log::error!(
2016 "Failed to rebuild book for snapshot replay on {}: {e}",
2017 resp.instrument_id,
2018 );
2019 return;
2020 }
2021 }
2022
2023 let Some(last_ts) = last_applied_ts else {
2024 return;
2025 };
2026
2027 let snapshot_ts = last_ts.max(original_start_ns);
2028 let mut new_data = book.to_deltas(snapshot_ts, snapshot_ts).deltas;
2029 new_data.extend(after);
2030 resp.data = new_data;
2031 }
2032
2033 fn handle_request_join(&mut self, req: RequestJoin) -> anyhow::Result<()> {
2034 if has_time_range_pipeline_params(req.params.as_ref()) {
2035 return self.execute_time_range_pipeline_request(RequestCommand::Join(req));
2036 }
2037
2038 let now_ns = self.clock.borrow().timestamp_ns();
2039 let now_dt = now_ns.to_datetime_utc();
2040 let zero = chrono::DateTime::<chrono::Utc>::from_timestamp_nanos(0);
2041 let start = req.start.unwrap_or(zero).min(now_dt);
2042 let end = req.end.unwrap_or(now_dt).min(now_dt);
2043 let dated = req.with_dates(Some(start), Some(end), now_ns);
2044
2045 let original_id = req.request_id;
2046 let dated_id = dated.request_id;
2047
2048 self.pending_join_requests.insert(original_id, req);
2049 self.parent_join_request_id.insert(dated_id, original_id);
2050
2051 let leg_ids: Vec<UUID4> = dated.request_ids.clone();
2052 self.new_request_pipeline(RequestCommand::Join(dated), leg_ids.len());
2053 for leg_id in leg_ids {
2054 self.register_request_pipeline_leg(leg_id, dated_id);
2055 }
2056
2057 Ok(())
2058 }
2059
2060 fn finalize_request_join(&mut self, resp: DataResponse) {
2061 let dated_id = *resp.correlation_id();
2062 let Some(original_id) = self.parent_join_request_id.remove(&dated_id) else {
2063 log::error!("parent_join_request_id missing for dated correlation {dated_id}");
2064 return;
2065 };
2066
2067 let Some(original) = self.pending_join_requests.remove(&original_id) else {
2068 log::error!("pending_join_requests missing for original {original_id}");
2069 return;
2070 };
2071
2072 let now_ns = self.clock.borrow().timestamp_ns();
2073
2074 for leg_request_id in &original.request_ids {
2080 let empty = empty_response_like(&resp, *leg_request_id, now_ns);
2081 msgbus::send_response(leg_request_id, &empty);
2082 }
2083
2084 let final_resp = rebind_response_correlation(resp, original_id);
2090 self.response(final_resp);
2091 }
2092
2093 fn process_request_bar_aggregation_response(&mut self, resp: &DataResponse) {
2094 let correlation_id = *resp.correlation_id();
2095 let Some(state) = self.request_bar_aggregations.get(&correlation_id).cloned() else {
2096 return;
2097 };
2098
2099 match resp {
2100 DataResponse::Quotes(r) => {
2101 for quote in &r.data {
2102 self.update_request_bar_aggregators_from_quote(&state, correlation_id, *quote);
2103 }
2104 }
2105 DataResponse::Trades(r) => {
2106 for trade in &r.data {
2107 self.update_request_bar_aggregators_from_trade(&state, correlation_id, *trade);
2108 }
2109 }
2110 DataResponse::Bars(r) => {
2111 for bar in &r.data {
2112 self.update_request_bar_aggregators_from_bar(&state, correlation_id, *bar);
2113 }
2114 }
2115 _ => {}
2116 }
2117
2118 self.cleanup_request_bar_aggregators(&correlation_id);
2119 }
2120
2121 fn handle_continuous_future_child_response(&mut self, parent_id: UUID4, resp: &DataResponse) {
2122 if !self.continuous_future_requests.contains_key(&parent_id) {
2123 log::error!("No active continuous future request for child response {parent_id}");
2124 return;
2125 }
2126
2127 let data_count = response_params(resp)
2128 .and_then(|params| params.get("data_count"))
2129 .and_then(serde_json::Value::as_u64)
2130 .or_else(|| resp.record_count().map(|count| count as u64))
2131 .unwrap_or(0);
2132
2133 if let Some(state) = self.continuous_future_requests.get_mut(&parent_id) {
2134 state.data_count += data_count;
2135 }
2136
2137 match resp {
2138 DataResponse::Quotes(r) => {
2139 if !log_if_empty_response(&r.data, &r.instrument_id, resp.correlation_id()) {
2140 self.handle_quotes(&r.data);
2141 }
2142 }
2143 DataResponse::Trades(r) => {
2144 if !log_if_empty_response(&r.data, &r.instrument_id, resp.correlation_id()) {
2145 self.handle_trades(&r.data);
2146 }
2147 }
2148 DataResponse::Bars(r) => {
2149 if !log_if_empty_response(&r.data, &r.bar_type, resp.correlation_id()) {
2150 self.handle_bars(&r.data);
2151 }
2152 }
2153 _ => {
2154 log::error!(
2155 "Continuous future child response {parent_id} must contain quotes, trades, or bars"
2156 );
2157 return;
2158 }
2159 }
2160
2161 self.process_continuous_future_aggregation_response(parent_id, resp);
2162 if let Err(e) = self.dispatch_next_continuous_future_segment(parent_id) {
2163 log::error!("Error dispatching continuous future segment for {parent_id}: {e}");
2164 self.emit_empty_continuous_future_response(parent_id);
2165 }
2166 }
2167
2168 fn process_continuous_future_aggregation_response(
2169 &self,
2170 parent_id: UUID4,
2171 resp: &DataResponse,
2172 ) {
2173 let Some(state) = self.continuous_future_requests.get(&parent_id) else {
2174 return;
2175 };
2176 let primary_bar_type = state.request.primary_bar_type;
2177 let aggregator_request_id = Some(parent_id);
2178
2179 match resp {
2180 DataResponse::Quotes(r) => {
2181 for quote in &r.data {
2182 self.update_request_bar_aggregator(
2183 primary_bar_type,
2184 aggregator_request_id,
2185 |aggregator| {
2186 aggregator.handle_quote(*quote);
2187 },
2188 );
2189 }
2190 }
2191 DataResponse::Trades(r) => {
2192 for trade in &r.data {
2193 self.update_request_bar_aggregator(
2194 primary_bar_type,
2195 aggregator_request_id,
2196 |aggregator| {
2197 aggregator.handle_trade(*trade);
2198 },
2199 );
2200 }
2201 }
2202 DataResponse::Bars(r) => {
2203 for bar in &r.data {
2204 self.update_request_bar_aggregator(
2205 primary_bar_type,
2206 aggregator_request_id,
2207 |aggregator| {
2208 aggregator.handle_bar(*bar);
2209 },
2210 );
2211 }
2212 }
2213 _ => {}
2214 }
2215 }
2216
2217 fn update_request_bar_aggregators_from_quote(
2218 &self,
2219 state: &RequestBarAggregation,
2220 request_id: UUID4,
2221 quote: QuoteTick,
2222 ) {
2223 let aggregator_request_id = state.aggregator_request_id(request_id);
2224
2225 for bar_type in &state.bar_types {
2226 if bar_type.is_composite()
2227 || bar_type.instrument_id() != quote.instrument_id
2228 || bar_type.spec().price_type == PriceType::Last
2229 {
2230 continue;
2231 }
2232
2233 self.update_request_bar_aggregator(*bar_type, aggregator_request_id, |aggregator| {
2234 aggregator.handle_quote(quote);
2235 });
2236 }
2237 }
2238
2239 fn update_request_bar_aggregators_from_trade(
2240 &self,
2241 state: &RequestBarAggregation,
2242 request_id: UUID4,
2243 trade: TradeTick,
2244 ) {
2245 let aggregator_request_id = state.aggregator_request_id(request_id);
2246
2247 for bar_type in &state.bar_types {
2248 if bar_type.is_composite()
2249 || bar_type.instrument_id() != trade.instrument_id
2250 || bar_type.spec().price_type != PriceType::Last
2251 {
2252 continue;
2253 }
2254
2255 self.update_request_bar_aggregator(*bar_type, aggregator_request_id, |aggregator| {
2256 aggregator.handle_trade(trade);
2257 });
2258 }
2259 }
2260
2261 fn update_request_bar_aggregators_from_bar(
2262 &self,
2263 state: &RequestBarAggregation,
2264 request_id: UUID4,
2265 bar: Bar,
2266 ) {
2267 let aggregator_request_id = state.aggregator_request_id(request_id);
2268
2269 for bar_type in &state.bar_types {
2270 if !bar_type.is_composite()
2271 || bar_type.composite().standard() != bar.bar_type.standard()
2272 {
2273 continue;
2274 }
2275
2276 self.update_request_bar_aggregator(*bar_type, aggregator_request_id, |aggregator| {
2277 aggregator.handle_bar(bar);
2278 });
2279 }
2280 }
2281
2282 fn update_request_bar_aggregator<F>(
2283 &self,
2284 bar_type: BarType,
2285 request_id: Option<UUID4>,
2286 update: F,
2287 ) where
2288 F: FnOnce(&mut dyn BarAggregator),
2289 {
2290 let key = bar_aggregator_key(bar_type, request_id);
2291 let Some(aggregator) = self.bar_aggregators.get(&key) else {
2292 log::error!("Cannot update request bar aggregator: no aggregator found for {bar_type}");
2293 return;
2294 };
2295
2296 update(aggregator.borrow_mut().as_mut());
2297 }
2298
2299 #[inline]
2300 fn pipeline_cache_writes_allowed(&self) -> bool {
2301 !self.config.disable_historical_cache
2302 }
2303
2304 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
2305 log::debug!("Handling instrument: {}", instrument.id());
2306
2307 if let Err(e) = self
2308 .cache
2309 .as_ref()
2310 .borrow_mut()
2311 .add_instrument(instrument.clone())
2312 {
2313 log_error_on_cache_insert(&e);
2314 }
2315
2316 let topic = switchboard::get_instrument_topic(instrument.id());
2317 log::debug!("Publishing instrument to topic: {topic}");
2318 msgbus::publish_instrument(topic, instrument);
2319
2320 self.update_option_chains(instrument);
2321 }
2322
2323 fn update_option_chains(&mut self, instrument: &InstrumentAny) {
2324 let Some(underlying) = instrument.underlying() else {
2325 return;
2326 };
2327 let Some(expiration_ns) = instrument.expiration_ns() else {
2328 return;
2329 };
2330 let Some(strike) = instrument.strike_price() else {
2331 return;
2332 };
2333 let Some(kind) = instrument.option_kind() else {
2334 return;
2335 };
2336
2337 let venue = instrument.id().venue;
2338 let settlement = instrument.settlement_currency().code;
2339 let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
2340
2341 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
2343 return;
2344 };
2345
2346 let clock = self.clock.clone();
2347 let client = self.get_command_client(None, Some(&venue));
2348
2349 if manager_rc
2350 .borrow_mut()
2351 .add_instrument(instrument.id(), strike, kind, client, &clock)
2352 {
2353 self.option_chain_instrument_index
2354 .insert(instrument.id(), series_id);
2355 }
2356 }
2357
2358 fn handle_delta(&mut self, delta: OrderBookDelta) {
2359 let deltas = if self.config.buffer_deltas {
2360 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
2361 buffered_deltas.deltas.push(delta);
2362 buffered_deltas.flags = delta.flags;
2363 buffered_deltas.sequence = delta.sequence;
2364 buffered_deltas.ts_event = delta.ts_event;
2365 buffered_deltas.ts_init = delta.ts_init;
2366 } else {
2367 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
2368 self.buffered_deltas_map
2369 .insert(delta.instrument_id, buffered_deltas);
2370 }
2371
2372 if !RecordFlag::F_LAST.matches(delta.flags) {
2373 return; }
2375
2376 self.buffered_deltas_map
2377 .remove(&delta.instrument_id)
2378 .expect("buffered deltas exist")
2379 } else {
2380 OrderBookDeltas::new(delta.instrument_id, vec![delta])
2381 };
2382
2383 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
2384 msgbus::publish_deltas(topic, &deltas);
2385 }
2386
2387 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
2388 if self.config.buffer_deltas {
2389 let instrument_id = deltas.instrument_id;
2390
2391 for delta in deltas.deltas {
2392 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
2393 buffered_deltas.deltas.push(delta);
2394 buffered_deltas.flags = delta.flags;
2395 buffered_deltas.sequence = delta.sequence;
2396 buffered_deltas.ts_event = delta.ts_event;
2397 buffered_deltas.ts_init = delta.ts_init;
2398 } else {
2399 let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
2400 self.buffered_deltas_map
2401 .insert(instrument_id, buffered_deltas);
2402 }
2403
2404 if RecordFlag::F_LAST.matches(delta.flags) {
2405 let deltas_to_publish = self
2406 .buffered_deltas_map
2407 .remove(&instrument_id)
2408 .expect("buffered deltas exist");
2409 let topic = switchboard::get_book_deltas_topic(instrument_id);
2410 msgbus::publish_deltas(topic, &deltas_to_publish);
2411 }
2412 }
2413 } else {
2414 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
2415 msgbus::publish_deltas(topic, &deltas);
2416 }
2417 }
2418
2419 fn handle_depth10(&self, depth: OrderBookDepth10) {
2420 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
2421 msgbus::publish_depth10(topic, &depth);
2422
2423 if self.config.emit_quotes_from_book_depths
2424 && let Some(quote) = derive_quote_from_depth(&depth)
2425 {
2426 book::publish_quote_if_changed(&self.cache, quote);
2427 }
2428 }
2429
2430 fn handle_quote(&self, quote: QuoteTick) {
2431 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
2432 log_error_on_cache_insert(&e);
2433 }
2434
2435 for synthetic_quote in self.synthetic_quotes_from_quote(quote) {
2436 let topic = switchboard::get_quotes_topic(synthetic_quote.instrument_id);
2437 msgbus::publish_quote(topic, &synthetic_quote);
2438 }
2439
2440 let topic = switchboard::get_quotes_topic(quote.instrument_id);
2441 msgbus::publish_quote(topic, "e);
2442 }
2443
2444 fn handle_trade(&self, trade: TradeTick) {
2445 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
2446 log_error_on_cache_insert(&e);
2447 }
2448
2449 for synthetic_trade in self.synthetic_trades_from_trade(trade) {
2450 let topic = switchboard::get_trades_topic(synthetic_trade.instrument_id);
2451 msgbus::publish_trade(topic, &synthetic_trade);
2452 }
2453
2454 let topic = switchboard::get_trades_topic(trade.instrument_id);
2455 msgbus::publish_trade(topic, &trade);
2456 }
2457
2458 fn synthetic_quotes_from_quote(&self, update: QuoteTick) -> Vec<QuoteTick> {
2459 let Some(synthetics) = self.synthetic_quote_feeds.get(&update.instrument_id) else {
2460 return Vec::new();
2461 };
2462
2463 synthetics
2464 .iter()
2465 .filter_map(|synthetic| self.synthetic_quote_from_update(synthetic, update))
2466 .collect()
2467 }
2468
2469 fn synthetic_quote_from_update(
2470 &self,
2471 synthetic: &SyntheticInstrument,
2472 update: QuoteTick,
2473 ) -> Option<QuoteTick> {
2474 let cache = self.cache.borrow();
2475 let mut bid_inputs = Vec::with_capacity(synthetic.components.len());
2476 let mut ask_inputs = Vec::with_capacity(synthetic.components.len());
2477
2478 for instrument_id in &synthetic.components {
2479 let (bid_price, ask_price) = if *instrument_id == update.instrument_id {
2480 (update.bid_price, update.ask_price)
2481 } else {
2482 let Some(component_quote) = cache.quote(instrument_id) else {
2483 log::warn!(
2484 "Cannot calculate synthetic instrument {} price, no quotes for {} yet",
2485 synthetic.id,
2486 instrument_id,
2487 );
2488 return None;
2489 };
2490 (component_quote.bid_price, component_quote.ask_price)
2491 };
2492
2493 bid_inputs.push(bid_price.as_f64());
2494 ask_inputs.push(ask_price.as_f64());
2495 }
2496 drop(cache);
2497
2498 let bid_price = match synthetic.calculate(&bid_inputs) {
2499 Ok(price) => price,
2500 Err(e) => {
2501 log::error!(
2502 "Cannot calculate synthetic instrument {} bid price: {e}",
2503 synthetic.id
2504 );
2505 return None;
2506 }
2507 };
2508 let ask_price = match synthetic.calculate(&ask_inputs) {
2509 Ok(price) => price,
2510 Err(e) => {
2511 log::error!(
2512 "Cannot calculate synthetic instrument {} ask price: {e}",
2513 synthetic.id
2514 );
2515 return None;
2516 }
2517 };
2518 let size_one = Quantity::from(1);
2519
2520 Some(QuoteTick::new(
2521 synthetic.id,
2522 bid_price,
2523 ask_price,
2524 size_one,
2525 size_one,
2526 update.ts_event,
2527 self.clock.borrow().timestamp_ns(),
2528 ))
2529 }
2530
2531 fn synthetic_trades_from_trade(&self, update: TradeTick) -> Vec<TradeTick> {
2532 let Some(synthetics) = self.synthetic_trade_feeds.get(&update.instrument_id) else {
2533 return Vec::new();
2534 };
2535
2536 synthetics
2537 .iter()
2538 .filter_map(|synthetic| self.synthetic_trade_from_update(synthetic, update))
2539 .collect()
2540 }
2541
2542 fn synthetic_trade_from_update(
2543 &self,
2544 synthetic: &SyntheticInstrument,
2545 update: TradeTick,
2546 ) -> Option<TradeTick> {
2547 let cache = self.cache.borrow();
2548 let mut inputs = Vec::with_capacity(synthetic.components.len());
2549
2550 for instrument_id in &synthetic.components {
2551 let price = if *instrument_id == update.instrument_id {
2552 update.price
2553 } else {
2554 let Some(component_trade) = cache.trade(instrument_id) else {
2555 log::warn!(
2556 "Cannot calculate synthetic instrument {} price, no trades for {} yet",
2557 synthetic.id,
2558 instrument_id,
2559 );
2560 return None;
2561 };
2562 component_trade.price
2563 };
2564
2565 inputs.push(price.as_f64());
2566 }
2567 drop(cache);
2568
2569 let price = match synthetic.calculate(&inputs) {
2570 Ok(price) => price,
2571 Err(e) => {
2572 log::error!(
2573 "Cannot calculate synthetic instrument {} trade price: {e}",
2574 synthetic.id
2575 );
2576 return None;
2577 }
2578 };
2579
2580 Some(TradeTick::new(
2581 synthetic.id,
2582 price,
2583 Quantity::from(1),
2584 update.aggressor_side,
2585 update.trade_id,
2586 update.ts_event,
2587 self.clock.borrow().timestamp_ns(),
2588 ))
2589 }
2590
2591 fn handle_bar(&self, bar: Bar) {
2592 process_engine_bar(&self.cache, self.config.validate_data_sequence, true, bar);
2593 }
2594
2595 fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
2596 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
2597 log_error_on_cache_insert(&e);
2598 }
2599
2600 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
2601 msgbus::publish_mark_price(topic, &mark_price);
2602 }
2603
2604 fn handle_index_price(&self, index_price: IndexPriceUpdate) {
2605 if let Err(e) = self
2606 .cache
2607 .as_ref()
2608 .borrow_mut()
2609 .add_index_price(index_price)
2610 {
2611 log_error_on_cache_insert(&e);
2612 }
2613
2614 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
2615 msgbus::publish_index_price(topic, &index_price);
2616 }
2617
2618 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
2620 if let Err(e) = self
2621 .cache
2622 .as_ref()
2623 .borrow_mut()
2624 .add_funding_rate(funding_rate)
2625 {
2626 log_error_on_cache_insert(&e);
2627 }
2628
2629 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
2630 msgbus::publish_funding_rate(topic, &funding_rate);
2631 }
2632
2633 fn handle_instrument_status(&mut self, status: InstrumentStatus) {
2634 if let Err(e) = self
2635 .cache
2636 .as_ref()
2637 .borrow_mut()
2638 .add_instrument_status(status)
2639 {
2640 log_error_on_cache_insert(&e);
2641 }
2642
2643 let topic = switchboard::get_instrument_status_topic(status.instrument_id);
2644 msgbus::publish_any(topic, &status);
2645
2646 if self
2647 .option_chain_instrument_index
2648 .contains_key(&status.instrument_id)
2649 && matches!(
2650 status.action,
2651 MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
2652 )
2653 {
2654 self.expire_option_chain_instrument(status.instrument_id);
2655 }
2656 }
2657
2658 fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
2665 let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
2666 return;
2667 };
2668
2669 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
2670 return;
2671 };
2672
2673 let series_empty = manager_rc
2674 .borrow_mut()
2675 .handle_instrument_expired(&instrument_id);
2676
2677 self.drain_deferred_commands();
2679
2680 log::info!(
2681 "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
2682 );
2683
2684 if series_empty {
2685 manager_rc.borrow_mut().teardown(&self.clock);
2686 self.option_chain_managers.remove(&series_id);
2687
2688 log::info!("Torn down empty option chain manager for {series_id}");
2689 }
2690 }
2691
2692 fn handle_instrument_close(&self, close: InstrumentClose) {
2693 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
2694 msgbus::publish_any(topic, &close);
2695 }
2696
2697 fn handle_custom_data(&self, custom: &CustomData) {
2698 log::debug!("Processing custom data: {}", custom.data.type_name());
2699 let topic = switchboard::get_custom_topic(&custom.data_type);
2700 msgbus::publish_any(topic, custom);
2701 }
2702
2703 fn handle_delta_pipeline(&self, delta: OrderBookDelta) {
2704 let deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
2706 let topic = switchboard::get_pipeline_book_deltas_topic(deltas.instrument_id);
2707 msgbus::publish_deltas(topic, &deltas);
2708 }
2709
2710 fn handle_deltas_pipeline(&self, deltas: &OrderBookDeltas) {
2711 let topic = switchboard::get_pipeline_book_deltas_topic(deltas.instrument_id);
2712 msgbus::publish_deltas(topic, deltas);
2713 }
2714
2715 fn handle_depth10_pipeline(&self, depth: OrderBookDepth10) {
2716 let topic = switchboard::get_pipeline_book_depth10_topic(depth.instrument_id);
2717 msgbus::publish_depth10(topic, &depth);
2718 }
2719
2720 fn handle_quote_pipeline(&self, quote: QuoteTick) {
2721 if self.pipeline_cache_writes_allowed()
2722 && let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote)
2723 {
2724 log_error_on_cache_insert(&e);
2725 }
2726
2727 let topic = switchboard::get_pipeline_quotes_topic(quote.instrument_id);
2728 msgbus::publish_quote(topic, "e);
2729 }
2730
2731 fn handle_trade_pipeline(&self, trade: TradeTick) {
2732 if self.pipeline_cache_writes_allowed()
2733 && let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade)
2734 {
2735 log_error_on_cache_insert(&e);
2736 }
2737
2738 let topic = switchboard::get_pipeline_trades_topic(trade.instrument_id);
2739 msgbus::publish_trade(topic, &trade);
2740 }
2741
2742 fn handle_bar_pipeline(&self, bar: Bar) {
2743 if !validate_bar_sequence(&self.cache, self.config.validate_data_sequence, &bar) {
2744 return;
2745 }
2746
2747 if self.pipeline_cache_writes_allowed()
2748 && let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar)
2749 {
2750 log_error_on_cache_insert(&e);
2751 }
2752
2753 let topic = switchboard::get_pipeline_bars_topic(bar.bar_type);
2754 msgbus::publish_bar(topic, &bar);
2755 }
2756
2757 fn handle_mark_price_pipeline(&self, mark_price: MarkPriceUpdate) {
2758 if self.pipeline_cache_writes_allowed()
2759 && let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price)
2760 {
2761 log_error_on_cache_insert(&e);
2762 }
2763
2764 let topic = switchboard::get_pipeline_mark_price_topic(mark_price.instrument_id);
2765 msgbus::publish_mark_price(topic, &mark_price);
2766 }
2767
2768 fn handle_index_price_pipeline(&self, index_price: IndexPriceUpdate) {
2769 if self.pipeline_cache_writes_allowed()
2770 && let Err(e) = self
2771 .cache
2772 .as_ref()
2773 .borrow_mut()
2774 .add_index_price(index_price)
2775 {
2776 log_error_on_cache_insert(&e);
2777 }
2778
2779 let topic = switchboard::get_pipeline_index_price_topic(index_price.instrument_id);
2780 msgbus::publish_index_price(topic, &index_price);
2781 }
2782
2783 fn handle_funding_rate_pipeline(&self, funding_rate: FundingRateUpdate) {
2784 if self.pipeline_cache_writes_allowed()
2785 && let Err(e) = self
2786 .cache
2787 .as_ref()
2788 .borrow_mut()
2789 .add_funding_rate(funding_rate)
2790 {
2791 log_error_on_cache_insert(&e);
2792 }
2793
2794 let topic = switchboard::get_pipeline_funding_rate_topic(funding_rate.instrument_id);
2795 msgbus::publish_funding_rate(topic, &funding_rate);
2796 }
2797
2798 fn handle_instrument_status_pipeline(&self, status: InstrumentStatus) {
2799 if self.pipeline_cache_writes_allowed()
2800 && let Err(e) = self
2801 .cache
2802 .as_ref()
2803 .borrow_mut()
2804 .add_instrument_status(status)
2805 {
2806 log_error_on_cache_insert(&e);
2807 }
2808
2809 let topic = switchboard::get_pipeline_instrument_status_topic(status.instrument_id);
2810 msgbus::publish_any(topic, &status);
2811 }
2812
2813 fn handle_option_greeks_pipeline(&self, greeks: OptionGreeks) {
2814 if self.pipeline_cache_writes_allowed() {
2815 self.cache.borrow_mut().add_option_greeks(greeks);
2816 }
2817
2818 let topic = switchboard::get_pipeline_option_greeks_topic(greeks.instrument_id);
2819 msgbus::publish_option_greeks(topic, &greeks);
2820 }
2821
2822 fn handle_instrument_close_pipeline(&self, close: InstrumentClose) {
2823 let topic = switchboard::get_pipeline_instrument_close_topic(close.instrument_id);
2824 msgbus::publish_any(topic, &close);
2825 }
2826
2827 fn handle_custom_data_pipeline(&self, custom: &CustomData) {
2828 log::debug!("Pipeline custom data: {}", custom.data.type_name());
2829 let topic = switchboard::get_pipeline_custom_topic(&custom.data_type);
2830 msgbus::publish_any(topic, custom);
2831 }
2832
2833 fn drain_deferred_commands(&mut self) {
2837 loop {
2839 let commands: VecDeque<DeferredCommand> =
2840 std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
2841
2842 if commands.is_empty() {
2843 break;
2844 }
2845
2846 for cmd in commands {
2847 match cmd {
2848 DeferredCommand::Subscribe(sub) => {
2849 let client = self.get_command_client(sub.client_id(), sub.venue());
2850 if let Some(client) = client {
2851 client.execute_subscribe(sub);
2852 }
2853 }
2854 DeferredCommand::Unsubscribe(unsub) => {
2855 let client = self.get_command_client(unsub.client_id(), unsub.venue());
2856 if let Some(client) = client {
2857 client.execute_unsubscribe(&unsub);
2858 }
2859 }
2860 DeferredCommand::ExpireInstrument(instrument_id) => {
2861 self.expire_option_chain_instrument(instrument_id);
2862 }
2863 DeferredCommand::ExpireSeries(series_id) => {
2864 self.expire_series(series_id);
2865 }
2866 }
2867 }
2868 }
2869 }
2870
2871 fn expire_series(&mut self, series_id: OptionSeriesId) {
2877 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
2878 return;
2879 };
2880
2881 let instrument_ids: Vec<InstrumentId> = self
2882 .option_chain_instrument_index
2883 .iter()
2884 .filter(|(_, sid)| **sid == series_id)
2885 .map(|(id, _)| *id)
2886 .collect();
2887
2888 for id in &instrument_ids {
2889 self.option_chain_instrument_index.remove(id);
2890 manager_rc.borrow_mut().handle_instrument_expired(id);
2891 }
2892
2893 manager_rc.borrow_mut().teardown(&self.clock);
2894 self.option_chain_managers.remove(&series_id);
2895
2896 log::info!("Proactively torn down expired option chain {series_id}");
2897 }
2898
2899 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<bool> {
2900 if cmd.instrument_id.is_synthetic() {
2901 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
2902 }
2903
2904 let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
2907
2908 let had_deltas =
2909 self.has_book_delta_subscription_key(cmd.instrument_id, cmd.client_id, cmd.venue);
2910 self.increment_book_delta_subscription(cmd.instrument_id, cmd.client_id, cmd.venue);
2911
2912 if cmd.managed {
2913 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, parent)?;
2914 }
2915
2916 Ok(!had_deltas)
2917 }
2918
2919 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
2920 if cmd.instrument_id.is_synthetic() {
2921 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
2922 }
2923
2924 let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
2925
2926 self.book_depth10_subs.insert(cmd.instrument_id);
2927 if cmd.managed {
2928 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
2929 }
2930
2931 Ok(())
2932 }
2933
2934 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
2935 if cmd.instrument_id.is_synthetic() {
2936 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
2937 }
2938
2939 let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
2940
2941 let had_snapshots = self.has_book_snapshot_subscriptions(&cmd.instrument_id);
2942 let inserted = self.increment_book_snapshot_subscription(cmd, parent);
2943
2944 if inserted && !had_snapshots {
2945 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
2950 }
2951
2952 if had_snapshots || self.has_book_delta_subscriptions(&cmd.instrument_id) {
2953 return Ok(());
2954 }
2955
2956 if let Some(client_id) = cmd.client_id.as_ref()
2957 && self.external_clients.contains(client_id)
2958 {
2959 if self.config.debug {
2960 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
2961 }
2962 return Ok(());
2963 }
2964
2965 log::debug!(
2966 "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
2967 cmd.instrument_id,
2968 cmd.client_id,
2969 cmd.venue,
2970 );
2971
2972 if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
2973 let deltas_cmd = SubscribeBookDeltas::new(
2974 cmd.instrument_id,
2975 cmd.book_type,
2976 cmd.client_id,
2977 cmd.venue,
2978 UUID4::new(),
2979 cmd.ts_init,
2980 cmd.depth,
2981 true, Some(cmd.command_id),
2983 cmd.params.clone(),
2984 );
2985 log::debug!(
2986 "Calling client.execute_subscribe for BookDeltas: {}",
2987 cmd.instrument_id
2988 );
2989 client.execute_subscribe(SubscribeCommand::BookDeltas(deltas_cmd));
2990 } else {
2991 log::error!(
2992 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
2993 cmd.client_id,
2994 cmd.venue,
2995 );
2996 }
2997
2998 Ok(())
2999 }
3000
3001 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
3002 match cmd.bar_type.aggregation_source() {
3003 AggregationSource::Internal => self.start_live_bar_aggregator(cmd)?,
3004 AggregationSource::External => {
3005 if cmd.bar_type.instrument_id().is_synthetic() {
3006 anyhow::bail!(
3007 "Cannot subscribe for externally aggregated synthetic instrument bar data"
3008 );
3009 }
3010 }
3011 }
3012
3013 Ok(())
3014 }
3015
3016 fn subscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
3017 let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
3018 log::error!(
3019 "Cannot subscribe to `QuoteTick` data for synthetic instrument {instrument_id}, not found",
3020 );
3021 return;
3022 };
3023
3024 if !self.subscribed_synthetic_quotes.insert(instrument_id) {
3025 return;
3026 }
3027
3028 for component_id in &synthetic.components {
3029 let synthetics = self.synthetic_quote_feeds.entry(*component_id).or_default();
3030 if !synthetics
3031 .iter()
3032 .any(|registered| registered.id == synthetic.id)
3033 {
3034 synthetics.push(synthetic.clone());
3035 }
3036 }
3037 }
3038
3039 fn subscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
3040 let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
3041 log::error!(
3042 "Cannot subscribe to `TradeTick` data for synthetic instrument {instrument_id}, not found",
3043 );
3044 return;
3045 };
3046
3047 if !self.subscribed_synthetic_trades.insert(instrument_id) {
3048 return;
3049 }
3050
3051 for component_id in &synthetic.components {
3052 let synthetics = self.synthetic_trade_feeds.entry(*component_id).or_default();
3053 if !synthetics
3054 .iter()
3055 .any(|registered| registered.id == synthetic.id)
3056 {
3057 synthetics.push(synthetic.clone());
3058 }
3059 }
3060 }
3061
3062 fn is_spread_quote_command(
3063 &self,
3064 instrument_id: InstrumentId,
3065 params: Option<&Params>,
3066 ) -> bool {
3067 if !params
3068 .and_then(|params| params.get_bool("aggregate_spread_quotes"))
3069 .unwrap_or(false)
3070 {
3071 return false;
3072 }
3073
3074 self.cache
3075 .borrow()
3076 .instrument(&instrument_id)
3077 .is_some_and(InstrumentAny::is_spread)
3078 }
3079
3080 fn subscribe_spread_quotes(&mut self, cmd: &SubscribeQuotes) {
3081 if self
3082 .spread_quote_aggregators
3083 .contains_key(&cmd.instrument_id)
3084 {
3085 log::warn!(
3086 "SpreadQuoteAggregator for {} is currently in use, subscription can't be started",
3087 cmd.instrument_id,
3088 );
3089 return;
3090 }
3091
3092 let Some(instrument) = self.cache.borrow().instrument(&cmd.instrument_id).cloned() else {
3093 log::error!(
3094 "Cannot create spread quote aggregator: no instrument found for {}",
3095 cmd.instrument_id,
3096 );
3097 return;
3098 };
3099 let Some(legs) = spread_instrument_legs(&instrument) else {
3100 log::error!(
3101 "Cannot create spread quote aggregator: invalid spread legs for {}",
3102 cmd.instrument_id,
3103 );
3104 return;
3105 };
3106
3107 if legs.len() <= 1 {
3108 log::error!(
3109 "Cannot create spread quote aggregator: spread instrument {} should have more than one leg",
3110 cmd.instrument_id,
3111 );
3112 return;
3113 }
3114
3115 let cache = self.cache.clone();
3116 let handler = Box::new(move |quote: QuoteTick| {
3117 let exchange_endpoint = format!(
3118 "SimulatedExchange.process_new_quote.{}",
3119 quote.instrument_id.venue
3120 );
3121 let exchange_endpoint = exchange_endpoint.into();
3122 if msgbus::has_quote_endpoint(exchange_endpoint) {
3123 msgbus::send_quote(exchange_endpoint, "e);
3124 }
3125
3126 if let Err(e) = cache.borrow_mut().add_quote(quote) {
3127 log_error_on_cache_insert(&e);
3128 }
3129 let topic = switchboard::get_quotes_topic(quote.instrument_id);
3130 msgbus::publish_quote(topic, "e);
3131 });
3132 let aggregator = Rc::new(RefCell::new(SpreadQuoteAggregator::new(
3133 cmd.instrument_id,
3134 &legs,
3135 matches!(
3136 instrument,
3137 InstrumentAny::FuturesSpread(_) | InstrumentAny::CryptoFuturesSpread(_)
3138 ),
3139 instrument.price_precision(),
3140 instrument.size_precision(),
3141 handler,
3142 self.clock.clone(),
3143 false,
3144 spread_quote_update_interval_seconds(cmd.params.as_ref()),
3145 cmd.params
3146 .as_ref()
3147 .and_then(|params| params.get_u64("quote_build_delay"))
3148 .unwrap_or(0),
3149 None,
3150 None,
3151 )));
3152
3153 let mut handlers = Vec::with_capacity(legs.len());
3154 for (leg_id, _) in &legs {
3155 let topic = switchboard::get_quotes_topic(*leg_id);
3156 let handler = TypedHandler::new(SpreadQuoteHandler::new(
3157 &aggregator,
3158 cmd.instrument_id,
3159 *leg_id,
3160 ));
3161 msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
3162 handlers.push((*leg_id, handler));
3163 }
3164
3165 aggregator
3166 .borrow_mut()
3167 .start_timer(Some(aggregator.clone()));
3168 aggregator.borrow_mut().set_running(true);
3169 self.spread_quote_aggregators
3170 .insert(cmd.instrument_id, aggregator);
3171 self.spread_quote_handlers
3172 .insert(cmd.instrument_id, handlers);
3173
3174 for (leg_id, _) in legs {
3175 let subscribe = SubscribeQuotes::new(
3176 leg_id,
3177 cmd.client_id,
3178 cmd.venue,
3179 UUID4::new(),
3180 cmd.ts_init,
3181 Some(cmd.command_id),
3182 cmd.params.clone(),
3183 );
3184 self.execute(DataCommand::Subscribe(SubscribeCommand::Quotes(subscribe)));
3185 }
3186 }
3187
3188 fn unsubscribe_spread_quotes(&mut self, cmd: &UnsubscribeQuotes) {
3189 let Some(leg_ids) = self.stop_spread_quote_aggregator(cmd.instrument_id) else {
3190 return;
3191 };
3192
3193 for leg_id in leg_ids {
3194 let unsubscribe = UnsubscribeQuotes::new(
3195 leg_id,
3196 cmd.client_id,
3197 cmd.venue,
3198 UUID4::new(),
3199 cmd.ts_init,
3200 Some(cmd.command_id),
3201 cmd.params.clone(),
3202 );
3203 self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(
3204 unsubscribe,
3205 )));
3206 }
3207 }
3208
3209 fn stop_spread_quote_aggregator(
3210 &mut self,
3211 spread_instrument_id: InstrumentId,
3212 ) -> Option<Vec<InstrumentId>> {
3213 let Some(aggregator) = self.spread_quote_aggregators.remove(&spread_instrument_id) else {
3214 log::warn!(
3215 "Cannot stop spread quote aggregator: no aggregator to stop for {spread_instrument_id}",
3216 );
3217 return None;
3218 };
3219
3220 aggregator.borrow_mut().stop_timer();
3221 aggregator.borrow_mut().set_running(false);
3222
3223 let handlers = self
3224 .spread_quote_handlers
3225 .remove(&spread_instrument_id)
3226 .unwrap_or_default();
3227 let mut leg_ids = Vec::with_capacity(handlers.len());
3228 for (leg_id, handler) in handlers {
3229 let topic = switchboard::get_quotes_topic(leg_id);
3230 msgbus::unsubscribe_quotes(topic.into(), &handler);
3231 leg_ids.push(leg_id);
3232 }
3233
3234 Some(leg_ids)
3235 }
3236
3237 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> bool {
3238 match self.decrement_book_delta_subscription(cmd.instrument_id, cmd.client_id, cmd.venue) {
3239 BookDeltasUnsubscribeResult::NotSubscribed => {
3240 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
3241 return false;
3242 }
3243 BookDeltasUnsubscribeResult::Decremented => return false,
3244 BookDeltasUnsubscribeResult::Removed => {}
3245 }
3246
3247 self.maintain_book_updater(&cmd.instrument_id);
3248
3249 !self.has_book_delta_subscriptions(&cmd.instrument_id)
3252 && !self.has_book_snapshot_subscriptions(&cmd.instrument_id)
3253 }
3254
3255 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> bool {
3256 if !self.book_depth10_subs.contains(&cmd.instrument_id) {
3257 log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
3258 return false;
3259 }
3260
3261 self.book_depth10_subs.remove(&cmd.instrument_id);
3262 self.maintain_book_updater(&cmd.instrument_id);
3263
3264 true
3265 }
3266
3267 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
3268 match self.decrement_book_snapshot_subscription(cmd.instrument_id, cmd.interval_ms) {
3269 BookSnapshotUnsubscribeResult::NotSubscribed => {
3270 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
3271 return;
3272 }
3273 BookSnapshotUnsubscribeResult::Decremented => return,
3274 BookSnapshotUnsubscribeResult::Removed => {}
3275 }
3276
3277 if self.has_book_snapshot_subscriptions(&cmd.instrument_id) {
3278 return;
3279 }
3280
3281 self.maintain_book_updater(&cmd.instrument_id);
3282
3283 if self.has_book_delta_subscriptions(&cmd.instrument_id) {
3284 return;
3285 }
3286
3287 if let Some(client_id) = cmd.client_id.as_ref()
3288 && self.external_clients.contains(client_id)
3289 {
3290 return;
3291 }
3292
3293 if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
3294 let deltas_cmd = UnsubscribeBookDeltas::new(
3295 cmd.instrument_id,
3296 cmd.client_id,
3297 cmd.venue,
3298 UUID4::new(),
3299 cmd.ts_init,
3300 Some(cmd.command_id),
3301 cmd.params.clone(),
3302 );
3303 client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
3304 }
3305 }
3306
3307 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
3308 let bar_type = cmd.bar_type;
3309
3310 let topic = switchboard::get_bars_topic(bar_type.standard());
3312 if msgbus::exact_subscriber_count_bars(topic) > 0 {
3313 return;
3314 }
3315
3316 if self
3317 .bar_aggregators
3318 .contains_key(&bar_aggregator_key(bar_type, None))
3319 {
3320 match self.stop_bar_aggregator(bar_type, None) {
3321 Ok(()) => self.unsubscribe_bar_aggregator(cmd),
3322 Err(e) => log::error!("Error stopping bar aggregator for {bar_type}: {e}"),
3323 }
3324 }
3325
3326 if bar_type.is_composite() {
3328 let source_type = bar_type.composite();
3329 let source_topic = switchboard::get_bars_topic(source_type);
3330 if msgbus::exact_subscriber_count_bars(source_topic) == 0
3331 && self
3332 .bar_aggregators
3333 .contains_key(&bar_aggregator_key(source_type, None))
3334 && let Err(e) = self.stop_bar_aggregator(source_type, None)
3335 {
3336 log::error!("Error stopping source bar aggregator for {source_type}: {e}");
3337 }
3338 }
3339 }
3340
3341 fn unsubscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
3342 if !self.subscribed_synthetic_quotes.remove(&instrument_id) {
3343 log::warn!("Cannot unsubscribe from synthetic `QuoteTick` data: not subscribed");
3344 return;
3345 }
3346
3347 self.synthetic_quote_feeds.retain(|_, synthetics| {
3348 synthetics.retain(|synthetic| synthetic.id != instrument_id);
3349 !synthetics.is_empty()
3350 });
3351 }
3352
3353 fn unsubscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
3354 if !self.subscribed_synthetic_trades.remove(&instrument_id) {
3355 log::warn!("Cannot unsubscribe from synthetic `TradeTick` data: not subscribed");
3356 return;
3357 }
3358
3359 self.synthetic_trade_feeds.retain(|_, synthetics| {
3360 synthetics.retain(|synthetic| synthetic.id != instrument_id);
3361 !synthetics.is_empty()
3362 });
3363 }
3364
3365 fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
3366 let series_id = cmd.series_id;
3367
3368 if let Some(old) = self.option_chain_managers.remove(&series_id) {
3370 log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
3371 let all_ids = old.borrow().all_instrument_ids();
3372 let old_venue = old.borrow().venue();
3373 old.borrow_mut().teardown(&self.clock);
3374 self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
3375 }
3376
3377 self.pending_option_chain_requests
3379 .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
3380
3381 if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
3384 let resolved_client_id = self
3386 .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
3387 .map(|c| c.client_id);
3388
3389 if let Some(client_id) = resolved_client_id {
3390 let request_id = UUID4::new();
3391 let ts_init = self.clock.borrow().timestamp_ns();
3392
3393 let sample_instrument_id = {
3396 let cache = self.cache.borrow();
3397 cache
3398 .instruments(&series_id.venue, Some(&series_id.underlying))
3399 .iter()
3400 .find(|i| {
3401 i.expiration_ns() == Some(series_id.expiration_ns)
3402 && i.settlement_currency().code == series_id.settlement_currency
3403 })
3404 .map(|i| i.id())
3405 };
3406
3407 let request = RequestForwardPrices::new(
3408 series_id.venue,
3409 series_id.underlying,
3410 sample_instrument_id,
3411 Some(client_id),
3412 request_id,
3413 ts_init,
3414 None,
3415 );
3416
3417 self.pending_option_chain_requests
3418 .insert(request_id, cmd.clone());
3419
3420 let req_cmd = RequestCommand::ForwardPrices(request);
3421 if let Err(e) = self.execute_request(req_cmd) {
3422 log::warn!("Failed to request forward prices for {series_id}: {e}");
3423 let cmd = self
3424 .pending_option_chain_requests
3425 .remove(&request_id)
3426 .expect("just inserted");
3427 self.create_option_chain_manager(&cmd, None);
3428 }
3429
3430 return;
3431 }
3432 }
3433
3434 self.create_option_chain_manager(cmd, None);
3435 }
3436
3437 fn create_option_chain_manager(
3439 &mut self,
3440 cmd: &SubscribeOptionChain,
3441 initial_atm_price: Option<Price>,
3442 ) {
3443 let series_id = cmd.series_id;
3444 let cache = self.cache.clone();
3445 let clock = self.clock.clone();
3446 let priority = self.msgbus_priority;
3447 let deferred_cmd_queue = self.deferred_cmd_queue.clone();
3448
3449 let manager_rc = {
3450 let client = self.get_command_client(cmd.client_id.as_ref(), Some(&series_id.venue));
3451 OptionChainManager::create_and_setup(
3452 series_id,
3453 &cache,
3454 cmd,
3455 &clock,
3456 priority,
3457 client,
3458 initial_atm_price,
3459 deferred_cmd_queue,
3460 )
3461 };
3462
3463 for id in manager_rc.borrow().all_instrument_ids() {
3465 self.option_chain_instrument_index.insert(id, series_id);
3466 }
3467
3468 self.option_chain_managers.insert(series_id, manager_rc);
3469 }
3470
3471 fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
3472 let series_id = cmd.series_id;
3473
3474 let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
3475 log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
3476 return;
3477 };
3478
3479 let all_ids = manager_rc.borrow().all_instrument_ids();
3481 let venue = manager_rc.borrow().venue();
3482
3483 for id in &all_ids {
3485 self.option_chain_instrument_index.remove(id);
3486 }
3487
3488 manager_rc.borrow_mut().teardown(&self.clock);
3489
3490 self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
3492
3493 log::info!("Unsubscribed option chain for {series_id}");
3494 }
3495
3496 fn forward_option_chain_unsubscribes(
3498 &mut self,
3499 instrument_ids: &[InstrumentId],
3500 venue: Venue,
3501 client_id: Option<ClientId>,
3502 ) {
3503 let ts_init = self.clock.borrow().timestamp_ns();
3504
3505 let Some(client) = self.get_command_client(client_id.as_ref(), Some(&venue)) else {
3506 log::error!(
3507 "Cannot forward option chain unsubscribes: no client found for venue={venue}",
3508 );
3509 return;
3510 };
3511
3512 for instrument_id in instrument_ids {
3513 client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
3514 *instrument_id,
3515 client_id,
3516 Some(venue),
3517 UUID4::new(),
3518 ts_init,
3519 None,
3520 None,
3521 )));
3522 client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
3523 UnsubscribeOptionGreeks::new(
3524 *instrument_id,
3525 client_id,
3526 Some(venue),
3527 UUID4::new(),
3528 ts_init,
3529 None,
3530 None,
3531 ),
3532 ));
3533 client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
3534 UnsubscribeInstrumentStatus::new(
3535 *instrument_id,
3536 client_id,
3537 Some(venue),
3538 UUID4::new(),
3539 ts_init,
3540 None,
3541 None,
3542 ),
3543 ));
3544 }
3545 }
3546
3547 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId) {
3548 let is_parent = self
3555 .book_deltas_parent_expansions
3556 .contains_key(instrument_id)
3557 || self
3558 .book_depth10_parent_expansions
3559 .contains_key(instrument_id);
3560 let target_ids: Vec<InstrumentId> = if is_parent {
3561 let mut set: AHashSet<InstrumentId> = AHashSet::new();
3562
3563 if let Some(expansion) = self.book_deltas_parent_expansions.get(instrument_id) {
3564 set.extend(expansion.iter().copied());
3565 }
3566
3567 if let Some(expansion) = self.book_depth10_parent_expansions.get(instrument_id) {
3568 set.extend(expansion.iter().copied());
3569 }
3570
3571 if set.is_empty() {
3572 return;
3573 }
3574
3575 set.into_iter().collect()
3576 } else {
3577 vec![*instrument_id]
3578 };
3579
3580 if is_parent {
3581 let parent_still_needs_deltas = self.has_book_delta_subscriptions(instrument_id)
3586 || self.book_depth10_subs.contains(instrument_id)
3587 || self.has_book_snapshot_subscriptions(instrument_id);
3588 let parent_still_needs_depth10 = self.book_depth10_subs.contains(instrument_id)
3589 || self.has_book_snapshot_subscriptions(instrument_id);
3590
3591 if !parent_still_needs_deltas {
3592 self.book_deltas_parent_expansions.remove(instrument_id);
3593 }
3594
3595 if !parent_still_needs_depth10 {
3596 self.book_depth10_parent_expansions.remove(instrument_id);
3597 }
3598 }
3599
3600 for target_id in &target_ids {
3601 let wants_deltas = self.is_underlying_wanted_for_deltas(target_id);
3602 let wants_depth10 = self.is_underlying_wanted_for_depth10(target_id);
3603
3604 let Some(updater) = self.book_updaters.get(target_id).cloned() else {
3605 continue;
3606 };
3607
3608 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
3609 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
3610
3611 if !wants_deltas {
3612 let topic = switchboard::get_book_deltas_topic(*target_id);
3613 msgbus::unsubscribe_book_deltas(topic.into(), &deltas_handler);
3614 }
3615
3616 if !wants_depth10 {
3617 let topic = switchboard::get_book_depth10_topic(*target_id);
3618 msgbus::unsubscribe_book_depth10(topic.into(), &depth_handler);
3619 }
3620
3621 if !wants_deltas && !wants_depth10 {
3622 self.book_updaters.remove(target_id);
3623 log::debug!("Removed BookUpdater for instrument ID {target_id}");
3624 }
3625 }
3626 }
3627
3628 fn has_book_snapshot_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
3629 self.book_snapshot_counts
3630 .keys()
3631 .any(|(id, _)| id == instrument_id)
3632 }
3633
3634 fn has_book_delta_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
3635 self.book_deltas_counts
3636 .keys()
3637 .any(|(id, _, _)| id == instrument_id)
3638 }
3639
3640 fn has_book_delta_subscription_key(
3641 &self,
3642 instrument_id: InstrumentId,
3643 client_id: Option<ClientId>,
3644 venue: Option<Venue>,
3645 ) -> bool {
3646 self.book_deltas_counts
3647 .contains_key(&(instrument_id, client_id, venue))
3648 }
3649
3650 fn increment_book_delta_subscription(
3651 &mut self,
3652 instrument_id: InstrumentId,
3653 client_id: Option<ClientId>,
3654 venue: Option<Venue>,
3655 ) {
3656 let key = (instrument_id, client_id, venue);
3657
3658 if let Some(count) = self.book_deltas_counts.get_mut(&key) {
3659 *count += 1;
3660 } else {
3661 self.book_deltas_counts.insert(key, 1);
3662 }
3663 }
3664
3665 fn decrement_book_delta_subscription(
3666 &mut self,
3667 instrument_id: InstrumentId,
3668 client_id: Option<ClientId>,
3669 venue: Option<Venue>,
3670 ) -> BookDeltasUnsubscribeResult {
3671 let key = (instrument_id, client_id, venue);
3672
3673 let Some(count) = self.book_deltas_counts.get_mut(&key) else {
3674 return BookDeltasUnsubscribeResult::NotSubscribed;
3675 };
3676
3677 if *count > 1 {
3678 *count -= 1;
3679 return BookDeltasUnsubscribeResult::Decremented;
3680 }
3681
3682 self.book_deltas_counts.shift_remove(&key);
3683 BookDeltasUnsubscribeResult::Removed
3684 }
3685
3686 fn increment_book_snapshot_subscription(
3687 &mut self,
3688 cmd: &SubscribeBookSnapshots,
3689 parent: Option<(Ustr, InstrumentClass)>,
3690 ) -> bool {
3691 let key = (cmd.instrument_id, cmd.interval_ms);
3692
3693 if let Some(count) = self.book_snapshot_counts.get_mut(&key) {
3694 *count += 1;
3695 return false;
3696 }
3697
3698 self.book_snapshot_counts.insert(key, 1);
3699
3700 let snapshot_infos = if let Some(snapshot_infos) = self.book_intervals.get(&cmd.interval_ms)
3701 {
3702 snapshot_infos.clone()
3703 } else {
3704 let snapshot_infos = Rc::new(RefCell::new(IndexMap::new()));
3705 self.book_intervals
3706 .insert(cmd.interval_ms, snapshot_infos.clone());
3707 self.schedule_book_snapshotter(cmd.interval_ms, snapshot_infos.clone());
3708 snapshot_infos
3709 };
3710
3711 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
3712 let snap_info = BookSnapshotInfo {
3713 instrument_id: cmd.instrument_id,
3714 venue: cmd.instrument_id.venue,
3715 parent,
3716 topic,
3717 interval_ms: cmd.interval_ms,
3718 };
3719
3720 snapshot_infos
3721 .borrow_mut()
3722 .insert(cmd.instrument_id, snap_info);
3723
3724 true
3725 }
3726
3727 fn decrement_book_snapshot_subscription(
3728 &mut self,
3729 instrument_id: InstrumentId,
3730 interval_ms: NonZeroUsize,
3731 ) -> BookSnapshotUnsubscribeResult {
3732 let key = (instrument_id, interval_ms);
3733
3734 let Some(count) = self.book_snapshot_counts.get_mut(&key) else {
3735 return BookSnapshotUnsubscribeResult::NotSubscribed;
3736 };
3737
3738 if *count > 1 {
3739 *count -= 1;
3740 return BookSnapshotUnsubscribeResult::Decremented;
3741 }
3742
3743 self.book_snapshot_counts.shift_remove(&key);
3744
3745 let remove_interval = if let Some(snapshot_infos) = self.book_intervals.get(&interval_ms) {
3746 let mut snapshot_infos = snapshot_infos.borrow_mut();
3747 snapshot_infos.shift_remove(&instrument_id);
3748 snapshot_infos.is_empty()
3749 } else {
3750 false
3751 };
3752
3753 if remove_interval {
3754 self.book_intervals.remove(&interval_ms);
3755
3756 if let Some(snapshotter) = self.book_snapshotters.remove(&interval_ms) {
3757 let timer_name = snapshotter.timer_name;
3758 let mut clock = self.clock.borrow_mut();
3759 if clock.timer_exists(&timer_name) {
3760 clock.cancel_timer(&timer_name);
3761 }
3762 }
3763 }
3764
3765 BookSnapshotUnsubscribeResult::Removed
3766 }
3767
3768 fn schedule_book_snapshotter(
3769 &mut self,
3770 interval_ms: NonZeroUsize,
3771 snapshot_infos: BookSnapshotInfos,
3772 ) {
3773 let interval_ns = millis_to_nanos_unchecked(interval_ms.get() as f64);
3774 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
3775 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
3776
3777 let snapshotter = Rc::new(BookSnapshotter::new(
3778 interval_ms,
3779 snapshot_infos,
3780 self.cache.clone(),
3781 ));
3782 let timer_name = snapshotter.timer_name;
3783 let snapshotter_callback = snapshotter.clone();
3784 let callback_fn: Rc<dyn Fn(TimeEvent)> =
3785 Rc::new(move |event| snapshotter_callback.snapshot(event));
3786 let callback = TimeEventCallback::from(callback_fn);
3787
3788 self.clock
3789 .borrow_mut()
3790 .set_timer_ns(
3791 &timer_name,
3792 interval_ns,
3793 Some(start_time_ns.into()),
3794 None,
3795 Some(callback),
3796 None,
3797 None,
3798 )
3799 .expect(FAILED);
3800
3801 self.book_snapshotters.insert(interval_ms, snapshotter);
3802 }
3803
3804 fn handle_instrument_response(&self, instrument: InstrumentAny) {
3805 let mut cache = self.cache.as_ref().borrow_mut();
3806 if let Err(e) = cache.add_instrument(instrument) {
3807 log_error_on_cache_insert(&e);
3808 }
3809 }
3810
3811 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
3812 let mut cache = self.cache.as_ref().borrow_mut();
3814 for instrument in instruments {
3815 if let Err(e) = cache.add_instrument(instrument.clone()) {
3816 log_error_on_cache_insert(&e);
3817 }
3818 }
3819 }
3820
3821 fn handle_quotes(&self, quotes: &[QuoteTick]) {
3822 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
3823 log_error_on_cache_insert(&e);
3824 }
3825 }
3826
3827 fn handle_trades(&self, trades: &[TradeTick]) {
3828 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
3829 log_error_on_cache_insert(&e);
3830 }
3831 }
3832
3833 fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
3834 if let Err(e) = self
3835 .cache
3836 .as_ref()
3837 .borrow_mut()
3838 .add_funding_rates(funding_rates)
3839 {
3840 log_error_on_cache_insert(&e);
3841 }
3842 }
3843
3844 fn handle_bars(&self, bars: &[Bar]) {
3845 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
3846 log_error_on_cache_insert(&e);
3847 }
3848 }
3849
3850 fn cache_is_owned_by_live_subscription(&self, instrument_id: &InstrumentId) -> bool {
3853 self.book_updaters.contains_key(instrument_id)
3854 }
3855
3856 fn handle_book_response(&self, book: &OrderBook) {
3857 if self.cache_is_owned_by_live_subscription(&book.instrument_id) {
3858 log::debug!(
3859 "Skipping cache write for order book {}: live subscription owns the book",
3860 book.instrument_id,
3861 );
3862 return;
3863 }
3864
3865 log::debug!("Adding order book {} to cache", book.instrument_id);
3866
3867 if let Err(e) = self
3868 .cache
3869 .as_ref()
3870 .borrow_mut()
3871 .add_order_book(book.clone())
3872 {
3873 log_error_on_cache_insert(&e);
3874 }
3875 }
3876
3877 fn handle_book_deltas_response(&self, resp: &BookDeltasResponse) {
3878 if !self.cache_is_owned_by_live_subscription(&resp.instrument_id) {
3879 let mut cache = self.cache.as_ref().borrow_mut();
3880 if let Some(book) = cache.order_book_mut(&resp.instrument_id) {
3881 for delta in &resp.data {
3882 if let Err(e) = book.apply_delta(delta) {
3883 log::error!("Failed to apply historical delta to cache: {e}");
3884 }
3885 }
3886 } else {
3887 log::debug!(
3888 "Skipping cache write for {} historical deltas on {}: no cache book yet",
3889 resp.data.len(),
3890 resp.instrument_id,
3891 );
3892 }
3893 }
3894
3895 if resp.data.is_empty() {
3900 return;
3901 }
3902
3903 let topic = switchboard::get_pipeline_book_deltas_topic(resp.instrument_id);
3904 let mut frame: Vec<OrderBookDelta> = Vec::new();
3905
3906 for delta in &resp.data {
3907 frame.push(*delta);
3908 if RecordFlag::F_LAST.matches(delta.flags) {
3909 let batch = OrderBookDeltas::new(resp.instrument_id, std::mem::take(&mut frame));
3910 msgbus::publish_deltas(topic, &batch);
3911 }
3912 }
3913
3914 if !frame.is_empty() {
3915 let batch = OrderBookDeltas::new(resp.instrument_id, frame);
3916 msgbus::publish_deltas(topic, &batch);
3917 }
3918 }
3919
3920 fn handle_book_depth_response(&self, resp: &BookDepthResponse) {
3921 let topic = switchboard::get_pipeline_book_depth10_topic(resp.instrument_id);
3922
3923 for depth in &resp.data {
3924 msgbus::publish_depth10(topic, depth);
3925 }
3926 }
3927
3928 fn handle_forward_prices_response(
3931 &mut self,
3932 correlation_id: &UUID4,
3933 resp: &ForwardPricesResponse,
3934 ) {
3935 let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
3936 log::debug!(
3937 "No pending option chain request for correlation_id={correlation_id}, ignoring"
3938 );
3939 return;
3940 };
3941
3942 let series_id = cmd.series_id;
3943
3944 let cache = self.cache.borrow();
3947 let mut best_price: Option<Price> = None;
3948
3949 for fp in &resp.data {
3950 if let Some(instrument) = cache.instrument(&fp.instrument_id)
3952 && let Some(expiration) = instrument.expiration_ns()
3953 && expiration == series_id.expiration_ns
3954 && instrument.settlement_currency().code == series_id.settlement_currency
3955 {
3956 match Price::from_decimal(fp.forward_price) {
3957 Ok(price) => best_price = Some(price),
3958 Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
3959 }
3960 break;
3961 }
3962 }
3963 drop(cache);
3964
3965 if let Some(price) = best_price {
3966 log::info!("Forward price for {series_id}: {price} (instant bootstrap)");
3967 } else {
3968 log::info!(
3969 "No matching forward price found for {series_id}, will bootstrap from live data",
3970 );
3971 }
3972
3973 self.create_option_chain_manager(&cmd, best_price);
3974 }
3975
3976 fn setup_book_updater(
3977 &mut self,
3978 instrument_id: &InstrumentId,
3979 book_type: BookType,
3980 only_deltas: bool,
3981 parent: Option<(Ustr, InstrumentClass)>,
3982 ) -> anyhow::Result<()> {
3983 let target_ids: Vec<InstrumentId> = if let Some((root, class)) = parent {
3988 self.cache
3989 .borrow()
3990 .instruments_by_parent(&instrument_id.venue, &root, class)
3991 .iter()
3992 .map(|i| i.id())
3993 .collect()
3994 } else {
3995 vec![*instrument_id]
3996 };
3997
3998 if parent.is_some() {
3999 self.book_deltas_parent_expansions
4000 .insert(*instrument_id, target_ids.clone());
4001
4002 if !only_deltas {
4003 self.book_depth10_parent_expansions
4004 .insert(*instrument_id, target_ids.clone());
4005 }
4006 }
4007
4008 {
4009 let mut cache = self.cache.borrow_mut();
4010 for target_id in &target_ids {
4011 if !cache.has_order_book(target_id) {
4012 let book = OrderBook::new(*target_id, book_type);
4013 log::debug!("Created {book}");
4014 cache.add_order_book(book)?;
4015 }
4016 }
4017 }
4018
4019 for target_id in &target_ids {
4020 let updater = self
4021 .book_updaters
4022 .entry(*target_id)
4023 .or_insert_with(|| {
4024 Rc::new(BookUpdater::new(
4025 target_id,
4026 self.cache.clone(),
4027 self.config.emit_quotes_from_book,
4028 ))
4029 })
4030 .clone();
4031
4032 let deltas_topic = switchboard::get_book_deltas_topic(*target_id);
4037 let deltas_handler = TypedHandler::new(updater.clone());
4038 msgbus::subscribe_book_deltas(
4039 deltas_topic.into(),
4040 deltas_handler,
4041 Some(self.msgbus_priority),
4042 );
4043
4044 if !only_deltas {
4045 let depth_topic = switchboard::get_book_depth10_topic(*target_id);
4046 let depth_handler = TypedHandler::new(updater);
4047 msgbus::subscribe_book_depth10(
4048 depth_topic.into(),
4049 depth_handler,
4050 Some(self.msgbus_priority),
4051 );
4052 }
4053 }
4054
4055 Ok(())
4056 }
4057
4058 fn is_underlying_wanted_for_deltas(&self, target_id: &InstrumentId) -> bool {
4059 if self.has_book_delta_subscriptions(target_id)
4063 || self.book_depth10_subs.contains(target_id)
4064 || self.has_book_snapshot_subscriptions(target_id)
4065 {
4066 return true;
4067 }
4068 self.book_deltas_parent_expansions
4069 .values()
4070 .any(|expansion| expansion.contains(target_id))
4071 }
4072
4073 fn is_underlying_wanted_for_depth10(&self, target_id: &InstrumentId) -> bool {
4074 if self.book_depth10_subs.contains(target_id)
4077 || self.has_book_snapshot_subscriptions(target_id)
4078 {
4079 return true;
4080 }
4081 self.book_depth10_parent_expansions
4082 .values()
4083 .any(|expansion| expansion.contains(target_id))
4084 }
4085
4086 fn create_bar_aggregator(
4087 &self,
4088 instrument: &InstrumentAny,
4089 bar_type: BarType,
4090 ) -> Box<dyn BarAggregator> {
4091 let cache = self.cache.clone();
4092 let validate_sequence = self.config.validate_data_sequence;
4093
4094 let handler = move |bar: Bar| {
4095 process_engine_bar(&cache, validate_sequence, true, bar);
4096 };
4097
4098 let clock = self.clock.clone();
4099 let config = self.config.clone();
4100
4101 let price_precision = instrument.price_precision();
4102 let size_precision = instrument.size_precision();
4103
4104 if bar_type.spec().is_time_aggregated() {
4105 let time_bars_origin_offset = config
4106 .time_bars_origin_offset
4107 .get(&bar_type.spec().aggregation)
4108 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
4109
4110 Box::new(TimeBarAggregator::new(
4111 bar_type,
4112 price_precision,
4113 size_precision,
4114 clock,
4115 handler,
4116 config.time_bars_build_with_no_updates,
4117 config.time_bars_timestamp_on_close,
4118 config.time_bars_interval_type,
4119 time_bars_origin_offset,
4120 config.time_bars_build_delay,
4121 config.time_bars_skip_first_non_full_bar,
4122 ))
4123 } else {
4124 match bar_type.spec().aggregation {
4125 BarAggregation::Tick => Box::new(TickBarAggregator::new(
4126 bar_type,
4127 price_precision,
4128 size_precision,
4129 handler,
4130 )) as Box<dyn BarAggregator>,
4131 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
4132 bar_type,
4133 price_precision,
4134 size_precision,
4135 handler,
4136 )) as Box<dyn BarAggregator>,
4137 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
4138 bar_type,
4139 price_precision,
4140 size_precision,
4141 handler,
4142 )) as Box<dyn BarAggregator>,
4143 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
4144 bar_type,
4145 price_precision,
4146 size_precision,
4147 handler,
4148 )) as Box<dyn BarAggregator>,
4149 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
4150 bar_type,
4151 price_precision,
4152 size_precision,
4153 handler,
4154 )) as Box<dyn BarAggregator>,
4155 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
4156 bar_type,
4157 price_precision,
4158 size_precision,
4159 handler,
4160 )) as Box<dyn BarAggregator>,
4161 BarAggregation::Value => Box::new(ValueBarAggregator::new(
4162 bar_type,
4163 price_precision,
4164 size_precision,
4165 handler,
4166 )) as Box<dyn BarAggregator>,
4167 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
4168 bar_type,
4169 price_precision,
4170 size_precision,
4171 handler,
4172 )) as Box<dyn BarAggregator>,
4173 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
4174 bar_type,
4175 price_precision,
4176 size_precision,
4177 handler,
4178 )) as Box<dyn BarAggregator>,
4179 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
4180 bar_type,
4181 price_precision,
4182 size_precision,
4183 instrument.price_increment(),
4184 handler,
4185 )) as Box<dyn BarAggregator>,
4186 other => unreachable!(
4187 "Unsupported internal bar aggregation dispatch for {other:?}; update `create_bar_aggregator`"
4188 ),
4189 }
4190 }
4191 }
4192
4193 fn create_bar_aggregator_for_key(
4194 &mut self,
4195 bar_type: BarType,
4196 request_id: Option<UUID4>,
4197 ) -> anyhow::Result<()> {
4198 let key = bar_aggregator_key(bar_type, request_id);
4199 if self.bar_aggregators.contains_key(&key) {
4200 return Ok(());
4201 }
4202
4203 let instrument = {
4204 let cache = self.cache.borrow();
4205 cache
4206 .instrument(&bar_type.instrument_id())
4207 .ok_or_else(|| {
4208 anyhow::anyhow!(
4209 "Cannot start bar aggregation: no instrument found for {}",
4210 bar_type.instrument_id(),
4211 )
4212 })?
4213 .clone()
4214 };
4215 let aggregator = self.create_bar_aggregator(&instrument, bar_type);
4216 self.bar_aggregators
4217 .insert(key, Rc::new(RefCell::new(aggregator)));
4218
4219 Ok(())
4220 }
4221
4222 fn start_live_bar_aggregator(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
4223 let key = bar_aggregator_key(cmd.bar_type, None);
4224
4225 if self
4226 .bar_aggregators
4227 .get(&key)
4228 .is_some_and(|aggregator| aggregator.borrow().is_running())
4229 && self.bar_aggregator_handlers.contains_key(&key)
4230 {
4231 log::warn!(
4232 "Aggregator for {} is currently in use, subscription can't be started",
4233 cmd.bar_type,
4234 );
4235 return Ok(());
4236 }
4237
4238 self.start_bar_aggregator(cmd.bar_type, None)?;
4239 self.subscribe_bar_aggregator(cmd);
4240
4241 Ok(())
4242 }
4243
4244 fn start_bar_aggregator(
4245 &mut self,
4246 bar_type: BarType,
4247 request_id: Option<UUID4>,
4248 ) -> anyhow::Result<()> {
4249 let key = bar_aggregator_key(bar_type, request_id);
4250 let bar_type_std = bar_type.standard();
4251
4252 self.create_bar_aggregator_for_key(bar_type, request_id)?;
4253 let aggregator = self
4254 .bar_aggregators
4255 .get(&key)
4256 .ok_or_else(|| anyhow::anyhow!("Cannot start bar aggregation for {bar_type}"))?
4257 .clone();
4258 let defer_live_activation = request_id.is_none()
4259 && aggregator.borrow().is_running()
4260 && !self.bar_aggregator_handlers.contains_key(&key);
4261
4262 if !self.bar_aggregator_handlers.contains_key(&key) {
4263 let mut subscriptions = Vec::new();
4265
4266 if bar_type.is_composite() {
4267 let topic = switchboard::get_bars_topic(bar_type.composite());
4268 let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_type_std));
4269 msgbus::subscribe_bars(topic.into(), handler.clone(), None);
4270 subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
4271 } else if bar_type.spec().price_type == PriceType::Last {
4272 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
4273 let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_type_std));
4274 msgbus::subscribe_trades(
4275 topic.into(),
4276 handler.clone(),
4277 Some(BAR_AGGREGATOR_PRIORITY),
4278 );
4279 subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
4280 } else {
4281 if matches!(
4283 bar_type.spec().aggregation,
4284 BarAggregation::TickImbalance
4285 | BarAggregation::VolumeImbalance
4286 | BarAggregation::ValueImbalance
4287 | BarAggregation::TickRuns
4288 | BarAggregation::VolumeRuns
4289 | BarAggregation::ValueRuns
4290 ) {
4291 log::warn!(
4292 "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
4293 data with `aggressor_side`, but `price_type` is not LAST so it will receive \
4294 quote data: bars will not emit correctly",
4295 );
4296 }
4297
4298 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
4299 let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_type_std));
4300 msgbus::subscribe_quotes(
4301 topic.into(),
4302 handler.clone(),
4303 Some(BAR_AGGREGATOR_PRIORITY),
4304 );
4305 subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
4306 }
4307
4308 self.bar_aggregator_handlers.insert(key, subscriptions);
4309 }
4310
4311 if defer_live_activation {
4312 return Ok(());
4313 }
4314
4315 self.setup_bar_aggregator(bar_type, false, request_id)?;
4317
4318 aggregator.borrow_mut().set_is_running(true);
4319
4320 Ok(())
4321 }
4322
4323 fn subscribe_bar_aggregator(&mut self, cmd: &SubscribeBars) {
4324 let key = bar_aggregator_key(cmd.bar_type, None);
4325 if !self.bar_aggregators.contains_key(&key) {
4326 log::error!(
4327 "Cannot subscribe bar aggregator: no aggregator found for {}",
4328 cmd.bar_type,
4329 );
4330 return;
4331 }
4332
4333 if cmd.bar_type.is_composite() {
4334 let composite_bar_type = cmd.bar_type.composite();
4335 if composite_bar_type.is_externally_aggregated() {
4336 let subscribe = SubscribeBars::new(
4337 composite_bar_type,
4338 cmd.client_id,
4339 cmd.venue,
4340 UUID4::new(),
4341 cmd.ts_init,
4342 Some(cmd.command_id),
4343 cmd.params.clone(),
4344 );
4345 self.execute(DataCommand::Subscribe(SubscribeCommand::Bars(subscribe)));
4346 }
4347 } else if cmd.bar_type.spec().price_type == PriceType::Last {
4348 let subscribe = SubscribeTrades::new(
4349 cmd.bar_type.instrument_id(),
4350 cmd.client_id,
4351 cmd.venue,
4352 UUID4::new(),
4353 cmd.ts_init,
4354 Some(cmd.command_id),
4355 cmd.params.clone(),
4356 );
4357 self.execute(DataCommand::Subscribe(SubscribeCommand::Trades(subscribe)));
4358 } else {
4359 let subscribe = SubscribeQuotes::new(
4360 cmd.bar_type.instrument_id(),
4361 cmd.client_id,
4362 cmd.venue,
4363 UUID4::new(),
4364 cmd.ts_init,
4365 Some(cmd.command_id),
4366 cmd.params.clone(),
4367 );
4368 self.execute(DataCommand::Subscribe(SubscribeCommand::Quotes(subscribe)));
4369 }
4370 }
4371
4372 fn setup_bar_aggregator(
4376 &self,
4377 bar_type: BarType,
4378 historical: bool,
4379 request_id: Option<UUID4>,
4380 ) -> anyhow::Result<()> {
4381 let key = bar_aggregator_key(bar_type, request_id);
4382 let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
4383 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
4384 })?;
4385
4386 let cache = self.cache.clone();
4388 let validate_sequence = self.config.validate_data_sequence;
4389 let publish = !historical;
4390 let handler: Box<dyn FnMut(Bar)> = Box::new(move |bar: Bar| {
4391 process_engine_bar(&cache, validate_sequence, publish, bar);
4392 });
4393
4394 aggregator
4395 .borrow_mut()
4396 .set_historical_mode(historical, handler);
4397
4398 if bar_type.spec().is_time_aggregated() {
4400 use nautilus_common::clock::TestClock;
4401
4402 if historical {
4403 let test_clock = Rc::new(RefCell::new(TestClock::new()));
4405 aggregator.borrow_mut().set_clock(test_clock);
4406 let aggregator_weak = Rc::downgrade(aggregator);
4409 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
4410 } else {
4411 aggregator.borrow_mut().set_clock(self.clock.clone());
4412 aggregator
4413 .borrow_mut()
4414 .start_timer(Some(aggregator.clone()));
4415 }
4416 }
4417
4418 Ok(())
4419 }
4420
4421 fn unsubscribe_bar_aggregator(&mut self, cmd: &UnsubscribeBars) {
4422 if cmd.bar_type.is_composite() {
4423 let composite_bar_type = cmd.bar_type.composite();
4424 if composite_bar_type.is_externally_aggregated() {
4425 let unsubscribe = UnsubscribeBars::new(
4426 composite_bar_type,
4427 cmd.client_id,
4428 cmd.venue,
4429 UUID4::new(),
4430 cmd.ts_init,
4431 Some(cmd.command_id),
4432 cmd.params.clone(),
4433 );
4434 self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Bars(
4435 unsubscribe,
4436 )));
4437 }
4438 } else if cmd.bar_type.spec().price_type == PriceType::Last {
4439 let unsubscribe = UnsubscribeTrades::new(
4440 cmd.bar_type.instrument_id(),
4441 cmd.client_id,
4442 cmd.venue,
4443 UUID4::new(),
4444 cmd.ts_init,
4445 Some(cmd.command_id),
4446 cmd.params.clone(),
4447 );
4448 self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Trades(
4449 unsubscribe,
4450 )));
4451 } else {
4452 let unsubscribe = UnsubscribeQuotes::new(
4453 cmd.bar_type.instrument_id(),
4454 cmd.client_id,
4455 cmd.venue,
4456 UUID4::new(),
4457 cmd.ts_init,
4458 Some(cmd.command_id),
4459 cmd.params.clone(),
4460 );
4461 self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(
4462 unsubscribe,
4463 )));
4464 }
4465 }
4466
4467 fn stop_bar_aggregator(
4468 &mut self,
4469 bar_type: BarType,
4470 request_id: Option<UUID4>,
4471 ) -> anyhow::Result<()> {
4472 let key = bar_aggregator_key(bar_type, request_id);
4473 let aggregator = self.bar_aggregators.shift_remove(&key).ok_or_else(|| {
4474 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
4475 })?;
4476
4477 aggregator.borrow_mut().stop();
4478
4479 if let Some(subs) = self.bar_aggregator_handlers.remove(&key) {
4481 for sub in subs {
4482 match sub {
4483 BarAggregatorSubscription::Bar { topic, handler } => {
4484 msgbus::unsubscribe_bars(topic.into(), &handler);
4485 }
4486 BarAggregatorSubscription::Trade { topic, handler } => {
4487 msgbus::unsubscribe_trades(topic.into(), &handler);
4488 }
4489 BarAggregatorSubscription::Quote { topic, handler } => {
4490 msgbus::unsubscribe_quotes(topic.into(), &handler);
4491 }
4492 }
4493 }
4494 }
4495
4496 Ok(())
4497 }
4498
4499 fn subscribe_continuous_future_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
4500 let target_bar_type = cmd.bar_type;
4501 let target_key = target_bar_type.standard();
4502
4503 if !target_bar_type.is_internally_aggregated() {
4504 anyhow::bail!(
4505 "Continuous future bar subscriptions require an internally aggregated target, was {target_bar_type}"
4506 );
4507 }
4508
4509 if self.continuous_future_roller.is_none() {
4510 anyhow::bail!(
4511 "Cannot subscribe continuous future bars for {target_bar_type}: roller is not initialized; ensure `register_msgbus_handlers` runs before subscribing"
4512 );
4513 }
4514
4515 let request = continuous_future_subscription_from_bars(cmd)?.ok_or_else(|| {
4516 anyhow::anyhow!(
4517 "Continuous future bar subscription requires `continuous_future_transitions`, was {cmd:?}"
4518 )
4519 })?;
4520
4521 self.ensure_continuous_future_target_instrument(&request);
4522
4523 if self
4524 .continuous_future_subscriptions
4525 .contains_key(&target_key)
4526 {
4527 log::warn!("Continuous future bars already subscribed for {target_bar_type}");
4528 return Ok(());
4529 }
4530
4531 let aggregator_key = bar_aggregator_key(target_bar_type, None);
4532 if let Some(aggregator) = self.bar_aggregators.get(&aggregator_key)
4533 && aggregator.borrow().is_running()
4534 {
4535 log::warn!(
4536 "Aggregator for {target_bar_type} is currently in use, continuous future subscription can't be started"
4537 );
4538 return Ok(());
4539 }
4540
4541 self.create_bar_aggregator_for_key(target_bar_type, None)?;
4542 self.setup_bar_aggregator(target_bar_type, false, None)?;
4543
4544 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
4545 let Some(segment) = request.next_segment(now_ns, now_ns) else {
4546 log::error!("Cannot determine active continuous future segment for {target_bar_type}");
4547 if let Err(e) = self.stop_bar_aggregator(target_bar_type, None) {
4548 log::error!(
4549 "Error rolling back continuous future aggregator for {target_bar_type}: {e}"
4550 );
4551 }
4552 return Ok(());
4553 };
4554
4555 self.apply_continuous_future_subscription_adjustment(&request, segment.index)?;
4556 let source = request.source_for_segment(segment.instrument_id);
4557 let source_subscription =
4558 self.subscribe_continuous_future_source(target_bar_type, source, segment.instrument_id);
4559
4560 if let Some(aggregator) = self.bar_aggregators.get(&aggregator_key) {
4561 aggregator.borrow_mut().set_is_running(true);
4562 }
4563
4564 let next_transition_index =
4565 (segment.index < request.transitions.len()).then_some(segment.index);
4566
4567 self.continuous_future_subscriptions.insert(
4568 target_key,
4569 ContinuousFutureSubscriptionState {
4570 target_bar_type,
4571 client_id: cmd.client_id,
4572 venue: cmd.venue,
4573 command_id: cmd.command_id,
4574 params: cmd.params.clone(),
4575 request,
4576 active_segment_instrument_id: segment.instrument_id,
4577 active_source: source,
4578 active_source_subscription: Some(source_subscription),
4579 next_transition_index,
4580 timer_name: None,
4581 },
4582 );
4583
4584 let child_cmd = self.build_continuous_future_subscribe_command(
4585 &target_key,
4586 source,
4587 segment.instrument_id,
4588 cmd.command_id,
4589 cmd.ts_init,
4590 true,
4591 );
4592
4593 if let Some(child) = child_cmd {
4594 self.execute(child);
4595 }
4596
4597 self.schedule_continuous_future_transition(target_key);
4598
4599 Ok(())
4600 }
4601
4602 fn unsubscribe_continuous_future_bars(&mut self, cmd: &UnsubscribeBars) {
4603 let target_key = cmd.bar_type.standard();
4604 let Some(mut state) = self.continuous_future_subscriptions.remove(&target_key) else {
4605 log::warn!(
4606 "Cannot unsubscribe continuous future bars: no subscription state for {target_key}"
4607 );
4608 return;
4609 };
4610
4611 if let Some(name) = state.timer_name.take() {
4612 self.clock.borrow_mut().cancel_timer(&name);
4613 }
4614
4615 let ts_init = self.clock.borrow().timestamp_ns();
4616 let segment_instrument_id = state.active_segment_instrument_id;
4617 let source = state.active_source;
4618 let source_subscription = state.active_source_subscription.take();
4619 let client_id = state.client_id;
4620 let venue = state.venue;
4621 let params = state.params.clone();
4622 let target_bar_type = state.target_bar_type;
4623 drop(state);
4624
4625 if let Some(subscription) = source_subscription {
4626 self.unsubscribe_continuous_future_source(target_bar_type, subscription);
4627 }
4628
4629 let child_cmd = build_continuous_future_unsubscribe_command(
4630 source,
4631 segment_instrument_id,
4632 client_id,
4633 venue,
4634 params.as_ref(),
4635 cmd.command_id,
4636 ts_init,
4637 );
4638 self.execute(child_cmd);
4639
4640 if let Err(e) = self.stop_bar_aggregator(target_bar_type, None) {
4641 log::error!("Error stopping continuous future aggregator for {target_bar_type}: {e}");
4642 }
4643 }
4644
4645 fn handle_continuous_future_subscription_transition(&mut self, event: &TimeEvent) {
4646 let event_name = event.name.as_str();
4647 let Some((target_key, transition_index)) = parse_transition_timer_name(event_name) else {
4648 log::warn!(
4649 "Ignoring continuous future transition event with unparsable name {event_name}"
4650 );
4651 return;
4652 };
4653
4654 let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) else {
4655 log::warn!(
4656 "Ignoring continuous future transition event {event_name}: no subscription state for {target_key}"
4657 );
4658 return;
4659 };
4660
4661 if state.timer_name.as_deref() != Some(event_name) {
4662 return;
4663 }
4664 state.timer_name = None;
4665
4666 let Some(next_index) = state.next_transition_index else {
4667 return;
4668 };
4669
4670 if next_index != transition_index || next_index >= state.request.transitions.len() {
4671 return;
4672 }
4673
4674 let prev_segment_instrument_id = state.active_segment_instrument_id;
4675 let next_segment_instrument_id = state.request.transitions[next_index].post_instrument_id;
4676 let new_segment_index = next_index + 1;
4677 state.active_segment_instrument_id = next_segment_instrument_id;
4678 state.next_transition_index =
4679 (new_segment_index < state.request.transitions.len()).then_some(new_segment_index);
4680
4681 let old_source = state.active_source;
4682 let old_source_subscription = state.active_source_subscription.take();
4683 let client_id = state.client_id;
4684 let venue = state.venue;
4685 let params = state.params.clone();
4686 let command_id = state.command_id;
4687 let target_bar_type = state.target_bar_type;
4688
4689 let ts_init = self.clock.borrow().timestamp_ns();
4690
4691 if let Some(subscription) = old_source_subscription {
4692 self.unsubscribe_continuous_future_source(target_bar_type, subscription);
4693 }
4694
4695 let unsub_child = build_continuous_future_unsubscribe_command(
4696 old_source,
4697 prev_segment_instrument_id,
4698 client_id,
4699 venue,
4700 params.as_ref(),
4701 command_id,
4702 ts_init,
4703 );
4704 self.execute(unsub_child);
4705
4706 if let Err(e) = self
4707 .apply_continuous_future_subscription_adjustment_for(target_bar_type, new_segment_index)
4708 {
4709 log::error!("Error applying continuous future adjustment for {target_bar_type}: {e}");
4710 return;
4711 }
4712
4713 let new_source = {
4714 let Some(state) = self.continuous_future_subscriptions.get(&target_key) else {
4715 return;
4716 };
4717 state.request.source_for_segment(next_segment_instrument_id)
4718 };
4719 let new_subscription = self.subscribe_continuous_future_source(
4720 target_bar_type,
4721 new_source,
4722 next_segment_instrument_id,
4723 );
4724
4725 if let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) {
4726 state.active_source = new_source;
4727 state.active_source_subscription = Some(new_subscription);
4728 }
4729
4730 let sub_child = self.build_continuous_future_subscribe_command(
4731 &target_key,
4732 new_source,
4733 next_segment_instrument_id,
4734 command_id,
4735 ts_init,
4736 true,
4737 );
4738
4739 if let Some(child) = sub_child {
4740 self.execute(child);
4741 }
4742
4743 self.schedule_continuous_future_transition(target_key);
4744 }
4745
4746 fn apply_continuous_future_subscription_adjustment(
4747 &self,
4748 request: &ContinuousFutureRequest,
4749 segment_index: usize,
4750 ) -> anyhow::Result<()> {
4751 let key = bar_aggregator_key(request.primary_bar_type, None);
4752 let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
4753 anyhow::anyhow!(
4754 "No live aggregator for continuous future subscription {}",
4755 request.primary_bar_type
4756 )
4757 })?;
4758 let adjustment = request.adjustment_for_segment(segment_index);
4759 aggregator
4760 .borrow_mut()
4761 .set_adjustment(adjustment, request.adjustment_mode);
4762 Ok(())
4763 }
4764
4765 fn apply_continuous_future_subscription_adjustment_for(
4766 &self,
4767 target_bar_type: BarType,
4768 segment_index: usize,
4769 ) -> anyhow::Result<()> {
4770 let Some(state) = self
4771 .continuous_future_subscriptions
4772 .get(&target_bar_type.standard())
4773 else {
4774 anyhow::bail!("No continuous future subscription state for {target_bar_type}");
4775 };
4776 self.apply_continuous_future_subscription_adjustment(&state.request, segment_index)
4777 }
4778
4779 fn subscribe_continuous_future_source(
4780 &mut self,
4781 target_bar_type: BarType,
4782 source: ContinuousFutureSource,
4783 segment_instrument_id: InstrumentId,
4784 ) -> BarAggregatorSubscription {
4785 let key = bar_aggregator_key(target_bar_type, None);
4786 let aggregator = self
4787 .bar_aggregators
4788 .get(&key)
4789 .cloned()
4790 .expect("aggregator was created before subscribe_continuous_future_source");
4791
4792 let subscription = match source {
4793 ContinuousFutureSource::Bars(source_bar_type) => {
4794 let topic = switchboard::get_bars_topic(source_bar_type);
4795 let handler =
4796 TypedHandler::new(BarBarHandler::new(&aggregator, target_bar_type.standard()));
4797 msgbus::subscribe_bars(topic.into(), handler.clone(), None);
4798 BarAggregatorSubscription::Bar { topic, handler }
4799 }
4800 ContinuousFutureSource::Trades => {
4801 let topic = switchboard::get_trades_topic(segment_instrument_id);
4802 let handler = TypedHandler::new(BarTradeHandler::new(
4803 &aggregator,
4804 target_bar_type.standard(),
4805 ));
4806 msgbus::subscribe_trades(
4807 topic.into(),
4808 handler.clone(),
4809 Some(BAR_AGGREGATOR_PRIORITY),
4810 );
4811 BarAggregatorSubscription::Trade { topic, handler }
4812 }
4813 ContinuousFutureSource::Quotes => {
4814 let topic = switchboard::get_quotes_topic(segment_instrument_id);
4815 let handler = TypedHandler::new(BarQuoteHandler::new(
4816 &aggregator,
4817 target_bar_type.standard(),
4818 ));
4819 msgbus::subscribe_quotes(
4820 topic.into(),
4821 handler.clone(),
4822 Some(BAR_AGGREGATOR_PRIORITY),
4823 );
4824 BarAggregatorSubscription::Quote { topic, handler }
4825 }
4826 };
4827
4828 self.bar_aggregator_handlers
4829 .entry(key)
4830 .or_default()
4831 .push(subscription.clone());
4832
4833 subscription
4834 }
4835
4836 fn unsubscribe_continuous_future_source(
4837 &mut self,
4838 target_bar_type: BarType,
4839 subscription: BarAggregatorSubscription,
4840 ) {
4841 let key = bar_aggregator_key(target_bar_type, None);
4842 if let Some(subs) = self.bar_aggregator_handlers.get_mut(&key) {
4843 subs.retain(|registered| !same_subscription(registered, &subscription));
4844 }
4845
4846 match subscription {
4847 BarAggregatorSubscription::Bar { topic, handler } => {
4848 msgbus::unsubscribe_bars(topic.into(), &handler);
4849 }
4850 BarAggregatorSubscription::Trade { topic, handler } => {
4851 msgbus::unsubscribe_trades(topic.into(), &handler);
4852 }
4853 BarAggregatorSubscription::Quote { topic, handler } => {
4854 msgbus::unsubscribe_quotes(topic.into(), &handler);
4855 }
4856 }
4857 }
4858
4859 fn build_continuous_future_subscribe_command(
4860 &self,
4861 target_key: &BarType,
4862 source: ContinuousFutureSource,
4863 segment_instrument_id: InstrumentId,
4864 command_id: UUID4,
4865 ts_init: UnixNanos,
4866 subscribe: bool,
4867 ) -> Option<DataCommand> {
4868 let state = self.continuous_future_subscriptions.get(target_key)?;
4869
4870 if !subscribe {
4871 return Some(build_continuous_future_unsubscribe_command(
4872 source,
4873 segment_instrument_id,
4874 state.client_id,
4875 state.venue,
4876 state.params.as_ref(),
4877 command_id,
4878 ts_init,
4879 ));
4880 }
4881
4882 let child_params = state
4883 .request
4884 .child_params(state.params.as_ref(), command_id);
4885
4886 Some(build_continuous_future_subscribe_inner(
4887 source,
4888 segment_instrument_id,
4889 state.client_id,
4890 state.venue,
4891 child_params,
4892 command_id,
4893 ts_init,
4894 ))
4895 }
4896
4897 fn schedule_continuous_future_transition(&mut self, target_key: BarType) {
4898 let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) else {
4899 return;
4900 };
4901
4902 if let Some(name) = state.timer_name.take() {
4903 self.clock.borrow_mut().cancel_timer(&name);
4904 }
4905
4906 let Some(transition_index) = state.next_transition_index else {
4907 return;
4908 };
4909 let Some(row) = state.request.transitions.get(transition_index) else {
4910 return;
4911 };
4912 let transition_ns = row.transition_time_ns;
4913 let timer_name = format!("continuous-future-roll:{target_key}:{transition_index}");
4914
4915 let Some(roller) = self.continuous_future_roller.clone() else {
4916 log::error!(
4917 "Cannot schedule continuous future transition timer for {target_key}: roller not initialized"
4918 );
4919 return;
4920 };
4921
4922 let callback_fn: Rc<dyn Fn(TimeEvent)> =
4923 Rc::new(move |event| roller.handle_transition(&event));
4924 let callback = TimeEventCallback::from(callback_fn);
4925
4926 if let Err(e) = self.clock.borrow_mut().set_time_alert_ns(
4927 &timer_name,
4928 UnixNanos::from(transition_ns),
4929 Some(callback),
4930 Some(true),
4931 ) {
4932 log::error!("Failed to schedule continuous future transition {timer_name}: {e}");
4933 return;
4934 }
4935
4936 if let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) {
4937 state.timer_name = Some(timer_name);
4938 }
4939 }
4940}
4941
4942fn resolve_parent_components(
4950 instrument_id: &InstrumentId,
4951 params: Option<&Params>,
4952) -> anyhow::Result<Option<(Ustr, InstrumentClass)>> {
4953 if !is_parent_subscription(params) {
4954 return Ok(None);
4955 }
4956 let Some((root, class)) = instrument_id.parse_parent_components() else {
4957 anyhow::bail!(
4958 "Cannot expand parent subscription for {instrument_id}: \
4959 symbol does not parse as `<root>.<class>` with a recognised class suffix"
4960 );
4961 };
4962 Ok(Some((Ustr::from(root), class)))
4963}
4964
4965fn spread_quote_update_interval_seconds(params: Option<&Params>) -> Option<u64> {
4966 match params.and_then(|params| params.get("update_interval_seconds")) {
4967 Some(value) if value.is_null() => None,
4968 Some(value) => value.as_u64().filter(|interval| *interval > 0),
4969 None => Some(1),
4970 }
4971}
4972
4973const GENERIC_SPREAD_ID_SEPARATOR: &str = "___";
4974
4975fn spread_instrument_legs(instrument: &InstrumentAny) -> Option<Vec<(InstrumentId, i64)>> {
4976 if !instrument.is_spread() {
4977 return None;
4978 }
4979
4980 let instrument_id = instrument.id();
4981 let symbol = instrument_id.symbol.as_str();
4982 if !symbol.contains(GENERIC_SPREAD_ID_SEPARATOR) {
4983 return Some(vec![(instrument_id, 1)]);
4984 }
4985
4986 symbol
4987 .split(GENERIC_SPREAD_ID_SEPARATOR)
4988 .map(|component| parse_spread_leg(component, instrument_id.venue))
4989 .collect()
4990}
4991
4992fn parse_spread_leg(component: &str, venue: Venue) -> Option<(InstrumentId, i64)> {
4993 if let Some(rest) = component.strip_prefix("((") {
4994 let (ratio, symbol) = rest.split_once("))")?;
4995 return parse_spread_leg_parts(ratio, symbol, venue, -1);
4996 }
4997
4998 let rest = component.strip_prefix('(')?;
4999 let (ratio, symbol) = rest.split_once(')')?;
5000 parse_spread_leg_parts(ratio, symbol, venue, 1)
5001}
5002
5003fn parse_spread_leg_parts(
5004 ratio: &str,
5005 symbol: &str,
5006 venue: Venue,
5007 sign: i64,
5008) -> Option<(InstrumentId, i64)> {
5009 if symbol.is_empty() {
5010 return None;
5011 }
5012
5013 let ratio = ratio.parse::<i64>().ok()?.checked_mul(sign)?;
5014 if ratio == 0 {
5015 return None;
5016 }
5017
5018 Some((InstrumentId::new(Symbol::new(symbol), venue), ratio))
5019}
5020
5021#[inline(always)]
5022fn log_error_on_cache_insert<T: Display>(e: &T) {
5023 log::error!("Error on cache insert: {e}");
5024}
5025
5026#[derive(Debug)]
5032struct ContinuousFutureRoller {
5033 engine: WeakCell<DataEngine>,
5034}
5035
5036impl ContinuousFutureRoller {
5037 fn new(engine: &Rc<RefCell<DataEngine>>) -> Self {
5038 Self {
5039 engine: WeakCell::from(Rc::downgrade(engine)),
5040 }
5041 }
5042
5043 fn handle_transition(&self, event: &TimeEvent) {
5044 if let Some(engine) = self.engine.upgrade() {
5045 engine
5046 .borrow_mut()
5047 .handle_continuous_future_subscription_transition(event);
5048 }
5049 }
5050}
5051
5052#[derive(Debug)]
5053struct ContinuousFutureSubscriptionState {
5054 target_bar_type: BarType,
5055 client_id: Option<ClientId>,
5056 venue: Option<Venue>,
5057 command_id: UUID4,
5058 params: Option<Params>,
5059 request: ContinuousFutureRequest,
5060 active_segment_instrument_id: InstrumentId,
5061 active_source: ContinuousFutureSource,
5062 active_source_subscription: Option<BarAggregatorSubscription>,
5063 next_transition_index: Option<usize>,
5064 timer_name: Option<String>,
5065}
5066
5067fn same_subscription(a: &BarAggregatorSubscription, b: &BarAggregatorSubscription) -> bool {
5068 match (a, b) {
5069 (
5070 BarAggregatorSubscription::Bar { handler: h1, .. },
5071 BarAggregatorSubscription::Bar { handler: h2, .. },
5072 ) => h1.id() == h2.id(),
5073 (
5074 BarAggregatorSubscription::Trade { handler: h1, .. },
5075 BarAggregatorSubscription::Trade { handler: h2, .. },
5076 ) => h1.id() == h2.id(),
5077 (
5078 BarAggregatorSubscription::Quote { handler: h1, .. },
5079 BarAggregatorSubscription::Quote { handler: h2, .. },
5080 ) => h1.id() == h2.id(),
5081 _ => false,
5082 }
5083}
5084
5085fn parse_transition_timer_name(name: &str) -> Option<(BarType, usize)> {
5086 let rest = name.strip_prefix("continuous-future-roll:")?;
5087 let (target, index) = rest.rsplit_once(':')?;
5088 let bar_type = BarType::from_str(target).ok()?;
5089 let index = index.parse::<usize>().ok()?;
5090 Some((bar_type, index))
5091}
5092
5093fn build_continuous_future_subscribe_inner(
5094 source: ContinuousFutureSource,
5095 segment_instrument_id: InstrumentId,
5096 client_id: Option<ClientId>,
5097 _venue: Option<Venue>,
5098 child_params: Params,
5099 correlation_id: UUID4,
5100 ts_init: UnixNanos,
5101) -> DataCommand {
5102 let command_id = UUID4::new();
5103 let child_venue = Some(segment_instrument_id.venue);
5104
5105 match source {
5106 ContinuousFutureSource::Bars(source_bar_type) => {
5107 DataCommand::Subscribe(SubscribeCommand::Bars(SubscribeBars::new(
5108 source_bar_type,
5109 client_id,
5110 child_venue,
5111 command_id,
5112 ts_init,
5113 Some(correlation_id),
5114 Some(child_params),
5115 )))
5116 }
5117 ContinuousFutureSource::Trades => {
5118 DataCommand::Subscribe(SubscribeCommand::Trades(SubscribeTrades::new(
5119 segment_instrument_id,
5120 client_id,
5121 child_venue,
5122 command_id,
5123 ts_init,
5124 Some(correlation_id),
5125 Some(child_params),
5126 )))
5127 }
5128 ContinuousFutureSource::Quotes => {
5129 DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
5130 segment_instrument_id,
5131 client_id,
5132 child_venue,
5133 command_id,
5134 ts_init,
5135 Some(correlation_id),
5136 Some(child_params),
5137 )))
5138 }
5139 }
5140}
5141
5142fn build_continuous_future_unsubscribe_command(
5143 source: ContinuousFutureSource,
5144 segment_instrument_id: InstrumentId,
5145 client_id: Option<ClientId>,
5146 _venue: Option<Venue>,
5147 parent_params: Option<&Params>,
5148 correlation_id: UUID4,
5149 ts_init: UnixNanos,
5150) -> DataCommand {
5151 let mut child_params = parent_params.cloned().unwrap_or_default();
5152 child_params.shift_remove("continuous_future_transitions");
5153 child_params.shift_remove("continuous_future_adjustment_mode");
5154 child_params.shift_remove("last_post_instrument_id");
5155 child_params.shift_remove("first_pre_instrument_id");
5156 child_params.shift_remove("bar_types");
5157 let command_id = UUID4::new();
5158 let child_venue = Some(segment_instrument_id.venue);
5159
5160 match source {
5161 ContinuousFutureSource::Bars(source_bar_type) => {
5162 DataCommand::Unsubscribe(UnsubscribeCommand::Bars(UnsubscribeBars::new(
5163 source_bar_type,
5164 client_id,
5165 child_venue,
5166 command_id,
5167 ts_init,
5168 Some(correlation_id),
5169 Some(child_params),
5170 )))
5171 }
5172 ContinuousFutureSource::Trades => {
5173 DataCommand::Unsubscribe(UnsubscribeCommand::Trades(UnsubscribeTrades::new(
5174 segment_instrument_id,
5175 client_id,
5176 child_venue,
5177 command_id,
5178 ts_init,
5179 Some(correlation_id),
5180 Some(child_params),
5181 )))
5182 }
5183 ContinuousFutureSource::Quotes => {
5184 DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
5185 segment_instrument_id,
5186 client_id,
5187 child_venue,
5188 command_id,
5189 ts_init,
5190 Some(correlation_id),
5191 Some(child_params),
5192 )))
5193 }
5194 }
5195}
5196
5197fn datetime_to_unix_nanos(datetime: chrono::DateTime<chrono::Utc>) -> anyhow::Result<UnixNanos> {
5198 let timestamp = datetime
5199 .timestamp_nanos_opt()
5200 .ok_or_else(|| anyhow::anyhow!("datetime is outside the supported nanosecond range"))?;
5201 let timestamp = u64::try_from(timestamp)
5202 .context("datetime is before the UNIX epoch and cannot be represented as UnixNanos")?;
5203 Ok(UnixNanos::from(timestamp))
5204}
5205
5206fn derive_quote_from_depth(depth: &OrderBookDepth10) -> Option<QuoteTick> {
5209 let bid = depth.bids.first()?;
5210 let ask = depth.asks.first()?;
5211
5212 if bid.side == OrderSide::NoOrderSide
5213 || ask.side == OrderSide::NoOrderSide
5214 || bid.size.raw == 0
5215 || ask.size.raw == 0
5216 {
5217 return None;
5218 }
5219
5220 Some(QuoteTick::new(
5221 depth.instrument_id,
5222 bid.price,
5223 ask.price,
5224 bid.size,
5225 ask.size,
5226 depth.ts_event,
5227 depth.ts_init,
5228 ))
5229}
5230
5231fn process_engine_bar(
5235 cache: &Rc<RefCell<Cache>>,
5236 validate_sequence: bool,
5237 publish: bool,
5238 bar: Bar,
5239) {
5240 if !validate_bar_sequence(cache, validate_sequence, &bar) {
5241 return;
5242 }
5243
5244 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
5245 log_error_on_cache_insert(&e);
5246 }
5247
5248 if publish {
5249 let topic = switchboard::get_bars_topic(bar.bar_type);
5250 msgbus::publish_bar(topic, &bar);
5251 }
5252}
5253
5254fn validate_bar_sequence(cache: &Rc<RefCell<Cache>>, validate_sequence: bool, bar: &Bar) -> bool {
5255 if !validate_sequence {
5256 return true;
5257 }
5258
5259 let Some(last_bar) = cache.as_ref().borrow().bar(&bar.bar_type).copied() else {
5260 return true;
5261 };
5262
5263 if bar.ts_event < last_bar.ts_event {
5264 log::warn!(
5265 "Bar {bar} was prior to last bar `ts_event` {}",
5266 last_bar.ts_event,
5267 );
5268 return false;
5269 }
5270
5271 if bar.ts_init < last_bar.ts_init {
5272 log::warn!(
5273 "Bar {bar} was prior to last bar `ts_init` {}",
5274 last_bar.ts_init,
5275 );
5276 return false;
5277 }
5278
5279 true
5282}
5283
5284#[inline(always)]
5285fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
5286 if data.is_empty() {
5287 let name = type_name::<T>();
5288 let short_name = name.rsplit("::").next().unwrap_or(name);
5289 log::warn!("Received empty {short_name} response for {id} {correlation_id}");
5290 return true;
5291 }
5292 false
5293}
5294
5295fn rebuild_pipeline_response(
5301 parent_id: UUID4,
5302 parent: Option<&RequestCommand>,
5303 legs: Vec<DataResponse>,
5304) -> Option<DataResponse> {
5305 if legs.is_empty() {
5306 return None;
5307 }
5308
5309 let (parent_start, parent_end) = parent_request_window(parent);
5310
5311 let mut iter = legs.into_iter();
5312 let first = iter.next()?;
5313
5314 match first {
5315 DataResponse::Data(mut acc) => {
5316 let mut data = custom_response_data(&acc, parent_id)?;
5317
5318 for leg in iter {
5319 let DataResponse::Data(other) = leg else {
5320 log::error!("Mixed-variant legs in pipeline {parent_id}");
5321 return None;
5322 };
5323 data.extend(custom_response_data(&other, parent_id)?);
5324 }
5325
5326 data.sort_by_key(CustomData::ts_init);
5327 acc.data = std::sync::Arc::new(data);
5328 acc.correlation_id = parent_id;
5329 if parent_start.is_some() {
5330 acc.start = parent_start;
5331 }
5332
5333 if parent_end.is_some() {
5334 acc.end = parent_end;
5335 }
5336 Some(DataResponse::Data(acc))
5337 }
5338 DataResponse::Quotes(mut acc) => {
5339 for leg in iter {
5340 let DataResponse::Quotes(other) = leg else {
5341 log::error!("Mixed-variant legs in pipeline {parent_id}");
5342 return None;
5343 };
5344 acc.data.extend(other.data);
5345 }
5346 acc.data.sort_by_key(|q| q.ts_init);
5347 acc.correlation_id = parent_id;
5348 if parent_start.is_some() {
5349 acc.start = parent_start;
5350 }
5351
5352 if parent_end.is_some() {
5353 acc.end = parent_end;
5354 }
5355 Some(DataResponse::Quotes(acc))
5356 }
5357 DataResponse::Trades(mut acc) => {
5358 for leg in iter {
5359 let DataResponse::Trades(other) = leg else {
5360 log::error!("Mixed-variant legs in pipeline {parent_id}");
5361 return None;
5362 };
5363 acc.data.extend(other.data);
5364 }
5365 acc.data.sort_by_key(|t| t.ts_init);
5366 acc.correlation_id = parent_id;
5367 if parent_start.is_some() {
5368 acc.start = parent_start;
5369 }
5370
5371 if parent_end.is_some() {
5372 acc.end = parent_end;
5373 }
5374 Some(DataResponse::Trades(acc))
5375 }
5376 DataResponse::FundingRates(mut acc) => {
5377 for leg in iter {
5378 let DataResponse::FundingRates(other) = leg else {
5379 log::error!("Mixed-variant legs in pipeline {parent_id}");
5380 return None;
5381 };
5382 acc.data.extend(other.data);
5383 }
5384 acc.data.sort_by_key(|r| r.ts_init);
5385 acc.correlation_id = parent_id;
5386 if parent_start.is_some() {
5387 acc.start = parent_start;
5388 }
5389
5390 if parent_end.is_some() {
5391 acc.end = parent_end;
5392 }
5393 Some(DataResponse::FundingRates(acc))
5394 }
5395 DataResponse::Bars(mut acc) => {
5396 for leg in iter {
5397 let DataResponse::Bars(other) = leg else {
5398 log::error!("Mixed-variant legs in pipeline {parent_id}");
5399 return None;
5400 };
5401 acc.data.extend(other.data);
5402 }
5403 acc.data.sort_by_key(|b| b.ts_init);
5404 acc.correlation_id = parent_id;
5405 if parent_start.is_some() {
5406 acc.start = parent_start;
5407 }
5408
5409 if parent_end.is_some() {
5410 acc.end = parent_end;
5411 }
5412 Some(DataResponse::Bars(acc))
5413 }
5414 DataResponse::Instruments(mut acc) => {
5415 for leg in iter {
5416 let DataResponse::Instruments(other) = leg else {
5417 log::error!("Mixed-variant legs in pipeline {parent_id}");
5418 return None;
5419 };
5420 acc.data.extend(other.data);
5421 }
5422 acc.correlation_id = parent_id;
5423 Some(DataResponse::Instruments(acc))
5424 }
5425 DataResponse::BookDeltas(mut acc) => {
5426 for leg in iter {
5427 let DataResponse::BookDeltas(other) = leg else {
5428 log::error!("Mixed-variant legs in pipeline {parent_id}");
5429 return None;
5430 };
5431 acc.data.extend(other.data);
5432 }
5433 acc.data.sort_by_key(|d| d.ts_init);
5434 acc.correlation_id = parent_id;
5435 if parent_start.is_some() {
5436 acc.start = parent_start;
5437 }
5438
5439 if parent_end.is_some() {
5440 acc.end = parent_end;
5441 }
5442 Some(DataResponse::BookDeltas(acc))
5443 }
5444 DataResponse::BookDepth(mut acc) => {
5445 for leg in iter {
5446 let DataResponse::BookDepth(other) = leg else {
5447 log::error!("Mixed-variant legs in pipeline {parent_id}");
5448 return None;
5449 };
5450 acc.data.extend(other.data);
5451 }
5452 acc.data.sort_by_key(|d| d.ts_init);
5453 acc.correlation_id = parent_id;
5454 if parent_start.is_some() {
5455 acc.start = parent_start;
5456 }
5457
5458 if parent_end.is_some() {
5459 acc.end = parent_end;
5460 }
5461 Some(DataResponse::BookDepth(acc))
5462 }
5463 other => {
5464 log::error!(
5469 "Pipeline rebuild not supported for variant {} (parent {parent_id})",
5470 other.kind(),
5471 );
5472 None
5473 }
5474 }
5475}
5476
5477fn custom_response_data(resp: &CustomDataResponse, parent_id: UUID4) -> Option<Vec<CustomData>> {
5478 if let Some(data) = resp.data.as_ref().downcast_ref::<Vec<CustomData>>() {
5479 return Some(data.clone());
5480 }
5481
5482 if let Some(data) = resp.data.as_ref().downcast_ref::<CustomData>() {
5483 return Some(vec![data.clone()]);
5484 }
5485
5486 if let Some(data) = resp.data.as_ref().downcast_ref::<Vec<Data>>() {
5487 let mut custom = Vec::with_capacity(data.len());
5488 for item in data {
5489 let Data::Custom(value) = item else {
5490 log::error!("Custom data pipeline {parent_id} received non-custom data {item:?}");
5491 return None;
5492 };
5493 custom.push(value.clone());
5494 }
5495 return Some(custom);
5496 }
5497
5498 log::error!(
5499 "Custom data pipeline {parent_id} received unsupported payload for {}",
5500 resp.data_type,
5501 );
5502 None
5503}
5504
5505fn parent_request_window(
5506 parent: Option<&RequestCommand>,
5507) -> (Option<UnixNanos>, Option<UnixNanos>) {
5508 let Some(parent) = parent else {
5509 return (None, None);
5510 };
5511
5512 let (start, end) = match parent {
5513 RequestCommand::Data(cmd) => (cmd.start, cmd.end),
5514 RequestCommand::Instrument(cmd) => (cmd.start, cmd.end),
5515 RequestCommand::Instruments(cmd) => (cmd.start, cmd.end),
5516 RequestCommand::BookDeltas(cmd) => (cmd.start, cmd.end),
5517 RequestCommand::BookDepth(cmd) => (cmd.start, cmd.end),
5518 RequestCommand::Quotes(cmd) => (cmd.start, cmd.end),
5519 RequestCommand::Trades(cmd) => (cmd.start, cmd.end),
5520 RequestCommand::FundingRates(cmd) => (cmd.start, cmd.end),
5521 RequestCommand::Bars(cmd) => (cmd.start, cmd.end),
5522 RequestCommand::Join(cmd) => (cmd.start, cmd.end),
5523 RequestCommand::BookSnapshot(_) | RequestCommand::ForwardPrices(_) => return (None, None),
5524 };
5525
5526 (
5527 start.map(datetime_to_unix_nanos_or_zero),
5528 end.map(datetime_to_unix_nanos_or_zero),
5529 )
5530}
5531
5532fn datetime_to_unix_nanos_or_zero(dt: chrono::DateTime<chrono::Utc>) -> UnixNanos {
5533 UnixNanos::from(u64::try_from(dt.timestamp_nanos_opt().unwrap_or(0).max(0)).unwrap_or(0))
5534}
5535
5536fn empty_response_like(
5537 template: &DataResponse,
5538 correlation_id: UUID4,
5539 ts_init: UnixNanos,
5540) -> DataResponse {
5541 match template {
5542 DataResponse::Quotes(r) => DataResponse::Quotes(QuotesResponse::new(
5543 correlation_id,
5544 r.client_id,
5545 r.instrument_id,
5546 Vec::new(),
5547 r.start,
5548 r.end,
5549 ts_init,
5550 r.params.clone(),
5551 )),
5552 DataResponse::Trades(r) => DataResponse::Trades(TradesResponse::new(
5553 correlation_id,
5554 r.client_id,
5555 r.instrument_id,
5556 Vec::new(),
5557 r.start,
5558 r.end,
5559 ts_init,
5560 r.params.clone(),
5561 )),
5562 DataResponse::FundingRates(r) => DataResponse::FundingRates(FundingRatesResponse::new(
5563 correlation_id,
5564 r.client_id,
5565 r.instrument_id,
5566 Vec::new(),
5567 r.start,
5568 r.end,
5569 ts_init,
5570 r.params.clone(),
5571 )),
5572 DataResponse::Bars(r) => DataResponse::Bars(BarsResponse::new(
5573 correlation_id,
5574 r.client_id,
5575 r.bar_type,
5576 Vec::new(),
5577 r.start,
5578 r.end,
5579 ts_init,
5580 r.params.clone(),
5581 )),
5582 DataResponse::BookDeltas(r) => DataResponse::BookDeltas(BookDeltasResponse::new(
5583 correlation_id,
5584 r.client_id,
5585 r.instrument_id,
5586 Vec::new(),
5587 r.start,
5588 r.end,
5589 ts_init,
5590 r.params.clone(),
5591 )),
5592 DataResponse::BookDepth(r) => DataResponse::BookDepth(BookDepthResponse::new(
5593 correlation_id,
5594 r.client_id,
5595 r.instrument_id,
5596 Vec::new(),
5597 r.start,
5598 r.end,
5599 ts_init,
5600 r.params.clone(),
5601 )),
5602 other => {
5603 log::error!(
5604 "Cannot fabricate empty leg response for variant {}",
5605 other.kind(),
5606 );
5607 other.clone()
5608 }
5609 }
5610}
5611
5612fn rebind_response_correlation(mut resp: DataResponse, new_id: UUID4) -> DataResponse {
5613 match &mut resp {
5614 DataResponse::Data(r) => r.correlation_id = new_id,
5615 DataResponse::Instrument(r) => r.correlation_id = new_id,
5616 DataResponse::Instruments(r) => r.correlation_id = new_id,
5617 DataResponse::Book(r) => r.correlation_id = new_id,
5618 DataResponse::BookDeltas(r) => r.correlation_id = new_id,
5619 DataResponse::BookDepth(r) => r.correlation_id = new_id,
5620 DataResponse::Quotes(r) => r.correlation_id = new_id,
5621 DataResponse::Trades(r) => r.correlation_id = new_id,
5622 DataResponse::FundingRates(r) => r.correlation_id = new_id,
5623 DataResponse::ForwardPrices(r) => r.correlation_id = new_id,
5624 DataResponse::Bars(r) => r.correlation_id = new_id,
5625 }
5626 resp
5627}