nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35use std::{
36    any::Any,
37    cell::{Ref, RefCell},
38    collections::hash_map::Entry,
39    fmt::Display,
40    num::NonZeroUsize,
41    rc::Rc,
42};
43
44use ahash::{AHashMap, AHashSet};
45use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
46use config::DataEngineConfig;
47use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
48use indexmap::IndexMap;
49use nautilus_common::{
50    cache::Cache,
51    clock::Clock,
52    logging::{RECV, RES},
53    messages::data::{
54        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
55        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
56        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
57        UnsubscribeCommand,
58    },
59    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
60    timer::TimeEventCallback,
61};
62use nautilus_core::{
63    correctness::{
64        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
65    },
66    datetime::millis_to_nanos,
67};
68use nautilus_model::{
69    data::{
70        Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
71        TradeTick,
72        close::InstrumentClose,
73        prices::{IndexPriceUpdate, MarkPriceUpdate},
74    },
75    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
76    identifiers::{ClientId, InstrumentId, Venue},
77    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
78    orderbook::OrderBook,
79};
80use nautilus_persistence::backend::catalog::ParquetDataCatalog;
81use ustr::Ustr;
82
83use crate::{
84    aggregation::{
85        BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
86        VolumeBarAggregator,
87    },
88    client::DataClientAdapter,
89};
90
91/// Provides a high-performance `DataEngine` for all environments.
92#[derive(Debug)]
93pub struct DataEngine {
94    clock: Rc<RefCell<dyn Clock>>,
95    cache: Rc<RefCell<Cache>>,
96    clients: IndexMap<ClientId, DataClientAdapter>,
97    default_client: Option<DataClientAdapter>,
98    external_clients: AHashSet<ClientId>,
99    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
100    routing_map: IndexMap<Venue, ClientId>,
101    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
102    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
103    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
104    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
105    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
106    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
107    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
108    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
109    msgbus_priority: u8,
110    config: DataEngineConfig,
111}
112
113impl DataEngine {
114    /// Creates a new [`DataEngine`] instance.
115    #[must_use]
116    pub fn new(
117        clock: Rc<RefCell<dyn Clock>>,
118        cache: Rc<RefCell<Cache>>,
119        config: Option<DataEngineConfig>,
120    ) -> Self {
121        let config = config.unwrap_or_default();
122
123        let external_clients: AHashSet<ClientId> = config
124            .external_clients
125            .clone()
126            .unwrap_or_default()
127            .into_iter()
128            .collect();
129
130        Self {
131            clock,
132            cache,
133            clients: IndexMap::new(),
134            default_client: None,
135            external_clients,
136            catalogs: AHashMap::new(),
137            routing_map: IndexMap::new(),
138            book_intervals: AHashMap::new(),
139            book_updaters: AHashMap::new(),
140            book_snapshotters: AHashMap::new(),
141            bar_aggregators: AHashMap::new(),
142            bar_aggregator_handlers: AHashMap::new(),
143            _synthetic_quote_feeds: AHashMap::new(),
144            _synthetic_trade_feeds: AHashMap::new(),
145            buffered_deltas_map: AHashMap::new(),
146            msgbus_priority: 10, // High-priority for built-in component
147            config,
148        }
149    }
150
151    /// Returns a read-only reference to the engines clock.
152    #[must_use]
153    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
154        self.clock.borrow()
155    }
156
157    /// Returns a read-only reference to the engines cache.
158    #[must_use]
159    pub fn get_cache(&self) -> Ref<'_, Cache> {
160        self.cache.borrow()
161    }
162
163    /// Registers the `catalog` with the engine with an optional specific `name`.
164    ///
165    /// # Panics
166    ///
167    /// Panics if a catalog with the same `name` has already been registered.
168    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
169        let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
170
171        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
172
173        self.catalogs.insert(name, catalog);
174        log::info!("Registered catalog <{name}>");
175    }
176
177    /// Registers the `client` with the engine with an optional venue `routing`.
178    ///
179    ///
180    /// # Panics
181    ///
182    /// Panics if a client with the same client ID has already been registered.
183    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
184        let client_id = client.client_id();
185
186        if let Some(default_client) = &self.default_client {
187            check_predicate_false(
188                default_client.client_id() == client.client_id(),
189                "client_id already registered as default client",
190            )
191            .expect(FAILED);
192        }
193
194        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
195
196        if let Some(routing) = routing {
197            self.routing_map.insert(routing, client_id);
198            log::info!("Set client {client_id} routing for {routing}");
199        }
200
201        if client.venue.is_none() && self.default_client.is_none() {
202            self.default_client = Some(client);
203            log::info!("Registered client {client_id} for default routing");
204        } else {
205            self.clients.insert(client_id, client);
206            log::info!("Registered client {client_id}");
207        }
208    }
209
210    /// Deregisters the client for the `client_id`.
211    ///
212    /// # Panics
213    ///
214    /// Panics if the client ID has not been registered.
215    pub fn deregister_client(&mut self, client_id: &ClientId) {
216        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
217
218        self.clients.shift_remove(client_id);
219        log::info!("Deregistered client {client_id}");
220    }
221
222    /// Registers the data `client` with the engine as the default routing client.
223    ///
224    /// When a specific venue routing cannot be found, this client will receive messages.
225    ///
226    /// # Warnings
227    ///
228    /// Any existing default routing client will be overwritten.
229    ///
230    /// # Panics
231    ///
232    /// Panics if a default client has already been registered.
233    pub fn register_default_client(&mut self, client: DataClientAdapter) {
234        check_predicate_true(
235            self.default_client.is_none(),
236            "default client already registered",
237        )
238        .expect(FAILED);
239
240        let client_id = client.client_id();
241
242        self.default_client = Some(client);
243        log::info!("Registered default client {client_id}");
244    }
245
246    /// Starts all registered data clients.
247    pub fn start(&self) {
248        for client in self.get_clients() {
249            if let Err(e) = client.start() {
250                log::error!("{e}");
251            }
252        }
253    }
254
255    /// Stops all registered data clients.
256    pub fn stop(&self) {
257        for client in self.get_clients() {
258            if let Err(e) = client.stop() {
259                log::error!("{e}");
260            }
261        }
262    }
263
264    /// Resets all registered data clients to their initial state.
265    pub fn reset(&self) {
266        for client in self.get_clients() {
267            if let Err(e) = client.reset() {
268                log::error!("{e}");
269            }
270        }
271    }
272
273    /// Disposes the engine, stopping all clients and cancelling any timers.
274    pub fn dispose(&self) {
275        for client in self.get_clients() {
276            if let Err(e) = client.dispose() {
277                log::error!("{e}");
278            }
279        }
280
281        self.clock.borrow_mut().cancel_timers();
282    }
283
284    /// Returns `true` if all registered data clients are currently connected.
285    #[must_use]
286    pub fn check_connected(&self) -> bool {
287        self.get_clients()
288            .iter()
289            .all(|client| client.is_connected())
290    }
291
292    /// Returns `true` if all registered data clients are currently disconnected.
293    #[must_use]
294    pub fn check_disconnected(&self) -> bool {
295        self.get_clients()
296            .iter()
297            .all(|client| !client.is_connected())
298    }
299
300    /// Returns a list of all registered client IDs, including the default client if set.
301    #[must_use]
302    pub fn registered_clients(&self) -> Vec<ClientId> {
303        self.get_clients()
304            .into_iter()
305            .map(|client| client.client_id())
306            .collect()
307    }
308
309    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
310
311    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
312    where
313        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
314        T: Clone,
315    {
316        self.get_clients()
317            .into_iter()
318            .flat_map(get_subs)
319            .cloned()
320            .collect()
321    }
322
323    #[must_use]
324    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
325        let (default_opt, clients_map) = (&self.default_client, &self.clients);
326        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
327
328        if let Some(default) = default_opt {
329            clients.push(default);
330        }
331
332        clients
333    }
334
335    #[must_use]
336    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
337        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
338        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
339
340        if let Some(default) = default_opt {
341            clients.push(default);
342        }
343
344        clients
345    }
346
347    pub fn get_client(
348        &mut self,
349        client_id: Option<&ClientId>,
350        venue: Option<&Venue>,
351    ) -> Option<&mut DataClientAdapter> {
352        if let Some(client_id) = client_id {
353            // Explicit ID: first look in registered clients
354            if let Some(client) = self.clients.get_mut(client_id) {
355                return Some(client);
356            }
357
358            // Then check if it matches the default client
359            if let Some(default) = self.default_client.as_mut() {
360                if default.client_id() == *client_id {
361                    return Some(default);
362                }
363            }
364
365            // Unknown explicit client
366            return None;
367        }
368
369        if let Some(v) = venue {
370            // Route by venue if mapped client still registered
371            if let Some(client_id) = self.routing_map.get(v) {
372                return self.clients.get_mut(client_id);
373            }
374        }
375
376        // Fallback to default client
377        self.get_default_client()
378    }
379
380    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
381        self.default_client.as_mut()
382    }
383
384    /// Returns all custom data types currently subscribed across all clients.
385    #[must_use]
386    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
387        self.collect_subscriptions(|client| &client.subscriptions_custom)
388    }
389
390    /// Returns all instrument IDs currently subscribed across all clients.
391    #[must_use]
392    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
393        self.collect_subscriptions(|client| &client.subscriptions_instrument)
394    }
395
396    /// Returns all instrument IDs for which book delta subscriptions exist.
397    #[must_use]
398    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
399        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
400    }
401
402    /// Returns all instrument IDs for which book snapshot subscriptions exist.
403    #[must_use]
404    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
405        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
406    }
407
408    /// Returns all instrument IDs for which quote subscriptions exist.
409    #[must_use]
410    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
411        self.collect_subscriptions(|client| &client.subscriptions_quotes)
412    }
413
414    /// Returns all instrument IDs for which trade subscriptions exist.
415    #[must_use]
416    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
417        self.collect_subscriptions(|client| &client.subscriptions_trades)
418    }
419
420    /// Returns all bar types currently subscribed across all clients.
421    #[must_use]
422    pub fn subscribed_bars(&self) -> Vec<BarType> {
423        self.collect_subscriptions(|client| &client.subscriptions_bars)
424    }
425
426    /// Returns all instrument IDs for which mark price subscriptions exist.
427    #[must_use]
428    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
429        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
430    }
431
432    /// Returns all instrument IDs for which index price subscriptions exist.
433    #[must_use]
434    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
435        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
436    }
437
438    /// Returns all instrument IDs for which status subscriptions exist.
439    #[must_use]
440    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
441        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
442    }
443
444    /// Returns all instrument IDs for which instrument close subscriptions exist.
445    #[must_use]
446    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
447        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
448    }
449
450    // -- COMMANDS --------------------------------------------------------------------------------
451
452    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
453    ///
454    /// Errors during execution are logged.
455    pub fn execute(&mut self, cmd: &DataCommand) {
456        if let Err(e) = match cmd {
457            DataCommand::Subscribe(c) => self.execute_subscribe(c),
458            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
459            DataCommand::Request(c) => self.execute_request(c),
460        } {
461            log::error!("{e}");
462        }
463    }
464
465    /// Handles a subscribe command, updating internal state and forwarding to the client.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
470    /// or if the underlying client operation fails.
471    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
472        // Update internal engine state
473        match &cmd {
474            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
475            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
476            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
477            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
478            _ => {} // Do nothing else
479        }
480
481        // Check if client declared as external
482        if let Some(client_id) = cmd.client_id() {
483            if self.external_clients.contains(client_id) {
484                return Ok(());
485            }
486        }
487
488        // Forward command to client
489        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
490            client.execute_subscribe(cmd);
491        } else {
492            log::error!(
493                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
494                cmd.client_id(),
495                cmd.venue(),
496            );
497        }
498
499        Ok(())
500    }
501
502    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the underlying client operation fails.
507    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
508        match &cmd {
509            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
510            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
511            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
512            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
513            _ => {} // Do nothing else
514        }
515
516        // Check if client declared as external
517        if let Some(client_id) = cmd.client_id() {
518            if self.external_clients.contains(client_id) {
519                return Ok(());
520            }
521        }
522
523        // Forward command to the client
524        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
525            client.execute_unsubscribe(cmd);
526        } else {
527            log::error!(
528                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
529                cmd.client_id(),
530                cmd.venue(),
531            );
532        }
533
534        Ok(())
535    }
536
537    /// Sends a [`RequestCommand`] to a suitable data client implementation.
538    ///
539    /// # Errors
540    ///
541    /// Returns an error if no client is found for the given client ID or venue,
542    /// or if the client fails to process the request.
543    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
544        // Skip requests for external clients
545        if let Some(cid) = req.client_id() {
546            if self.external_clients.contains(cid) {
547                return Ok(());
548            }
549        }
550        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
551            match req {
552                RequestCommand::Data(req) => client.request_data(req),
553                RequestCommand::Instrument(req) => client.request_instrument(req),
554                RequestCommand::Instruments(req) => client.request_instruments(req),
555                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
556                RequestCommand::Quotes(req) => client.request_quotes(req),
557                RequestCommand::Trades(req) => client.request_trades(req),
558                RequestCommand::Bars(req) => client.request_bars(req),
559            }
560        } else {
561            anyhow::bail!(
562                "Cannot handle request: no client found for {:?} {:?}",
563                req.client_id(),
564                req.venue()
565            );
566        }
567    }
568
569    /// Processes a dynamically-typed data message.
570    ///
571    /// Currently supports `InstrumentAny`; unrecognized types are logged as errors.
572    pub fn process(&mut self, data: &dyn Any) {
573        // TODO: Eventually these could be added to the `Data` enum? process here for now
574        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
575            self.handle_instrument(instrument.clone());
576        } else {
577            log::error!("Cannot process data {data:?}, type is unrecognized");
578        }
579    }
580
581    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
582    pub fn process_data(&mut self, data: Data) {
583        match data {
584            Data::Delta(delta) => self.handle_delta(delta),
585            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
586            Data::Depth10(depth) => self.handle_depth10(*depth),
587            Data::Quote(quote) => self.handle_quote(quote),
588            Data::Trade(trade) => self.handle_trade(trade),
589            Data::Bar(bar) => self.handle_bar(bar),
590            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
591            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
592            Data::InstrumentClose(close) => self.handle_instrument_close(close),
593        }
594    }
595
596    /// Processes a `DataResponse`, handling and publishing the response message.
597    pub fn response(&self, resp: DataResponse) {
598        log::debug!("{RECV}{RES} {resp:?}");
599
600        match &resp {
601            DataResponse::Instrument(resp) => {
602                self.handle_instrument_response(resp.data.clone());
603            }
604            DataResponse::Instruments(resp) => {
605                self.handle_instruments(&resp.data);
606            }
607            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
608            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
609            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
610            _ => todo!(),
611        }
612
613        msgbus::send_response(resp.correlation_id(), &resp);
614    }
615
616    // -- DATA HANDLERS ---------------------------------------------------------------------------
617
618    fn handle_instrument(&mut self, instrument: InstrumentAny) {
619        if let Err(e) = self
620            .cache
621            .as_ref()
622            .borrow_mut()
623            .add_instrument(instrument.clone())
624        {
625            log_error_on_cache_insert(&e);
626        }
627
628        let topic = switchboard::get_instrument_topic(instrument.id());
629        msgbus::publish(topic, &instrument as &dyn Any);
630    }
631
632    fn handle_delta(&mut self, delta: OrderBookDelta) {
633        let deltas = if self.config.buffer_deltas {
634            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
635                buffered_deltas.deltas.push(delta);
636            } else {
637                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
638                self.buffered_deltas_map
639                    .insert(delta.instrument_id, buffered_deltas);
640            }
641
642            if !RecordFlag::F_LAST.matches(delta.flags) {
643                return; // Not the last delta for event
644            }
645
646            // SAFETY: We know the deltas exists already
647            self.buffered_deltas_map
648                .remove(&delta.instrument_id)
649                .unwrap()
650        } else {
651            OrderBookDeltas::new(delta.instrument_id, vec![delta])
652        };
653
654        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
655        msgbus::publish(topic, &deltas as &dyn Any);
656    }
657
658    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
659        let deltas = if self.config.buffer_deltas {
660            let mut is_last_delta = false;
661            for delta in &deltas.deltas {
662                if RecordFlag::F_LAST.matches(delta.flags) {
663                    is_last_delta = true;
664                    break;
665                }
666            }
667
668            let instrument_id = deltas.instrument_id;
669
670            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
671                buffered_deltas.deltas.extend(deltas.deltas);
672            } else {
673                self.buffered_deltas_map.insert(instrument_id, deltas);
674            }
675
676            if !is_last_delta {
677                return;
678            }
679
680            // SAFETY: We know the deltas exists already
681            self.buffered_deltas_map.remove(&instrument_id).unwrap()
682        } else {
683            deltas
684        };
685
686        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
687        msgbus::publish(topic, &deltas as &dyn Any);
688    }
689
690    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
691        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
692        msgbus::publish(topic, &depth as &dyn Any);
693    }
694
695    fn handle_quote(&mut self, quote: QuoteTick) {
696        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
697            log_error_on_cache_insert(&e);
698        }
699
700        // TODO: Handle synthetics
701
702        let topic = switchboard::get_quotes_topic(quote.instrument_id);
703        msgbus::publish(topic, &quote as &dyn Any);
704    }
705
706    fn handle_trade(&mut self, trade: TradeTick) {
707        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
708            log_error_on_cache_insert(&e);
709        }
710
711        // TODO: Handle synthetics
712
713        let topic = switchboard::get_trades_topic(trade.instrument_id);
714        msgbus::publish(topic, &trade as &dyn Any);
715    }
716
717    fn handle_bar(&mut self, bar: Bar) {
718        // TODO: Handle additional bar logic
719        if self.config.validate_data_sequence {
720            if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
721                if bar.ts_event < last_bar.ts_event {
722                    log::warn!(
723                        "Bar {bar} was prior to last bar `ts_event` {}",
724                        last_bar.ts_event
725                    );
726                    return; // Bar is out of sequence
727                }
728                if bar.ts_init < last_bar.ts_init {
729                    log::warn!(
730                        "Bar {bar} was prior to last bar `ts_init` {}",
731                        last_bar.ts_init
732                    );
733                    return; // Bar is out of sequence
734                }
735                // TODO: Implement `bar.is_revision` logic
736            }
737        }
738
739        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
740            log_error_on_cache_insert(&e);
741        }
742
743        let topic = switchboard::get_bars_topic(bar.bar_type);
744        msgbus::publish(topic, &bar as &dyn Any);
745    }
746
747    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
748        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
749            log_error_on_cache_insert(&e);
750        }
751
752        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
753        msgbus::publish(topic, &mark_price as &dyn Any);
754    }
755
756    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
757        if let Err(e) = self
758            .cache
759            .as_ref()
760            .borrow_mut()
761            .add_index_price(index_price)
762        {
763            log_error_on_cache_insert(&e);
764        }
765
766        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
767        msgbus::publish(topic, &index_price as &dyn Any);
768    }
769
770    fn handle_instrument_close(&mut self, close: InstrumentClose) {
771        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
772        msgbus::publish(topic, &close as &dyn Any);
773    }
774
775    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
776
777    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
778        if cmd.instrument_id.is_synthetic() {
779            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
780        }
781
782        self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
783
784        Ok(())
785    }
786
787    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
788        if cmd.instrument_id.is_synthetic() {
789            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
790        }
791
792        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
793
794        Ok(())
795    }
796
797    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
798        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
799            return Ok(());
800        }
801
802        if cmd.instrument_id.is_synthetic() {
803            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
804        }
805
806        // Track snapshot intervals per instrument, and set up timer on first subscription
807        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
808            Entry::Vacant(e) => {
809                let mut set = AHashSet::new();
810                set.insert(cmd.instrument_id);
811                e.insert(set);
812                true
813            }
814            Entry::Occupied(mut e) => {
815                e.get_mut().insert(cmd.instrument_id);
816                false
817            }
818        };
819
820        if first_for_interval {
821            // Initialize snapshotter and schedule its timer
822            let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
823            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id);
824
825            let snap_info = BookSnapshotInfo {
826                instrument_id: cmd.instrument_id,
827                venue: cmd.instrument_id.venue,
828                is_composite: cmd.instrument_id.symbol.is_composite(),
829                root: Ustr::from(cmd.instrument_id.symbol.root()),
830                topic,
831                interval_ms: cmd.interval_ms,
832            };
833
834            // Schedule the first snapshot at the next interval boundary
835            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
836            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
837
838            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
839            self.book_snapshotters
840                .insert(cmd.instrument_id, snapshotter.clone());
841            let timer_name = snapshotter.timer_name;
842
843            let callback =
844                TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
845
846            self.clock
847                .borrow_mut()
848                .set_timer_ns(
849                    &timer_name,
850                    interval_ns,
851                    start_time_ns.into(),
852                    None,
853                    Some(callback),
854                    None,
855                )
856                .expect(FAILED);
857        }
858
859        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
860
861        Ok(())
862    }
863
864    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
865        match cmd.bar_type.aggregation_source() {
866            AggregationSource::Internal => {
867                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
868                    self.start_bar_aggregator(cmd.bar_type)?;
869                }
870            }
871            AggregationSource::External => {
872                if cmd.bar_type.instrument_id().is_synthetic() {
873                    anyhow::bail!(
874                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
875                    );
876                }
877            }
878        }
879
880        Ok(())
881    }
882
883    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
884        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
885            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
886            return Ok(());
887        }
888
889        let topics = vec![
890            switchboard::get_book_deltas_topic(cmd.instrument_id),
891            switchboard::get_book_depth10_topic(cmd.instrument_id),
892            switchboard::get_book_snapshots_topic(cmd.instrument_id),
893        ];
894
895        self.maintain_book_updater(&cmd.instrument_id, &topics);
896        self.maintain_book_snapshotter(&cmd.instrument_id);
897
898        Ok(())
899    }
900
901    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
902        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
903            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
904            return Ok(());
905        }
906
907        let topics = vec![
908            switchboard::get_book_deltas_topic(cmd.instrument_id),
909            switchboard::get_book_depth10_topic(cmd.instrument_id),
910            switchboard::get_book_snapshots_topic(cmd.instrument_id),
911        ];
912
913        self.maintain_book_updater(&cmd.instrument_id, &topics);
914        self.maintain_book_snapshotter(&cmd.instrument_id);
915
916        Ok(())
917    }
918
919    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
920        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
921            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
922            return Ok(());
923        }
924
925        // Remove instrument from interval tracking, and drop empty intervals
926        let mut to_remove = Vec::new();
927        for (interval, set) in &mut self.book_intervals {
928            if set.remove(&cmd.instrument_id) && set.is_empty() {
929                to_remove.push(*interval);
930            }
931        }
932
933        for interval in to_remove {
934            self.book_intervals.remove(&interval);
935        }
936
937        let topics = vec![
938            switchboard::get_book_deltas_topic(cmd.instrument_id),
939            switchboard::get_book_depth10_topic(cmd.instrument_id),
940            switchboard::get_book_snapshots_topic(cmd.instrument_id),
941        ];
942
943        self.maintain_book_updater(&cmd.instrument_id, &topics);
944        self.maintain_book_snapshotter(&cmd.instrument_id);
945
946        Ok(())
947    }
948
949    /// Unsubscribe internal bar aggregator for the given bar type.
950    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
951        // If we have an internal aggregator for this bar type, stop and remove it
952        let bar_type = cmd.bar_type;
953        if self.bar_aggregators.contains_key(&bar_type.standard()) {
954            if let Err(err) = self.stop_bar_aggregator(bar_type) {
955                log::error!("Error stopping bar aggregator for {bar_type}: {err}");
956            }
957            self.bar_aggregators.remove(&bar_type.standard());
958            log::debug!("Removed bar aggregator for {bar_type}");
959        }
960        Ok(())
961    }
962
963    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
964        if let Some(updater) = self.book_updaters.get(instrument_id) {
965            let handler = ShareableMessageHandler(updater.clone());
966
967            // Unsubscribe handler if it is the last subscriber
968            for topic in topics {
969                if msgbus::subscriptions_count(topic.as_str()) == 1
970                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
971                {
972                    log::debug!("Unsubscribing BookUpdater from {topic}");
973                    msgbus::unsubscribe_topic(*topic, handler.clone());
974                }
975            }
976
977            // Check remaining subscriptions, if none then remove updater
978            let still_subscribed = topics
979                .iter()
980                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
981            if !still_subscribed {
982                self.book_updaters.remove(instrument_id);
983                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
984            }
985        }
986    }
987
988    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
989        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
990            let topic = switchboard::get_book_snapshots_topic(*instrument_id);
991
992            // Check remaining snapshot subscriptions, if none then remove snapshotter
993            if msgbus::subscriptions_count(topic.as_str()) == 0 {
994                let timer_name = snapshotter.timer_name;
995                self.book_snapshotters.remove(instrument_id);
996                let mut clock = self.clock.borrow_mut();
997                if clock.timer_names().contains(&timer_name.as_str()) {
998                    clock.cancel_timer(&timer_name);
999                }
1000                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1001            }
1002        }
1003    }
1004
1005    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1006
1007    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1008        let mut cache = self.cache.as_ref().borrow_mut();
1009        if let Err(e) = cache.add_instrument(instrument) {
1010            log_error_on_cache_insert(&e);
1011        }
1012    }
1013
1014    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1015        // TODO: Improve by adding bulk update methods to cache and database
1016        let mut cache = self.cache.as_ref().borrow_mut();
1017        for instrument in instruments {
1018            if let Err(e) = cache.add_instrument(instrument.clone()) {
1019                log_error_on_cache_insert(&e);
1020            }
1021        }
1022    }
1023
1024    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1025        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1026            log_error_on_cache_insert(&e);
1027        }
1028    }
1029
1030    fn handle_trades(&self, trades: &[TradeTick]) {
1031        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1032            log_error_on_cache_insert(&e);
1033        }
1034    }
1035
1036    fn handle_bars(&self, bars: &[Bar]) {
1037        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1038            log_error_on_cache_insert(&e);
1039        }
1040    }
1041
1042    // -- INTERNAL --------------------------------------------------------------------------------
1043
1044    #[allow(clippy::too_many_arguments)]
1045    fn setup_order_book(
1046        &mut self,
1047        instrument_id: &InstrumentId,
1048        book_type: BookType,
1049        only_deltas: bool,
1050        managed: bool,
1051    ) -> anyhow::Result<()> {
1052        let mut cache = self.cache.borrow_mut();
1053        if managed && !cache.has_order_book(instrument_id) {
1054            let book = OrderBook::new(*instrument_id, book_type);
1055            log::debug!("Created {book}");
1056            cache.add_order_book(book)?;
1057        }
1058
1059        // Set up subscriptions
1060        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1061        self.book_updaters.insert(*instrument_id, updater.clone());
1062
1063        let handler = ShareableMessageHandler(updater);
1064
1065        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1066        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1067            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1068        }
1069
1070        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1071        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1072            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1073        }
1074
1075        Ok(())
1076    }
1077
1078    fn create_bar_aggregator(
1079        &mut self,
1080        instrument: &InstrumentAny,
1081        bar_type: BarType,
1082    ) -> Box<dyn BarAggregator> {
1083        let cache = self.cache.clone();
1084
1085        let handler = move |bar: Bar| {
1086            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1087                log_error_on_cache_insert(&e);
1088            }
1089
1090            let topic = switchboard::get_bars_topic(bar.bar_type);
1091            msgbus::publish(topic, &bar as &dyn Any);
1092        };
1093
1094        let clock = self.clock.clone();
1095        let config = self.config.clone();
1096
1097        let price_precision = instrument.price_precision();
1098        let size_precision = instrument.size_precision();
1099
1100        if bar_type.spec().is_time_aggregated() {
1101            Box::new(TimeBarAggregator::new(
1102                bar_type,
1103                price_precision,
1104                size_precision,
1105                clock,
1106                handler,
1107                false, // await_partial
1108                config.time_bars_build_with_no_updates,
1109                config.time_bars_timestamp_on_close,
1110                config.time_bars_interval_type,
1111                None,  // TODO: Implement
1112                20,    // TODO: TBD, composite bar build delay
1113                false, // TODO: skip_first_non_full_bar, make it config dependent
1114            ))
1115        } else {
1116            match bar_type.spec().aggregation {
1117                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1118                    bar_type,
1119                    price_precision,
1120                    size_precision,
1121                    handler,
1122                    false,
1123                )) as Box<dyn BarAggregator>,
1124                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1125                    bar_type,
1126                    price_precision,
1127                    size_precision,
1128                    handler,
1129                    false,
1130                )) as Box<dyn BarAggregator>,
1131                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1132                    bar_type,
1133                    price_precision,
1134                    size_precision,
1135                    handler,
1136                    false,
1137                )) as Box<dyn BarAggregator>,
1138                _ => panic!(
1139                    "Cannot create aggregator: {} aggregation not currently supported",
1140                    bar_type.spec().aggregation
1141                ),
1142            }
1143        }
1144    }
1145
1146    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1147        // Get the instrument for this bar type
1148        let instrument = {
1149            let cache = self.cache.borrow();
1150            cache
1151                .instrument(&bar_type.instrument_id())
1152                .ok_or_else(|| {
1153                    anyhow::anyhow!(
1154                        "Cannot start bar aggregation: no instrument found for {}",
1155                        bar_type.instrument_id(),
1156                    )
1157                })?
1158                .clone()
1159        };
1160
1161        // Use standard form of bar type as key
1162        let bar_key = bar_type.standard();
1163
1164        // Create or retrieve aggregator in Rc<RefCell>
1165        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1166            rc.clone()
1167        } else {
1168            let agg = self.create_bar_aggregator(&instrument, bar_type);
1169            let rc = Rc::new(RefCell::new(agg));
1170            self.bar_aggregators.insert(bar_key, rc.clone());
1171            rc
1172        };
1173
1174        // Subscribe to underlying data topics
1175        let mut handlers = Vec::new();
1176
1177        if bar_type.is_composite() {
1178            let topic = switchboard::get_bars_topic(bar_type.composite());
1179            let handler =
1180                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1181
1182            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1183                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1184            }
1185
1186            handlers.push((topic, handler));
1187        } else if bar_type.spec().price_type == PriceType::Last {
1188            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1189            let handler =
1190                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1191
1192            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1193                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1194            }
1195
1196            handlers.push((topic, handler));
1197        } else {
1198            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1199            let handler =
1200                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1201
1202            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1203                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1204            }
1205
1206            handlers.push((topic, handler));
1207        }
1208
1209        self.bar_aggregator_handlers.insert(bar_key, handlers);
1210        aggregator.borrow_mut().set_is_running(true);
1211
1212        Ok(())
1213    }
1214
1215    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1216        let aggregator = self
1217            .bar_aggregators
1218            .remove(&bar_type.standard())
1219            .ok_or_else(|| {
1220                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1221            })?;
1222
1223        aggregator.borrow_mut().stop();
1224
1225        // Unsubscribe any registered message handlers
1226        let bar_key = bar_type.standard();
1227        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1228            for (topic, handler) in subs {
1229                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1230                    msgbus::unsubscribe_topic(topic, handler);
1231                }
1232            }
1233        }
1234
1235        Ok(())
1236    }
1237}
1238
1239#[inline(always)]
1240fn log_error_on_cache_insert<T: Display>(e: &T) {
1241    log::error!("Error on cache insert: {e}");
1242}