Skip to main content

nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39    any::{Any, type_name},
40    cell::{Ref, RefCell},
41    collections::hash_map::Entry,
42    fmt::{Debug, Display},
43    num::NonZeroUsize,
44    rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{
64        self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
65        switchboard::{self, MessagingSwitchboard},
66    },
67    runner::get_data_cmd_sender,
68    timer::{TimeEvent, TimeEventCallback},
69};
70use nautilus_core::{
71    UUID4, WeakCell,
72    correctness::{
73        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
74    },
75    datetime::millis_to_nanos_unchecked,
76};
77#[cfg(feature = "defi")]
78use nautilus_model::defi::DefiData;
79use nautilus_model::{
80    data::{
81        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
82        InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
83        QuoteTick, TradeTick,
84    },
85    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
86    identifiers::{ClientId, InstrumentId, Venue},
87    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
88    orderbook::OrderBook,
89};
90#[cfg(feature = "streaming")]
91use nautilus_persistence::backend::catalog::ParquetDataCatalog;
92use ustr::Ustr;
93
94#[cfg(feature = "defi")]
95#[allow(unused_imports)] // Brings DeFi impl blocks into scope
96use crate::defi::engine as _;
97#[cfg(feature = "defi")]
98use crate::engine::pool::PoolUpdater;
99use crate::{
100    aggregation::{
101        BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
102        TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
103        ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
104        VolumeRunsBarAggregator,
105    },
106    client::DataClientAdapter,
107};
108
109/// Typed subscription for bar aggregator handlers.
110///
111/// Stores the topic and handler for each data type so we can properly
112/// unsubscribe from the typed routers.
113#[derive(Clone)]
114pub enum BarAggregatorSubscription {
115    Bar {
116        topic: MStr<Topic>,
117        handler: TypedHandler<Bar>,
118    },
119    Trade {
120        topic: MStr<Topic>,
121        handler: TypedHandler<TradeTick>,
122    },
123    Quote {
124        topic: MStr<Topic>,
125        handler: TypedHandler<QuoteTick>,
126    },
127}
128
129impl Debug for BarAggregatorSubscription {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        match self {
132            Self::Bar { topic, handler } => f
133                .debug_struct(stringify!(Bar))
134                .field("topic", topic)
135                .field("handler_id", &handler.id())
136                .finish(),
137            Self::Trade { topic, handler } => f
138                .debug_struct(stringify!(Trade))
139                .field("topic", topic)
140                .field("handler_id", &handler.id())
141                .finish(),
142            Self::Quote { topic, handler } => f
143                .debug_struct(stringify!(Quote))
144                .field("topic", topic)
145                .field("handler_id", &handler.id())
146                .finish(),
147        }
148    }
149}
150
151/// Provides a high-performance `DataEngine` for all environments.
152#[derive(Debug)]
153pub struct DataEngine {
154    pub(crate) clock: Rc<RefCell<dyn Clock>>,
155    pub(crate) cache: Rc<RefCell<Cache>>,
156    pub(crate) external_clients: AHashSet<ClientId>,
157    clients: IndexMap<ClientId, DataClientAdapter>,
158    default_client: Option<DataClientAdapter>,
159    #[cfg(feature = "streaming")]
160    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
161    routing_map: IndexMap<Venue, ClientId>,
162    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
163    book_deltas_subs: AHashSet<InstrumentId>,
164    book_depth10_subs: AHashSet<InstrumentId>,
165    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
166    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
167    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
168    bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
169    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
170    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
171    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
172    pub(crate) msgbus_priority: u8,
173    pub(crate) config: DataEngineConfig,
174    #[cfg(feature = "defi")]
175    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
176    #[cfg(feature = "defi")]
177    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
178    #[cfg(feature = "defi")]
179    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
180    #[cfg(feature = "defi")]
181    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
182}
183
184impl DataEngine {
185    /// Creates a new [`DataEngine`] instance.
186    #[must_use]
187    pub fn new(
188        clock: Rc<RefCell<dyn Clock>>,
189        cache: Rc<RefCell<Cache>>,
190        config: Option<DataEngineConfig>,
191    ) -> Self {
192        let config = config.unwrap_or_default();
193
194        let external_clients: AHashSet<ClientId> = config
195            .external_clients
196            .clone()
197            .unwrap_or_default()
198            .into_iter()
199            .collect();
200
201        Self {
202            clock,
203            cache,
204            external_clients,
205            clients: IndexMap::new(),
206            default_client: None,
207            #[cfg(feature = "streaming")]
208            catalogs: AHashMap::new(),
209            routing_map: IndexMap::new(),
210            book_intervals: AHashMap::new(),
211            book_deltas_subs: AHashSet::new(),
212            book_depth10_subs: AHashSet::new(),
213            book_updaters: AHashMap::new(),
214            book_snapshotters: AHashMap::new(),
215            bar_aggregators: AHashMap::new(),
216            bar_aggregator_handlers: AHashMap::new(),
217            _synthetic_quote_feeds: AHashMap::new(),
218            _synthetic_trade_feeds: AHashMap::new(),
219            buffered_deltas_map: AHashMap::new(),
220            msgbus_priority: 10, // High-priority for built-in component
221            config,
222            #[cfg(feature = "defi")]
223            pool_updaters: AHashMap::new(),
224            #[cfg(feature = "defi")]
225            pool_updaters_pending: AHashSet::new(),
226            #[cfg(feature = "defi")]
227            pool_snapshot_pending: AHashSet::new(),
228            #[cfg(feature = "defi")]
229            pool_event_buffers: AHashMap::new(),
230        }
231    }
232
233    /// Registers all message bus handlers for the data engine.
234    pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
235        let weak = WeakCell::from(Rc::downgrade(&engine));
236
237        let weak1 = weak.clone();
238        msgbus::register_data_command_endpoint(
239            MessagingSwitchboard::data_engine_execute(),
240            TypedIntoHandler::from(move |cmd: DataCommand| {
241                if let Some(rc) = weak1.upgrade() {
242                    rc.borrow_mut().execute(cmd);
243                }
244            }),
245        );
246
247        msgbus::register_data_command_endpoint(
248            MessagingSwitchboard::data_engine_queue_execute(),
249            TypedIntoHandler::from(move |cmd: DataCommand| {
250                get_data_cmd_sender().clone().execute(cmd);
251            }),
252        );
253
254        // Register process handler (polymorphic - uses Any)
255        let weak2 = weak.clone();
256        msgbus::register_any(
257            MessagingSwitchboard::data_engine_process(),
258            ShareableMessageHandler::from_any(move |data: &dyn Any| {
259                if let Some(rc) = weak2.upgrade() {
260                    rc.borrow_mut().process(data);
261                }
262            }),
263        );
264
265        // Register process_data handler (typed - takes ownership)
266        let weak3 = weak.clone();
267        msgbus::register_data_endpoint(
268            MessagingSwitchboard::data_engine_process_data(),
269            TypedIntoHandler::from(move |data: Data| {
270                if let Some(rc) = weak3.upgrade() {
271                    rc.borrow_mut().process_data(data);
272                }
273            }),
274        );
275
276        // Register process_defi_data handler (typed - takes ownership)
277        #[cfg(feature = "defi")]
278        {
279            let weak4 = weak.clone();
280            msgbus::register_defi_data_endpoint(
281                MessagingSwitchboard::data_engine_process_defi_data(),
282                TypedIntoHandler::from(move |data: DefiData| {
283                    if let Some(rc) = weak4.upgrade() {
284                        rc.borrow_mut().process_defi_data(data);
285                    }
286                }),
287            );
288        }
289
290        let weak5 = weak;
291        msgbus::register_data_response_endpoint(
292            MessagingSwitchboard::data_engine_response(),
293            TypedIntoHandler::from(move |resp: DataResponse| {
294                if let Some(rc) = weak5.upgrade() {
295                    rc.borrow_mut().response(resp);
296                }
297            }),
298        );
299    }
300
301    /// Returns a read-only reference to the engines clock.
302    #[must_use]
303    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
304        self.clock.borrow()
305    }
306
307    /// Returns a read-only reference to the engines cache.
308    #[must_use]
309    pub fn get_cache(&self) -> Ref<'_, Cache> {
310        self.cache.borrow()
311    }
312
313    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
314    #[must_use]
315    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
316        Rc::clone(&self.cache)
317    }
318
319    /// Registers the `catalog` with the engine with an optional specific `name`.
320    ///
321    /// # Panics
322    ///
323    /// Panics if a catalog with the same `name` has already been registered.
324    #[cfg(feature = "streaming")]
325    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
326        let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
327
328        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
329
330        self.catalogs.insert(name, catalog);
331        log::info!("Registered catalog <{name}>");
332    }
333
334    /// Registers the `client` with the engine with an optional venue `routing`.
335    ///
336    ///
337    /// # Panics
338    ///
339    /// Panics if a client with the same client ID has already been registered.
340    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
341        let client_id = client.client_id();
342
343        if let Some(default_client) = &self.default_client {
344            check_predicate_false(
345                default_client.client_id() == client.client_id(),
346                "client_id already registered as default client",
347            )
348            .expect(FAILED);
349        }
350
351        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
352
353        if let Some(routing) = routing {
354            self.routing_map.insert(routing, client_id);
355            log::debug!("Set client {client_id} routing for {routing}");
356        }
357
358        if client.venue.is_none() && self.default_client.is_none() {
359            self.default_client = Some(client);
360            log::debug!("Registered client {client_id} for default routing");
361        } else {
362            self.clients.insert(client_id, client);
363            log::debug!("Registered client {client_id}");
364        }
365    }
366
367    /// Deregisters the client for the `client_id`.
368    ///
369    /// # Panics
370    ///
371    /// Panics if the client ID has not been registered.
372    pub fn deregister_client(&mut self, client_id: &ClientId) {
373        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
374
375        self.clients.shift_remove(client_id);
376        log::info!("Deregistered client {client_id}");
377    }
378
379    /// Registers the data `client` with the engine as the default routing client.
380    ///
381    /// When a specific venue routing cannot be found, this client will receive messages.
382    ///
383    /// # Warnings
384    ///
385    /// Any existing default routing client will be overwritten.
386    ///
387    /// # Panics
388    ///
389    /// Panics if a default client has already been registered.
390    pub fn register_default_client(&mut self, client: DataClientAdapter) {
391        check_predicate_true(
392            self.default_client.is_none(),
393            "default client already registered",
394        )
395        .expect(FAILED);
396
397        let client_id = client.client_id();
398
399        self.default_client = Some(client);
400        log::debug!("Registered default client {client_id}");
401    }
402
403    /// Starts all registered data clients and re-arms bar aggregator timers.
404    pub fn start(&mut self) {
405        for client in self.get_clients_mut() {
406            if let Err(e) = client.start() {
407                log::error!("{e}");
408            }
409        }
410
411        for aggregator in self.bar_aggregators.values() {
412            if aggregator.borrow().bar_type().spec().is_time_aggregated() {
413                aggregator
414                    .borrow_mut()
415                    .start_timer(Some(aggregator.clone()));
416            }
417        }
418    }
419
420    /// Stops all registered data clients and bar aggregator timers.
421    pub fn stop(&mut self) {
422        for client in self.get_clients_mut() {
423            if let Err(e) = client.stop() {
424                log::error!("{e}");
425            }
426        }
427
428        for aggregator in self.bar_aggregators.values() {
429            aggregator.borrow_mut().stop();
430        }
431    }
432
433    /// Resets all registered data clients and clears bar aggregator state.
434    pub fn reset(&mut self) {
435        for client in self.get_clients_mut() {
436            if let Err(e) = client.reset() {
437                log::error!("{e}");
438            }
439        }
440
441        let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
442        for bar_type in bar_types {
443            if let Err(e) = self.stop_bar_aggregator(bar_type) {
444                log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
445            }
446        }
447    }
448
449    /// Disposes the engine, stopping all clients and canceling any timers.
450    pub fn dispose(&mut self) {
451        for client in self.get_clients_mut() {
452            if let Err(e) = client.dispose() {
453                log::error!("{e}");
454            }
455        }
456
457        self.clock.borrow_mut().cancel_timers();
458    }
459
460    /// Connects all registered data clients concurrently.
461    ///
462    /// Connection failures are logged but do not prevent the node from running.
463    pub async fn connect(&mut self) {
464        let futures: Vec<_> = self
465            .get_clients_mut()
466            .into_iter()
467            .map(|client| client.connect())
468            .collect();
469
470        let results = join_all(futures).await;
471
472        for error in results.into_iter().filter_map(Result::err) {
473            log::error!("Failed to connect data client: {error}");
474        }
475    }
476
477    /// Disconnects all registered data clients concurrently.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if any client fails to disconnect.
482    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
483        let futures: Vec<_> = self
484            .get_clients_mut()
485            .into_iter()
486            .map(|client| client.disconnect())
487            .collect();
488
489        let results = join_all(futures).await;
490        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
491
492        if errors.is_empty() {
493            Ok(())
494        } else {
495            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
496            anyhow::bail!(
497                "Failed to disconnect data clients: {}",
498                error_msgs.join("; ")
499            )
500        }
501    }
502
503    /// Returns `true` if all registered data clients are currently connected.
504    #[must_use]
505    pub fn check_connected(&self) -> bool {
506        self.get_clients()
507            .iter()
508            .all(|client| client.is_connected())
509    }
510
511    /// Returns `true` if all registered data clients are currently disconnected.
512    #[must_use]
513    pub fn check_disconnected(&self) -> bool {
514        self.get_clients()
515            .iter()
516            .all(|client| !client.is_connected())
517    }
518
519    /// Returns connection status for each registered client.
520    #[must_use]
521    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
522        self.get_clients()
523            .into_iter()
524            .map(|client| (client.client_id(), client.is_connected()))
525            .collect()
526    }
527
528    /// Returns a list of all registered client IDs, including the default client if set.
529    #[must_use]
530    pub fn registered_clients(&self) -> Vec<ClientId> {
531        self.get_clients()
532            .into_iter()
533            .map(|client| client.client_id())
534            .collect()
535    }
536
537    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
538
539    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
540    where
541        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
542        T: Clone,
543    {
544        self.get_clients()
545            .into_iter()
546            .flat_map(get_subs)
547            .cloned()
548            .collect()
549    }
550
551    #[must_use]
552    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
553        let (default_opt, clients_map) = (&self.default_client, &self.clients);
554        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
555
556        if let Some(default) = default_opt {
557            clients.push(default);
558        }
559
560        clients
561    }
562
563    #[must_use]
564    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
565        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
566        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
567
568        if let Some(default) = default_opt {
569            clients.push(default);
570        }
571
572        clients
573    }
574
575    pub fn get_client(
576        &mut self,
577        client_id: Option<&ClientId>,
578        venue: Option<&Venue>,
579    ) -> Option<&mut DataClientAdapter> {
580        if let Some(client_id) = client_id {
581            // Explicit ID: first look in registered clients
582            if let Some(client) = self.clients.get_mut(client_id) {
583                return Some(client);
584            }
585
586            // Then check if it matches the default client
587            if let Some(default) = self.default_client.as_mut()
588                && default.client_id() == *client_id
589            {
590                return Some(default);
591            }
592
593            // Unknown explicit client
594            return None;
595        }
596
597        if let Some(v) = venue {
598            // Route by venue if mapped client still registered
599            if let Some(client_id) = self.routing_map.get(v) {
600                return self.clients.get_mut(client_id);
601            }
602        }
603
604        // Fallback to default client
605        self.get_default_client()
606    }
607
608    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
609        self.default_client.as_mut()
610    }
611
612    /// Returns all custom data types currently subscribed across all clients.
613    #[must_use]
614    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
615        self.collect_subscriptions(|client| &client.subscriptions_custom)
616    }
617
618    /// Returns all instrument IDs currently subscribed across all clients.
619    #[must_use]
620    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
621        self.collect_subscriptions(|client| &client.subscriptions_instrument)
622    }
623
624    /// Returns all instrument IDs for which book delta subscriptions exist.
625    #[must_use]
626    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
627        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
628    }
629
630    /// Returns all instrument IDs for which book depth10 subscriptions exist.
631    #[must_use]
632    pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
633        self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
634    }
635
636    /// Returns all instrument IDs for which book snapshot subscriptions exist.
637    #[must_use]
638    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
639        self.book_intervals
640            .values()
641            .flat_map(|set| set.iter().copied())
642            .collect()
643    }
644
645    /// Returns all instrument IDs for which quote subscriptions exist.
646    #[must_use]
647    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
648        self.collect_subscriptions(|client| &client.subscriptions_quotes)
649    }
650
651    /// Returns all instrument IDs for which trade subscriptions exist.
652    #[must_use]
653    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
654        self.collect_subscriptions(|client| &client.subscriptions_trades)
655    }
656
657    /// Returns all bar types currently subscribed across all clients.
658    #[must_use]
659    pub fn subscribed_bars(&self) -> Vec<BarType> {
660        self.collect_subscriptions(|client| &client.subscriptions_bars)
661    }
662
663    /// Returns all instrument IDs for which mark price subscriptions exist.
664    #[must_use]
665    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
666        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
667    }
668
669    /// Returns all instrument IDs for which index price subscriptions exist.
670    #[must_use]
671    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
672        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
673    }
674
675    /// Returns all instrument IDs for which funding rate subscriptions exist.
676    #[must_use]
677    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
678        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
679    }
680
681    /// Returns all instrument IDs for which status subscriptions exist.
682    #[must_use]
683    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
684        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
685    }
686
687    /// Returns all instrument IDs for which instrument close subscriptions exist.
688    #[must_use]
689    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
690        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
691    }
692
693    // -- COMMANDS --------------------------------------------------------------------------------
694
695    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
696    ///
697    /// Errors during execution are logged.
698    pub fn execute(&mut self, cmd: DataCommand) {
699        if let Err(e) = match cmd {
700            DataCommand::Subscribe(c) => self.execute_subscribe(&c),
701            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
702            DataCommand::Request(c) => self.execute_request(c),
703            #[cfg(feature = "defi")]
704            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
705            #[cfg(feature = "defi")]
706            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(&c),
707            #[cfg(feature = "defi")]
708            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
709            _ => {
710                log::warn!("Unhandled DataCommand variant");
711                Ok(())
712            }
713        } {
714            log::error!("{e}");
715        }
716    }
717
718    /// Handles a subscribe command, updating internal state and forwarding to the client.
719    ///
720    /// # Errors
721    ///
722    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
723    /// or if the underlying client operation fails.
724    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
725        // Update internal engine state
726        match &cmd {
727            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
728            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
729            SubscribeCommand::BookSnapshots(cmd) => {
730                // Handles client forwarding internally (forwards as BookDeltas)
731                return self.subscribe_book_snapshots(cmd);
732            }
733            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
734            _ => {} // Do nothing else
735        }
736
737        if let Some(client_id) = cmd.client_id()
738            && self.external_clients.contains(client_id)
739        {
740            if self.config.debug {
741                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
742            }
743            return Ok(());
744        }
745
746        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
747            client.execute_subscribe(cmd);
748        } else {
749            log::error!(
750                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
751                cmd.client_id(),
752                cmd.venue(),
753            );
754        }
755
756        Ok(())
757    }
758
759    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the underlying client operation fails.
764    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
765        match &cmd {
766            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
767            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
768            UnsubscribeCommand::BookSnapshots(cmd) => {
769                // Handles client forwarding internally (forwards as BookDeltas)
770                return self.unsubscribe_book_snapshots(cmd);
771            }
772            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
773            _ => {} // Do nothing else
774        }
775
776        if let Some(client_id) = cmd.client_id()
777            && self.external_clients.contains(client_id)
778        {
779            if self.config.debug {
780                log::debug!(
781                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
782                );
783            }
784            return Ok(());
785        }
786
787        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
788            client.execute_unsubscribe(cmd);
789        } else {
790            log::error!(
791                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
792                cmd.client_id(),
793                cmd.venue(),
794            );
795        }
796
797        Ok(())
798    }
799
800    /// Sends a [`RequestCommand`] to a suitable data client implementation.
801    ///
802    /// # Errors
803    ///
804    /// Returns an error if no client is found for the given client ID or venue,
805    /// or if the client fails to process the request.
806    pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
807        // Skip requests for external clients
808        if let Some(cid) = req.client_id()
809            && self.external_clients.contains(cid)
810        {
811            if self.config.debug {
812                log::debug!("Skipping data request for external client {cid}: {req:?}");
813            }
814            return Ok(());
815        }
816
817        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
818            match req {
819                RequestCommand::Data(req) => client.request_data(req),
820                RequestCommand::Instrument(req) => client.request_instrument(req),
821                RequestCommand::Instruments(req) => client.request_instruments(req),
822                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
823                RequestCommand::BookDepth(req) => client.request_book_depth(req),
824                RequestCommand::Quotes(req) => client.request_quotes(req),
825                RequestCommand::Trades(req) => client.request_trades(req),
826                RequestCommand::FundingRates(req) => client.request_funding_rates(req),
827                RequestCommand::Bars(req) => client.request_bars(req),
828            }
829        } else {
830            anyhow::bail!(
831                "Cannot handle request: no client found for {:?} {:?}",
832                req.client_id(),
833                req.venue()
834            );
835        }
836    }
837
838    /// Processes a dynamically-typed data message.
839    ///
840    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
841    pub fn process(&mut self, data: &dyn Any) {
842        // TODO: Eventually these can be added to the `Data` enum (C/Cython blocking), process here for now
843        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
844            self.handle_instrument(instrument.clone());
845        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
846            self.handle_funding_rate(*funding_rate);
847        } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
848            self.handle_instrument_status(*status);
849        } else {
850            log::error!("Cannot process data {data:?}, type is unrecognized");
851        }
852
853        // TODO: Add custom data handling here
854    }
855
856    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
857    pub fn process_data(&mut self, data: Data) {
858        match data {
859            Data::Delta(delta) => self.handle_delta(delta),
860            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
861            Data::Depth10(depth) => self.handle_depth10(*depth),
862            Data::Quote(quote) => self.handle_quote(quote),
863            Data::Trade(trade) => self.handle_trade(trade),
864            Data::Bar(bar) => self.handle_bar(bar),
865            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
866            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
867            Data::InstrumentClose(close) => self.handle_instrument_close(close),
868        }
869    }
870
871    /// Processes a `DataResponse`, handling and publishing the response message.
872    pub fn response(&self, resp: DataResponse) {
873        log::debug!("{RECV}{RES} {resp:?}");
874
875        let correlation_id = *resp.correlation_id();
876
877        match &resp {
878            DataResponse::Instrument(r) => {
879                self.handle_instrument_response(r.data.clone());
880            }
881            DataResponse::Instruments(r) => {
882                self.handle_instruments(&r.data);
883            }
884            DataResponse::Quotes(r) => {
885                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
886                    self.handle_quotes(&r.data);
887                }
888            }
889            DataResponse::Trades(r) => {
890                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
891                    self.handle_trades(&r.data);
892                }
893            }
894            DataResponse::FundingRates(r) => {
895                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
896                    self.handle_funding_rates(&r.data);
897                }
898            }
899            DataResponse::Bars(r) => {
900                if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
901                    self.handle_bars(&r.data);
902                }
903            }
904            DataResponse::Book(r) => self.handle_book_response(&r.data),
905            _ => todo!("Handle other response types"),
906        }
907
908        msgbus::send_response(&correlation_id, resp);
909    }
910
911    // -- DATA HANDLERS ---------------------------------------------------------------------------
912
913    fn handle_instrument(&mut self, instrument: InstrumentAny) {
914        log::debug!("Handling instrument: {}", instrument.id());
915
916        if let Err(e) = self
917            .cache
918            .as_ref()
919            .borrow_mut()
920            .add_instrument(instrument.clone())
921        {
922            log_error_on_cache_insert(&e);
923        }
924
925        let topic = switchboard::get_instrument_topic(instrument.id());
926        log::debug!("Publishing instrument to topic: {topic}");
927        msgbus::publish_any(topic, &instrument);
928    }
929
930    fn handle_delta(&mut self, delta: OrderBookDelta) {
931        let deltas = if self.config.buffer_deltas {
932            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
933                buffered_deltas.deltas.push(delta);
934                buffered_deltas.flags = delta.flags;
935                buffered_deltas.sequence = delta.sequence;
936                buffered_deltas.ts_event = delta.ts_event;
937                buffered_deltas.ts_init = delta.ts_init;
938            } else {
939                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
940                self.buffered_deltas_map
941                    .insert(delta.instrument_id, buffered_deltas);
942            }
943
944            if !RecordFlag::F_LAST.matches(delta.flags) {
945                return; // Not the last delta for event
946            }
947
948            self.buffered_deltas_map
949                .remove(&delta.instrument_id)
950                .expect("buffered deltas exist")
951        } else {
952            OrderBookDeltas::new(delta.instrument_id, vec![delta])
953        };
954
955        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
956        msgbus::publish_deltas(topic, &deltas);
957    }
958
959    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
960        if self.config.buffer_deltas {
961            let instrument_id = deltas.instrument_id;
962
963            for delta in deltas.deltas {
964                if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
965                    buffered_deltas.deltas.push(delta);
966                    buffered_deltas.flags = delta.flags;
967                    buffered_deltas.sequence = delta.sequence;
968                    buffered_deltas.ts_event = delta.ts_event;
969                    buffered_deltas.ts_init = delta.ts_init;
970                } else {
971                    let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
972                    self.buffered_deltas_map
973                        .insert(instrument_id, buffered_deltas);
974                }
975
976                if RecordFlag::F_LAST.matches(delta.flags) {
977                    let deltas_to_publish = self
978                        .buffered_deltas_map
979                        .remove(&instrument_id)
980                        .expect("buffered deltas exist");
981                    let topic = switchboard::get_book_deltas_topic(instrument_id);
982                    msgbus::publish_deltas(topic, &deltas_to_publish);
983                }
984            }
985        } else {
986            let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
987            msgbus::publish_deltas(topic, &deltas);
988        }
989    }
990
991    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
992        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
993        msgbus::publish_depth10(topic, &depth);
994    }
995
996    fn handle_quote(&mut self, quote: QuoteTick) {
997        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
998            log_error_on_cache_insert(&e);
999        }
1000
1001        // TODO: Handle synthetics
1002
1003        let topic = switchboard::get_quotes_topic(quote.instrument_id);
1004        msgbus::publish_quote(topic, &quote);
1005    }
1006
1007    fn handle_trade(&mut self, trade: TradeTick) {
1008        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1009            log_error_on_cache_insert(&e);
1010        }
1011
1012        // TODO: Handle synthetics
1013
1014        let topic = switchboard::get_trades_topic(trade.instrument_id);
1015        msgbus::publish_trade(topic, &trade);
1016    }
1017
1018    fn handle_bar(&mut self, bar: Bar) {
1019        // TODO: Handle additional bar logic
1020        if self.config.validate_data_sequence
1021            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1022        {
1023            if bar.ts_event < last_bar.ts_event {
1024                log::warn!(
1025                    "Bar {bar} was prior to last bar `ts_event` {}",
1026                    last_bar.ts_event
1027                );
1028                return; // Bar is out of sequence
1029            }
1030
1031            if bar.ts_init < last_bar.ts_init {
1032                log::warn!(
1033                    "Bar {bar} was prior to last bar `ts_init` {}",
1034                    last_bar.ts_init
1035                );
1036                return; // Bar is out of sequence
1037            }
1038            // TODO: Implement `bar.is_revision` logic
1039        }
1040
1041        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1042            log_error_on_cache_insert(&e);
1043        }
1044
1045        let topic = switchboard::get_bars_topic(bar.bar_type);
1046        msgbus::publish_bar(topic, &bar);
1047    }
1048
1049    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
1050        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1051            log_error_on_cache_insert(&e);
1052        }
1053
1054        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1055        msgbus::publish_mark_price(topic, &mark_price);
1056    }
1057
1058    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
1059        if let Err(e) = self
1060            .cache
1061            .as_ref()
1062            .borrow_mut()
1063            .add_index_price(index_price)
1064        {
1065            log_error_on_cache_insert(&e);
1066        }
1067
1068        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1069        msgbus::publish_index_price(topic, &index_price);
1070    }
1071
1072    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
1073    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1074        if let Err(e) = self
1075            .cache
1076            .as_ref()
1077            .borrow_mut()
1078            .add_funding_rate(funding_rate)
1079        {
1080            log_error_on_cache_insert(&e);
1081        }
1082
1083        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1084        msgbus::publish_funding_rate(topic, &funding_rate);
1085    }
1086
1087    fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1088        let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1089        msgbus::publish_any(topic, &status);
1090    }
1091
1092    fn handle_instrument_close(&mut self, close: InstrumentClose) {
1093        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1094        msgbus::publish_any(topic, &close);
1095    }
1096
1097    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
1098
1099    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1100        if cmd.instrument_id.is_synthetic() {
1101            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1102        }
1103
1104        self.book_deltas_subs.insert(cmd.instrument_id);
1105        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1106
1107        Ok(())
1108    }
1109
1110    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1111        if cmd.instrument_id.is_synthetic() {
1112            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1113        }
1114
1115        self.book_depth10_subs.insert(cmd.instrument_id);
1116        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1117
1118        Ok(())
1119    }
1120
1121    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1122        if cmd.instrument_id.is_synthetic() {
1123            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1124        }
1125
1126        // Track snapshot intervals per instrument, and set up timer on first subscription
1127        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
1128            Entry::Vacant(e) => {
1129                let mut set = AHashSet::new();
1130                set.insert(cmd.instrument_id);
1131                e.insert(set);
1132                true
1133            }
1134            Entry::Occupied(mut e) => {
1135                e.get_mut().insert(cmd.instrument_id);
1136                false
1137            }
1138        };
1139
1140        if first_for_interval {
1141            // Initialize snapshotter and schedule its timer
1142            let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
1143            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1144
1145            let snap_info = BookSnapshotInfo {
1146                instrument_id: cmd.instrument_id,
1147                venue: cmd.instrument_id.venue,
1148                is_composite: cmd.instrument_id.symbol.is_composite(),
1149                root: Ustr::from(cmd.instrument_id.symbol.root()),
1150                topic,
1151                interval_ms: cmd.interval_ms,
1152            };
1153
1154            // Schedule the first snapshot at the next interval boundary
1155            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1156            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1157
1158            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1159            self.book_snapshotters
1160                .insert(cmd.instrument_id, snapshotter.clone());
1161            let timer_name = snapshotter.timer_name;
1162
1163            let callback_fn: Rc<dyn Fn(TimeEvent)> =
1164                Rc::new(move |event| snapshotter.snapshot(event));
1165            let callback = TimeEventCallback::from(callback_fn);
1166
1167            self.clock
1168                .borrow_mut()
1169                .set_timer_ns(
1170                    &timer_name,
1171                    interval_ns,
1172                    Some(start_time_ns.into()),
1173                    None,
1174                    Some(callback),
1175                    None,
1176                    None,
1177                )
1178                .expect(FAILED);
1179        }
1180
1181        // Only set up book updater if not already subscribed to deltas
1182        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1183            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1184        }
1185
1186        if let Some(client_id) = cmd.client_id.as_ref()
1187            && self.external_clients.contains(client_id)
1188        {
1189            if self.config.debug {
1190                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1191            }
1192            return Ok(());
1193        }
1194
1195        log::debug!(
1196            "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1197            cmd.instrument_id,
1198            cmd.client_id,
1199            cmd.venue,
1200        );
1201
1202        if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1203            let deltas_cmd = SubscribeBookDeltas::new(
1204                cmd.instrument_id,
1205                cmd.book_type,
1206                cmd.client_id,
1207                cmd.venue,
1208                UUID4::new(),
1209                cmd.ts_init,
1210                cmd.depth,
1211                true, // managed
1212                Some(cmd.command_id),
1213                cmd.params.clone(),
1214            );
1215            log::debug!(
1216                "Calling client.execute_subscribe for BookDeltas: {}",
1217                cmd.instrument_id
1218            );
1219            client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1220        } else {
1221            log::error!(
1222                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1223                cmd.client_id,
1224                cmd.venue,
1225            );
1226        }
1227
1228        Ok(())
1229    }
1230
1231    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1232        match cmd.bar_type.aggregation_source() {
1233            AggregationSource::Internal => {
1234                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1235                    self.start_bar_aggregator(cmd.bar_type)?;
1236                }
1237            }
1238            AggregationSource::External => {
1239                if cmd.bar_type.instrument_id().is_synthetic() {
1240                    anyhow::bail!(
1241                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1242                    );
1243                }
1244            }
1245        }
1246
1247        Ok(())
1248    }
1249
1250    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1251        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1252            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1253            return Ok(());
1254        }
1255
1256        self.book_deltas_subs.remove(&cmd.instrument_id);
1257
1258        let topics = vec![
1259            switchboard::get_book_deltas_topic(cmd.instrument_id),
1260            switchboard::get_book_depth10_topic(cmd.instrument_id),
1261            // TODO: Unsubscribe from snapshots?
1262        ];
1263
1264        self.maintain_book_updater(&cmd.instrument_id, &topics);
1265        self.maintain_book_snapshotter(&cmd.instrument_id);
1266
1267        Ok(())
1268    }
1269
1270    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1271        if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1272            log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1273            return Ok(());
1274        }
1275
1276        self.book_depth10_subs.remove(&cmd.instrument_id);
1277
1278        let topics = vec![
1279            switchboard::get_book_deltas_topic(cmd.instrument_id),
1280            switchboard::get_book_depth10_topic(cmd.instrument_id),
1281        ];
1282
1283        self.maintain_book_updater(&cmd.instrument_id, &topics);
1284        self.maintain_book_snapshotter(&cmd.instrument_id);
1285
1286        Ok(())
1287    }
1288
1289    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1290        let is_subscribed = self
1291            .book_intervals
1292            .values()
1293            .any(|set| set.contains(&cmd.instrument_id));
1294
1295        if !is_subscribed {
1296            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1297            return Ok(());
1298        }
1299
1300        // Remove instrument from interval tracking, and drop empty intervals
1301        let mut to_remove = Vec::new();
1302        for (interval, set) in &mut self.book_intervals {
1303            if set.remove(&cmd.instrument_id) && set.is_empty() {
1304                to_remove.push(*interval);
1305            }
1306        }
1307
1308        for interval in to_remove {
1309            self.book_intervals.remove(&interval);
1310        }
1311
1312        let topics = vec![
1313            switchboard::get_book_deltas_topic(cmd.instrument_id),
1314            switchboard::get_book_depth10_topic(cmd.instrument_id),
1315        ];
1316
1317        self.maintain_book_updater(&cmd.instrument_id, &topics);
1318        self.maintain_book_snapshotter(&cmd.instrument_id);
1319
1320        let still_in_intervals = self
1321            .book_intervals
1322            .values()
1323            .any(|set| set.contains(&cmd.instrument_id));
1324
1325        if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1326            if let Some(client_id) = cmd.client_id.as_ref()
1327                && self.external_clients.contains(client_id)
1328            {
1329                return Ok(());
1330            }
1331
1332            if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1333                let deltas_cmd = UnsubscribeBookDeltas::new(
1334                    cmd.instrument_id,
1335                    cmd.client_id,
1336                    cmd.venue,
1337                    UUID4::new(),
1338                    cmd.ts_init,
1339                    Some(cmd.command_id),
1340                    cmd.params.clone(),
1341                );
1342                client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1343            }
1344        }
1345
1346        Ok(())
1347    }
1348
1349    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1350        let bar_type = cmd.bar_type;
1351
1352        // Don't remove aggregator if other exact-topic subscribers still exist
1353        let topic = switchboard::get_bars_topic(bar_type.standard());
1354        if msgbus::exact_subscriber_count_bars(topic) > 0 {
1355            return Ok(());
1356        }
1357
1358        if self.bar_aggregators.contains_key(&bar_type.standard())
1359            && let Err(e) = self.stop_bar_aggregator(bar_type)
1360        {
1361            log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1362        }
1363
1364        // After stopping a composite, check if the source aggregator is now orphaned
1365        if bar_type.is_composite() {
1366            let source_type = bar_type.composite();
1367            let source_topic = switchboard::get_bars_topic(source_type);
1368            if msgbus::exact_subscriber_count_bars(source_topic) == 0
1369                && self.bar_aggregators.contains_key(&source_type)
1370                && let Err(e) = self.stop_bar_aggregator(source_type)
1371            {
1372                log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1373            }
1374        }
1375
1376        Ok(())
1377    }
1378
1379    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, _topics: &[MStr<Topic>]) {
1380        let Some(updater) = self.book_updaters.get(instrument_id) else {
1381            return;
1382        };
1383
1384        // Check which internal subscriptions still exist
1385        let has_deltas = self.book_deltas_subs.contains(instrument_id);
1386        let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1387
1388        let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1389        let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1390        let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1391        let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1392
1393        // Unsubscribe from topics that no longer have subscriptions
1394        if !has_deltas {
1395            msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1396        }
1397
1398        if !has_depth10 {
1399            msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1400        }
1401
1402        // Remove BookUpdater only when no subscriptions remain
1403        if !has_deltas && !has_depth10 {
1404            self.book_updaters.remove(instrument_id);
1405            log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1406        }
1407    }
1408
1409    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1410        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1411            let topic = switchboard::get_book_snapshots_topic(
1412                *instrument_id,
1413                snapshotter.snap_info.interval_ms,
1414            );
1415
1416            // Check remaining snapshot subscriptions, if none then remove snapshotter
1417            if msgbus::subscriber_count_book_snapshots(topic) == 0 {
1418                let timer_name = snapshotter.timer_name;
1419                self.book_snapshotters.remove(instrument_id);
1420                let mut clock = self.clock.borrow_mut();
1421                if clock.timer_exists(&timer_name) {
1422                    clock.cancel_timer(&timer_name);
1423                }
1424                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1425            }
1426        }
1427    }
1428
1429    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1430
1431    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1432        let mut cache = self.cache.as_ref().borrow_mut();
1433        if let Err(e) = cache.add_instrument(instrument) {
1434            log_error_on_cache_insert(&e);
1435        }
1436    }
1437
1438    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1439        // TODO: Improve by adding bulk update methods to cache and database
1440        let mut cache = self.cache.as_ref().borrow_mut();
1441        for instrument in instruments {
1442            if let Err(e) = cache.add_instrument(instrument.clone()) {
1443                log_error_on_cache_insert(&e);
1444            }
1445        }
1446    }
1447
1448    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1449        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1450            log_error_on_cache_insert(&e);
1451        }
1452    }
1453
1454    fn handle_trades(&self, trades: &[TradeTick]) {
1455        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1456            log_error_on_cache_insert(&e);
1457        }
1458    }
1459
1460    fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1461        if let Err(e) = self
1462            .cache
1463            .as_ref()
1464            .borrow_mut()
1465            .add_funding_rates(funding_rates)
1466        {
1467            log_error_on_cache_insert(&e);
1468        }
1469    }
1470
1471    fn handle_bars(&self, bars: &[Bar]) {
1472        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1473            log_error_on_cache_insert(&e);
1474        }
1475    }
1476
1477    fn handle_book_response(&self, book: &OrderBook) {
1478        log::debug!("Adding order book {} to cache", book.instrument_id);
1479
1480        if let Err(e) = self
1481            .cache
1482            .as_ref()
1483            .borrow_mut()
1484            .add_order_book(book.clone())
1485        {
1486            log_error_on_cache_insert(&e);
1487        }
1488    }
1489
1490    // -- INTERNAL --------------------------------------------------------------------------------
1491
1492    #[allow(clippy::too_many_arguments)]
1493    fn setup_book_updater(
1494        &mut self,
1495        instrument_id: &InstrumentId,
1496        book_type: BookType,
1497        only_deltas: bool,
1498        managed: bool,
1499    ) -> anyhow::Result<()> {
1500        let mut cache = self.cache.borrow_mut();
1501        if managed && !cache.has_order_book(instrument_id) {
1502            let book = OrderBook::new(*instrument_id, book_type);
1503            log::debug!("Created {book}");
1504            cache.add_order_book(book)?;
1505        }
1506
1507        // Reuse existing BookUpdater or create a new one
1508        let updater = self
1509            .book_updaters
1510            .entry(*instrument_id)
1511            .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
1512            .clone();
1513
1514        // Subscribe to deltas (typed router handles duplicates)
1515        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1516        let deltas_handler = TypedHandler::new(updater.clone());
1517        msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
1518
1519        // Subscribe to depth10 if not only_deltas
1520        if !only_deltas {
1521            let topic = switchboard::get_book_depth10_topic(*instrument_id);
1522            let depth_handler = TypedHandler::new(updater);
1523            msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
1524        }
1525
1526        Ok(())
1527    }
1528
1529    fn create_bar_aggregator(
1530        &mut self,
1531        instrument: &InstrumentAny,
1532        bar_type: BarType,
1533    ) -> Box<dyn BarAggregator> {
1534        let cache = self.cache.clone();
1535
1536        let handler = move |bar: Bar| {
1537            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1538                log_error_on_cache_insert(&e);
1539            }
1540
1541            let topic = switchboard::get_bars_topic(bar.bar_type);
1542            msgbus::publish_bar(topic, &bar);
1543        };
1544
1545        let clock = self.clock.clone();
1546        let config = self.config.clone();
1547
1548        let price_precision = instrument.price_precision();
1549        let size_precision = instrument.size_precision();
1550
1551        if bar_type.spec().is_time_aggregated() {
1552            // Get time_bars_origin_offset from config
1553            let time_bars_origin_offset = config
1554                .time_bars_origins
1555                .get(&bar_type.spec().aggregation)
1556                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1557
1558            Box::new(TimeBarAggregator::new(
1559                bar_type,
1560                price_precision,
1561                size_precision,
1562                clock,
1563                handler,
1564                config.time_bars_build_with_no_updates,
1565                config.time_bars_timestamp_on_close,
1566                config.time_bars_interval_type,
1567                time_bars_origin_offset,
1568                config.time_bars_build_delay,
1569                config.time_bars_skip_first_non_full_bar,
1570            ))
1571        } else {
1572            match bar_type.spec().aggregation {
1573                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1574                    bar_type,
1575                    price_precision,
1576                    size_precision,
1577                    handler,
1578                )) as Box<dyn BarAggregator>,
1579                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1580                    bar_type,
1581                    price_precision,
1582                    size_precision,
1583                    handler,
1584                )) as Box<dyn BarAggregator>,
1585                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1586                    bar_type,
1587                    price_precision,
1588                    size_precision,
1589                    handler,
1590                )) as Box<dyn BarAggregator>,
1591                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1592                    bar_type,
1593                    price_precision,
1594                    size_precision,
1595                    handler,
1596                )) as Box<dyn BarAggregator>,
1597                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1598                    bar_type,
1599                    price_precision,
1600                    size_precision,
1601                    handler,
1602                )) as Box<dyn BarAggregator>,
1603                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1604                    bar_type,
1605                    price_precision,
1606                    size_precision,
1607                    handler,
1608                )) as Box<dyn BarAggregator>,
1609                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1610                    bar_type,
1611                    price_precision,
1612                    size_precision,
1613                    handler,
1614                )) as Box<dyn BarAggregator>,
1615                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1616                    bar_type,
1617                    price_precision,
1618                    size_precision,
1619                    handler,
1620                )) as Box<dyn BarAggregator>,
1621                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1622                    bar_type,
1623                    price_precision,
1624                    size_precision,
1625                    handler,
1626                )) as Box<dyn BarAggregator>,
1627                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1628                    bar_type,
1629                    price_precision,
1630                    size_precision,
1631                    instrument.price_increment(),
1632                    handler,
1633                )) as Box<dyn BarAggregator>,
1634                _ => panic!(
1635                    "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1636                    bar_type.spec().aggregation
1637                ),
1638            }
1639        }
1640    }
1641
1642    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1643        // Get the instrument for this bar type
1644        let instrument = {
1645            let cache = self.cache.borrow();
1646            cache
1647                .instrument(&bar_type.instrument_id())
1648                .ok_or_else(|| {
1649                    anyhow::anyhow!(
1650                        "Cannot start bar aggregation: no instrument found for {}",
1651                        bar_type.instrument_id(),
1652                    )
1653                })?
1654                .clone()
1655        };
1656
1657        // Use standard form of bar type as key
1658        let bar_key = bar_type.standard();
1659
1660        // Create or retrieve aggregator in Rc<RefCell>
1661        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1662            rc.clone()
1663        } else {
1664            let agg = self.create_bar_aggregator(&instrument, bar_type);
1665            let rc = Rc::new(RefCell::new(agg));
1666            self.bar_aggregators.insert(bar_key, rc.clone());
1667            rc
1668        };
1669
1670        // Subscribe to underlying data topics
1671        let mut subscriptions = Vec::new();
1672
1673        if bar_type.is_composite() {
1674            let topic = switchboard::get_bars_topic(bar_type.composite());
1675            let handler = TypedHandler::new(BarBarHandler::new(aggregator.clone(), bar_key));
1676            msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
1677            subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
1678        } else if bar_type.spec().price_type == PriceType::Last {
1679            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1680            let handler = TypedHandler::new(BarTradeHandler::new(aggregator.clone(), bar_key));
1681            msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
1682            subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
1683        } else {
1684            // Warn if imbalance/runs aggregation is wired to quotes (needs aggressor_side from trades)
1685            if matches!(
1686                bar_type.spec().aggregation,
1687                BarAggregation::TickImbalance
1688                    | BarAggregation::VolumeImbalance
1689                    | BarAggregation::ValueImbalance
1690                    | BarAggregation::TickRuns
1691                    | BarAggregation::VolumeRuns
1692                    | BarAggregation::ValueRuns
1693            ) {
1694                log::warn!(
1695                    "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
1696                     data with `aggressor_side`, but `price_type` is not LAST so it will receive \
1697                     quote data: bars will not emit correctly",
1698                );
1699            }
1700
1701            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1702            let handler = TypedHandler::new(BarQuoteHandler::new(aggregator.clone(), bar_key));
1703            msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
1704            subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
1705        }
1706
1707        self.bar_aggregator_handlers.insert(bar_key, subscriptions);
1708
1709        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
1710        self.setup_bar_aggregator(bar_type, false)?;
1711
1712        aggregator.borrow_mut().set_is_running(true);
1713
1714        Ok(())
1715    }
1716
1717    /// Sets up a bar aggregator, matching Cython _setup_bar_aggregator logic.
1718    ///
1719    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
1720    fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1721        let bar_key = bar_type.standard();
1722        let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1723            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1724        })?;
1725
1726        // Set historical mode and handler
1727        let handler: Box<dyn FnMut(Bar)> = if historical {
1728            // Historical handler - process_historical equivalent
1729            let cache = self.cache.clone();
1730            Box::new(move |bar: Bar| {
1731                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1732                    log_error_on_cache_insert(&e);
1733                }
1734                // In historical mode, bars are processed but not published to message bus
1735            })
1736        } else {
1737            // Regular handler - process equivalent
1738            let cache = self.cache.clone();
1739            Box::new(move |bar: Bar| {
1740                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1741                    log_error_on_cache_insert(&e);
1742                }
1743                let topic = switchboard::get_bars_topic(bar.bar_type);
1744                msgbus::publish_bar(topic, &bar);
1745            })
1746        };
1747
1748        aggregator
1749            .borrow_mut()
1750            .set_historical_mode(historical, handler);
1751
1752        // For TimeBarAggregator, set clock and start timer
1753        if bar_type.spec().is_time_aggregated() {
1754            use nautilus_common::clock::TestClock;
1755
1756            if historical {
1757                // Each aggregator gets its own independent clock
1758                let test_clock = Rc::new(RefCell::new(TestClock::new()));
1759                aggregator.borrow_mut().set_clock(test_clock);
1760                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
1761                // Store weak reference so start_timer can use it when called later
1762                let aggregator_weak = Rc::downgrade(aggregator);
1763                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1764            } else {
1765                aggregator.borrow_mut().set_clock(self.clock.clone());
1766                aggregator
1767                    .borrow_mut()
1768                    .start_timer(Some(aggregator.clone()));
1769            }
1770        }
1771
1772        Ok(())
1773    }
1774
1775    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1776        let aggregator = self
1777            .bar_aggregators
1778            .remove(&bar_type.standard())
1779            .ok_or_else(|| {
1780                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1781            })?;
1782
1783        aggregator.borrow_mut().stop();
1784
1785        // Unsubscribe any registered message handlers
1786        let bar_key = bar_type.standard();
1787        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1788            for sub in subs {
1789                match sub {
1790                    BarAggregatorSubscription::Bar { topic, handler } => {
1791                        msgbus::unsubscribe_bars(topic.into(), &handler);
1792                    }
1793                    BarAggregatorSubscription::Trade { topic, handler } => {
1794                        msgbus::unsubscribe_trades(topic.into(), &handler);
1795                    }
1796                    BarAggregatorSubscription::Quote { topic, handler } => {
1797                        msgbus::unsubscribe_quotes(topic.into(), &handler);
1798                    }
1799                }
1800            }
1801        }
1802
1803        Ok(())
1804    }
1805}
1806
1807#[inline(always)]
1808fn log_error_on_cache_insert<T: Display>(e: &T) {
1809    log::error!("Error on cache insert: {e}");
1810}
1811
1812#[inline(always)]
1813fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
1814    if data.is_empty() {
1815        let name = type_name::<T>();
1816        let short_name = name.rsplit("::").next().unwrap_or(name);
1817        log::warn!("Received empty {short_name} response for {id} {correlation_id}");
1818        return true;
1819    }
1820    false
1821}