Skip to main content

nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39    any::{Any, type_name},
40    cell::{Ref, RefCell},
41    collections::{VecDeque, hash_map::Entry},
42    fmt::{Debug, Display},
43    num::NonZeroUsize,
44    rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, ForwardPricesResponse, RequestCommand, RequestForwardPrices,
59        SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
60        SubscribeCommand, SubscribeOptionChain, UnsubscribeBars, UnsubscribeBookDeltas,
61        UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
62        UnsubscribeInstrumentStatus, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
63        UnsubscribeQuotes,
64    },
65    msgbus::{
66        self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
67        switchboard::{self, MessagingSwitchboard},
68    },
69    runner::get_data_cmd_sender,
70    timer::{TimeEvent, TimeEventCallback},
71};
72use nautilus_core::{
73    UUID4, WeakCell,
74    correctness::{
75        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
76    },
77    datetime::millis_to_nanos_unchecked,
78};
79#[cfg(feature = "defi")]
80use nautilus_model::defi::DefiData;
81use nautilus_model::{
82    data::{
83        Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate,
84        InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
85        OrderBookDepth10, QuoteTick, TradeTick,
86        option_chain::{OptionGreeks, StrikeRange},
87    },
88    enums::{
89        AggregationSource, BarAggregation, BookType, MarketStatusAction, PriceType, RecordFlag,
90    },
91    identifiers::{ClientId, InstrumentId, OptionSeriesId, Venue},
92    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
93    orderbook::OrderBook,
94    types::Price,
95};
96#[cfg(feature = "streaming")]
97use nautilus_persistence::backend::catalog::ParquetDataCatalog;
98use ustr::Ustr;
99
100#[cfg(feature = "defi")]
101#[allow(unused_imports)] // Brings DeFi impl blocks into scope
102use crate::defi::engine as _;
103#[cfg(feature = "defi")]
104use crate::engine::pool::PoolUpdater;
105use crate::{
106    aggregation::{
107        BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
108        TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
109        ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
110        VolumeRunsBarAggregator,
111    },
112    client::DataClientAdapter,
113    option_chains::OptionChainManager,
114};
115
116/// Deferred subscribe/unsubscribe command.
117///
118/// Components that lack direct `DataClientAdapter` access (handlers, timers)
119/// push commands here; the `DataEngine` drains on each data tick.
120#[derive(Debug, Clone)]
121pub(crate) enum DeferredCommand {
122    Subscribe(SubscribeCommand),
123    Unsubscribe(UnsubscribeCommand),
124    ExpireSeries(OptionSeriesId),
125}
126
127/// Shared queue for deferred subscribe/unsubscribe commands.
128pub(crate) type DeferredCommandQueue = Rc<RefCell<VecDeque<DeferredCommand>>>;
129
130/// Typed subscription for bar aggregator handlers.
131///
132/// Stores the topic and handler for each data type so we can properly
133/// unsubscribe from the typed routers.
134#[derive(Clone)]
135pub enum BarAggregatorSubscription {
136    Bar {
137        topic: MStr<Topic>,
138        handler: TypedHandler<Bar>,
139    },
140    Trade {
141        topic: MStr<Topic>,
142        handler: TypedHandler<TradeTick>,
143    },
144    Quote {
145        topic: MStr<Topic>,
146        handler: TypedHandler<QuoteTick>,
147    },
148}
149
150impl Debug for BarAggregatorSubscription {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        match self {
153            Self::Bar { topic, handler } => f
154                .debug_struct(stringify!(Bar))
155                .field("topic", topic)
156                .field("handler_id", &handler.id())
157                .finish(),
158            Self::Trade { topic, handler } => f
159                .debug_struct(stringify!(Trade))
160                .field("topic", topic)
161                .field("handler_id", &handler.id())
162                .finish(),
163            Self::Quote { topic, handler } => f
164                .debug_struct(stringify!(Quote))
165                .field("topic", topic)
166                .field("handler_id", &handler.id())
167                .finish(),
168        }
169    }
170}
171
172/// Provides a high-performance `DataEngine` for all environments.
173#[derive(Debug)]
174pub struct DataEngine {
175    pub(crate) clock: Rc<RefCell<dyn Clock>>,
176    pub(crate) cache: Rc<RefCell<Cache>>,
177    pub(crate) external_clients: AHashSet<ClientId>,
178    clients: IndexMap<ClientId, DataClientAdapter>,
179    default_client: Option<DataClientAdapter>,
180    #[cfg(feature = "streaming")]
181    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
182    routing_map: IndexMap<Venue, ClientId>,
183    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
184    book_deltas_subs: AHashSet<InstrumentId>,
185    book_depth10_subs: AHashSet<InstrumentId>,
186    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
187    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
188    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
189    bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
190    option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
191    option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
192    deferred_cmd_queue: DeferredCommandQueue,
193    pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
194    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
195    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
196    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
197    pub(crate) msgbus_priority: u8,
198    pub(crate) config: DataEngineConfig,
199    #[cfg(feature = "defi")]
200    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
201    #[cfg(feature = "defi")]
202    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
203    #[cfg(feature = "defi")]
204    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
205    #[cfg(feature = "defi")]
206    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
207}
208
209impl DataEngine {
210    /// Creates a new [`DataEngine`] instance.
211    #[must_use]
212    pub fn new(
213        clock: Rc<RefCell<dyn Clock>>,
214        cache: Rc<RefCell<Cache>>,
215        config: Option<DataEngineConfig>,
216    ) -> Self {
217        let config = config.unwrap_or_default();
218
219        let external_clients: AHashSet<ClientId> = config
220            .external_clients
221            .clone()
222            .unwrap_or_default()
223            .into_iter()
224            .collect();
225
226        Self {
227            clock,
228            cache,
229            external_clients,
230            clients: IndexMap::new(),
231            default_client: None,
232            #[cfg(feature = "streaming")]
233            catalogs: AHashMap::new(),
234            routing_map: IndexMap::new(),
235            book_intervals: AHashMap::new(),
236            book_deltas_subs: AHashSet::new(),
237            book_depth10_subs: AHashSet::new(),
238            book_updaters: AHashMap::new(),
239            book_snapshotters: AHashMap::new(),
240            bar_aggregators: AHashMap::new(),
241            bar_aggregator_handlers: AHashMap::new(),
242            option_chain_managers: AHashMap::new(),
243            option_chain_instrument_index: AHashMap::new(),
244            deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
245            pending_option_chain_requests: AHashMap::new(),
246            _synthetic_quote_feeds: AHashMap::new(),
247            _synthetic_trade_feeds: AHashMap::new(),
248            buffered_deltas_map: AHashMap::new(),
249            msgbus_priority: 10, // High-priority for built-in component
250            config,
251            #[cfg(feature = "defi")]
252            pool_updaters: AHashMap::new(),
253            #[cfg(feature = "defi")]
254            pool_updaters_pending: AHashSet::new(),
255            #[cfg(feature = "defi")]
256            pool_snapshot_pending: AHashSet::new(),
257            #[cfg(feature = "defi")]
258            pool_event_buffers: AHashMap::new(),
259        }
260    }
261
262    /// Registers all message bus handlers for the data engine.
263    pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
264        let weak = WeakCell::from(Rc::downgrade(engine));
265
266        let weak1 = weak.clone();
267        msgbus::register_data_command_endpoint(
268            MessagingSwitchboard::data_engine_execute(),
269            TypedIntoHandler::from(move |cmd: DataCommand| {
270                if let Some(rc) = weak1.upgrade() {
271                    rc.borrow_mut().execute(cmd);
272                }
273            }),
274        );
275
276        msgbus::register_data_command_endpoint(
277            MessagingSwitchboard::data_engine_queue_execute(),
278            TypedIntoHandler::from(move |cmd: DataCommand| {
279                get_data_cmd_sender().clone().execute(cmd);
280            }),
281        );
282
283        // Register process handler (polymorphic - uses Any)
284        let weak2 = weak.clone();
285        msgbus::register_any(
286            MessagingSwitchboard::data_engine_process(),
287            ShareableMessageHandler::from_any(move |data: &dyn Any| {
288                if let Some(rc) = weak2.upgrade() {
289                    rc.borrow_mut().process(data);
290                }
291            }),
292        );
293
294        // Register process_data handler (typed - takes ownership)
295        let weak3 = weak.clone();
296        msgbus::register_data_endpoint(
297            MessagingSwitchboard::data_engine_process_data(),
298            TypedIntoHandler::from(move |data: Data| {
299                if let Some(rc) = weak3.upgrade() {
300                    rc.borrow_mut().process_data(data);
301                }
302            }),
303        );
304
305        // Register process_defi_data handler (typed - takes ownership)
306        #[cfg(feature = "defi")]
307        {
308            let weak4 = weak.clone();
309            msgbus::register_defi_data_endpoint(
310                MessagingSwitchboard::data_engine_process_defi_data(),
311                TypedIntoHandler::from(move |data: DefiData| {
312                    if let Some(rc) = weak4.upgrade() {
313                        rc.borrow_mut().process_defi_data(data);
314                    }
315                }),
316            );
317        }
318
319        let weak5 = weak;
320        msgbus::register_data_response_endpoint(
321            MessagingSwitchboard::data_engine_response(),
322            TypedIntoHandler::from(move |resp: DataResponse| {
323                if let Some(rc) = weak5.upgrade() {
324                    rc.borrow_mut().response(resp);
325                }
326            }),
327        );
328    }
329
330    /// Returns a read-only reference to the engines clock.
331    #[must_use]
332    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
333        self.clock.borrow()
334    }
335
336    /// Returns a read-only reference to the engines cache.
337    #[must_use]
338    pub fn get_cache(&self) -> Ref<'_, Cache> {
339        self.cache.borrow()
340    }
341
342    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
343    #[must_use]
344    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
345        Rc::clone(&self.cache)
346    }
347
348    /// Registers the `catalog` with the engine with an optional specific `name`.
349    ///
350    /// # Panics
351    ///
352    /// Panics if a catalog with the same `name` has already been registered.
353    #[cfg(feature = "streaming")]
354    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
355        let name = Ustr::from(name.unwrap_or("catalog_0"));
356
357        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
358
359        self.catalogs.insert(name, catalog);
360        log::info!("Registered catalog <{name}>");
361    }
362
363    /// Registers the `client` with the engine with an optional venue `routing`.
364    ///
365    ///
366    /// # Panics
367    ///
368    /// Panics if a client with the same client ID has already been registered.
369    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
370        let client_id = client.client_id();
371
372        if let Some(default_client) = &self.default_client {
373            check_predicate_false(
374                default_client.client_id() == client.client_id(),
375                "client_id already registered as default client",
376            )
377            .expect(FAILED);
378        }
379
380        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
381
382        if let Some(routing) = routing {
383            self.routing_map.insert(routing, client_id);
384            log::debug!("Set client {client_id} routing for {routing}");
385        }
386
387        if client.venue.is_none() && self.default_client.is_none() {
388            self.default_client = Some(client);
389            log::debug!("Registered client {client_id} for default routing");
390        } else {
391            self.clients.insert(client_id, client);
392            log::debug!("Registered client {client_id}");
393        }
394    }
395
396    /// Deregisters the client for the `client_id`.
397    ///
398    /// # Panics
399    ///
400    /// Panics if the client ID has not been registered.
401    pub fn deregister_client(&mut self, client_id: &ClientId) {
402        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
403
404        self.clients.shift_remove(client_id);
405        log::info!("Deregistered client {client_id}");
406    }
407
408    /// Registers the data `client` with the engine as the default routing client.
409    ///
410    /// When a specific venue routing cannot be found, this client will receive messages.
411    ///
412    /// # Warnings
413    ///
414    /// Any existing default routing client will be overwritten.
415    ///
416    /// # Panics
417    ///
418    /// Panics if a default client has already been registered.
419    pub fn register_default_client(&mut self, client: DataClientAdapter) {
420        check_predicate_true(
421            self.default_client.is_none(),
422            "default client already registered",
423        )
424        .expect(FAILED);
425
426        let client_id = client.client_id();
427
428        self.default_client = Some(client);
429        log::debug!("Registered default client {client_id}");
430    }
431
432    /// Starts all registered data clients and re-arms bar aggregator timers.
433    pub fn start(&mut self) {
434        for client in self.get_clients_mut() {
435            if let Err(e) = client.start() {
436                log::error!("{e}");
437            }
438        }
439
440        for aggregator in self.bar_aggregators.values() {
441            if aggregator.borrow().bar_type().spec().is_time_aggregated() {
442                aggregator
443                    .borrow_mut()
444                    .start_timer(Some(aggregator.clone()));
445            }
446        }
447    }
448
449    /// Stops all registered data clients and bar aggregator timers.
450    pub fn stop(&mut self) {
451        for client in self.get_clients_mut() {
452            if let Err(e) = client.stop() {
453                log::error!("{e}");
454            }
455        }
456
457        for aggregator in self.bar_aggregators.values() {
458            aggregator.borrow_mut().stop();
459        }
460    }
461
462    /// Resets all registered data clients and clears bar aggregator state.
463    pub fn reset(&mut self) {
464        for client in self.get_clients_mut() {
465            if let Err(e) = client.reset() {
466                log::error!("{e}");
467            }
468        }
469
470        let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
471        for bar_type in bar_types {
472            if let Err(e) = self.stop_bar_aggregator(bar_type) {
473                log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
474            }
475        }
476    }
477
478    /// Disposes the engine, stopping all clients and canceling any timers.
479    pub fn dispose(&mut self) {
480        for client in self.get_clients_mut() {
481            if let Err(e) = client.dispose() {
482                log::error!("{e}");
483            }
484        }
485
486        self.clock.borrow_mut().cancel_timers();
487    }
488
489    /// Connects all registered data clients concurrently.
490    ///
491    /// Connection failures are logged but do not prevent the node from running.
492    pub async fn connect(&mut self) {
493        let futures: Vec<_> = self
494            .get_clients_mut()
495            .into_iter()
496            .map(|client| client.connect())
497            .collect();
498
499        let results = join_all(futures).await;
500
501        for error in results.into_iter().filter_map(Result::err) {
502            log::error!("Failed to connect data client: {error}");
503        }
504    }
505
506    /// Disconnects all registered data clients concurrently.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if any client fails to disconnect.
511    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
512        let futures: Vec<_> = self
513            .get_clients_mut()
514            .into_iter()
515            .map(|client| client.disconnect())
516            .collect();
517
518        let results = join_all(futures).await;
519        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
520
521        if errors.is_empty() {
522            Ok(())
523        } else {
524            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
525            anyhow::bail!(
526                "Failed to disconnect data clients: {}",
527                error_msgs.join("; ")
528            )
529        }
530    }
531
532    /// Returns `true` if all registered data clients are currently connected.
533    #[must_use]
534    pub fn check_connected(&self) -> bool {
535        self.get_clients()
536            .iter()
537            .all(|client| client.is_connected())
538    }
539
540    /// Returns `true` if all registered data clients are currently disconnected.
541    #[must_use]
542    pub fn check_disconnected(&self) -> bool {
543        self.get_clients()
544            .iter()
545            .all(|client| !client.is_connected())
546    }
547
548    /// Returns connection status for each registered client.
549    #[must_use]
550    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
551        self.get_clients()
552            .into_iter()
553            .map(|client| (client.client_id(), client.is_connected()))
554            .collect()
555    }
556
557    /// Returns a list of all registered client IDs, including the default client if set.
558    #[must_use]
559    pub fn registered_clients(&self) -> Vec<ClientId> {
560        self.get_clients()
561            .into_iter()
562            .map(|client| client.client_id())
563            .collect()
564    }
565
566    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
567
568    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
569    where
570        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
571        T: Clone,
572    {
573        self.get_clients()
574            .into_iter()
575            .flat_map(get_subs)
576            .cloned()
577            .collect()
578    }
579
580    #[must_use]
581    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
582        let (default_opt, clients_map) = (&self.default_client, &self.clients);
583        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
584
585        if let Some(default) = default_opt {
586            clients.push(default);
587        }
588
589        clients
590    }
591
592    #[must_use]
593    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
594        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
595        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
596
597        if let Some(default) = default_opt {
598            clients.push(default);
599        }
600
601        clients
602    }
603
604    pub fn get_client(
605        &mut self,
606        client_id: Option<&ClientId>,
607        venue: Option<&Venue>,
608    ) -> Option<&mut DataClientAdapter> {
609        if let Some(client_id) = client_id {
610            // Explicit ID: first look in registered clients
611            if let Some(client) = self.clients.get_mut(client_id) {
612                return Some(client);
613            }
614
615            // Then check if it matches the default client
616            if let Some(default) = self.default_client.as_mut()
617                && default.client_id() == *client_id
618            {
619                return Some(default);
620            }
621
622            // Unknown explicit client
623            return None;
624        }
625
626        if let Some(v) = venue {
627            // Route by venue if mapped client still registered
628            if let Some(client_id) = self.routing_map.get(v) {
629                return self.clients.get_mut(client_id);
630            }
631        }
632
633        // Fallback to default client
634        self.get_default_client()
635    }
636
637    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
638        self.default_client.as_mut()
639    }
640
641    /// Returns all custom data types currently subscribed across all clients.
642    #[must_use]
643    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
644        self.collect_subscriptions(|client| &client.subscriptions_custom)
645    }
646
647    /// Returns all instrument IDs currently subscribed across all clients.
648    #[must_use]
649    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
650        self.collect_subscriptions(|client| &client.subscriptions_instrument)
651    }
652
653    /// Returns all instrument IDs for which book delta subscriptions exist.
654    #[must_use]
655    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
656        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
657    }
658
659    /// Returns all instrument IDs for which book depth10 subscriptions exist.
660    #[must_use]
661    pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
662        self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
663    }
664
665    /// Returns all instrument IDs for which book snapshot subscriptions exist.
666    #[must_use]
667    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
668        self.book_intervals
669            .values()
670            .flat_map(|set| set.iter().copied())
671            .collect()
672    }
673
674    /// Returns all instrument IDs for which quote subscriptions exist.
675    #[must_use]
676    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
677        self.collect_subscriptions(|client| &client.subscriptions_quotes)
678    }
679
680    /// Returns all instrument IDs for which trade subscriptions exist.
681    #[must_use]
682    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
683        self.collect_subscriptions(|client| &client.subscriptions_trades)
684    }
685
686    /// Returns all bar types currently subscribed across all clients.
687    #[must_use]
688    pub fn subscribed_bars(&self) -> Vec<BarType> {
689        self.collect_subscriptions(|client| &client.subscriptions_bars)
690    }
691
692    /// Returns all instrument IDs for which mark price subscriptions exist.
693    #[must_use]
694    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
695        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
696    }
697
698    /// Returns all instrument IDs for which index price subscriptions exist.
699    #[must_use]
700    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
701        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
702    }
703
704    /// Returns all instrument IDs for which funding rate subscriptions exist.
705    #[must_use]
706    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
707        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
708    }
709
710    /// Returns all instrument IDs for which status subscriptions exist.
711    #[must_use]
712    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
713        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
714    }
715
716    /// Returns all instrument IDs for which instrument close subscriptions exist.
717    #[must_use]
718    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
719        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
720    }
721
722    // -- COMMANDS --------------------------------------------------------------------------------
723
724    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
725    ///
726    /// Errors during execution are logged.
727    pub fn execute(&mut self, cmd: DataCommand) {
728        if let Err(e) = match cmd {
729            DataCommand::Subscribe(c) => self.execute_subscribe(&c),
730            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
731            DataCommand::Request(c) => self.execute_request(c),
732            #[cfg(feature = "defi")]
733            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
734            #[cfg(feature = "defi")]
735            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(&c),
736            #[cfg(feature = "defi")]
737            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
738            _ => {
739                log::warn!("Unhandled DataCommand variant");
740                Ok(())
741            }
742        } {
743            log::error!("{e}");
744        }
745    }
746
747    /// Handles a subscribe command, updating internal state and forwarding to the client.
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
752    /// or if the underlying client operation fails.
753    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
754        // Update internal engine state
755        match &cmd {
756            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
757            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
758            SubscribeCommand::BookSnapshots(cmd) => {
759                // Handles client forwarding internally (forwards as BookDeltas)
760                return self.subscribe_book_snapshots(cmd);
761            }
762            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
763            SubscribeCommand::OptionChain(cmd) => {
764                self.subscribe_option_chain(cmd);
765                return Ok(());
766            }
767            _ => {} // Do nothing else
768        }
769
770        if let Some(client_id) = cmd.client_id()
771            && self.external_clients.contains(client_id)
772        {
773            if self.config.debug {
774                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
775            }
776            return Ok(());
777        }
778
779        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
780            client.execute_subscribe(cmd);
781        } else {
782            log::error!(
783                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
784                cmd.client_id(),
785                cmd.venue(),
786            );
787        }
788
789        Ok(())
790    }
791
792    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the underlying client operation fails.
797    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
798        match &cmd {
799            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
800            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
801            UnsubscribeCommand::BookSnapshots(cmd) => {
802                // Handles client forwarding internally (forwards as BookDeltas)
803                self.unsubscribe_book_snapshots(cmd);
804                return Ok(());
805            }
806            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
807            UnsubscribeCommand::OptionChain(cmd) => {
808                self.unsubscribe_option_chain(cmd);
809                return Ok(());
810            }
811            _ => {} // Do nothing else
812        }
813
814        if let Some(client_id) = cmd.client_id()
815            && self.external_clients.contains(client_id)
816        {
817            if self.config.debug {
818                log::debug!(
819                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
820                );
821            }
822            return Ok(());
823        }
824
825        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
826            client.execute_unsubscribe(cmd);
827        } else {
828            log::error!(
829                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
830                cmd.client_id(),
831                cmd.venue(),
832            );
833        }
834
835        Ok(())
836    }
837
838    /// Sends a [`RequestCommand`] to a suitable data client implementation.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if no client is found for the given client ID or venue,
843    /// or if the client fails to process the request.
844    pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
845        // Skip requests for external clients
846        if let Some(cid) = req.client_id()
847            && self.external_clients.contains(cid)
848        {
849            if self.config.debug {
850                log::debug!("Skipping data request for external client {cid}: {req:?}");
851            }
852            return Ok(());
853        }
854
855        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
856            match req {
857                RequestCommand::Data(req) => client.request_data(req),
858                RequestCommand::Instrument(req) => client.request_instrument(req),
859                RequestCommand::Instruments(req) => client.request_instruments(req),
860                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
861                RequestCommand::BookDepth(req) => client.request_book_depth(req),
862                RequestCommand::Quotes(req) => client.request_quotes(req),
863                RequestCommand::Trades(req) => client.request_trades(req),
864                RequestCommand::FundingRates(req) => client.request_funding_rates(req),
865                RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
866                RequestCommand::Bars(req) => client.request_bars(req),
867            }
868        } else {
869            anyhow::bail!(
870                "Cannot handle request: no client found for {:?} {:?}",
871                req.client_id(),
872                req.venue()
873            );
874        }
875    }
876
877    /// Processes a dynamically-typed data message.
878    ///
879    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
880    pub fn process(&mut self, data: &dyn Any) {
881        // TODO: Eventually these can be added to the `Data` enum (C/Cython blocking), process here for now
882        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
883            self.handle_instrument(instrument);
884        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
885            self.handle_funding_rate(*funding_rate);
886        } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
887            self.handle_instrument_status(*status);
888        } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
889            self.cache.borrow_mut().add_option_greeks(*option_greeks);
890            let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
891            msgbus::publish_option_greeks(topic, option_greeks);
892            self.drain_deferred_commands();
893        } else {
894            log::error!("Cannot process data {data:?}, type is unrecognized");
895        }
896
897        // TODO: Add custom data handling here
898    }
899
900    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
901    pub fn process_data(&mut self, data: Data) {
902        match data {
903            Data::Delta(delta) => self.handle_delta(delta),
904            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
905            Data::Depth10(depth) => self.handle_depth10(*depth),
906            Data::Quote(quote) => {
907                self.handle_quote(quote);
908                self.drain_deferred_commands();
909            }
910            Data::Trade(trade) => self.handle_trade(trade),
911            Data::Bar(bar) => self.handle_bar(bar),
912            Data::MarkPriceUpdate(mark_price) => {
913                self.handle_mark_price(mark_price);
914                self.drain_deferred_commands();
915            }
916            Data::IndexPriceUpdate(index_price) => {
917                self.handle_index_price(index_price);
918                self.drain_deferred_commands();
919            }
920            Data::InstrumentClose(close) => self.handle_instrument_close(close),
921            Data::Custom(custom) => self.handle_custom_data(&custom),
922        }
923    }
924
925    /// Processes a `DataResponse`, handling and publishing the response message.
926    #[allow(clippy::needless_pass_by_value)] // Required by message bus dispatch
927    pub fn response(&mut self, resp: DataResponse) {
928        log::debug!("{RECV}{RES} {resp:?}");
929
930        let correlation_id = *resp.correlation_id();
931
932        match &resp {
933            DataResponse::Instrument(r) => {
934                self.handle_instrument_response(r.data.clone());
935            }
936            DataResponse::Instruments(r) => {
937                self.handle_instruments(&r.data);
938            }
939            DataResponse::Quotes(r) => {
940                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
941                    self.handle_quotes(&r.data);
942                }
943            }
944            DataResponse::Trades(r) => {
945                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
946                    self.handle_trades(&r.data);
947                }
948            }
949            DataResponse::FundingRates(r) => {
950                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
951                    self.handle_funding_rates(&r.data);
952                }
953            }
954            DataResponse::Bars(r) => {
955                if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
956                    self.handle_bars(&r.data);
957                }
958            }
959            DataResponse::Book(r) => self.handle_book_response(&r.data),
960            DataResponse::ForwardPrices(r) => {
961                return self.handle_forward_prices_response(&correlation_id, r);
962            }
963            _ => todo!("Handle other response types"),
964        }
965
966        msgbus::send_response(&correlation_id, &resp);
967    }
968
969    // -- DATA HANDLERS ---------------------------------------------------------------------------
970
971    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
972        log::debug!("Handling instrument: {}", instrument.id());
973
974        if let Err(e) = self
975            .cache
976            .as_ref()
977            .borrow_mut()
978            .add_instrument(instrument.clone())
979        {
980            log_error_on_cache_insert(&e);
981        }
982
983        let topic = switchboard::get_instrument_topic(instrument.id());
984        log::debug!("Publishing instrument to topic: {topic}");
985        msgbus::publish_any(topic, instrument);
986
987        self.update_option_chains(instrument);
988    }
989
990    fn update_option_chains(&mut self, instrument: &InstrumentAny) {
991        let Some(underlying) = instrument.underlying() else {
992            return;
993        };
994        let Some(expiration_ns) = instrument.expiration_ns() else {
995            return;
996        };
997        let Some(strike) = instrument.strike_price() else {
998            return;
999        };
1000        let Some(kind) = instrument.option_kind() else {
1001            return;
1002        };
1003
1004        let venue = instrument.id().venue;
1005        let settlement = instrument.settlement_currency().code;
1006        let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
1007
1008        // Clone Rc to release borrow on self.option_chain_managers before accessing self.clients
1009        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1010            return;
1011        };
1012
1013        let clock = self.clock.clone();
1014        let client = self.get_client(None, Some(&venue));
1015
1016        if manager_rc
1017            .borrow_mut()
1018            .add_instrument(instrument.id(), strike, kind, client, &clock)
1019        {
1020            self.option_chain_instrument_index
1021                .insert(instrument.id(), series_id);
1022        }
1023    }
1024
1025    fn handle_delta(&mut self, delta: OrderBookDelta) {
1026        let deltas = if self.config.buffer_deltas {
1027            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
1028                buffered_deltas.deltas.push(delta);
1029                buffered_deltas.flags = delta.flags;
1030                buffered_deltas.sequence = delta.sequence;
1031                buffered_deltas.ts_event = delta.ts_event;
1032                buffered_deltas.ts_init = delta.ts_init;
1033            } else {
1034                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1035                self.buffered_deltas_map
1036                    .insert(delta.instrument_id, buffered_deltas);
1037            }
1038
1039            if !RecordFlag::F_LAST.matches(delta.flags) {
1040                return; // Not the last delta for event
1041            }
1042
1043            self.buffered_deltas_map
1044                .remove(&delta.instrument_id)
1045                .expect("buffered deltas exist")
1046        } else {
1047            OrderBookDeltas::new(delta.instrument_id, vec![delta])
1048        };
1049
1050        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1051        msgbus::publish_deltas(topic, &deltas);
1052    }
1053
1054    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
1055        if self.config.buffer_deltas {
1056            let instrument_id = deltas.instrument_id;
1057
1058            for delta in deltas.deltas {
1059                if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
1060                    buffered_deltas.deltas.push(delta);
1061                    buffered_deltas.flags = delta.flags;
1062                    buffered_deltas.sequence = delta.sequence;
1063                    buffered_deltas.ts_event = delta.ts_event;
1064                    buffered_deltas.ts_init = delta.ts_init;
1065                } else {
1066                    let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1067                    self.buffered_deltas_map
1068                        .insert(instrument_id, buffered_deltas);
1069                }
1070
1071                if RecordFlag::F_LAST.matches(delta.flags) {
1072                    let deltas_to_publish = self
1073                        .buffered_deltas_map
1074                        .remove(&instrument_id)
1075                        .expect("buffered deltas exist");
1076                    let topic = switchboard::get_book_deltas_topic(instrument_id);
1077                    msgbus::publish_deltas(topic, &deltas_to_publish);
1078                }
1079            }
1080        } else {
1081            let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1082            msgbus::publish_deltas(topic, &deltas);
1083        }
1084    }
1085
1086    fn handle_depth10(&self, depth: OrderBookDepth10) {
1087        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
1088        msgbus::publish_depth10(topic, &depth);
1089    }
1090
1091    fn handle_quote(&self, quote: QuoteTick) {
1092        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
1093            log_error_on_cache_insert(&e);
1094        }
1095
1096        // TODO: Handle synthetics
1097
1098        let topic = switchboard::get_quotes_topic(quote.instrument_id);
1099        msgbus::publish_quote(topic, &quote);
1100    }
1101
1102    fn handle_trade(&self, trade: TradeTick) {
1103        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1104            log_error_on_cache_insert(&e);
1105        }
1106
1107        // TODO: Handle synthetics
1108
1109        let topic = switchboard::get_trades_topic(trade.instrument_id);
1110        msgbus::publish_trade(topic, &trade);
1111    }
1112
1113    fn handle_bar(&self, bar: Bar) {
1114        // TODO: Handle additional bar logic
1115        if self.config.validate_data_sequence
1116            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1117        {
1118            if bar.ts_event < last_bar.ts_event {
1119                log::warn!(
1120                    "Bar {bar} was prior to last bar `ts_event` {}",
1121                    last_bar.ts_event
1122                );
1123                return; // Bar is out of sequence
1124            }
1125
1126            if bar.ts_init < last_bar.ts_init {
1127                log::warn!(
1128                    "Bar {bar} was prior to last bar `ts_init` {}",
1129                    last_bar.ts_init
1130                );
1131                return; // Bar is out of sequence
1132            }
1133            // TODO: Implement `bar.is_revision` logic
1134        }
1135
1136        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1137            log_error_on_cache_insert(&e);
1138        }
1139
1140        let topic = switchboard::get_bars_topic(bar.bar_type);
1141        msgbus::publish_bar(topic, &bar);
1142    }
1143
1144    fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
1145        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1146            log_error_on_cache_insert(&e);
1147        }
1148
1149        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1150        msgbus::publish_mark_price(topic, &mark_price);
1151    }
1152
1153    fn handle_index_price(&self, index_price: IndexPriceUpdate) {
1154        if let Err(e) = self
1155            .cache
1156            .as_ref()
1157            .borrow_mut()
1158            .add_index_price(index_price)
1159        {
1160            log_error_on_cache_insert(&e);
1161        }
1162
1163        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1164        msgbus::publish_index_price(topic, &index_price);
1165    }
1166
1167    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
1168    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1169        if let Err(e) = self
1170            .cache
1171            .as_ref()
1172            .borrow_mut()
1173            .add_funding_rate(funding_rate)
1174        {
1175            log_error_on_cache_insert(&e);
1176        }
1177
1178        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1179        msgbus::publish_funding_rate(topic, &funding_rate);
1180    }
1181
1182    fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1183        let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1184        msgbus::publish_any(topic, &status);
1185
1186        // Check if this instrument belongs to an option chain before expiring
1187        if self
1188            .option_chain_instrument_index
1189            .contains_key(&status.instrument_id)
1190            && matches!(
1191                status.action,
1192                MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
1193            )
1194        {
1195            self.expire_option_chain_instrument(status.instrument_id);
1196        }
1197    }
1198
1199    /// Removes a settled/expired instrument from its option chain manager.
1200    ///
1201    /// Looks up the owning series via the reverse index, delegates removal to
1202    /// the manager (which unregisters msgbus handlers and pushes deferred wire
1203    /// unsubscribes), then drains those commands. When the series catalog
1204    /// becomes empty, the entire manager is torn down.
1205    fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
1206        let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
1207            return;
1208        };
1209
1210        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1211            return;
1212        };
1213
1214        let series_empty = manager_rc
1215            .borrow_mut()
1216            .handle_instrument_expired(&instrument_id);
1217
1218        // Drain deferred unsubscribe commands pushed by the manager
1219        self.drain_deferred_commands();
1220
1221        log::info!(
1222            "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
1223        );
1224
1225        if series_empty {
1226            manager_rc.borrow_mut().teardown(&self.clock);
1227            self.option_chain_managers.remove(&series_id);
1228
1229            log::info!("Torn down empty option chain manager for {series_id}");
1230        }
1231    }
1232
1233    fn handle_instrument_close(&self, close: InstrumentClose) {
1234        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1235        msgbus::publish_any(topic, &close);
1236    }
1237
1238    fn handle_custom_data(&self, custom: &CustomData) {
1239        log::debug!("Processing custom data: {}", custom.data.type_name());
1240        let topic = switchboard::get_custom_topic(&custom.data_type);
1241        msgbus::publish_any(topic, custom);
1242    }
1243
1244    /// Drains deferred subscribe/unsubscribe commands pushed by option chain
1245    /// managers (or any other component) and executes them against the appropriate
1246    /// data client.
1247    fn drain_deferred_commands(&mut self) {
1248        // Loop because expire_series pushes Unsubscribe commands; converges in <= 3 iterations
1249        loop {
1250            let commands: VecDeque<DeferredCommand> =
1251                std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
1252
1253            if commands.is_empty() {
1254                break;
1255            }
1256
1257            for cmd in commands {
1258                match cmd {
1259                    DeferredCommand::Subscribe(sub) => {
1260                        let client = self.get_client(sub.client_id(), sub.venue());
1261                        if let Some(client) = client {
1262                            client.execute_subscribe(&sub);
1263                        }
1264                    }
1265                    DeferredCommand::Unsubscribe(unsub) => {
1266                        let client = self.get_client(unsub.client_id(), unsub.venue());
1267                        if let Some(client) = client {
1268                            client.execute_unsubscribe(&unsub);
1269                        }
1270                    }
1271                    DeferredCommand::ExpireSeries(series_id) => {
1272                        self.expire_series(series_id);
1273                    }
1274                }
1275            }
1276        }
1277    }
1278
1279    /// Proactively expires all instruments for a series and tears down the manager.
1280    ///
1281    /// `handle_instrument_expired` removes each instrument from the aggregator and pushes
1282    /// deferred unsubscribe commands. `teardown` then cancels the snapshot timer and clears
1283    /// the handler lists (the aggregator is already empty at that point).
1284    fn expire_series(&mut self, series_id: OptionSeriesId) {
1285        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1286            return;
1287        };
1288
1289        let instrument_ids: Vec<InstrumentId> = self
1290            .option_chain_instrument_index
1291            .iter()
1292            .filter(|(_, sid)| **sid == series_id)
1293            .map(|(id, _)| *id)
1294            .collect();
1295
1296        for id in &instrument_ids {
1297            self.option_chain_instrument_index.remove(id);
1298            manager_rc.borrow_mut().handle_instrument_expired(id);
1299        }
1300
1301        manager_rc.borrow_mut().teardown(&self.clock);
1302        self.option_chain_managers.remove(&series_id);
1303
1304        log::info!("Proactively torn down expired option chain {series_id}");
1305    }
1306
1307    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
1308
1309    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1310        if cmd.instrument_id.is_synthetic() {
1311            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1312        }
1313
1314        self.book_deltas_subs.insert(cmd.instrument_id);
1315        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1316
1317        Ok(())
1318    }
1319
1320    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1321        if cmd.instrument_id.is_synthetic() {
1322            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1323        }
1324
1325        self.book_depth10_subs.insert(cmd.instrument_id);
1326        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1327
1328        Ok(())
1329    }
1330
1331    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1332        if cmd.instrument_id.is_synthetic() {
1333            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1334        }
1335
1336        // Track snapshot intervals per instrument, and set up timer on first subscription
1337        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
1338            Entry::Vacant(e) => {
1339                let mut set = AHashSet::new();
1340                set.insert(cmd.instrument_id);
1341                e.insert(set);
1342                true
1343            }
1344            Entry::Occupied(mut e) => {
1345                e.get_mut().insert(cmd.instrument_id);
1346                false
1347            }
1348        };
1349
1350        if first_for_interval {
1351            // Initialize snapshotter and schedule its timer
1352            let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
1353            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1354
1355            let snap_info = BookSnapshotInfo {
1356                instrument_id: cmd.instrument_id,
1357                venue: cmd.instrument_id.venue,
1358                is_composite: cmd.instrument_id.symbol.is_composite(),
1359                root: Ustr::from(cmd.instrument_id.symbol.root()),
1360                topic,
1361                interval_ms: cmd.interval_ms,
1362            };
1363
1364            // Schedule the first snapshot at the next interval boundary
1365            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1366            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1367
1368            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1369            self.book_snapshotters
1370                .insert(cmd.instrument_id, snapshotter.clone());
1371            let timer_name = snapshotter.timer_name;
1372
1373            let callback_fn: Rc<dyn Fn(TimeEvent)> =
1374                Rc::new(move |event| snapshotter.snapshot(event));
1375            let callback = TimeEventCallback::from(callback_fn);
1376
1377            self.clock
1378                .borrow_mut()
1379                .set_timer_ns(
1380                    &timer_name,
1381                    interval_ns,
1382                    Some(start_time_ns.into()),
1383                    None,
1384                    Some(callback),
1385                    None,
1386                    None,
1387                )
1388                .expect(FAILED);
1389        }
1390
1391        // Only set up book updater if not already subscribed to deltas
1392        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1393            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1394        }
1395
1396        if let Some(client_id) = cmd.client_id.as_ref()
1397            && self.external_clients.contains(client_id)
1398        {
1399            if self.config.debug {
1400                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1401            }
1402            return Ok(());
1403        }
1404
1405        log::debug!(
1406            "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1407            cmd.instrument_id,
1408            cmd.client_id,
1409            cmd.venue,
1410        );
1411
1412        if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1413            let deltas_cmd = SubscribeBookDeltas::new(
1414                cmd.instrument_id,
1415                cmd.book_type,
1416                cmd.client_id,
1417                cmd.venue,
1418                UUID4::new(),
1419                cmd.ts_init,
1420                cmd.depth,
1421                true, // managed
1422                Some(cmd.command_id),
1423                cmd.params.clone(),
1424            );
1425            log::debug!(
1426                "Calling client.execute_subscribe for BookDeltas: {}",
1427                cmd.instrument_id
1428            );
1429            client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1430        } else {
1431            log::error!(
1432                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1433                cmd.client_id,
1434                cmd.venue,
1435            );
1436        }
1437
1438        Ok(())
1439    }
1440
1441    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1442        match cmd.bar_type.aggregation_source() {
1443            AggregationSource::Internal => {
1444                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1445                    self.start_bar_aggregator(cmd.bar_type)?;
1446                }
1447            }
1448            AggregationSource::External => {
1449                if cmd.bar_type.instrument_id().is_synthetic() {
1450                    anyhow::bail!(
1451                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1452                    );
1453                }
1454            }
1455        }
1456
1457        Ok(())
1458    }
1459
1460    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) {
1461        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1462            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1463            return;
1464        }
1465
1466        self.book_deltas_subs.remove(&cmd.instrument_id);
1467
1468        let topics = vec![
1469            switchboard::get_book_deltas_topic(cmd.instrument_id),
1470            switchboard::get_book_depth10_topic(cmd.instrument_id),
1471            // TODO: Unsubscribe from snapshots?
1472        ];
1473
1474        self.maintain_book_updater(&cmd.instrument_id, &topics);
1475        self.maintain_book_snapshotter(&cmd.instrument_id);
1476    }
1477
1478    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) {
1479        if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1480            log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1481            return;
1482        }
1483
1484        self.book_depth10_subs.remove(&cmd.instrument_id);
1485
1486        let topics = vec![
1487            switchboard::get_book_deltas_topic(cmd.instrument_id),
1488            switchboard::get_book_depth10_topic(cmd.instrument_id),
1489        ];
1490
1491        self.maintain_book_updater(&cmd.instrument_id, &topics);
1492        self.maintain_book_snapshotter(&cmd.instrument_id);
1493    }
1494
1495    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
1496        let is_subscribed = self
1497            .book_intervals
1498            .values()
1499            .any(|set| set.contains(&cmd.instrument_id));
1500
1501        if !is_subscribed {
1502            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1503            return;
1504        }
1505
1506        // Remove instrument from interval tracking, and drop empty intervals
1507        let mut to_remove = Vec::new();
1508        for (interval, set) in &mut self.book_intervals {
1509            if set.remove(&cmd.instrument_id) && set.is_empty() {
1510                to_remove.push(*interval);
1511            }
1512        }
1513
1514        for interval in to_remove {
1515            self.book_intervals.remove(&interval);
1516        }
1517
1518        let topics = vec![
1519            switchboard::get_book_deltas_topic(cmd.instrument_id),
1520            switchboard::get_book_depth10_topic(cmd.instrument_id),
1521        ];
1522
1523        self.maintain_book_updater(&cmd.instrument_id, &topics);
1524        self.maintain_book_snapshotter(&cmd.instrument_id);
1525
1526        let still_in_intervals = self
1527            .book_intervals
1528            .values()
1529            .any(|set| set.contains(&cmd.instrument_id));
1530
1531        if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1532            if let Some(client_id) = cmd.client_id.as_ref()
1533                && self.external_clients.contains(client_id)
1534            {
1535                return;
1536            }
1537
1538            if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1539                let deltas_cmd = UnsubscribeBookDeltas::new(
1540                    cmd.instrument_id,
1541                    cmd.client_id,
1542                    cmd.venue,
1543                    UUID4::new(),
1544                    cmd.ts_init,
1545                    Some(cmd.command_id),
1546                    cmd.params.clone(),
1547                );
1548                client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1549            }
1550        }
1551    }
1552
1553    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
1554        let bar_type = cmd.bar_type;
1555
1556        // Don't remove aggregator if other exact-topic subscribers still exist
1557        let topic = switchboard::get_bars_topic(bar_type.standard());
1558        if msgbus::exact_subscriber_count_bars(topic) > 0 {
1559            return;
1560        }
1561
1562        if self.bar_aggregators.contains_key(&bar_type.standard())
1563            && let Err(e) = self.stop_bar_aggregator(bar_type)
1564        {
1565            log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1566        }
1567
1568        // After stopping a composite, check if the source aggregator is now orphaned
1569        if bar_type.is_composite() {
1570            let source_type = bar_type.composite();
1571            let source_topic = switchboard::get_bars_topic(source_type);
1572            if msgbus::exact_subscriber_count_bars(source_topic) == 0
1573                && self.bar_aggregators.contains_key(&source_type)
1574                && let Err(e) = self.stop_bar_aggregator(source_type)
1575            {
1576                log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1577            }
1578        }
1579    }
1580
1581    fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
1582        let series_id = cmd.series_id;
1583
1584        // Handle edits to existing subscriptions by tearing down and re-setting up the OptionChainManager.
1585        if let Some(old) = self.option_chain_managers.remove(&series_id) {
1586            log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
1587            let all_ids = old.borrow().all_instrument_ids();
1588            let old_venue = old.borrow().venue();
1589            old.borrow_mut().teardown(&self.clock);
1590            self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
1591        }
1592
1593        // Drain any stale pending forward price requests for this series
1594        self.pending_option_chain_requests
1595            .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
1596
1597        // For ATM-based strike ranges, request forward prices from the adapter
1598        // to enable instant bootstrap without waiting for the first WebSocket tick.
1599        if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
1600            // Extract client_id first to avoid borrow conflicts
1601            let resolved_client_id = self
1602                .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
1603                .map(|c| c.client_id);
1604
1605            if let Some(client_id) = resolved_client_id {
1606                let request_id = UUID4::new();
1607                let ts_init = self.clock.borrow().timestamp_ns();
1608
1609                // Pick any one option instrument at this expiry from cache
1610                // to enable single-instrument forward price fetch (1 HTTP call)
1611                let sample_instrument_id = {
1612                    let cache = self.cache.borrow();
1613                    cache
1614                        .instruments(&series_id.venue, Some(&series_id.underlying))
1615                        .iter()
1616                        .find(|i| {
1617                            i.expiration_ns() == Some(series_id.expiration_ns)
1618                                && i.settlement_currency().code == series_id.settlement_currency
1619                        })
1620                        .map(|i| i.id())
1621                };
1622
1623                let request = RequestForwardPrices::new(
1624                    series_id.venue,
1625                    series_id.underlying,
1626                    sample_instrument_id,
1627                    Some(client_id),
1628                    request_id,
1629                    ts_init,
1630                    None,
1631                );
1632
1633                self.pending_option_chain_requests
1634                    .insert(request_id, cmd.clone());
1635
1636                let req_cmd = RequestCommand::ForwardPrices(request);
1637                if let Err(e) = self.execute_request(req_cmd) {
1638                    log::warn!("Failed to request forward prices for {series_id}: {e}");
1639                    let cmd = self
1640                        .pending_option_chain_requests
1641                        .remove(&request_id)
1642                        .expect("just inserted");
1643                    self.create_option_chain_manager(&cmd, None);
1644                }
1645
1646                return;
1647            }
1648        }
1649
1650        self.create_option_chain_manager(cmd, None);
1651    }
1652
1653    /// Creates and stores an `OptionChainManager` for the given subscription.
1654    fn create_option_chain_manager(
1655        &mut self,
1656        cmd: &SubscribeOptionChain,
1657        initial_atm_price: Option<Price>,
1658    ) {
1659        let series_id = cmd.series_id;
1660        let cache = self.cache.clone();
1661        let clock = self.clock.clone();
1662        let priority = self.msgbus_priority;
1663        let deferred_cmd_queue = self.deferred_cmd_queue.clone();
1664
1665        let manager_rc = {
1666            let client = self.get_client(cmd.client_id.as_ref(), Some(&series_id.venue));
1667            OptionChainManager::create_and_setup(
1668                series_id,
1669                &cache,
1670                cmd,
1671                &clock,
1672                priority,
1673                client,
1674                initial_atm_price,
1675                deferred_cmd_queue,
1676            )
1677        };
1678
1679        // Index all instruments for reverse lookup
1680        for id in manager_rc.borrow().all_instrument_ids() {
1681            self.option_chain_instrument_index.insert(id, series_id);
1682        }
1683
1684        self.option_chain_managers.insert(series_id, manager_rc);
1685    }
1686
1687    fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
1688        let series_id = cmd.series_id;
1689
1690        let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
1691            log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
1692            return;
1693        };
1694
1695        // Extract info before teardown
1696        let all_ids = manager_rc.borrow().all_instrument_ids();
1697        let venue = manager_rc.borrow().venue();
1698
1699        // Remove all instruments from reverse index
1700        for id in &all_ids {
1701            self.option_chain_instrument_index.remove(id);
1702        }
1703
1704        manager_rc.borrow_mut().teardown(&self.clock);
1705
1706        // Forward wire-level unsubscribes to the data client
1707        self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
1708
1709        log::info!("Unsubscribed option chain for {series_id}");
1710    }
1711
1712    /// Forwards wire-level unsubscribe commands for all option chain instruments.
1713    fn forward_option_chain_unsubscribes(
1714        &mut self,
1715        instrument_ids: &[InstrumentId],
1716        venue: Venue,
1717        client_id: Option<ClientId>,
1718    ) {
1719        let ts_init = self.clock.borrow().timestamp_ns();
1720
1721        let Some(client) = self.get_client(client_id.as_ref(), Some(&venue)) else {
1722            log::error!(
1723                "Cannot forward option chain unsubscribes: no client found for venue={venue}",
1724            );
1725            return;
1726        };
1727
1728        for instrument_id in instrument_ids {
1729            client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
1730                *instrument_id,
1731                client_id,
1732                Some(venue),
1733                UUID4::new(),
1734                ts_init,
1735                None,
1736                None,
1737            )));
1738            client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
1739                UnsubscribeOptionGreeks::new(
1740                    *instrument_id,
1741                    client_id,
1742                    Some(venue),
1743                    UUID4::new(),
1744                    ts_init,
1745                    None,
1746                    None,
1747                ),
1748            ));
1749            client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
1750                UnsubscribeInstrumentStatus::new(
1751                    *instrument_id,
1752                    client_id,
1753                    Some(venue),
1754                    UUID4::new(),
1755                    ts_init,
1756                    None,
1757                    None,
1758                ),
1759            ));
1760        }
1761    }
1762
1763    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, _topics: &[MStr<Topic>]) {
1764        let Some(updater) = self.book_updaters.get(instrument_id) else {
1765            return;
1766        };
1767
1768        // Check which internal subscriptions still exist
1769        let has_deltas = self.book_deltas_subs.contains(instrument_id);
1770        let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1771
1772        let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1773        let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1774        let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1775        let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1776
1777        // Unsubscribe from topics that no longer have subscriptions
1778        if !has_deltas {
1779            msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1780        }
1781
1782        if !has_depth10 {
1783            msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1784        }
1785
1786        // Remove BookUpdater only when no subscriptions remain
1787        if !has_deltas && !has_depth10 {
1788            self.book_updaters.remove(instrument_id);
1789            log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1790        }
1791    }
1792
1793    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1794        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1795            let topic = switchboard::get_book_snapshots_topic(
1796                *instrument_id,
1797                snapshotter.snap_info.interval_ms,
1798            );
1799
1800            // Check remaining snapshot subscriptions, if none then remove snapshotter
1801            if msgbus::subscriber_count_book_snapshots(topic) == 0 {
1802                let timer_name = snapshotter.timer_name;
1803                self.book_snapshotters.remove(instrument_id);
1804                let mut clock = self.clock.borrow_mut();
1805                if clock.timer_exists(&timer_name) {
1806                    clock.cancel_timer(&timer_name);
1807                }
1808                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1809            }
1810        }
1811    }
1812
1813    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1814
1815    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1816        let mut cache = self.cache.as_ref().borrow_mut();
1817        if let Err(e) = cache.add_instrument(instrument) {
1818            log_error_on_cache_insert(&e);
1819        }
1820    }
1821
1822    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1823        // TODO: Improve by adding bulk update methods to cache and database
1824        let mut cache = self.cache.as_ref().borrow_mut();
1825        for instrument in instruments {
1826            if let Err(e) = cache.add_instrument(instrument.clone()) {
1827                log_error_on_cache_insert(&e);
1828            }
1829        }
1830    }
1831
1832    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1833        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1834            log_error_on_cache_insert(&e);
1835        }
1836    }
1837
1838    fn handle_trades(&self, trades: &[TradeTick]) {
1839        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1840            log_error_on_cache_insert(&e);
1841        }
1842    }
1843
1844    fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1845        if let Err(e) = self
1846            .cache
1847            .as_ref()
1848            .borrow_mut()
1849            .add_funding_rates(funding_rates)
1850        {
1851            log_error_on_cache_insert(&e);
1852        }
1853    }
1854
1855    fn handle_bars(&self, bars: &[Bar]) {
1856        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1857            log_error_on_cache_insert(&e);
1858        }
1859    }
1860
1861    fn handle_book_response(&self, book: &OrderBook) {
1862        log::debug!("Adding order book {} to cache", book.instrument_id);
1863
1864        if let Err(e) = self
1865            .cache
1866            .as_ref()
1867            .borrow_mut()
1868            .add_order_book(book.clone())
1869        {
1870            log_error_on_cache_insert(&e);
1871        }
1872    }
1873
1874    /// Handles a `ForwardPricesResponse` by extracting the forward price
1875    /// for the pending option chain and creating the manager with instant bootstrap.
1876    fn handle_forward_prices_response(
1877        &mut self,
1878        correlation_id: &UUID4,
1879        resp: &ForwardPricesResponse,
1880    ) {
1881        let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
1882            log::debug!(
1883                "No pending option chain request for correlation_id={correlation_id}, ignoring"
1884            );
1885            return;
1886        };
1887
1888        let series_id = cmd.series_id;
1889
1890        // Find a forward price that matches an instrument in this series.
1891        // We look up each forward price instrument in the cache to match by expiry and currency.
1892        let cache = self.cache.borrow();
1893        let mut best_price: Option<Price> = None;
1894
1895        for fp in &resp.data {
1896            // Check if any cached instrument with this id belongs to our series
1897            if let Some(instrument) = cache.instrument(&fp.instrument_id)
1898                && let Some(expiration) = instrument.expiration_ns()
1899                && expiration == series_id.expiration_ns
1900                && instrument.settlement_currency().code == series_id.settlement_currency
1901            {
1902                match Price::from_decimal(fp.forward_price) {
1903                    Ok(price) => best_price = Some(price),
1904                    Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
1905                }
1906                break;
1907            }
1908        }
1909        drop(cache);
1910
1911        if let Some(price) = best_price {
1912            log::info!("Forward price for {series_id}: {price} (instant bootstrap)",);
1913        } else {
1914            log::info!(
1915                "No matching forward price found for {series_id}, will bootstrap from live data",
1916            );
1917        }
1918
1919        self.create_option_chain_manager(&cmd, best_price);
1920    }
1921
1922    // -- INTERNAL --------------------------------------------------------------------------------
1923
1924    #[allow(clippy::too_many_arguments)]
1925    fn setup_book_updater(
1926        &mut self,
1927        instrument_id: &InstrumentId,
1928        book_type: BookType,
1929        only_deltas: bool,
1930        managed: bool,
1931    ) -> anyhow::Result<()> {
1932        let mut cache = self.cache.borrow_mut();
1933        if managed && !cache.has_order_book(instrument_id) {
1934            let book = OrderBook::new(*instrument_id, book_type);
1935            log::debug!("Created {book}");
1936            cache.add_order_book(book)?;
1937        }
1938
1939        // Reuse existing BookUpdater or create a new one
1940        let updater = self
1941            .book_updaters
1942            .entry(*instrument_id)
1943            .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
1944            .clone();
1945
1946        // Subscribe to deltas (typed router handles duplicates)
1947        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1948        let deltas_handler = TypedHandler::new(updater.clone());
1949        msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
1950
1951        // Subscribe to depth10 if not only_deltas
1952        if !only_deltas {
1953            let topic = switchboard::get_book_depth10_topic(*instrument_id);
1954            let depth_handler = TypedHandler::new(updater);
1955            msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
1956        }
1957
1958        Ok(())
1959    }
1960
1961    fn create_bar_aggregator(
1962        &self,
1963        instrument: &InstrumentAny,
1964        bar_type: BarType,
1965    ) -> Box<dyn BarAggregator> {
1966        let cache = self.cache.clone();
1967
1968        let handler = move |bar: Bar| {
1969            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1970                log_error_on_cache_insert(&e);
1971            }
1972
1973            let topic = switchboard::get_bars_topic(bar.bar_type);
1974            msgbus::publish_bar(topic, &bar);
1975        };
1976
1977        let clock = self.clock.clone();
1978        let config = self.config.clone();
1979
1980        let price_precision = instrument.price_precision();
1981        let size_precision = instrument.size_precision();
1982
1983        if bar_type.spec().is_time_aggregated() {
1984            // Get time_bars_origin_offset from config
1985            let time_bars_origin_offset = config
1986                .time_bars_origins
1987                .get(&bar_type.spec().aggregation)
1988                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1989
1990            Box::new(TimeBarAggregator::new(
1991                bar_type,
1992                price_precision,
1993                size_precision,
1994                clock,
1995                handler,
1996                config.time_bars_build_with_no_updates,
1997                config.time_bars_timestamp_on_close,
1998                config.time_bars_interval_type,
1999                time_bars_origin_offset,
2000                config.time_bars_build_delay,
2001                config.time_bars_skip_first_non_full_bar,
2002            ))
2003        } else {
2004            match bar_type.spec().aggregation {
2005                BarAggregation::Tick => Box::new(TickBarAggregator::new(
2006                    bar_type,
2007                    price_precision,
2008                    size_precision,
2009                    handler,
2010                )) as Box<dyn BarAggregator>,
2011                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
2012                    bar_type,
2013                    price_precision,
2014                    size_precision,
2015                    handler,
2016                )) as Box<dyn BarAggregator>,
2017                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
2018                    bar_type,
2019                    price_precision,
2020                    size_precision,
2021                    handler,
2022                )) as Box<dyn BarAggregator>,
2023                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
2024                    bar_type,
2025                    price_precision,
2026                    size_precision,
2027                    handler,
2028                )) as Box<dyn BarAggregator>,
2029                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
2030                    bar_type,
2031                    price_precision,
2032                    size_precision,
2033                    handler,
2034                )) as Box<dyn BarAggregator>,
2035                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
2036                    bar_type,
2037                    price_precision,
2038                    size_precision,
2039                    handler,
2040                )) as Box<dyn BarAggregator>,
2041                BarAggregation::Value => Box::new(ValueBarAggregator::new(
2042                    bar_type,
2043                    price_precision,
2044                    size_precision,
2045                    handler,
2046                )) as Box<dyn BarAggregator>,
2047                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
2048                    bar_type,
2049                    price_precision,
2050                    size_precision,
2051                    handler,
2052                )) as Box<dyn BarAggregator>,
2053                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
2054                    bar_type,
2055                    price_precision,
2056                    size_precision,
2057                    handler,
2058                )) as Box<dyn BarAggregator>,
2059                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
2060                    bar_type,
2061                    price_precision,
2062                    size_precision,
2063                    instrument.price_increment(),
2064                    handler,
2065                )) as Box<dyn BarAggregator>,
2066                _ => panic!(
2067                    "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
2068                    bar_type.spec().aggregation
2069                ),
2070            }
2071        }
2072    }
2073
2074    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2075        // Get the instrument for this bar type
2076        let instrument = {
2077            let cache = self.cache.borrow();
2078            cache
2079                .instrument(&bar_type.instrument_id())
2080                .ok_or_else(|| {
2081                    anyhow::anyhow!(
2082                        "Cannot start bar aggregation: no instrument found for {}",
2083                        bar_type.instrument_id(),
2084                    )
2085                })?
2086                .clone()
2087        };
2088
2089        // Use standard form of bar type as key
2090        let bar_key = bar_type.standard();
2091
2092        // Create or retrieve aggregator in Rc<RefCell>
2093        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
2094            rc.clone()
2095        } else {
2096            let agg = self.create_bar_aggregator(&instrument, bar_type);
2097            let rc = Rc::new(RefCell::new(agg));
2098            self.bar_aggregators.insert(bar_key, rc.clone());
2099            rc
2100        };
2101
2102        // Subscribe to underlying data topics
2103        let mut subscriptions = Vec::new();
2104
2105        if bar_type.is_composite() {
2106            let topic = switchboard::get_bars_topic(bar_type.composite());
2107            let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_key));
2108            msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
2109            subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
2110        } else if bar_type.spec().price_type == PriceType::Last {
2111            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
2112            let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_key));
2113            msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
2114            subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
2115        } else {
2116            // Warn if imbalance/runs aggregation is wired to quotes (needs aggressor_side from trades)
2117            if matches!(
2118                bar_type.spec().aggregation,
2119                BarAggregation::TickImbalance
2120                    | BarAggregation::VolumeImbalance
2121                    | BarAggregation::ValueImbalance
2122                    | BarAggregation::TickRuns
2123                    | BarAggregation::VolumeRuns
2124                    | BarAggregation::ValueRuns
2125            ) {
2126                log::warn!(
2127                    "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
2128                     data with `aggressor_side`, but `price_type` is not LAST so it will receive \
2129                     quote data: bars will not emit correctly",
2130                );
2131            }
2132
2133            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
2134            let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_key));
2135            msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
2136            subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
2137        }
2138
2139        self.bar_aggregator_handlers.insert(bar_key, subscriptions);
2140
2141        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
2142        self.setup_bar_aggregator(bar_type, false)?;
2143
2144        aggregator.borrow_mut().set_is_running(true);
2145
2146        Ok(())
2147    }
2148
2149    /// Sets up a bar aggregator, matching Cython _setup_bar_aggregator logic.
2150    ///
2151    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
2152    fn setup_bar_aggregator(&self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
2153        let bar_key = bar_type.standard();
2154        let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
2155            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
2156        })?;
2157
2158        // Set historical mode and handler
2159        let handler: Box<dyn FnMut(Bar)> = if historical {
2160            // Historical handler - process_historical equivalent
2161            let cache = self.cache.clone();
2162            Box::new(move |bar: Bar| {
2163                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2164                    log_error_on_cache_insert(&e);
2165                }
2166                // In historical mode, bars are processed but not published to message bus
2167            })
2168        } else {
2169            // Regular handler - process equivalent
2170            let cache = self.cache.clone();
2171            Box::new(move |bar: Bar| {
2172                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2173                    log_error_on_cache_insert(&e);
2174                }
2175                let topic = switchboard::get_bars_topic(bar.bar_type);
2176                msgbus::publish_bar(topic, &bar);
2177            })
2178        };
2179
2180        aggregator
2181            .borrow_mut()
2182            .set_historical_mode(historical, handler);
2183
2184        // For TimeBarAggregator, set clock and start timer
2185        if bar_type.spec().is_time_aggregated() {
2186            use nautilus_common::clock::TestClock;
2187
2188            if historical {
2189                // Each aggregator gets its own independent clock
2190                let test_clock = Rc::new(RefCell::new(TestClock::new()));
2191                aggregator.borrow_mut().set_clock(test_clock);
2192                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
2193                // Store weak reference so start_timer can use it when called later
2194                let aggregator_weak = Rc::downgrade(aggregator);
2195                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
2196            } else {
2197                aggregator.borrow_mut().set_clock(self.clock.clone());
2198                aggregator
2199                    .borrow_mut()
2200                    .start_timer(Some(aggregator.clone()));
2201            }
2202        }
2203
2204        Ok(())
2205    }
2206
2207    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2208        let aggregator = self
2209            .bar_aggregators
2210            .remove(&bar_type.standard())
2211            .ok_or_else(|| {
2212                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
2213            })?;
2214
2215        aggregator.borrow_mut().stop();
2216
2217        // Unsubscribe any registered message handlers
2218        let bar_key = bar_type.standard();
2219        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
2220            for sub in subs {
2221                match sub {
2222                    BarAggregatorSubscription::Bar { topic, handler } => {
2223                        msgbus::unsubscribe_bars(topic.into(), &handler);
2224                    }
2225                    BarAggregatorSubscription::Trade { topic, handler } => {
2226                        msgbus::unsubscribe_trades(topic.into(), &handler);
2227                    }
2228                    BarAggregatorSubscription::Quote { topic, handler } => {
2229                        msgbus::unsubscribe_quotes(topic.into(), &handler);
2230                    }
2231                }
2232            }
2233        }
2234
2235        Ok(())
2236    }
2237}
2238
2239#[inline(always)]
2240fn log_error_on_cache_insert<T: Display>(e: &T) {
2241    log::error!("Error on cache insert: {e}");
2242}
2243
2244#[inline(always)]
2245fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
2246    if data.is_empty() {
2247        let name = type_name::<T>();
2248        let short_name = name.rsplit("::").next().unwrap_or(name);
2249        log::warn!("Received empty {short_name} response for {id} {correlation_id}");
2250        return true;
2251    }
2252    false
2253}