Skip to main content

nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod bar;
32pub mod book;
33mod commands;
34pub mod config;
35mod handlers;
36
37#[cfg(feature = "defi")]
38pub mod pool;
39
40#[cfg(feature = "streaming")]
41mod streaming;
42
43use std::{
44    any::{Any, type_name},
45    cell::{Ref, RefCell},
46    collections::VecDeque,
47    fmt::{Debug, Display},
48    num::NonZeroUsize,
49    rc::Rc,
50};
51
52use ahash::{AHashMap, AHashSet};
53pub use bar::BarAggregatorSubscription;
54use bar::{BarAggregatorKey, bar_aggregator_key};
55use book::{
56    BookSnapshotInfo, BookSnapshotInfos, BookSnapshotKey, BookSnapshotUnsubscribeResult,
57    BookSnapshotter, BookUpdater,
58};
59pub(crate) use commands::{DeferredCommand, DeferredCommandQueue};
60use config::DataEngineConfig;
61use futures::future::join_all;
62use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler, SpreadQuoteHandler};
63use indexmap::IndexMap;
64use nautilus_common::{
65    cache::Cache,
66    clock::Clock,
67    logging::{RECV, RES},
68    messages::data::{
69        DataCommand, DataResponse, ForwardPricesResponse, RequestCommand, RequestForwardPrices,
70        SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
71        SubscribeCommand, SubscribeOptionChain, SubscribeQuotes, UnsubscribeBars,
72        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
73        UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionChain,
74        UnsubscribeOptionGreeks, UnsubscribeQuotes, is_parent_subscription,
75    },
76    msgbus::{
77        self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
78        switchboard::{self, MessagingSwitchboard},
79    },
80    runner::get_data_cmd_sender,
81    timer::{TimeEvent, TimeEventCallback},
82};
83use nautilus_core::{
84    Params, UUID4, WeakCell,
85    correctness::{
86        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
87    },
88    datetime::millis_to_nanos_unchecked,
89};
90#[cfg(feature = "defi")]
91use nautilus_model::defi::DefiData;
92use nautilus_model::{
93    data::{
94        Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate,
95        InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
96        OrderBookDepth10, QuoteTick, TradeTick,
97        option_chain::{OptionGreeks, StrikeRange},
98    },
99    enums::{
100        AggregationSource, BarAggregation, BookType, InstrumentClass, MarketStatusAction,
101        OrderSide, PriceType, RecordFlag,
102    },
103    identifiers::{ClientId, InstrumentId, OptionSeriesId, Symbol, Venue},
104    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
105    orderbook::OrderBook,
106    types::{Price, Quantity},
107};
108#[cfg(feature = "streaming")]
109use streaming::CatalogMap;
110use ustr::Ustr;
111
112#[cfg(feature = "defi")]
113#[allow(unused_imports)] // Brings DeFi impl blocks into scope
114use crate::defi::engine as _;
115#[cfg(feature = "defi")]
116use crate::engine::pool::PoolUpdater;
117use crate::{
118    aggregation::{
119        BarAggregator, RenkoBarAggregator, SpreadQuoteAggregator, TickBarAggregator,
120        TickImbalanceBarAggregator, TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator,
121        ValueImbalanceBarAggregator, ValueRunsBarAggregator, VolumeBarAggregator,
122        VolumeImbalanceBarAggregator, VolumeRunsBarAggregator,
123    },
124    client::DataClientAdapter,
125    option_chains::OptionChainManager,
126};
127
128// Between built-in handlers (10) and default user handlers (0)
129const BAR_AGGREGATOR_PRIORITY: u32 = 5;
130const GENERIC_SPREAD_ID_SEPARATOR: &str = "___";
131
132/// Provides a high-performance `DataEngine` for all environments.
133#[derive(Debug)]
134pub struct DataEngine {
135    pub(crate) clock: Rc<RefCell<dyn Clock>>,
136    pub(crate) cache: Rc<RefCell<Cache>>,
137    pub(crate) external_clients: AHashSet<ClientId>,
138    clients: IndexMap<ClientId, DataClientAdapter>,
139    default_client: Option<DataClientAdapter>,
140    #[cfg(feature = "streaming")]
141    catalogs: CatalogMap,
142    routing_map: IndexMap<Venue, ClientId>,
143    book_intervals: AHashMap<NonZeroUsize, BookSnapshotInfos>,
144    book_snapshot_counts: IndexMap<BookSnapshotKey, usize>,
145    book_deltas_subs: AHashSet<InstrumentId>,
146    book_depth10_subs: AHashSet<InstrumentId>,
147    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
148    book_deltas_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
149    book_depth10_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
150    book_snapshotters: AHashMap<NonZeroUsize, Rc<BookSnapshotter>>,
151    bar_aggregators: IndexMap<BarAggregatorKey, Rc<RefCell<Box<dyn BarAggregator>>>>,
152    bar_aggregator_handlers: AHashMap<BarAggregatorKey, Vec<BarAggregatorSubscription>>,
153    spread_quote_aggregators: AHashMap<InstrumentId, Rc<RefCell<SpreadQuoteAggregator>>>,
154    spread_quote_handlers: AHashMap<InstrumentId, Vec<(InstrumentId, TypedHandler<QuoteTick>)>>,
155    option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
156    option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
157    deferred_cmd_queue: DeferredCommandQueue,
158    pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
159    synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
160    synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
161    subscribed_synthetic_quotes: AHashSet<InstrumentId>,
162    subscribed_synthetic_trades: AHashSet<InstrumentId>,
163    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
164    command_count: u64,
165    data_count: u64,
166    request_count: u64,
167    response_count: u64,
168    pub(crate) msgbus_priority: u32,
169    pub(crate) config: DataEngineConfig,
170    #[cfg(feature = "defi")]
171    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
172    #[cfg(feature = "defi")]
173    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
174    #[cfg(feature = "defi")]
175    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
176    #[cfg(feature = "defi")]
177    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
178}
179
180impl DataEngine {
181    /// Creates a new [`DataEngine`] instance.
182    #[must_use]
183    pub fn new(
184        clock: Rc<RefCell<dyn Clock>>,
185        cache: Rc<RefCell<Cache>>,
186        config: Option<DataEngineConfig>,
187    ) -> Self {
188        let config = config.unwrap_or_default();
189
190        let external_clients: AHashSet<ClientId> = config
191            .external_clients
192            .clone()
193            .unwrap_or_default()
194            .into_iter()
195            .collect();
196
197        Self {
198            clock,
199            cache,
200            external_clients,
201            clients: IndexMap::new(),
202            default_client: None,
203            #[cfg(feature = "streaming")]
204            catalogs: CatalogMap::new(),
205            routing_map: IndexMap::new(),
206            book_intervals: AHashMap::new(),
207            book_snapshot_counts: IndexMap::new(),
208            book_deltas_subs: AHashSet::new(),
209            book_depth10_subs: AHashSet::new(),
210            book_updaters: AHashMap::new(),
211            book_deltas_parent_expansions: AHashMap::new(),
212            book_depth10_parent_expansions: AHashMap::new(),
213            book_snapshotters: AHashMap::new(),
214            bar_aggregators: IndexMap::new(),
215            bar_aggregator_handlers: AHashMap::new(),
216            spread_quote_aggregators: AHashMap::new(),
217            spread_quote_handlers: AHashMap::new(),
218            option_chain_managers: AHashMap::new(),
219            option_chain_instrument_index: AHashMap::new(),
220            deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
221            pending_option_chain_requests: AHashMap::new(),
222            synthetic_quote_feeds: AHashMap::new(),
223            synthetic_trade_feeds: AHashMap::new(),
224            subscribed_synthetic_quotes: AHashSet::new(),
225            subscribed_synthetic_trades: AHashSet::new(),
226            buffered_deltas_map: AHashMap::new(),
227            command_count: 0,
228            data_count: 0,
229            request_count: 0,
230            response_count: 0,
231            msgbus_priority: 10, // High-priority for built-in component
232            config,
233            #[cfg(feature = "defi")]
234            pool_updaters: AHashMap::new(),
235            #[cfg(feature = "defi")]
236            pool_updaters_pending: AHashSet::new(),
237            #[cfg(feature = "defi")]
238            pool_snapshot_pending: AHashSet::new(),
239            #[cfg(feature = "defi")]
240            pool_event_buffers: AHashMap::new(),
241        }
242    }
243
244    /// Registers all message bus handlers for the data engine.
245    pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
246        let weak = WeakCell::from(Rc::downgrade(engine));
247
248        let weak1 = weak.clone();
249        msgbus::register_data_command_endpoint(
250            MessagingSwitchboard::data_engine_execute(),
251            TypedIntoHandler::from(move |cmd: DataCommand| {
252                if let Some(rc) = weak1.upgrade() {
253                    rc.borrow_mut().execute(cmd);
254                }
255            }),
256        );
257
258        msgbus::register_data_command_endpoint(
259            MessagingSwitchboard::data_engine_queue_execute(),
260            TypedIntoHandler::from(move |cmd: DataCommand| {
261                get_data_cmd_sender().clone().execute(cmd);
262            }),
263        );
264
265        // Register process handler (polymorphic - uses Any)
266        let weak2 = weak.clone();
267        msgbus::register_any(
268            MessagingSwitchboard::data_engine_process(),
269            ShareableMessageHandler::from_any(move |data: &dyn Any| {
270                if let Some(rc) = weak2.upgrade() {
271                    rc.borrow_mut().process(data);
272                }
273            }),
274        );
275
276        // Register process_data handler (typed - takes ownership)
277        let weak3 = weak.clone();
278        msgbus::register_data_endpoint(
279            MessagingSwitchboard::data_engine_process_data(),
280            TypedIntoHandler::from(move |data: Data| {
281                if let Some(rc) = weak3.upgrade() {
282                    rc.borrow_mut().process_data(data);
283                }
284            }),
285        );
286
287        // Register process_defi_data handler (typed - takes ownership)
288        #[cfg(feature = "defi")]
289        {
290            let weak4 = weak.clone();
291            msgbus::register_defi_data_endpoint(
292                MessagingSwitchboard::data_engine_process_defi_data(),
293                TypedIntoHandler::from(move |data: DefiData| {
294                    if let Some(rc) = weak4.upgrade() {
295                        rc.borrow_mut().process_defi_data(data);
296                    }
297                }),
298            );
299        }
300
301        let weak5 = weak;
302        msgbus::register_data_response_endpoint(
303            MessagingSwitchboard::data_engine_response(),
304            TypedIntoHandler::from(move |resp: DataResponse| {
305                if let Some(rc) = weak5.upgrade() {
306                    rc.borrow_mut().response(resp);
307                }
308            }),
309        );
310    }
311
312    /// Returns the total count of data commands received by the engine.
313    #[must_use]
314    pub const fn command_count(&self) -> u64 {
315        self.command_count
316    }
317
318    /// Returns the total count of data stream objects received by the engine.
319    #[must_use]
320    pub const fn data_count(&self) -> u64 {
321        self.data_count
322    }
323
324    #[cfg(feature = "defi")]
325    pub(crate) const fn increment_data_count(&mut self) {
326        self.data_count += 1;
327    }
328
329    /// Returns the total count of data requests received by the engine.
330    #[must_use]
331    pub const fn request_count(&self) -> u64 {
332        self.request_count
333    }
334
335    /// Returns the total count of data responses received by the engine.
336    #[must_use]
337    pub const fn response_count(&self) -> u64 {
338        self.response_count
339    }
340
341    /// Returns whether an `OptionChainManager` exists for the given series.
342    #[must_use]
343    pub fn has_option_chain_manager(&self, series_id: &OptionSeriesId) -> bool {
344        self.option_chain_managers.contains_key(series_id)
345    }
346
347    /// Returns the count of pending option-chain bootstrap requests.
348    #[must_use]
349    pub fn pending_option_chain_request_count(&self) -> usize {
350        self.pending_option_chain_requests.len()
351    }
352
353    /// Returns a read-only reference to the engines clock.
354    #[must_use]
355    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
356        self.clock.borrow()
357    }
358
359    /// Returns a read-only reference to the engines cache.
360    #[must_use]
361    pub fn get_cache(&self) -> Ref<'_, Cache> {
362        self.cache.borrow()
363    }
364
365    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
366    #[must_use]
367    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
368        Rc::clone(&self.cache)
369    }
370
371    /// Registers the `client` with the engine with an optional venue `routing`.
372    ///
373    ///
374    /// # Panics
375    ///
376    /// Panics if a client with the same client ID has already been registered.
377    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
378        let client_id = client.client_id();
379
380        if let Some(default_client) = &self.default_client {
381            check_predicate_false(
382                default_client.client_id() == client.client_id(),
383                "client_id already registered as default client",
384            )
385            .expect(FAILED);
386        }
387
388        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
389
390        if let Some(routing) = routing {
391            self.routing_map.insert(routing, client_id);
392            log::debug!("Set client {client_id} routing for {routing}");
393        }
394
395        if client.venue.is_none() && self.default_client.is_none() {
396            self.default_client = Some(client);
397            log::debug!("Registered client {client_id} for default routing");
398        } else {
399            self.clients.insert(client_id, client);
400            log::debug!("Registered client {client_id}");
401        }
402    }
403
404    /// Deregisters the client for the `client_id`.
405    ///
406    /// # Panics
407    ///
408    /// Panics if the client ID has not been registered.
409    pub fn deregister_client(&mut self, client_id: &ClientId) {
410        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
411
412        self.clients.shift_remove(client_id);
413        log::info!("Deregistered client {client_id}");
414    }
415
416    /// Registers the data `client` with the engine as the default routing client.
417    ///
418    /// When a specific venue routing cannot be found, this client will receive messages.
419    ///
420    /// # Warnings
421    ///
422    /// Any existing default routing client will be overwritten.
423    ///
424    /// # Panics
425    ///
426    /// Panics if a default client has already been registered.
427    pub fn register_default_client(&mut self, client: DataClientAdapter) {
428        check_predicate_true(
429            self.default_client.is_none(),
430            "default client already registered",
431        )
432        .expect(FAILED);
433
434        let client_id = client.client_id();
435
436        self.default_client = Some(client);
437        log::debug!("Registered default client {client_id}");
438    }
439
440    /// Starts all registered data clients and re-arms bar aggregator timers.
441    pub fn start(&mut self) {
442        for client in self.get_clients_mut() {
443            if let Err(e) = client.start() {
444                log::error!("{e}");
445            }
446        }
447
448        for aggregator in self.bar_aggregators.values() {
449            if aggregator.borrow().bar_type().spec().is_time_aggregated() {
450                aggregator
451                    .borrow_mut()
452                    .start_timer(Some(aggregator.clone()));
453            }
454        }
455
456        for aggregator in self.spread_quote_aggregators.values() {
457            aggregator
458                .borrow_mut()
459                .start_timer(Some(aggregator.clone()));
460        }
461    }
462
463    /// Stops all registered data clients and bar aggregator timers.
464    pub fn stop(&mut self) {
465        for client in self.get_clients_mut() {
466            if let Err(e) = client.stop() {
467                log::error!("{e}");
468            }
469        }
470
471        for aggregator in self.bar_aggregators.values() {
472            aggregator.borrow_mut().stop();
473        }
474
475        for aggregator in self.spread_quote_aggregators.values() {
476            aggregator.borrow_mut().stop_timer();
477        }
478    }
479
480    /// Resets all registered data clients and clears engine state.
481    pub fn reset(&mut self) {
482        for client in self.get_clients_mut() {
483            if let Err(e) = client.reset() {
484                log::error!("{e}");
485            }
486        }
487
488        let keys: Vec<BarAggregatorKey> = self.bar_aggregators.keys().copied().collect();
489        for (bar_type, request_id) in keys {
490            if let Err(e) = self.stop_bar_aggregator(bar_type, request_id) {
491                log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
492            }
493        }
494
495        let spread_ids: Vec<InstrumentId> = self.spread_quote_aggregators.keys().copied().collect();
496        for spread_id in spread_ids {
497            self.stop_spread_quote_aggregator(spread_id);
498        }
499
500        // Tear down option chain managers to unregister their msgbus handlers
501        let managers: Vec<_> = self.option_chain_managers.drain().collect();
502        for (_, manager) in managers {
503            manager.borrow_mut().teardown(&self.clock);
504        }
505        self.option_chain_instrument_index.clear();
506        self.pending_option_chain_requests.clear();
507
508        // Unsubscribe BookUpdaters before dropping; otherwise the typed router
509        // keeps dispatching to abandoned updaters. `book_updaters` is keyed by
510        // per-underlying id, so the literal per-underlying topic is the same
511        // string the subscribe path used.
512        let book_updaters: Vec<(InstrumentId, Rc<BookUpdater>)> =
513            self.book_updaters.drain().collect();
514        for (instrument_id, updater) in book_updaters {
515            let deltas_topic = switchboard::get_book_deltas_topic(instrument_id);
516            let depth_topic = switchboard::get_book_depth10_topic(instrument_id);
517            let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
518            let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
519            msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
520            msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
521        }
522        self.book_deltas_parent_expansions.clear();
523        self.book_depth10_parent_expansions.clear();
524
525        self.book_deltas_subs.clear();
526        self.book_depth10_subs.clear();
527        self.book_intervals.clear();
528        self.book_snapshot_counts.clear();
529        self.book_snapshotters.clear();
530        self.buffered_deltas_map.clear();
531
532        self.synthetic_quote_feeds.clear();
533        self.synthetic_trade_feeds.clear();
534        self.subscribed_synthetic_quotes.clear();
535        self.subscribed_synthetic_trades.clear();
536
537        self.deferred_cmd_queue.borrow_mut().clear();
538
539        self.clock.borrow_mut().cancel_timers();
540
541        self.command_count = 0;
542        self.data_count = 0;
543        self.request_count = 0;
544        self.response_count = 0;
545    }
546
547    /// Disposes the engine, stopping all clients and canceling any timers.
548    pub fn dispose(&mut self) {
549        for client in self.get_clients_mut() {
550            if let Err(e) = client.dispose() {
551                log::error!("{e}");
552            }
553        }
554
555        self.clock.borrow_mut().cancel_timers();
556    }
557
558    /// Connects all registered data clients concurrently.
559    ///
560    /// Connection failures are logged but do not prevent the node from running.
561    pub async fn connect(&mut self) {
562        let futures: Vec<_> = self
563            .get_clients_mut()
564            .into_iter()
565            .map(DataClientAdapter::connect)
566            .collect();
567
568        let results = join_all(futures).await;
569
570        for error in results.into_iter().filter_map(Result::err) {
571            log::error!("Failed to connect data client: {error}");
572        }
573    }
574
575    /// Disconnects all registered data clients concurrently.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if any client fails to disconnect.
580    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
581        let futures: Vec<_> = self
582            .get_clients_mut()
583            .into_iter()
584            .map(DataClientAdapter::disconnect)
585            .collect();
586
587        let results = join_all(futures).await;
588        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
589
590        if errors.is_empty() {
591            Ok(())
592        } else {
593            let error_msgs: Vec<_> = errors.iter().map(ToString::to_string).collect();
594            anyhow::bail!(
595                "Failed to disconnect data clients: {}",
596                error_msgs.join("; ")
597            )
598        }
599    }
600
601    /// Returns `true` if all registered data clients are currently connected.
602    #[must_use]
603    pub fn check_connected(&self) -> bool {
604        self.get_clients()
605            .iter()
606            .all(|client| client.is_connected())
607    }
608
609    /// Returns `true` if all registered data clients are currently disconnected.
610    #[must_use]
611    pub fn check_disconnected(&self) -> bool {
612        self.get_clients()
613            .iter()
614            .all(|client| !client.is_connected())
615    }
616
617    /// Returns connection status for each registered client.
618    #[must_use]
619    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
620        self.get_clients()
621            .into_iter()
622            .map(|client| (client.client_id(), client.is_connected()))
623            .collect()
624    }
625
626    /// Returns a list of all registered client IDs, including the default client if set.
627    #[must_use]
628    pub fn registered_clients(&self) -> Vec<ClientId> {
629        self.get_clients()
630            .into_iter()
631            .map(|client| client.client_id())
632            .collect()
633    }
634
635    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
636    where
637        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
638        T: Clone,
639    {
640        self.get_clients()
641            .into_iter()
642            .flat_map(get_subs)
643            .cloned()
644            .collect()
645    }
646
647    #[must_use]
648    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
649        let (default_opt, clients_map) = (&self.default_client, &self.clients);
650        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
651
652        if let Some(default) = default_opt {
653            clients.push(default);
654        }
655
656        clients
657    }
658
659    #[must_use]
660    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
661        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
662        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
663
664        if let Some(default) = default_opt {
665            clients.push(default);
666        }
667
668        clients
669    }
670
671    pub fn get_client(
672        &mut self,
673        client_id: Option<&ClientId>,
674        venue: Option<&Venue>,
675    ) -> Option<&mut DataClientAdapter> {
676        if let Some(client_id) = client_id {
677            // Explicit ID: first look in registered clients
678            if let Some(client) = self.clients.get_mut(client_id) {
679                return Some(client);
680            }
681
682            // Then check if it matches the default client
683            if let Some(default) = self.default_client.as_mut()
684                && default.client_id() == *client_id
685            {
686                return Some(default);
687            }
688
689            // Unknown explicit client
690            return None;
691        }
692
693        if let Some(v) = venue {
694            // Route by venue if mapped client still registered
695            if let Some(client_id) = self.routing_map.get(v) {
696                return self.clients.get_mut(client_id);
697            }
698        }
699
700        // Fallback to default client
701        self.get_default_client()
702    }
703
704    /// Resolves the client for a subscribe/unsubscribe command.
705    ///
706    /// When `BACKTEST` is registered, all commands route through it regardless of
707    /// the command's `client_id` or `venue`. Request paths skip this override.
708    fn get_command_client(
709        &mut self,
710        client_id: Option<&ClientId>,
711        venue: Option<&Venue>,
712    ) -> Option<&mut DataClientAdapter> {
713        let backtest_id = ClientId::new("BACKTEST");
714        // BACKTEST may live in `clients` or as the default (venue=None branch in
715        // `register_client`)
716        if self.clients.contains_key(&backtest_id) {
717            return self.clients.get_mut(&backtest_id);
718        }
719        let default_is_backtest = self
720            .default_client
721            .as_ref()
722            .is_some_and(|c| c.client_id() == backtest_id);
723        if default_is_backtest {
724            return self.default_client.as_mut();
725        }
726        self.get_client(client_id, venue)
727    }
728
729    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
730        self.default_client.as_mut()
731    }
732
733    /// Returns all custom data types currently subscribed across all clients.
734    #[must_use]
735    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
736        self.collect_subscriptions(|client| &client.subscriptions_custom)
737    }
738
739    /// Returns all instrument IDs currently subscribed across all clients.
740    #[must_use]
741    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
742        self.collect_subscriptions(|client| &client.subscriptions_instrument)
743    }
744
745    /// Returns all instrument IDs for which book delta subscriptions exist.
746    #[must_use]
747    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
748        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
749    }
750
751    /// Returns all instrument IDs for which book depth10 subscriptions exist.
752    #[must_use]
753    pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
754        self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
755    }
756
757    /// Returns all instrument IDs for which book snapshot subscriptions exist.
758    #[must_use]
759    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
760        self.book_snapshot_counts
761            .keys()
762            .map(|(instrument_id, _)| *instrument_id)
763            .collect()
764    }
765
766    /// Returns all instrument IDs for which quote subscriptions exist.
767    #[must_use]
768    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
769        self.collect_subscriptions(|client| &client.subscriptions_quotes)
770    }
771
772    /// Returns all synthetic instrument IDs for which quote subscriptions exist.
773    #[must_use]
774    pub fn subscribed_synthetic_quotes(&self) -> Vec<InstrumentId> {
775        self.subscribed_synthetic_quotes.iter().copied().collect()
776    }
777
778    /// Returns all instrument IDs for which trade subscriptions exist.
779    #[must_use]
780    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
781        self.collect_subscriptions(|client| &client.subscriptions_trades)
782    }
783
784    /// Returns all synthetic instrument IDs for which trade subscriptions exist.
785    #[must_use]
786    pub fn subscribed_synthetic_trades(&self) -> Vec<InstrumentId> {
787        self.subscribed_synthetic_trades.iter().copied().collect()
788    }
789
790    /// Returns all bar types currently subscribed across all clients.
791    #[must_use]
792    pub fn subscribed_bars(&self) -> Vec<BarType> {
793        self.collect_subscriptions(|client| &client.subscriptions_bars)
794    }
795
796    /// Returns all instrument IDs for which mark price subscriptions exist.
797    #[must_use]
798    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
799        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
800    }
801
802    /// Returns all instrument IDs for which index price subscriptions exist.
803    #[must_use]
804    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
805        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
806    }
807
808    /// Returns all instrument IDs for which funding rate subscriptions exist.
809    #[must_use]
810    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
811        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
812    }
813
814    /// Returns all instrument IDs for which status subscriptions exist.
815    #[must_use]
816    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
817        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
818    }
819
820    /// Returns all instrument IDs for which instrument close subscriptions exist.
821    #[must_use]
822    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
823        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
824    }
825
826    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
827    ///
828    /// Errors during execution are logged.
829    pub fn execute(&mut self, cmd: DataCommand) {
830        match &cmd {
831            DataCommand::Subscribe(_) | DataCommand::Unsubscribe(_) => self.command_count += 1,
832            DataCommand::Request(_) => self.request_count += 1,
833            #[cfg(feature = "defi")]
834            DataCommand::DefiRequest(_) => self.request_count += 1,
835            #[cfg(feature = "defi")]
836            DataCommand::DefiSubscribe(_) | DataCommand::DefiUnsubscribe(_) => {
837                self.command_count += 1;
838            }
839            _ => {}
840        }
841
842        if let Err(e) = match cmd {
843            DataCommand::Subscribe(c) => self.execute_subscribe(c),
844            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
845            DataCommand::Request(c) => self.execute_request(c),
846            #[cfg(feature = "defi")]
847            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
848            #[cfg(feature = "defi")]
849            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
850            #[cfg(feature = "defi")]
851            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
852            _ => {
853                log::warn!("Unhandled DataCommand variant");
854                Ok(())
855            }
856        } {
857            log::error!("{e}");
858        }
859    }
860
861    /// Handles a subscribe command, updating internal state and forwarding to the client.
862    ///
863    /// # Errors
864    ///
865    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
866    /// or if the underlying client operation fails.
867    pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
868        // Update internal engine state
869        match &cmd {
870            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
871            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
872            SubscribeCommand::BookSnapshots(cmd) => {
873                // Handles client forwarding internally (forwards as BookDeltas)
874                return self.subscribe_book_snapshots(cmd);
875            }
876            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
877            SubscribeCommand::OptionChain(cmd) => {
878                self.subscribe_option_chain(cmd);
879                return Ok(());
880            }
881            SubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
882                self.subscribe_synthetic_quotes(cmd.instrument_id);
883                return Ok(());
884            }
885            SubscribeCommand::Quotes(cmd)
886                if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
887            {
888                self.subscribe_spread_quotes(cmd);
889                return Ok(());
890            }
891            SubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
892                self.subscribe_synthetic_trades(cmd.instrument_id);
893                return Ok(());
894            }
895            SubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
896                anyhow::bail!("Cannot subscribe for synthetic instrument `Instrument` data");
897            }
898            SubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
899                anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentStatus` data");
900            }
901            SubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
902                anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentClose` data");
903            }
904            SubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
905                anyhow::bail!("Cannot subscribe for synthetic instrument `OptionGreeks` data");
906            }
907            _ => {} // Do nothing else
908        }
909
910        if let Some(client_id) = cmd.client_id()
911            && self.external_clients.contains(client_id)
912        {
913            if self.config.debug {
914                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
915            }
916            return Ok(());
917        }
918
919        #[cfg(feature = "streaming")]
920        let cmd = self.subscribe_command_with_prefilled_start_ns(cmd)?;
921
922        if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
923            client.execute_subscribe(cmd);
924        } else {
925            log::error!(
926                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
927                cmd.client_id(),
928                cmd.venue(),
929            );
930        }
931
932        Ok(())
933    }
934
935    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
936    ///
937    /// # Errors
938    ///
939    /// Returns an error if the underlying client operation fails.
940    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
941        match &cmd {
942            UnsubscribeCommand::BookDeltas(cmd) if !self.unsubscribe_book_deltas(cmd) => {
943                return Ok(());
944            }
945            UnsubscribeCommand::BookDepth10(cmd) if !self.unsubscribe_book_depth10(cmd) => {
946                return Ok(());
947            }
948            UnsubscribeCommand::BookSnapshots(cmd) => {
949                // Handles client forwarding internally (forwards as BookDeltas)
950                self.unsubscribe_book_snapshots(cmd);
951                return Ok(());
952            }
953            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
954            UnsubscribeCommand::OptionChain(cmd) => {
955                self.unsubscribe_option_chain(cmd);
956                return Ok(());
957            }
958            UnsubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
959                self.unsubscribe_synthetic_quotes(cmd.instrument_id);
960                return Ok(());
961            }
962            UnsubscribeCommand::Quotes(cmd)
963                if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
964            {
965                self.unsubscribe_spread_quotes(cmd);
966                return Ok(());
967            }
968            UnsubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
969                self.unsubscribe_synthetic_trades(cmd.instrument_id);
970                return Ok(());
971            }
972            UnsubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
973                anyhow::bail!("Cannot unsubscribe from synthetic instrument `Instrument` data");
974            }
975            UnsubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
976                anyhow::bail!(
977                    "Cannot unsubscribe from synthetic instrument `InstrumentStatus` data"
978                );
979            }
980            UnsubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
981                anyhow::bail!(
982                    "Cannot unsubscribe from synthetic instrument `InstrumentClose` data"
983                );
984            }
985            UnsubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
986                anyhow::bail!("Cannot unsubscribe from synthetic instrument `OptionGreeks` data");
987            }
988            _ => {}
989        }
990
991        if let Some(client_id) = cmd.client_id()
992            && self.external_clients.contains(client_id)
993        {
994            if self.config.debug {
995                log::debug!(
996                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
997                );
998            }
999            return Ok(());
1000        }
1001
1002        // Keep client subscribed while exact-topic subscribers remain
1003        if Self::topic_has_remaining_subscribers(cmd) {
1004            return Ok(());
1005        }
1006
1007        if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
1008            client.execute_unsubscribe(cmd);
1009        } else {
1010            log::error!(
1011                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1012                cmd.client_id(),
1013                cmd.venue(),
1014            );
1015        }
1016
1017        Ok(())
1018    }
1019
1020    fn topic_has_remaining_subscribers(cmd: &UnsubscribeCommand) -> bool {
1021        // Exact match only; wildcard observers must not block venue detach.
1022        // BookDeltas/Depth10 excluded: binary engine state cannot distinguish
1023        // the internal BookUpdater handler from external subscribers
1024        match cmd {
1025            UnsubscribeCommand::Quotes(c) => {
1026                let topic = switchboard::get_quotes_topic(c.instrument_id);
1027                msgbus::exact_subscriber_count_quotes(topic) > 0
1028            }
1029            UnsubscribeCommand::Trades(c) => {
1030                let topic = switchboard::get_trades_topic(c.instrument_id);
1031                msgbus::exact_subscriber_count_trades(topic) > 0
1032            }
1033            UnsubscribeCommand::MarkPrices(c) => {
1034                let topic = switchboard::get_mark_price_topic(c.instrument_id);
1035                msgbus::exact_subscriber_count_mark_prices(topic) > 0
1036            }
1037            UnsubscribeCommand::IndexPrices(c) => {
1038                let topic = switchboard::get_index_price_topic(c.instrument_id);
1039                msgbus::exact_subscriber_count_index_prices(topic) > 0
1040            }
1041            UnsubscribeCommand::FundingRates(c) => {
1042                let topic = switchboard::get_funding_rate_topic(c.instrument_id);
1043                msgbus::exact_subscriber_count_funding_rates(topic) > 0
1044            }
1045            UnsubscribeCommand::OptionGreeks(c) => {
1046                let topic = switchboard::get_option_greeks_topic(c.instrument_id);
1047                msgbus::exact_subscriber_count_option_greeks(topic) > 0
1048            }
1049            _ => false,
1050        }
1051    }
1052
1053    /// Sends a [`RequestCommand`] to a suitable data client implementation.
1054    ///
1055    /// # Errors
1056    ///
1057    /// Returns an error if no client is found for the given client ID or venue,
1058    /// or if the client fails to process the request.
1059    pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
1060        // Skip requests for external clients
1061        if let Some(cid) = req.client_id()
1062            && self.external_clients.contains(cid)
1063        {
1064            if self.config.debug {
1065                log::debug!("Skipping data request for external client {cid}: {req:?}");
1066            }
1067            return Ok(());
1068        }
1069
1070        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
1071            match req {
1072                RequestCommand::Data(req) => client.request_data(req),
1073                RequestCommand::Instrument(req) => client.request_instrument(req),
1074                RequestCommand::Instruments(req) => client.request_instruments(req),
1075                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
1076                RequestCommand::BookDepth(req) => client.request_book_depth(req),
1077                RequestCommand::Quotes(req) => client.request_quotes(req),
1078                RequestCommand::Trades(req) => client.request_trades(req),
1079                RequestCommand::FundingRates(req) => client.request_funding_rates(req),
1080                RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
1081                RequestCommand::Bars(req) => client.request_bars(req),
1082            }
1083        } else {
1084            anyhow::bail!(
1085                "Cannot handle request: no client found for {:?} {:?}",
1086                req.client_id(),
1087                req.venue()
1088            );
1089        }
1090    }
1091
1092    /// Processes a dynamically-typed data message.
1093    ///
1094    /// Currently supports `InstrumentAny`, funding rates, instrument status, option greeks, and
1095    /// custom data; unrecognized types are logged as errors.
1096    pub fn process(&mut self, data: &dyn Any) {
1097        self.data_count += 1;
1098        // TODO: Eventually these can be added to the `Data` enum (C/Cython blocking), process here for now
1099        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
1100            self.handle_instrument(instrument);
1101        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
1102            self.handle_funding_rate(*funding_rate);
1103        } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
1104            self.handle_instrument_status(*status);
1105        } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
1106            self.cache.borrow_mut().add_option_greeks(*option_greeks);
1107            let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
1108            msgbus::publish_option_greeks(topic, option_greeks);
1109            self.drain_deferred_commands();
1110        } else if let Some(custom) = data.downcast_ref::<CustomData>() {
1111            self.handle_custom_data(custom);
1112        } else {
1113            log::error!("Cannot process data {data:?}, type is unrecognized");
1114        }
1115    }
1116
1117    /// Processes a `Data` enum instance, dispatching to live handlers.
1118    pub fn process_data(&mut self, data: Data) {
1119        self.data_count += 1;
1120
1121        match data {
1122            Data::Delta(delta) => self.handle_delta(delta),
1123            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
1124            Data::Depth10(depth) => self.handle_depth10(*depth),
1125            Data::Quote(quote) => {
1126                self.handle_quote(quote);
1127                self.drain_deferred_commands();
1128            }
1129            Data::Trade(trade) => self.handle_trade(trade),
1130            Data::Bar(bar) => self.handle_bar(bar),
1131            Data::MarkPriceUpdate(mark_price) => {
1132                self.handle_mark_price(mark_price);
1133                self.drain_deferred_commands();
1134            }
1135            Data::IndexPriceUpdate(index_price) => {
1136                self.handle_index_price(index_price);
1137                self.drain_deferred_commands();
1138            }
1139            Data::InstrumentStatus(status) => {
1140                self.handle_instrument_status(status);
1141                self.drain_deferred_commands();
1142            }
1143            Data::InstrumentClose(close) => self.handle_instrument_close(close),
1144            Data::Custom(custom) => self.handle_custom_data(&custom),
1145        }
1146    }
1147
1148    /// Processes a `Data` instance through the historical pipeline.
1149    ///
1150    /// Pipeline mode publishes each item on the historical topic family
1151    /// (prefixed with `historical.`) and gates cache writes on
1152    /// `disable_historical_cache`. None of the live-only side effects
1153    /// (synthetic republish, option-chain expiry, depth-derived quotes,
1154    /// deferred-command drains) run in this path.
1155    pub fn process_historical(&mut self, data: Data) {
1156        self.data_count += 1;
1157
1158        match data {
1159            Data::Delta(delta) => self.handle_delta_pipeline(delta),
1160            Data::Deltas(deltas) => self.handle_deltas_pipeline(&deltas.into_inner()),
1161            Data::Depth10(depth) => self.handle_depth10_pipeline(*depth),
1162            Data::Quote(quote) => self.handle_quote_pipeline(quote),
1163            Data::Trade(trade) => self.handle_trade_pipeline(trade),
1164            Data::Bar(bar) => self.handle_bar_pipeline(bar),
1165            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price_pipeline(mark_price),
1166            Data::IndexPriceUpdate(index_price) => self.handle_index_price_pipeline(index_price),
1167            Data::InstrumentStatus(status) => self.handle_instrument_status_pipeline(status),
1168            Data::InstrumentClose(close) => self.handle_instrument_close_pipeline(close),
1169            Data::Custom(custom) => self.handle_custom_data_pipeline(&custom),
1170        }
1171    }
1172
1173    /// Processes a `DataResponse`, handling and publishing the response message.
1174    #[expect(clippy::needless_pass_by_value)] // Required by message bus dispatch
1175    pub fn response(&mut self, resp: DataResponse) {
1176        if log::log_enabled!(log::Level::Debug) {
1177            let correlation_id = resp.correlation_id();
1178            match resp.record_count() {
1179                Some(count) => log::debug!(
1180                    "{RECV}{RES} {} correlation_id={correlation_id} records={count}",
1181                    resp.kind(),
1182                ),
1183                None => log::debug!(
1184                    "{RECV}{RES} {} correlation_id={correlation_id}",
1185                    resp.kind(),
1186                ),
1187            }
1188        }
1189        log::trace!("{RECV}{RES} {resp:?}");
1190
1191        self.response_count += 1;
1192        let correlation_id = *resp.correlation_id();
1193
1194        match &resp {
1195            DataResponse::Instrument(r) => {
1196                self.handle_instrument_response(r.data.clone());
1197            }
1198            DataResponse::Instruments(r) => {
1199                self.handle_instruments(&r.data);
1200            }
1201            DataResponse::Quotes(r) => {
1202                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1203                    self.handle_quotes(&r.data);
1204                }
1205            }
1206            DataResponse::Trades(r) => {
1207                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1208                    self.handle_trades(&r.data);
1209                }
1210            }
1211            DataResponse::FundingRates(r) => {
1212                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1213                    self.handle_funding_rates(&r.data);
1214                }
1215            }
1216            DataResponse::Bars(r) => {
1217                if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
1218                    self.handle_bars(&r.data);
1219                }
1220            }
1221            DataResponse::Book(r) => self.handle_book_response(&r.data),
1222            DataResponse::ForwardPrices(r) => {
1223                return self.handle_forward_prices_response(&correlation_id, r);
1224            }
1225            DataResponse::Data(_) => {}
1226        }
1227
1228        msgbus::send_response(&correlation_id, &resp);
1229    }
1230
1231    #[inline]
1232    fn pipeline_cache_writes_allowed(&self) -> bool {
1233        !self.config.disable_historical_cache
1234    }
1235
1236    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
1237        log::debug!("Handling instrument: {}", instrument.id());
1238
1239        if let Err(e) = self
1240            .cache
1241            .as_ref()
1242            .borrow_mut()
1243            .add_instrument(instrument.clone())
1244        {
1245            log_error_on_cache_insert(&e);
1246        }
1247
1248        let topic = switchboard::get_instrument_topic(instrument.id());
1249        log::debug!("Publishing instrument to topic: {topic}");
1250        msgbus::publish_instrument(topic, instrument);
1251
1252        self.update_option_chains(instrument);
1253    }
1254
1255    fn update_option_chains(&mut self, instrument: &InstrumentAny) {
1256        let Some(underlying) = instrument.underlying() else {
1257            return;
1258        };
1259        let Some(expiration_ns) = instrument.expiration_ns() else {
1260            return;
1261        };
1262        let Some(strike) = instrument.strike_price() else {
1263            return;
1264        };
1265        let Some(kind) = instrument.option_kind() else {
1266            return;
1267        };
1268
1269        let venue = instrument.id().venue;
1270        let settlement = instrument.settlement_currency().code;
1271        let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
1272
1273        // Clone Rc to release borrow on self.option_chain_managers before accessing self.clients
1274        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1275            return;
1276        };
1277
1278        let clock = self.clock.clone();
1279        let client = self.get_command_client(None, Some(&venue));
1280
1281        if manager_rc
1282            .borrow_mut()
1283            .add_instrument(instrument.id(), strike, kind, client, &clock)
1284        {
1285            self.option_chain_instrument_index
1286                .insert(instrument.id(), series_id);
1287        }
1288    }
1289
1290    fn handle_delta(&mut self, delta: OrderBookDelta) {
1291        let deltas = if self.config.buffer_deltas {
1292            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
1293                buffered_deltas.deltas.push(delta);
1294                buffered_deltas.flags = delta.flags;
1295                buffered_deltas.sequence = delta.sequence;
1296                buffered_deltas.ts_event = delta.ts_event;
1297                buffered_deltas.ts_init = delta.ts_init;
1298            } else {
1299                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1300                self.buffered_deltas_map
1301                    .insert(delta.instrument_id, buffered_deltas);
1302            }
1303
1304            if !RecordFlag::F_LAST.matches(delta.flags) {
1305                return; // Not the last delta for event
1306            }
1307
1308            self.buffered_deltas_map
1309                .remove(&delta.instrument_id)
1310                .expect("buffered deltas exist")
1311        } else {
1312            OrderBookDeltas::new(delta.instrument_id, vec![delta])
1313        };
1314
1315        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1316        msgbus::publish_deltas(topic, &deltas);
1317    }
1318
1319    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
1320        if self.config.buffer_deltas {
1321            let instrument_id = deltas.instrument_id;
1322
1323            for delta in deltas.deltas {
1324                if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
1325                    buffered_deltas.deltas.push(delta);
1326                    buffered_deltas.flags = delta.flags;
1327                    buffered_deltas.sequence = delta.sequence;
1328                    buffered_deltas.ts_event = delta.ts_event;
1329                    buffered_deltas.ts_init = delta.ts_init;
1330                } else {
1331                    let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1332                    self.buffered_deltas_map
1333                        .insert(instrument_id, buffered_deltas);
1334                }
1335
1336                if RecordFlag::F_LAST.matches(delta.flags) {
1337                    let deltas_to_publish = self
1338                        .buffered_deltas_map
1339                        .remove(&instrument_id)
1340                        .expect("buffered deltas exist");
1341                    let topic = switchboard::get_book_deltas_topic(instrument_id);
1342                    msgbus::publish_deltas(topic, &deltas_to_publish);
1343                }
1344            }
1345        } else {
1346            let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1347            msgbus::publish_deltas(topic, &deltas);
1348        }
1349    }
1350
1351    fn handle_depth10(&self, depth: OrderBookDepth10) {
1352        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
1353        msgbus::publish_depth10(topic, &depth);
1354
1355        if self.config.emit_quotes_from_book_depths
1356            && let Some(quote) = derive_quote_from_depth(&depth)
1357        {
1358            book::publish_quote_if_changed(&self.cache, quote);
1359        }
1360    }
1361
1362    fn handle_quote(&self, quote: QuoteTick) {
1363        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
1364            log_error_on_cache_insert(&e);
1365        }
1366
1367        for synthetic_quote in self.synthetic_quotes_from_quote(quote) {
1368            let topic = switchboard::get_quotes_topic(synthetic_quote.instrument_id);
1369            msgbus::publish_quote(topic, &synthetic_quote);
1370        }
1371
1372        let topic = switchboard::get_quotes_topic(quote.instrument_id);
1373        msgbus::publish_quote(topic, &quote);
1374    }
1375
1376    fn handle_trade(&self, trade: TradeTick) {
1377        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1378            log_error_on_cache_insert(&e);
1379        }
1380
1381        for synthetic_trade in self.synthetic_trades_from_trade(trade) {
1382            let topic = switchboard::get_trades_topic(synthetic_trade.instrument_id);
1383            msgbus::publish_trade(topic, &synthetic_trade);
1384        }
1385
1386        let topic = switchboard::get_trades_topic(trade.instrument_id);
1387        msgbus::publish_trade(topic, &trade);
1388    }
1389
1390    fn synthetic_quotes_from_quote(&self, update: QuoteTick) -> Vec<QuoteTick> {
1391        let Some(synthetics) = self.synthetic_quote_feeds.get(&update.instrument_id) else {
1392            return Vec::new();
1393        };
1394
1395        synthetics
1396            .iter()
1397            .filter_map(|synthetic| self.synthetic_quote_from_update(synthetic, update))
1398            .collect()
1399    }
1400
1401    fn synthetic_quote_from_update(
1402        &self,
1403        synthetic: &SyntheticInstrument,
1404        update: QuoteTick,
1405    ) -> Option<QuoteTick> {
1406        let cache = self.cache.borrow();
1407        let mut bid_inputs = Vec::with_capacity(synthetic.components.len());
1408        let mut ask_inputs = Vec::with_capacity(synthetic.components.len());
1409
1410        for instrument_id in &synthetic.components {
1411            let (bid_price, ask_price) = if *instrument_id == update.instrument_id {
1412                (update.bid_price, update.ask_price)
1413            } else {
1414                let Some(component_quote) = cache.quote(instrument_id) else {
1415                    log::warn!(
1416                        "Cannot calculate synthetic instrument {} price, no quotes for {} yet",
1417                        synthetic.id,
1418                        instrument_id,
1419                    );
1420                    return None;
1421                };
1422                (component_quote.bid_price, component_quote.ask_price)
1423            };
1424
1425            bid_inputs.push(bid_price.as_f64());
1426            ask_inputs.push(ask_price.as_f64());
1427        }
1428        drop(cache);
1429
1430        let bid_price = match synthetic.calculate(&bid_inputs) {
1431            Ok(price) => price,
1432            Err(e) => {
1433                log::error!(
1434                    "Cannot calculate synthetic instrument {} bid price: {e}",
1435                    synthetic.id
1436                );
1437                return None;
1438            }
1439        };
1440        let ask_price = match synthetic.calculate(&ask_inputs) {
1441            Ok(price) => price,
1442            Err(e) => {
1443                log::error!(
1444                    "Cannot calculate synthetic instrument {} ask price: {e}",
1445                    synthetic.id
1446                );
1447                return None;
1448            }
1449        };
1450        let size_one = Quantity::from(1);
1451
1452        Some(QuoteTick::new(
1453            synthetic.id,
1454            bid_price,
1455            ask_price,
1456            size_one,
1457            size_one,
1458            update.ts_event,
1459            self.clock.borrow().timestamp_ns(),
1460        ))
1461    }
1462
1463    fn synthetic_trades_from_trade(&self, update: TradeTick) -> Vec<TradeTick> {
1464        let Some(synthetics) = self.synthetic_trade_feeds.get(&update.instrument_id) else {
1465            return Vec::new();
1466        };
1467
1468        synthetics
1469            .iter()
1470            .filter_map(|synthetic| self.synthetic_trade_from_update(synthetic, update))
1471            .collect()
1472    }
1473
1474    fn synthetic_trade_from_update(
1475        &self,
1476        synthetic: &SyntheticInstrument,
1477        update: TradeTick,
1478    ) -> Option<TradeTick> {
1479        let cache = self.cache.borrow();
1480        let mut inputs = Vec::with_capacity(synthetic.components.len());
1481
1482        for instrument_id in &synthetic.components {
1483            let price = if *instrument_id == update.instrument_id {
1484                update.price
1485            } else {
1486                let Some(component_trade) = cache.trade(instrument_id) else {
1487                    log::warn!(
1488                        "Cannot calculate synthetic instrument {} price, no trades for {} yet",
1489                        synthetic.id,
1490                        instrument_id,
1491                    );
1492                    return None;
1493                };
1494                component_trade.price
1495            };
1496
1497            inputs.push(price.as_f64());
1498        }
1499        drop(cache);
1500
1501        let price = match synthetic.calculate(&inputs) {
1502            Ok(price) => price,
1503            Err(e) => {
1504                log::error!(
1505                    "Cannot calculate synthetic instrument {} trade price: {e}",
1506                    synthetic.id
1507                );
1508                return None;
1509            }
1510        };
1511
1512        Some(TradeTick::new(
1513            synthetic.id,
1514            price,
1515            Quantity::from(1),
1516            update.aggressor_side,
1517            update.trade_id,
1518            update.ts_event,
1519            self.clock.borrow().timestamp_ns(),
1520        ))
1521    }
1522
1523    fn handle_bar(&self, bar: Bar) {
1524        process_engine_bar(&self.cache, self.config.validate_data_sequence, true, bar);
1525    }
1526
1527    fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
1528        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1529            log_error_on_cache_insert(&e);
1530        }
1531
1532        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1533        msgbus::publish_mark_price(topic, &mark_price);
1534    }
1535
1536    fn handle_index_price(&self, index_price: IndexPriceUpdate) {
1537        if let Err(e) = self
1538            .cache
1539            .as_ref()
1540            .borrow_mut()
1541            .add_index_price(index_price)
1542        {
1543            log_error_on_cache_insert(&e);
1544        }
1545
1546        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1547        msgbus::publish_index_price(topic, &index_price);
1548    }
1549
1550    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
1551    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1552        if let Err(e) = self
1553            .cache
1554            .as_ref()
1555            .borrow_mut()
1556            .add_funding_rate(funding_rate)
1557        {
1558            log_error_on_cache_insert(&e);
1559        }
1560
1561        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1562        msgbus::publish_funding_rate(topic, &funding_rate);
1563    }
1564
1565    fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1566        if let Err(e) = self
1567            .cache
1568            .as_ref()
1569            .borrow_mut()
1570            .add_instrument_status(status)
1571        {
1572            log_error_on_cache_insert(&e);
1573        }
1574
1575        let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1576        msgbus::publish_any(topic, &status);
1577
1578        if self
1579            .option_chain_instrument_index
1580            .contains_key(&status.instrument_id)
1581            && matches!(
1582                status.action,
1583                MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
1584            )
1585        {
1586            self.expire_option_chain_instrument(status.instrument_id);
1587        }
1588    }
1589
1590    /// Removes a settled/expired instrument from its option chain manager.
1591    ///
1592    /// Looks up the owning series via the reverse index, delegates removal to
1593    /// the manager (which unregisters msgbus handlers and pushes deferred wire
1594    /// unsubscribes), then drains those commands. When the series catalog
1595    /// becomes empty, the entire manager is torn down.
1596    fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
1597        let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
1598            return;
1599        };
1600
1601        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1602            return;
1603        };
1604
1605        let series_empty = manager_rc
1606            .borrow_mut()
1607            .handle_instrument_expired(&instrument_id);
1608
1609        // Drain deferred unsubscribe commands pushed by the manager
1610        self.drain_deferred_commands();
1611
1612        log::info!(
1613            "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
1614        );
1615
1616        if series_empty {
1617            manager_rc.borrow_mut().teardown(&self.clock);
1618            self.option_chain_managers.remove(&series_id);
1619
1620            log::info!("Torn down empty option chain manager for {series_id}");
1621        }
1622    }
1623
1624    fn handle_instrument_close(&self, close: InstrumentClose) {
1625        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1626        msgbus::publish_any(topic, &close);
1627    }
1628
1629    fn handle_custom_data(&self, custom: &CustomData) {
1630        log::debug!("Processing custom data: {}", custom.data.type_name());
1631        let topic = switchboard::get_custom_topic(&custom.data_type);
1632        msgbus::publish_any(topic, custom);
1633    }
1634
1635    fn handle_delta_pipeline(&self, delta: OrderBookDelta) {
1636        // Pipeline deltas are not buffered; replays arrive pre-batched
1637        let deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1638        let topic = historical_topic_of(switchboard::get_book_deltas_topic(deltas.instrument_id));
1639        msgbus::publish_deltas(topic, &deltas);
1640    }
1641
1642    fn handle_deltas_pipeline(&self, deltas: &OrderBookDeltas) {
1643        let topic = historical_topic_of(switchboard::get_book_deltas_topic(deltas.instrument_id));
1644        msgbus::publish_deltas(topic, deltas);
1645    }
1646
1647    fn handle_depth10_pipeline(&self, depth: OrderBookDepth10) {
1648        let topic = historical_topic_of(switchboard::get_book_depth10_topic(depth.instrument_id));
1649        msgbus::publish_depth10(topic, &depth);
1650    }
1651
1652    fn handle_quote_pipeline(&self, quote: QuoteTick) {
1653        if self.pipeline_cache_writes_allowed()
1654            && let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote)
1655        {
1656            log_error_on_cache_insert(&e);
1657        }
1658
1659        let topic = historical_topic_of(switchboard::get_quotes_topic(quote.instrument_id));
1660        msgbus::publish_quote(topic, &quote);
1661    }
1662
1663    fn handle_trade_pipeline(&self, trade: TradeTick) {
1664        if self.pipeline_cache_writes_allowed()
1665            && let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade)
1666        {
1667            log_error_on_cache_insert(&e);
1668        }
1669
1670        let topic = historical_topic_of(switchboard::get_trades_topic(trade.instrument_id));
1671        msgbus::publish_trade(topic, &trade);
1672    }
1673
1674    fn handle_bar_pipeline(&self, bar: Bar) {
1675        if !validate_bar_sequence(&self.cache, self.config.validate_data_sequence, &bar) {
1676            return;
1677        }
1678
1679        if self.pipeline_cache_writes_allowed()
1680            && let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar)
1681        {
1682            log_error_on_cache_insert(&e);
1683        }
1684
1685        let topic = historical_topic_of(switchboard::get_bars_topic(bar.bar_type));
1686        msgbus::publish_bar(topic, &bar);
1687    }
1688
1689    fn handle_mark_price_pipeline(&self, mark_price: MarkPriceUpdate) {
1690        if self.pipeline_cache_writes_allowed()
1691            && let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price)
1692        {
1693            log_error_on_cache_insert(&e);
1694        }
1695
1696        let topic =
1697            historical_topic_of(switchboard::get_mark_price_topic(mark_price.instrument_id));
1698        msgbus::publish_mark_price(topic, &mark_price);
1699    }
1700
1701    fn handle_index_price_pipeline(&self, index_price: IndexPriceUpdate) {
1702        if self.pipeline_cache_writes_allowed()
1703            && let Err(e) = self
1704                .cache
1705                .as_ref()
1706                .borrow_mut()
1707                .add_index_price(index_price)
1708        {
1709            log_error_on_cache_insert(&e);
1710        }
1711
1712        let topic = historical_topic_of(switchboard::get_index_price_topic(
1713            index_price.instrument_id,
1714        ));
1715        msgbus::publish_index_price(topic, &index_price);
1716    }
1717
1718    fn handle_instrument_status_pipeline(&self, status: InstrumentStatus) {
1719        if self.pipeline_cache_writes_allowed()
1720            && let Err(e) = self
1721                .cache
1722                .as_ref()
1723                .borrow_mut()
1724                .add_instrument_status(status)
1725        {
1726            log_error_on_cache_insert(&e);
1727        }
1728
1729        let topic = historical_topic_of(switchboard::get_instrument_status_topic(
1730            status.instrument_id,
1731        ));
1732        msgbus::publish_any(topic, &status);
1733    }
1734
1735    fn handle_instrument_close_pipeline(&self, close: InstrumentClose) {
1736        let topic =
1737            historical_topic_of(switchboard::get_instrument_close_topic(close.instrument_id));
1738        msgbus::publish_any(topic, &close);
1739    }
1740
1741    fn handle_custom_data_pipeline(&self, custom: &CustomData) {
1742        log::debug!("Pipeline custom data: {}", custom.data.type_name());
1743        let topic = historical_topic_of(switchboard::get_custom_topic(&custom.data_type));
1744        msgbus::publish_any(topic, custom);
1745    }
1746
1747    /// Drains deferred subscribe/unsubscribe commands pushed by option chain
1748    /// managers (or any other component) and executes them against the appropriate
1749    /// data client.
1750    fn drain_deferred_commands(&mut self) {
1751        // Loop because expire_series pushes Unsubscribe commands; converges in <= 3 iterations
1752        loop {
1753            let commands: VecDeque<DeferredCommand> =
1754                std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
1755
1756            if commands.is_empty() {
1757                break;
1758            }
1759
1760            for cmd in commands {
1761                match cmd {
1762                    DeferredCommand::Subscribe(sub) => {
1763                        let client = self.get_command_client(sub.client_id(), sub.venue());
1764                        if let Some(client) = client {
1765                            client.execute_subscribe(sub);
1766                        }
1767                    }
1768                    DeferredCommand::Unsubscribe(unsub) => {
1769                        let client = self.get_command_client(unsub.client_id(), unsub.venue());
1770                        if let Some(client) = client {
1771                            client.execute_unsubscribe(&unsub);
1772                        }
1773                    }
1774                    DeferredCommand::ExpireInstrument(instrument_id) => {
1775                        self.expire_option_chain_instrument(instrument_id);
1776                    }
1777                    DeferredCommand::ExpireSeries(series_id) => {
1778                        self.expire_series(series_id);
1779                    }
1780                }
1781            }
1782        }
1783    }
1784
1785    /// Proactively expires all instruments for a series and tears down the manager.
1786    ///
1787    /// `handle_instrument_expired` removes each instrument from the aggregator and pushes
1788    /// deferred unsubscribe commands. `teardown` then cancels the snapshot timer and clears
1789    /// the handler lists (the aggregator is already empty at that point).
1790    fn expire_series(&mut self, series_id: OptionSeriesId) {
1791        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1792            return;
1793        };
1794
1795        let instrument_ids: Vec<InstrumentId> = self
1796            .option_chain_instrument_index
1797            .iter()
1798            .filter(|(_, sid)| **sid == series_id)
1799            .map(|(id, _)| *id)
1800            .collect();
1801
1802        for id in &instrument_ids {
1803            self.option_chain_instrument_index.remove(id);
1804            manager_rc.borrow_mut().handle_instrument_expired(id);
1805        }
1806
1807        manager_rc.borrow_mut().teardown(&self.clock);
1808        self.option_chain_managers.remove(&series_id);
1809
1810        log::info!("Proactively torn down expired option chain {series_id}");
1811    }
1812
1813    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1814        if cmd.instrument_id.is_synthetic() {
1815            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1816        }
1817
1818        // Validate parent shape BEFORE mutating subscription state so a parse
1819        // failure leaves the engine bookkeeping unchanged.
1820        let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
1821
1822        self.book_deltas_subs.insert(cmd.instrument_id);
1823        if cmd.managed {
1824            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, parent)?;
1825        }
1826
1827        Ok(())
1828    }
1829
1830    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1831        if cmd.instrument_id.is_synthetic() {
1832            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1833        }
1834
1835        let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
1836
1837        self.book_depth10_subs.insert(cmd.instrument_id);
1838        if cmd.managed {
1839            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
1840        }
1841
1842        Ok(())
1843    }
1844
1845    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1846        if cmd.instrument_id.is_synthetic() {
1847            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1848        }
1849
1850        let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
1851
1852        let had_snapshots = self.has_book_snapshot_subscriptions(&cmd.instrument_id);
1853        let inserted = self.increment_book_snapshot_subscription(cmd, parent);
1854
1855        if inserted && !had_snapshots {
1856            // Always run setup so the depth10 handler is registered alongside
1857            // the deltas handler when this is the first snapshot for the id;
1858            // setup_book_updater is idempotent and the typed router dedups
1859            // overlapping subscribes.
1860            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
1861        }
1862
1863        if had_snapshots || self.book_deltas_subs.contains(&cmd.instrument_id) {
1864            return Ok(());
1865        }
1866
1867        if let Some(client_id) = cmd.client_id.as_ref()
1868            && self.external_clients.contains(client_id)
1869        {
1870            if self.config.debug {
1871                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
1872            }
1873            return Ok(());
1874        }
1875
1876        log::debug!(
1877            "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1878            cmd.instrument_id,
1879            cmd.client_id,
1880            cmd.venue,
1881        );
1882
1883        if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1884            let deltas_cmd = SubscribeBookDeltas::new(
1885                cmd.instrument_id,
1886                cmd.book_type,
1887                cmd.client_id,
1888                cmd.venue,
1889                UUID4::new(),
1890                cmd.ts_init,
1891                cmd.depth,
1892                true, // managed
1893                Some(cmd.command_id),
1894                cmd.params.clone(),
1895            );
1896            log::debug!(
1897                "Calling client.execute_subscribe for BookDeltas: {}",
1898                cmd.instrument_id
1899            );
1900            client.execute_subscribe(SubscribeCommand::BookDeltas(deltas_cmd));
1901        } else {
1902            log::error!(
1903                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1904                cmd.client_id,
1905                cmd.venue,
1906            );
1907        }
1908
1909        Ok(())
1910    }
1911
1912    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1913        match cmd.bar_type.aggregation_source() {
1914            AggregationSource::Internal => {
1915                if !self
1916                    .bar_aggregators
1917                    .contains_key(&bar_aggregator_key(cmd.bar_type, None))
1918                {
1919                    self.start_bar_aggregator(cmd.bar_type, None)?;
1920                }
1921            }
1922            AggregationSource::External => {
1923                if cmd.bar_type.instrument_id().is_synthetic() {
1924                    anyhow::bail!(
1925                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1926                    );
1927                }
1928            }
1929        }
1930
1931        Ok(())
1932    }
1933
1934    fn subscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
1935        let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
1936            log::error!(
1937                "Cannot subscribe to `QuoteTick` data for synthetic instrument {instrument_id}, not found",
1938            );
1939            return;
1940        };
1941
1942        if !self.subscribed_synthetic_quotes.insert(instrument_id) {
1943            return;
1944        }
1945
1946        for component_id in &synthetic.components {
1947            let synthetics = self.synthetic_quote_feeds.entry(*component_id).or_default();
1948            if !synthetics
1949                .iter()
1950                .any(|registered| registered.id == synthetic.id)
1951            {
1952                synthetics.push(synthetic.clone());
1953            }
1954        }
1955    }
1956
1957    fn subscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
1958        let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
1959            log::error!(
1960                "Cannot subscribe to `TradeTick` data for synthetic instrument {instrument_id}, not found",
1961            );
1962            return;
1963        };
1964
1965        if !self.subscribed_synthetic_trades.insert(instrument_id) {
1966            return;
1967        }
1968
1969        for component_id in &synthetic.components {
1970            let synthetics = self.synthetic_trade_feeds.entry(*component_id).or_default();
1971            if !synthetics
1972                .iter()
1973                .any(|registered| registered.id == synthetic.id)
1974            {
1975                synthetics.push(synthetic.clone());
1976            }
1977        }
1978    }
1979
1980    fn is_spread_quote_command(
1981        &self,
1982        instrument_id: InstrumentId,
1983        params: Option<&Params>,
1984    ) -> bool {
1985        if !params
1986            .and_then(|params| params.get_bool("aggregate_spread_quotes"))
1987            .unwrap_or(false)
1988        {
1989            return false;
1990        }
1991
1992        self.cache
1993            .borrow()
1994            .instrument(&instrument_id)
1995            .is_some_and(InstrumentAny::is_spread)
1996    }
1997
1998    fn subscribe_spread_quotes(&mut self, cmd: &SubscribeQuotes) {
1999        if self
2000            .spread_quote_aggregators
2001            .contains_key(&cmd.instrument_id)
2002        {
2003            log::warn!(
2004                "SpreadQuoteAggregator for {} is currently in use, subscription can't be started",
2005                cmd.instrument_id,
2006            );
2007            return;
2008        }
2009
2010        let Some(instrument) = self.cache.borrow().instrument(&cmd.instrument_id).cloned() else {
2011            log::error!(
2012                "Cannot create spread quote aggregator: no instrument found for {}",
2013                cmd.instrument_id,
2014            );
2015            return;
2016        };
2017        let Some(legs) = spread_instrument_legs(&instrument) else {
2018            log::error!(
2019                "Cannot create spread quote aggregator: invalid spread legs for {}",
2020                cmd.instrument_id,
2021            );
2022            return;
2023        };
2024
2025        if legs.len() <= 1 {
2026            log::error!(
2027                "Cannot create spread quote aggregator: spread instrument {} should have more than one leg",
2028                cmd.instrument_id,
2029            );
2030            return;
2031        }
2032
2033        let cache = self.cache.clone();
2034        let handler = Box::new(move |quote: QuoteTick| {
2035            if let Err(e) = cache.borrow_mut().add_quote(quote) {
2036                log_error_on_cache_insert(&e);
2037            }
2038            let topic = switchboard::get_quotes_topic(quote.instrument_id);
2039            msgbus::publish_quote(topic, &quote);
2040        });
2041        let aggregator = Rc::new(RefCell::new(SpreadQuoteAggregator::new(
2042            cmd.instrument_id,
2043            &legs,
2044            matches!(instrument, InstrumentAny::FuturesSpread(_)),
2045            instrument.price_precision(),
2046            instrument.size_precision(),
2047            handler,
2048            self.clock.clone(),
2049            false,
2050            spread_quote_update_interval_seconds(cmd.params.as_ref()),
2051            cmd.params
2052                .as_ref()
2053                .and_then(|params| params.get_u64("quote_build_delay"))
2054                .unwrap_or(0),
2055            None,
2056            None,
2057        )));
2058
2059        let mut handlers = Vec::with_capacity(legs.len());
2060        for (leg_id, _) in &legs {
2061            let topic = switchboard::get_quotes_topic(*leg_id);
2062            let handler = TypedHandler::new(SpreadQuoteHandler::new(
2063                &aggregator,
2064                cmd.instrument_id,
2065                *leg_id,
2066            ));
2067            msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
2068            handlers.push((*leg_id, handler));
2069        }
2070
2071        aggregator
2072            .borrow_mut()
2073            .start_timer(Some(aggregator.clone()));
2074        aggregator.borrow_mut().set_running(true);
2075        self.spread_quote_aggregators
2076            .insert(cmd.instrument_id, aggregator);
2077        self.spread_quote_handlers
2078            .insert(cmd.instrument_id, handlers);
2079
2080        for (leg_id, _) in legs {
2081            let subscribe = SubscribeQuotes::new(
2082                leg_id,
2083                cmd.client_id,
2084                cmd.venue,
2085                UUID4::new(),
2086                cmd.ts_init,
2087                Some(cmd.command_id),
2088                cmd.params.clone(),
2089            );
2090            self.execute(DataCommand::Subscribe(SubscribeCommand::Quotes(subscribe)));
2091        }
2092    }
2093
2094    fn unsubscribe_spread_quotes(&mut self, cmd: &UnsubscribeQuotes) {
2095        let Some(leg_ids) = self.stop_spread_quote_aggregator(cmd.instrument_id) else {
2096            return;
2097        };
2098
2099        for leg_id in leg_ids {
2100            let unsubscribe = UnsubscribeQuotes::new(
2101                leg_id,
2102                cmd.client_id,
2103                cmd.venue,
2104                UUID4::new(),
2105                cmd.ts_init,
2106                Some(cmd.command_id),
2107                cmd.params.clone(),
2108            );
2109            self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(
2110                unsubscribe,
2111            )));
2112        }
2113    }
2114
2115    fn stop_spread_quote_aggregator(
2116        &mut self,
2117        spread_instrument_id: InstrumentId,
2118    ) -> Option<Vec<InstrumentId>> {
2119        let Some(aggregator) = self.spread_quote_aggregators.remove(&spread_instrument_id) else {
2120            log::warn!(
2121                "Cannot stop spread quote aggregator: no aggregator to stop for {spread_instrument_id}",
2122            );
2123            return None;
2124        };
2125
2126        aggregator.borrow_mut().stop_timer();
2127        aggregator.borrow_mut().set_running(false);
2128
2129        let handlers = self
2130            .spread_quote_handlers
2131            .remove(&spread_instrument_id)
2132            .unwrap_or_default();
2133        let mut leg_ids = Vec::with_capacity(handlers.len());
2134        for (leg_id, handler) in handlers {
2135            let topic = switchboard::get_quotes_topic(leg_id);
2136            msgbus::unsubscribe_quotes(topic.into(), &handler);
2137            leg_ids.push(leg_id);
2138        }
2139
2140        Some(leg_ids)
2141    }
2142
2143    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> bool {
2144        if !self.book_deltas_subs.contains(&cmd.instrument_id) {
2145            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
2146            return false;
2147        }
2148
2149        self.book_deltas_subs.remove(&cmd.instrument_id);
2150        self.maintain_book_updater(&cmd.instrument_id);
2151
2152        // Snapshot subscriptions reuse the deltas feed.
2153        // Keep the client subscribed until the last snapshot consumer is gone.
2154        !self.has_book_snapshot_subscriptions(&cmd.instrument_id)
2155    }
2156
2157    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> bool {
2158        if !self.book_depth10_subs.contains(&cmd.instrument_id) {
2159            log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
2160            return false;
2161        }
2162
2163        self.book_depth10_subs.remove(&cmd.instrument_id);
2164        self.maintain_book_updater(&cmd.instrument_id);
2165
2166        true
2167    }
2168
2169    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
2170        match self.decrement_book_snapshot_subscription(cmd.instrument_id, cmd.interval_ms) {
2171            BookSnapshotUnsubscribeResult::NotSubscribed => {
2172                log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
2173                return;
2174            }
2175            BookSnapshotUnsubscribeResult::Decremented => return,
2176            BookSnapshotUnsubscribeResult::Removed => {}
2177        }
2178
2179        if self.has_book_snapshot_subscriptions(&cmd.instrument_id) {
2180            return;
2181        }
2182
2183        self.maintain_book_updater(&cmd.instrument_id);
2184
2185        if self.book_deltas_subs.contains(&cmd.instrument_id) {
2186            return;
2187        }
2188
2189        if let Some(client_id) = cmd.client_id.as_ref()
2190            && self.external_clients.contains(client_id)
2191        {
2192            return;
2193        }
2194
2195        if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
2196            let deltas_cmd = UnsubscribeBookDeltas::new(
2197                cmd.instrument_id,
2198                cmd.client_id,
2199                cmd.venue,
2200                UUID4::new(),
2201                cmd.ts_init,
2202                Some(cmd.command_id),
2203                cmd.params.clone(),
2204            );
2205            client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
2206        }
2207    }
2208
2209    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
2210        let bar_type = cmd.bar_type;
2211
2212        // Don't remove aggregator if other exact-topic subscribers still exist
2213        let topic = switchboard::get_bars_topic(bar_type.standard());
2214        if msgbus::exact_subscriber_count_bars(topic) > 0 {
2215            return;
2216        }
2217
2218        if self
2219            .bar_aggregators
2220            .contains_key(&bar_aggregator_key(bar_type, None))
2221            && let Err(e) = self.stop_bar_aggregator(bar_type, None)
2222        {
2223            log::error!("Error stopping bar aggregator for {bar_type}: {e}");
2224        }
2225
2226        // After stopping a composite, check if the source aggregator is now orphaned
2227        if bar_type.is_composite() {
2228            let source_type = bar_type.composite();
2229            let source_topic = switchboard::get_bars_topic(source_type);
2230            if msgbus::exact_subscriber_count_bars(source_topic) == 0
2231                && self
2232                    .bar_aggregators
2233                    .contains_key(&bar_aggregator_key(source_type, None))
2234                && let Err(e) = self.stop_bar_aggregator(source_type, None)
2235            {
2236                log::error!("Error stopping source bar aggregator for {source_type}: {e}");
2237            }
2238        }
2239    }
2240
2241    fn unsubscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
2242        if !self.subscribed_synthetic_quotes.remove(&instrument_id) {
2243            log::warn!("Cannot unsubscribe from synthetic `QuoteTick` data: not subscribed");
2244            return;
2245        }
2246
2247        self.synthetic_quote_feeds.retain(|_, synthetics| {
2248            synthetics.retain(|synthetic| synthetic.id != instrument_id);
2249            !synthetics.is_empty()
2250        });
2251    }
2252
2253    fn unsubscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
2254        if !self.subscribed_synthetic_trades.remove(&instrument_id) {
2255            log::warn!("Cannot unsubscribe from synthetic `TradeTick` data: not subscribed");
2256            return;
2257        }
2258
2259        self.synthetic_trade_feeds.retain(|_, synthetics| {
2260            synthetics.retain(|synthetic| synthetic.id != instrument_id);
2261            !synthetics.is_empty()
2262        });
2263    }
2264
2265    fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
2266        let series_id = cmd.series_id;
2267
2268        // Handle edits to existing subscriptions by tearing down and re-setting up the OptionChainManager.
2269        if let Some(old) = self.option_chain_managers.remove(&series_id) {
2270            log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
2271            let all_ids = old.borrow().all_instrument_ids();
2272            let old_venue = old.borrow().venue();
2273            old.borrow_mut().teardown(&self.clock);
2274            self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
2275        }
2276
2277        // Drain any stale pending forward price requests for this series
2278        self.pending_option_chain_requests
2279            .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
2280
2281        // For ATM-based strike ranges, request forward prices from the adapter
2282        // to enable instant bootstrap without waiting for the first WebSocket tick.
2283        if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
2284            // Extract client_id first to avoid borrow conflicts
2285            let resolved_client_id = self
2286                .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
2287                .map(|c| c.client_id);
2288
2289            if let Some(client_id) = resolved_client_id {
2290                let request_id = UUID4::new();
2291                let ts_init = self.clock.borrow().timestamp_ns();
2292
2293                // Pick any one option instrument at this expiry from cache
2294                // to enable single-instrument forward price fetch (1 HTTP call)
2295                let sample_instrument_id = {
2296                    let cache = self.cache.borrow();
2297                    cache
2298                        .instruments(&series_id.venue, Some(&series_id.underlying))
2299                        .iter()
2300                        .find(|i| {
2301                            i.expiration_ns() == Some(series_id.expiration_ns)
2302                                && i.settlement_currency().code == series_id.settlement_currency
2303                        })
2304                        .map(|i| i.id())
2305                };
2306
2307                let request = RequestForwardPrices::new(
2308                    series_id.venue,
2309                    series_id.underlying,
2310                    sample_instrument_id,
2311                    Some(client_id),
2312                    request_id,
2313                    ts_init,
2314                    None,
2315                );
2316
2317                self.pending_option_chain_requests
2318                    .insert(request_id, cmd.clone());
2319
2320                let req_cmd = RequestCommand::ForwardPrices(request);
2321                if let Err(e) = self.execute_request(req_cmd) {
2322                    log::warn!("Failed to request forward prices for {series_id}: {e}");
2323                    let cmd = self
2324                        .pending_option_chain_requests
2325                        .remove(&request_id)
2326                        .expect("just inserted");
2327                    self.create_option_chain_manager(&cmd, None);
2328                }
2329
2330                return;
2331            }
2332        }
2333
2334        self.create_option_chain_manager(cmd, None);
2335    }
2336
2337    /// Creates and stores an `OptionChainManager` for the given subscription.
2338    fn create_option_chain_manager(
2339        &mut self,
2340        cmd: &SubscribeOptionChain,
2341        initial_atm_price: Option<Price>,
2342    ) {
2343        let series_id = cmd.series_id;
2344        let cache = self.cache.clone();
2345        let clock = self.clock.clone();
2346        let priority = self.msgbus_priority;
2347        let deferred_cmd_queue = self.deferred_cmd_queue.clone();
2348
2349        let manager_rc = {
2350            let client = self.get_command_client(cmd.client_id.as_ref(), Some(&series_id.venue));
2351            OptionChainManager::create_and_setup(
2352                series_id,
2353                &cache,
2354                cmd,
2355                &clock,
2356                priority,
2357                client,
2358                initial_atm_price,
2359                deferred_cmd_queue,
2360            )
2361        };
2362
2363        // Index all instruments for reverse lookup
2364        for id in manager_rc.borrow().all_instrument_ids() {
2365            self.option_chain_instrument_index.insert(id, series_id);
2366        }
2367
2368        self.option_chain_managers.insert(series_id, manager_rc);
2369    }
2370
2371    fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
2372        let series_id = cmd.series_id;
2373
2374        let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
2375            log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
2376            return;
2377        };
2378
2379        // Extract info before teardown
2380        let all_ids = manager_rc.borrow().all_instrument_ids();
2381        let venue = manager_rc.borrow().venue();
2382
2383        // Remove all instruments from reverse index
2384        for id in &all_ids {
2385            self.option_chain_instrument_index.remove(id);
2386        }
2387
2388        manager_rc.borrow_mut().teardown(&self.clock);
2389
2390        // Forward wire-level unsubscribes to the data client
2391        self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
2392
2393        log::info!("Unsubscribed option chain for {series_id}");
2394    }
2395
2396    /// Forwards wire-level unsubscribe commands for all option chain instruments.
2397    fn forward_option_chain_unsubscribes(
2398        &mut self,
2399        instrument_ids: &[InstrumentId],
2400        venue: Venue,
2401        client_id: Option<ClientId>,
2402    ) {
2403        let ts_init = self.clock.borrow().timestamp_ns();
2404
2405        let Some(client) = self.get_command_client(client_id.as_ref(), Some(&venue)) else {
2406            log::error!(
2407                "Cannot forward option chain unsubscribes: no client found for venue={venue}",
2408            );
2409            return;
2410        };
2411
2412        for instrument_id in instrument_ids {
2413            client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
2414                *instrument_id,
2415                client_id,
2416                Some(venue),
2417                UUID4::new(),
2418                ts_init,
2419                None,
2420                None,
2421            )));
2422            client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
2423                UnsubscribeOptionGreeks::new(
2424                    *instrument_id,
2425                    client_id,
2426                    Some(venue),
2427                    UUID4::new(),
2428                    ts_init,
2429                    None,
2430                    None,
2431                ),
2432            ));
2433            client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
2434                UnsubscribeInstrumentStatus::new(
2435                    *instrument_id,
2436                    client_id,
2437                    Some(venue),
2438                    UUID4::new(),
2439                    ts_init,
2440                    None,
2441                    None,
2442                ),
2443            ));
2444        }
2445    }
2446
2447    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId) {
2448        // Determine which per-underlying books this subscription touched, then
2449        // for each book check whether any other active subscription still
2450        // wants it before unsubscribing/dropping the shared BookUpdater.
2451        //
2452        // The presence of a memoized expansion identifies a parent teardown.
2453        // Concrete subscriptions touch only the exact id.
2454        let is_parent = self
2455            .book_deltas_parent_expansions
2456            .contains_key(instrument_id)
2457            || self
2458                .book_depth10_parent_expansions
2459                .contains_key(instrument_id);
2460        let target_ids: Vec<InstrumentId> = if is_parent {
2461            let mut set: AHashSet<InstrumentId> = AHashSet::new();
2462
2463            if let Some(expansion) = self.book_deltas_parent_expansions.get(instrument_id) {
2464                set.extend(expansion.iter().copied());
2465            }
2466
2467            if let Some(expansion) = self.book_depth10_parent_expansions.get(instrument_id) {
2468                set.extend(expansion.iter().copied());
2469            }
2470
2471            if set.is_empty() {
2472                return;
2473            }
2474
2475            set.into_iter().collect()
2476        } else {
2477            vec![*instrument_id]
2478        };
2479
2480        if is_parent {
2481            // Each parent kind (deltas / depth10 / snapshots) writes its own
2482            // memo via setup_book_updater. Keep each memo alive while any
2483            // sibling subscription that drives the same handler kind remains
2484            // active for this parent id.
2485            let parent_still_needs_deltas = self.book_deltas_subs.contains(instrument_id)
2486                || self.book_depth10_subs.contains(instrument_id)
2487                || self.has_book_snapshot_subscriptions(instrument_id);
2488            let parent_still_needs_depth10 = self.book_depth10_subs.contains(instrument_id)
2489                || self.has_book_snapshot_subscriptions(instrument_id);
2490
2491            if !parent_still_needs_deltas {
2492                self.book_deltas_parent_expansions.remove(instrument_id);
2493            }
2494
2495            if !parent_still_needs_depth10 {
2496                self.book_depth10_parent_expansions.remove(instrument_id);
2497            }
2498        }
2499
2500        for target_id in &target_ids {
2501            let wants_deltas = self.is_underlying_wanted_for_deltas(target_id);
2502            let wants_depth10 = self.is_underlying_wanted_for_depth10(target_id);
2503
2504            let Some(updater) = self.book_updaters.get(target_id).cloned() else {
2505                continue;
2506            };
2507
2508            let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
2509            let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
2510
2511            if !wants_deltas {
2512                let topic = switchboard::get_book_deltas_topic(*target_id);
2513                msgbus::unsubscribe_book_deltas(topic.into(), &deltas_handler);
2514            }
2515
2516            if !wants_depth10 {
2517                let topic = switchboard::get_book_depth10_topic(*target_id);
2518                msgbus::unsubscribe_book_depth10(topic.into(), &depth_handler);
2519            }
2520
2521            if !wants_deltas && !wants_depth10 {
2522                self.book_updaters.remove(target_id);
2523                log::debug!("Removed BookUpdater for instrument ID {target_id}");
2524            }
2525        }
2526    }
2527
2528    fn has_book_snapshot_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
2529        self.book_snapshot_counts
2530            .keys()
2531            .any(|(id, _)| id == instrument_id)
2532    }
2533
2534    fn increment_book_snapshot_subscription(
2535        &mut self,
2536        cmd: &SubscribeBookSnapshots,
2537        parent: Option<(Ustr, InstrumentClass)>,
2538    ) -> bool {
2539        let key = (cmd.instrument_id, cmd.interval_ms);
2540
2541        if let Some(count) = self.book_snapshot_counts.get_mut(&key) {
2542            *count += 1;
2543            return false;
2544        }
2545
2546        self.book_snapshot_counts.insert(key, 1);
2547
2548        let snapshot_infos = if let Some(snapshot_infos) = self.book_intervals.get(&cmd.interval_ms)
2549        {
2550            snapshot_infos.clone()
2551        } else {
2552            let snapshot_infos = Rc::new(RefCell::new(IndexMap::new()));
2553            self.book_intervals
2554                .insert(cmd.interval_ms, snapshot_infos.clone());
2555            self.schedule_book_snapshotter(cmd.interval_ms, snapshot_infos.clone());
2556            snapshot_infos
2557        };
2558
2559        let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
2560        let snap_info = BookSnapshotInfo {
2561            instrument_id: cmd.instrument_id,
2562            venue: cmd.instrument_id.venue,
2563            parent,
2564            topic,
2565            interval_ms: cmd.interval_ms,
2566        };
2567
2568        snapshot_infos
2569            .borrow_mut()
2570            .insert(cmd.instrument_id, snap_info);
2571
2572        true
2573    }
2574
2575    fn decrement_book_snapshot_subscription(
2576        &mut self,
2577        instrument_id: InstrumentId,
2578        interval_ms: NonZeroUsize,
2579    ) -> BookSnapshotUnsubscribeResult {
2580        let key = (instrument_id, interval_ms);
2581
2582        let Some(count) = self.book_snapshot_counts.get_mut(&key) else {
2583            return BookSnapshotUnsubscribeResult::NotSubscribed;
2584        };
2585
2586        if *count > 1 {
2587            *count -= 1;
2588            return BookSnapshotUnsubscribeResult::Decremented;
2589        }
2590
2591        self.book_snapshot_counts.shift_remove(&key);
2592
2593        let remove_interval = if let Some(snapshot_infos) = self.book_intervals.get(&interval_ms) {
2594            let mut snapshot_infos = snapshot_infos.borrow_mut();
2595            snapshot_infos.shift_remove(&instrument_id);
2596            snapshot_infos.is_empty()
2597        } else {
2598            false
2599        };
2600
2601        if remove_interval {
2602            self.book_intervals.remove(&interval_ms);
2603
2604            if let Some(snapshotter) = self.book_snapshotters.remove(&interval_ms) {
2605                let timer_name = snapshotter.timer_name;
2606                let mut clock = self.clock.borrow_mut();
2607                if clock.timer_exists(&timer_name) {
2608                    clock.cancel_timer(&timer_name);
2609                }
2610            }
2611        }
2612
2613        BookSnapshotUnsubscribeResult::Removed
2614    }
2615
2616    fn schedule_book_snapshotter(
2617        &mut self,
2618        interval_ms: NonZeroUsize,
2619        snapshot_infos: BookSnapshotInfos,
2620    ) {
2621        let interval_ns = millis_to_nanos_unchecked(interval_ms.get() as f64);
2622        let now_ns = self.clock.borrow().timestamp_ns().as_u64();
2623        let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
2624
2625        let snapshotter = Rc::new(BookSnapshotter::new(
2626            interval_ms,
2627            snapshot_infos,
2628            self.cache.clone(),
2629        ));
2630        let timer_name = snapshotter.timer_name;
2631        let snapshotter_callback = snapshotter.clone();
2632        let callback_fn: Rc<dyn Fn(TimeEvent)> =
2633            Rc::new(move |event| snapshotter_callback.snapshot(event));
2634        let callback = TimeEventCallback::from(callback_fn);
2635
2636        self.clock
2637            .borrow_mut()
2638            .set_timer_ns(
2639                &timer_name,
2640                interval_ns,
2641                Some(start_time_ns.into()),
2642                None,
2643                Some(callback),
2644                None,
2645                None,
2646            )
2647            .expect(FAILED);
2648
2649        self.book_snapshotters.insert(interval_ms, snapshotter);
2650    }
2651
2652    fn handle_instrument_response(&self, instrument: InstrumentAny) {
2653        let mut cache = self.cache.as_ref().borrow_mut();
2654        if let Err(e) = cache.add_instrument(instrument) {
2655            log_error_on_cache_insert(&e);
2656        }
2657    }
2658
2659    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
2660        // TODO: Improve by adding bulk update methods to cache and database
2661        let mut cache = self.cache.as_ref().borrow_mut();
2662        for instrument in instruments {
2663            if let Err(e) = cache.add_instrument(instrument.clone()) {
2664                log_error_on_cache_insert(&e);
2665            }
2666        }
2667    }
2668
2669    fn handle_quotes(&self, quotes: &[QuoteTick]) {
2670        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
2671            log_error_on_cache_insert(&e);
2672        }
2673    }
2674
2675    fn handle_trades(&self, trades: &[TradeTick]) {
2676        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
2677            log_error_on_cache_insert(&e);
2678        }
2679    }
2680
2681    fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
2682        if let Err(e) = self
2683            .cache
2684            .as_ref()
2685            .borrow_mut()
2686            .add_funding_rates(funding_rates)
2687        {
2688            log_error_on_cache_insert(&e);
2689        }
2690    }
2691
2692    fn handle_bars(&self, bars: &[Bar]) {
2693        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
2694            log_error_on_cache_insert(&e);
2695        }
2696    }
2697
2698    fn handle_book_response(&self, book: &OrderBook) {
2699        log::debug!("Adding order book {} to cache", book.instrument_id);
2700
2701        if let Err(e) = self
2702            .cache
2703            .as_ref()
2704            .borrow_mut()
2705            .add_order_book(book.clone())
2706        {
2707            log_error_on_cache_insert(&e);
2708        }
2709    }
2710
2711    /// Handles a `ForwardPricesResponse` by extracting the forward price
2712    /// for the pending option chain and creating the manager with instant bootstrap.
2713    fn handle_forward_prices_response(
2714        &mut self,
2715        correlation_id: &UUID4,
2716        resp: &ForwardPricesResponse,
2717    ) {
2718        let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
2719            log::debug!(
2720                "No pending option chain request for correlation_id={correlation_id}, ignoring"
2721            );
2722            return;
2723        };
2724
2725        let series_id = cmd.series_id;
2726
2727        // Find a forward price that matches an instrument in this series.
2728        // We look up each forward price instrument in the cache to match by expiry and currency.
2729        let cache = self.cache.borrow();
2730        let mut best_price: Option<Price> = None;
2731
2732        for fp in &resp.data {
2733            // Check if any cached instrument with this id belongs to our series
2734            if let Some(instrument) = cache.instrument(&fp.instrument_id)
2735                && let Some(expiration) = instrument.expiration_ns()
2736                && expiration == series_id.expiration_ns
2737                && instrument.settlement_currency().code == series_id.settlement_currency
2738            {
2739                match Price::from_decimal(fp.forward_price) {
2740                    Ok(price) => best_price = Some(price),
2741                    Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
2742                }
2743                break;
2744            }
2745        }
2746        drop(cache);
2747
2748        if let Some(price) = best_price {
2749            log::info!("Forward price for {series_id}: {price} (instant bootstrap)");
2750        } else {
2751            log::info!(
2752                "No matching forward price found for {series_id}, will bootstrap from live data",
2753            );
2754        }
2755
2756        self.create_option_chain_manager(&cmd, best_price);
2757    }
2758
2759    fn setup_book_updater(
2760        &mut self,
2761        instrument_id: &InstrumentId,
2762        book_type: BookType,
2763        only_deltas: bool,
2764        parent: Option<(Ustr, InstrumentClass)>,
2765    ) -> anyhow::Result<()> {
2766        // One BookUpdater per cache book (keyed by per-underlying id), shared
2767        // across overlapping subscriptions. Parent subs are expanded into
2768        // their underlyings here; the expansion is memoized so unsubscribe
2769        // mirrors the exact set even if the cache composition changes later.
2770        let target_ids: Vec<InstrumentId> = if let Some((root, class)) = parent {
2771            self.cache
2772                .borrow()
2773                .instruments_by_parent(&instrument_id.venue, &root, class)
2774                .iter()
2775                .map(|i| i.id())
2776                .collect()
2777        } else {
2778            vec![*instrument_id]
2779        };
2780
2781        if parent.is_some() {
2782            self.book_deltas_parent_expansions
2783                .insert(*instrument_id, target_ids.clone());
2784
2785            if !only_deltas {
2786                self.book_depth10_parent_expansions
2787                    .insert(*instrument_id, target_ids.clone());
2788            }
2789        }
2790
2791        {
2792            let mut cache = self.cache.borrow_mut();
2793            for target_id in &target_ids {
2794                if !cache.has_order_book(target_id) {
2795                    let book = OrderBook::new(*target_id, book_type);
2796                    log::debug!("Created {book}");
2797                    cache.add_order_book(book)?;
2798                }
2799            }
2800        }
2801
2802        for target_id in &target_ids {
2803            let updater = self
2804                .book_updaters
2805                .entry(*target_id)
2806                .or_insert_with(|| {
2807                    Rc::new(BookUpdater::new(
2808                        target_id,
2809                        self.cache.clone(),
2810                        self.config.emit_quotes_from_book,
2811                    ))
2812                })
2813                .clone();
2814
2815            // Subscribe handler to the literal per-underlying topic. The
2816            // typed router dedups (pattern, handler_id) pairs, so overlapping
2817            // composite + exact subscriptions register exactly one handler
2818            // entry per book and a single delta apply per publish.
2819            let deltas_topic = switchboard::get_book_deltas_topic(*target_id);
2820            let deltas_handler = TypedHandler::new(updater.clone());
2821            msgbus::subscribe_book_deltas(
2822                deltas_topic.into(),
2823                deltas_handler,
2824                Some(self.msgbus_priority),
2825            );
2826
2827            if !only_deltas {
2828                let depth_topic = switchboard::get_book_depth10_topic(*target_id);
2829                let depth_handler = TypedHandler::new(updater);
2830                msgbus::subscribe_book_depth10(
2831                    depth_topic.into(),
2832                    depth_handler,
2833                    Some(self.msgbus_priority),
2834                );
2835            }
2836        }
2837
2838        Ok(())
2839    }
2840
2841    fn is_underlying_wanted_for_deltas(&self, target_id: &InstrumentId) -> bool {
2842        // Any of {deltas, depth10, snapshots} subs causes setup_book_updater to
2843        // subscribe the deltas handler (depth10/snapshots use only_deltas=false),
2844        // so all three keep the per-underlying deltas handler alive.
2845        if self.book_deltas_subs.contains(target_id)
2846            || self.book_depth10_subs.contains(target_id)
2847            || self.has_book_snapshot_subscriptions(target_id)
2848        {
2849            return true;
2850        }
2851        self.book_deltas_parent_expansions
2852            .values()
2853            .any(|expansion| expansion.contains(target_id))
2854    }
2855
2856    fn is_underlying_wanted_for_depth10(&self, target_id: &InstrumentId) -> bool {
2857        // Snapshots use only_deltas=false, so they drive the depth10 handler
2858        // as well as the deltas handler.
2859        if self.book_depth10_subs.contains(target_id)
2860            || self.has_book_snapshot_subscriptions(target_id)
2861        {
2862            return true;
2863        }
2864        self.book_depth10_parent_expansions
2865            .values()
2866            .any(|expansion| expansion.contains(target_id))
2867    }
2868
2869    fn create_bar_aggregator(
2870        &self,
2871        instrument: &InstrumentAny,
2872        bar_type: BarType,
2873    ) -> Box<dyn BarAggregator> {
2874        let cache = self.cache.clone();
2875        let validate_sequence = self.config.validate_data_sequence;
2876
2877        let handler = move |bar: Bar| {
2878            process_engine_bar(&cache, validate_sequence, true, bar);
2879        };
2880
2881        let clock = self.clock.clone();
2882        let config = self.config.clone();
2883
2884        let price_precision = instrument.price_precision();
2885        let size_precision = instrument.size_precision();
2886
2887        if bar_type.spec().is_time_aggregated() {
2888            let time_bars_origin_offset = config
2889                .time_bars_origin_offset
2890                .get(&bar_type.spec().aggregation)
2891                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
2892
2893            Box::new(TimeBarAggregator::new(
2894                bar_type,
2895                price_precision,
2896                size_precision,
2897                clock,
2898                handler,
2899                config.time_bars_build_with_no_updates,
2900                config.time_bars_timestamp_on_close,
2901                config.time_bars_interval_type,
2902                time_bars_origin_offset,
2903                config.time_bars_build_delay,
2904                config.time_bars_skip_first_non_full_bar,
2905            ))
2906        } else {
2907            match bar_type.spec().aggregation {
2908                BarAggregation::Tick => Box::new(TickBarAggregator::new(
2909                    bar_type,
2910                    price_precision,
2911                    size_precision,
2912                    handler,
2913                )) as Box<dyn BarAggregator>,
2914                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
2915                    bar_type,
2916                    price_precision,
2917                    size_precision,
2918                    handler,
2919                )) as Box<dyn BarAggregator>,
2920                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
2921                    bar_type,
2922                    price_precision,
2923                    size_precision,
2924                    handler,
2925                )) as Box<dyn BarAggregator>,
2926                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
2927                    bar_type,
2928                    price_precision,
2929                    size_precision,
2930                    handler,
2931                )) as Box<dyn BarAggregator>,
2932                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
2933                    bar_type,
2934                    price_precision,
2935                    size_precision,
2936                    handler,
2937                )) as Box<dyn BarAggregator>,
2938                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
2939                    bar_type,
2940                    price_precision,
2941                    size_precision,
2942                    handler,
2943                )) as Box<dyn BarAggregator>,
2944                BarAggregation::Value => Box::new(ValueBarAggregator::new(
2945                    bar_type,
2946                    price_precision,
2947                    size_precision,
2948                    handler,
2949                )) as Box<dyn BarAggregator>,
2950                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
2951                    bar_type,
2952                    price_precision,
2953                    size_precision,
2954                    handler,
2955                )) as Box<dyn BarAggregator>,
2956                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
2957                    bar_type,
2958                    price_precision,
2959                    size_precision,
2960                    handler,
2961                )) as Box<dyn BarAggregator>,
2962                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
2963                    bar_type,
2964                    price_precision,
2965                    size_precision,
2966                    instrument.price_increment(),
2967                    handler,
2968                )) as Box<dyn BarAggregator>,
2969                other => unreachable!(
2970                    "Unsupported internal bar aggregation dispatch for {other:?}; update `create_bar_aggregator`"
2971                ),
2972            }
2973        }
2974    }
2975
2976    // Live-only path: callers must pass `request_id = None`. Handler IDs key
2977    // on `bar_type.standard()`; request-scoped aggregators (#5) will consume
2978    // response data directly without subscribing to the msgbus
2979    fn start_bar_aggregator(
2980        &mut self,
2981        bar_type: BarType,
2982        request_id: Option<UUID4>,
2983    ) -> anyhow::Result<()> {
2984        // Get the instrument for this bar type
2985        let instrument = {
2986            let cache = self.cache.borrow();
2987            cache
2988                .instrument(&bar_type.instrument_id())
2989                .ok_or_else(|| {
2990                    anyhow::anyhow!(
2991                        "Cannot start bar aggregation: no instrument found for {}",
2992                        bar_type.instrument_id(),
2993                    )
2994                })?
2995                .clone()
2996        };
2997
2998        let key = bar_aggregator_key(bar_type, request_id);
2999        let bar_type_std = bar_type.standard();
3000
3001        // Create or retrieve aggregator in Rc<RefCell>
3002        let aggregator = if let Some(rc) = self.bar_aggregators.get(&key) {
3003            rc.clone()
3004        } else {
3005            let agg = self.create_bar_aggregator(&instrument, bar_type);
3006            let rc = Rc::new(RefCell::new(agg));
3007            self.bar_aggregators.insert(key, rc.clone());
3008            rc
3009        };
3010
3011        // Subscribe to underlying data topics
3012        let mut subscriptions = Vec::new();
3013
3014        if bar_type.is_composite() {
3015            let topic = switchboard::get_bars_topic(bar_type.composite());
3016            let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_type_std));
3017            msgbus::subscribe_bars(topic.into(), handler.clone(), None);
3018            subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
3019        } else if bar_type.spec().price_type == PriceType::Last {
3020            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
3021            let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_type_std));
3022            msgbus::subscribe_trades(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
3023            subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
3024        } else {
3025            // Warn if imbalance/runs aggregation is wired to quotes (needs aggressor_side from trades)
3026            if matches!(
3027                bar_type.spec().aggregation,
3028                BarAggregation::TickImbalance
3029                    | BarAggregation::VolumeImbalance
3030                    | BarAggregation::ValueImbalance
3031                    | BarAggregation::TickRuns
3032                    | BarAggregation::VolumeRuns
3033                    | BarAggregation::ValueRuns
3034            ) {
3035                log::warn!(
3036                    "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
3037                     data with `aggressor_side`, but `price_type` is not LAST so it will receive \
3038                     quote data: bars will not emit correctly",
3039                );
3040            }
3041
3042            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
3043            let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_type_std));
3044            msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
3045            subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
3046        }
3047
3048        self.bar_aggregator_handlers.insert(key, subscriptions);
3049
3050        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
3051        self.setup_bar_aggregator(bar_type, false, request_id)?;
3052
3053        aggregator.borrow_mut().set_is_running(true);
3054
3055        Ok(())
3056    }
3057
3058    /// Sets up a bar aggregator, matching Cython `_setup_bar_aggregator` logic.
3059    ///
3060    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
3061    fn setup_bar_aggregator(
3062        &self,
3063        bar_type: BarType,
3064        historical: bool,
3065        request_id: Option<UUID4>,
3066    ) -> anyhow::Result<()> {
3067        let key = bar_aggregator_key(bar_type, request_id);
3068        let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
3069            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
3070        })?;
3071
3072        // Set historical mode and handler
3073        let cache = self.cache.clone();
3074        let validate_sequence = self.config.validate_data_sequence;
3075        let publish = !historical;
3076        let handler: Box<dyn FnMut(Bar)> = Box::new(move |bar: Bar| {
3077            process_engine_bar(&cache, validate_sequence, publish, bar);
3078        });
3079
3080        aggregator
3081            .borrow_mut()
3082            .set_historical_mode(historical, handler);
3083
3084        // For TimeBarAggregator, set clock and start timer
3085        if bar_type.spec().is_time_aggregated() {
3086            use nautilus_common::clock::TestClock;
3087
3088            if historical {
3089                // Each aggregator gets its own independent clock
3090                let test_clock = Rc::new(RefCell::new(TestClock::new()));
3091                aggregator.borrow_mut().set_clock(test_clock);
3092                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
3093                // Store weak reference so start_timer can use it when called later
3094                let aggregator_weak = Rc::downgrade(aggregator);
3095                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
3096            } else {
3097                aggregator.borrow_mut().set_clock(self.clock.clone());
3098                aggregator
3099                    .borrow_mut()
3100                    .start_timer(Some(aggregator.clone()));
3101            }
3102        }
3103
3104        Ok(())
3105    }
3106
3107    fn stop_bar_aggregator(
3108        &mut self,
3109        bar_type: BarType,
3110        request_id: Option<UUID4>,
3111    ) -> anyhow::Result<()> {
3112        let key = bar_aggregator_key(bar_type, request_id);
3113        let aggregator = self.bar_aggregators.shift_remove(&key).ok_or_else(|| {
3114            anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
3115        })?;
3116
3117        aggregator.borrow_mut().stop();
3118
3119        // Unsubscribe any registered message handlers
3120        if let Some(subs) = self.bar_aggregator_handlers.remove(&key) {
3121            for sub in subs {
3122                match sub {
3123                    BarAggregatorSubscription::Bar { topic, handler } => {
3124                        msgbus::unsubscribe_bars(topic.into(), &handler);
3125                    }
3126                    BarAggregatorSubscription::Trade { topic, handler } => {
3127                        msgbus::unsubscribe_trades(topic.into(), &handler);
3128                    }
3129                    BarAggregatorSubscription::Quote { topic, handler } => {
3130                        msgbus::unsubscribe_quotes(topic.into(), &handler);
3131                    }
3132                }
3133            }
3134        }
3135
3136        Ok(())
3137    }
3138}
3139
3140// Resolves parent expansion components for a book subscription command.
3141//
3142// Returns Ok(Some((root, class))) when params carries PARAMS_IS_PARENT=true and
3143// the instrument_id parses as a recognised <root>.<class> shape; Ok(None) for
3144// concrete (non-parent) subscriptions; Err when the caller asserts a parent
3145// subscription but the id cannot be parsed, so subscribe entries can reject up
3146// front before touching state.
3147fn resolve_parent_components(
3148    instrument_id: &InstrumentId,
3149    params: Option<&Params>,
3150) -> anyhow::Result<Option<(Ustr, InstrumentClass)>> {
3151    if !is_parent_subscription(params) {
3152        return Ok(None);
3153    }
3154    let Some((root, class)) = instrument_id.parse_parent_components() else {
3155        anyhow::bail!(
3156            "Cannot expand parent subscription for {instrument_id}: \
3157             symbol does not parse as `<root>.<class>` with a recognised class suffix"
3158        );
3159    };
3160    Ok(Some((Ustr::from(root), class)))
3161}
3162
3163fn spread_quote_update_interval_seconds(params: Option<&Params>) -> Option<u64> {
3164    match params.and_then(|params| params.get("update_interval_seconds")) {
3165        Some(value) if value.is_null() => None,
3166        Some(value) => value.as_u64().filter(|interval| *interval > 0),
3167        None => Some(1),
3168    }
3169}
3170
3171fn spread_instrument_legs(instrument: &InstrumentAny) -> Option<Vec<(InstrumentId, i64)>> {
3172    if !instrument.is_spread() {
3173        return None;
3174    }
3175
3176    let instrument_id = instrument.id();
3177    let symbol = instrument_id.symbol.as_str();
3178    if !symbol.contains(GENERIC_SPREAD_ID_SEPARATOR) {
3179        return Some(vec![(instrument_id, 1)]);
3180    }
3181
3182    symbol
3183        .split(GENERIC_SPREAD_ID_SEPARATOR)
3184        .map(|component| parse_spread_leg(component, instrument_id.venue))
3185        .collect()
3186}
3187
3188fn parse_spread_leg(component: &str, venue: Venue) -> Option<(InstrumentId, i64)> {
3189    if let Some(rest) = component.strip_prefix("((") {
3190        let (ratio, symbol) = rest.split_once("))")?;
3191        return parse_spread_leg_parts(ratio, symbol, venue, -1);
3192    }
3193
3194    let rest = component.strip_prefix('(')?;
3195    let (ratio, symbol) = rest.split_once(')')?;
3196    parse_spread_leg_parts(ratio, symbol, venue, 1)
3197}
3198
3199fn parse_spread_leg_parts(
3200    ratio: &str,
3201    symbol: &str,
3202    venue: Venue,
3203    sign: i64,
3204) -> Option<(InstrumentId, i64)> {
3205    if symbol.is_empty() {
3206        return None;
3207    }
3208
3209    let ratio = ratio.parse::<i64>().ok()?.checked_mul(sign)?;
3210    if ratio == 0 {
3211        return None;
3212    }
3213
3214    Some((InstrumentId::new(Symbol::new(symbol), venue), ratio))
3215}
3216
3217#[inline(always)]
3218fn log_error_on_cache_insert<T: Display>(e: &T) {
3219    log::error!("Error on cache insert: {e}");
3220}
3221
3222#[inline]
3223fn historical_topic_of(live: MStr<Topic>) -> MStr<Topic> {
3224    MStr::<Topic>::from(format!("historical.{}", live.as_ref()))
3225}
3226
3227// Top-of-book `QuoteTick` from an `OrderBookDepth10`. Returns `None` for
3228// `NoOrderSide` padding or zero size.
3229fn derive_quote_from_depth(depth: &OrderBookDepth10) -> Option<QuoteTick> {
3230    let bid = depth.bids.first()?;
3231    let ask = depth.asks.first()?;
3232
3233    if bid.side == OrderSide::NoOrderSide
3234        || ask.side == OrderSide::NoOrderSide
3235        || bid.size.raw == 0
3236        || ask.size.raw == 0
3237    {
3238        return None;
3239    }
3240
3241    Some(QuoteTick::new(
3242        depth.instrument_id,
3243        bid.price,
3244        ask.price,
3245        bid.size,
3246        ask.size,
3247        depth.ts_event,
3248        depth.ts_init,
3249    ))
3250}
3251
3252// Validates a bar against `last_bar` before writing and (optionally) publishing.
3253// Shared by `handle_bar` and aggregator-emitted bars so both honour
3254// `validate_data_sequence`.
3255fn process_engine_bar(
3256    cache: &Rc<RefCell<Cache>>,
3257    validate_sequence: bool,
3258    publish: bool,
3259    bar: Bar,
3260) {
3261    if !validate_bar_sequence(cache, validate_sequence, &bar) {
3262        return;
3263    }
3264
3265    if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
3266        log_error_on_cache_insert(&e);
3267    }
3268
3269    if publish {
3270        let topic = switchboard::get_bars_topic(bar.bar_type);
3271        msgbus::publish_bar(topic, &bar);
3272    }
3273}
3274
3275fn validate_bar_sequence(cache: &Rc<RefCell<Cache>>, validate_sequence: bool, bar: &Bar) -> bool {
3276    if !validate_sequence {
3277        return true;
3278    }
3279
3280    let Some(last_bar) = cache.as_ref().borrow().bar(&bar.bar_type).copied() else {
3281        return true;
3282    };
3283
3284    if bar.ts_event < last_bar.ts_event {
3285        log::warn!(
3286            "Bar {bar} was prior to last bar `ts_event` {}",
3287            last_bar.ts_event,
3288        );
3289        return false;
3290    }
3291
3292    if bar.ts_init < last_bar.ts_init {
3293        log::warn!(
3294            "Bar {bar} was prior to last bar `ts_init` {}",
3295            last_bar.ts_init,
3296        );
3297        return false;
3298    }
3299
3300    // Bar revision overwrite needs a `Bar.is_revision` field on the model;
3301    // not present today. Tracked under #8 in the data engine parity plan
3302    true
3303}
3304
3305#[inline(always)]
3306fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
3307    if data.is_empty() {
3308        let name = type_name::<T>();
3309        let short_name = name.rsplit("::").next().unwrap_or(name);
3310        log::warn!("Received empty {short_name} response for {id} {correlation_id}");
3311        return true;
3312    }
3313    false
3314}