Skip to main content

nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod bar;
32pub mod book;
33mod commands;
34pub mod config;
35mod handlers;
36mod requests;
37mod time_range;
38
39#[cfg(feature = "defi")]
40pub mod pool;
41
42#[cfg(feature = "streaming")]
43mod streaming;
44
45use std::{
46    any::{Any, type_name},
47    cell::{Ref, RefCell},
48    collections::VecDeque,
49    fmt::{Debug, Display},
50    num::NonZeroUsize,
51    rc::Rc,
52    str::FromStr,
53};
54
55use ahash::{AHashMap, AHashSet};
56use anyhow::Context;
57pub use bar::BarAggregatorSubscription;
58use bar::{BarAggregatorKey, bar_aggregator_key};
59use book::{
60    BookSnapshotInfo, BookSnapshotInfos, BookSnapshotKey, BookSnapshotUnsubscribeResult,
61    BookSnapshotter, BookUpdater,
62};
63pub(crate) use commands::{DeferredCommand, DeferredCommandQueue};
64use config::DataEngineConfig;
65use futures::future::join_all;
66use handlers::{
67    BAR_AGGREGATOR_PRIORITY, BarBarHandler, BarQuoteHandler, BarTradeHandler, SpreadQuoteHandler,
68};
69use indexmap::IndexMap;
70use nautilus_common::{
71    cache::Cache,
72    clock::Clock,
73    logging::{RECV, RES},
74    messages::data::{
75        BarsResponse, BookDeltasResponse, BookDepthResponse, CustomDataResponse, DataCommand,
76        DataResponse, ForwardPricesResponse, FundingRatesResponse, QuotesResponse, RequestBars,
77        RequestCommand, RequestForwardPrices, RequestJoin, RequestQuotes, RequestTrades,
78        SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
79        SubscribeCommand, SubscribeOptionChain, SubscribeQuotes, SubscribeTrades, TradesResponse,
80        UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
81        UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionChain,
82        UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades, is_parent_subscription,
83    },
84    msgbus::{
85        self, ShareableMessageHandler, TypedHandler, TypedIntoHandler,
86        switchboard::{self, MessagingSwitchboard},
87    },
88    runner::get_data_cmd_sender,
89    timer::{TimeEvent, TimeEventCallback},
90};
91use nautilus_core::{
92    Params, UUID4, UnixNanos, WeakCell,
93    correctness::{
94        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
95    },
96    datetime::{NANOSECONDS_IN_DAY, millis_to_nanos_unchecked},
97};
98#[cfg(feature = "defi")]
99use nautilus_model::defi::DefiData;
100use nautilus_model::{
101    data::{
102        Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, HasTsInit, IndexPriceUpdate,
103        InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
104        OrderBookDepth10, QuoteTick, TradeTick,
105        option_chain::{OptionGreeks, StrikeRange},
106    },
107    enums::{
108        AggregationSource, BarAggregation, BookType, InstrumentClass, MarketStatusAction,
109        OrderSide, PriceType, RecordFlag,
110    },
111    identifiers::{ClientId, InstrumentId, OptionSeriesId, Symbol, Venue},
112    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
113    orderbook::OrderBook,
114    types::{Price, Quantity},
115};
116use requests::{
117    ContinuousFutureRequest, ContinuousFutureRequestState, ContinuousFutureSegment,
118    ContinuousFutureSource, RequestBarAggregation, continuous_future_parent_request_id,
119    continuous_future_request_from_bars, continuous_future_subscription_from_bars,
120    has_continuous_future_params, request_bar_aggregation_from_params, request_params,
121    response_params,
122};
123#[cfg(feature = "streaming")]
124use streaming::CatalogMap;
125use time_range::{
126    TimeRangePipelineState, has_time_range_pipeline_params, is_time_range_pipeline_variant,
127};
128use ustr::Ustr;
129
130#[cfg(feature = "defi")]
131#[allow(unused_imports)] // Brings DeFi impl blocks into scope
132use crate::defi::engine as _;
133#[cfg(feature = "defi")]
134use crate::engine::pool::PoolUpdater;
135use crate::{
136    aggregation::{
137        BarAggregator, RenkoBarAggregator, SpreadQuoteAggregator, TickBarAggregator,
138        TickImbalanceBarAggregator, TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator,
139        ValueImbalanceBarAggregator, ValueRunsBarAggregator, VolumeBarAggregator,
140        VolumeImbalanceBarAggregator, VolumeRunsBarAggregator,
141    },
142    client::DataClientAdapter,
143    option_chains::OptionChainManager,
144};
145
146/// Provides a high-performance `DataEngine` for all environments.
147#[derive(Debug)]
148pub struct DataEngine {
149    pub(crate) clock: Rc<RefCell<dyn Clock>>,
150    pub(crate) cache: Rc<RefCell<Cache>>,
151    pub(crate) external_clients: AHashSet<ClientId>,
152    clients: IndexMap<ClientId, DataClientAdapter>,
153    default_client: Option<DataClientAdapter>,
154    routing_map: IndexMap<Venue, ClientId>,
155    book_intervals: AHashMap<NonZeroUsize, BookSnapshotInfos>,
156    book_snapshot_counts: IndexMap<BookSnapshotKey, usize>,
157    book_deltas_counts: IndexMap<BookDeltasKey, usize>,
158    book_depth10_subs: AHashSet<InstrumentId>,
159    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
160    book_deltas_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
161    book_depth10_parent_expansions: AHashMap<InstrumentId, Vec<InstrumentId>>,
162    book_snapshotters: AHashMap<NonZeroUsize, Rc<BookSnapshotter>>,
163    bar_aggregators: IndexMap<BarAggregatorKey, Rc<RefCell<Box<dyn BarAggregator>>>>,
164    bar_aggregator_handlers: AHashMap<BarAggregatorKey, Vec<BarAggregatorSubscription>>,
165    request_bar_aggregations: AHashMap<UUID4, RequestBarAggregation>,
166    request_pipeline_parent_request: AHashMap<UUID4, RequestCommand>,
167    request_pipeline_n_components: AHashMap<UUID4, usize>,
168    request_pipeline_parent_request_id: AHashMap<UUID4, UUID4>,
169    request_pipeline_responses: AHashMap<UUID4, Vec<DataResponse>>,
170    time_range_pipeline_requests: AHashMap<UUID4, TimeRangePipelineState>,
171    time_range_pipeline_parent_request_id: AHashMap<UUID4, UUID4>,
172    parent_join_request_id: AHashMap<UUID4, UUID4>,
173    pending_join_requests: AHashMap<UUID4, RequestJoin>,
174    continuous_future_requests: AHashMap<UUID4, ContinuousFutureRequestState>,
175    continuous_future_subscriptions: AHashMap<BarType, ContinuousFutureSubscriptionState>,
176    continuous_future_roller: Option<Rc<ContinuousFutureRoller>>,
177    spread_quote_aggregators: AHashMap<InstrumentId, Rc<RefCell<SpreadQuoteAggregator>>>,
178    spread_quote_handlers: AHashMap<InstrumentId, Vec<(InstrumentId, TypedHandler<QuoteTick>)>>,
179    option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
180    option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
181    deferred_cmd_queue: DeferredCommandQueue,
182    pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
183    synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
184    synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
185    subscribed_synthetic_quotes: AHashSet<InstrumentId>,
186    subscribed_synthetic_trades: AHashSet<InstrumentId>,
187    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
188    command_count: u64,
189    data_count: u64,
190    request_count: u64,
191    response_count: u64,
192    pub(crate) msgbus_priority: u32,
193    pub(crate) config: DataEngineConfig,
194    #[cfg(feature = "streaming")]
195    catalogs: CatalogMap,
196    #[cfg(feature = "defi")]
197    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
198    #[cfg(feature = "defi")]
199    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
200    #[cfg(feature = "defi")]
201    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
202    #[cfg(feature = "defi")]
203    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
204}
205
206enum BookDeltasUnsubscribeResult {
207    NotSubscribed,
208    Decremented,
209    Removed,
210}
211
212type BookDeltasKey = (InstrumentId, Option<ClientId>, Option<Venue>);
213
214impl DataEngine {
215    /// Creates a new [`DataEngine`] instance.
216    #[must_use]
217    pub fn new(
218        clock: Rc<RefCell<dyn Clock>>,
219        cache: Rc<RefCell<Cache>>,
220        config: Option<DataEngineConfig>,
221    ) -> Self {
222        let config = config.unwrap_or_default();
223
224        let external_clients: AHashSet<ClientId> = config
225            .external_clients
226            .clone()
227            .unwrap_or_default()
228            .into_iter()
229            .collect();
230
231        Self {
232            clock,
233            cache,
234            external_clients,
235            clients: IndexMap::new(),
236            default_client: None,
237            routing_map: IndexMap::new(),
238            book_intervals: AHashMap::new(),
239            book_snapshot_counts: IndexMap::new(),
240            book_deltas_counts: IndexMap::new(),
241            book_depth10_subs: AHashSet::new(),
242            book_updaters: AHashMap::new(),
243            book_deltas_parent_expansions: AHashMap::new(),
244            book_depth10_parent_expansions: AHashMap::new(),
245            book_snapshotters: AHashMap::new(),
246            bar_aggregators: IndexMap::new(),
247            bar_aggregator_handlers: AHashMap::new(),
248            request_bar_aggregations: AHashMap::new(),
249            request_pipeline_parent_request: AHashMap::new(),
250            request_pipeline_n_components: AHashMap::new(),
251            request_pipeline_parent_request_id: AHashMap::new(),
252            request_pipeline_responses: AHashMap::new(),
253            time_range_pipeline_requests: AHashMap::new(),
254            time_range_pipeline_parent_request_id: AHashMap::new(),
255            parent_join_request_id: AHashMap::new(),
256            pending_join_requests: AHashMap::new(),
257            continuous_future_requests: AHashMap::new(),
258            continuous_future_subscriptions: AHashMap::new(),
259            continuous_future_roller: None,
260            spread_quote_aggregators: AHashMap::new(),
261            spread_quote_handlers: AHashMap::new(),
262            option_chain_managers: AHashMap::new(),
263            option_chain_instrument_index: AHashMap::new(),
264            deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
265            pending_option_chain_requests: AHashMap::new(),
266            synthetic_quote_feeds: AHashMap::new(),
267            synthetic_trade_feeds: AHashMap::new(),
268            subscribed_synthetic_quotes: AHashSet::new(),
269            subscribed_synthetic_trades: AHashSet::new(),
270            buffered_deltas_map: AHashMap::new(),
271            command_count: 0,
272            data_count: 0,
273            request_count: 0,
274            response_count: 0,
275            msgbus_priority: 10, // High-priority for built-in component
276            config,
277            #[cfg(feature = "streaming")]
278            catalogs: CatalogMap::new(),
279            #[cfg(feature = "defi")]
280            pool_updaters: AHashMap::new(),
281            #[cfg(feature = "defi")]
282            pool_updaters_pending: AHashSet::new(),
283            #[cfg(feature = "defi")]
284            pool_snapshot_pending: AHashSet::new(),
285            #[cfg(feature = "defi")]
286            pool_event_buffers: AHashMap::new(),
287        }
288    }
289
290    /// Registers all message bus handlers for the data engine.
291    pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
292        let weak = WeakCell::from(Rc::downgrade(engine));
293        engine.borrow_mut().continuous_future_roller =
294            Some(Rc::new(ContinuousFutureRoller::new(engine)));
295
296        let weak1 = weak.clone();
297        msgbus::register_data_command_endpoint(
298            MessagingSwitchboard::data_engine_execute(),
299            TypedIntoHandler::from(move |cmd: DataCommand| {
300                if let Some(rc) = weak1.upgrade() {
301                    rc.borrow_mut().execute(cmd);
302                }
303            }),
304        );
305
306        msgbus::register_data_command_endpoint(
307            MessagingSwitchboard::data_engine_queue_execute(),
308            TypedIntoHandler::from(move |cmd: DataCommand| {
309                get_data_cmd_sender().clone().execute(cmd);
310            }),
311        );
312
313        // Register process handler (polymorphic - uses Any)
314        let weak2 = weak.clone();
315        msgbus::register_any(
316            MessagingSwitchboard::data_engine_process(),
317            ShareableMessageHandler::from_any(move |data: &dyn Any| {
318                if let Some(rc) = weak2.upgrade() {
319                    rc.borrow_mut().process(data);
320                }
321            }),
322        );
323
324        // Register process_data handler (typed - takes ownership)
325        let weak3 = weak.clone();
326        msgbus::register_data_endpoint(
327            MessagingSwitchboard::data_engine_process_data(),
328            TypedIntoHandler::from(move |data: Data| {
329                if let Some(rc) = weak3.upgrade() {
330                    rc.borrow_mut().process_data(data);
331                }
332            }),
333        );
334
335        // Register process_defi_data handler (typed - takes ownership)
336        #[cfg(feature = "defi")]
337        {
338            let weak4 = weak.clone();
339            msgbus::register_defi_data_endpoint(
340                MessagingSwitchboard::data_engine_process_defi_data(),
341                TypedIntoHandler::from(move |data: DefiData| {
342                    if let Some(rc) = weak4.upgrade() {
343                        rc.borrow_mut().process_defi_data(data);
344                    }
345                }),
346            );
347        }
348
349        let weak5 = weak;
350        msgbus::register_data_response_endpoint(
351            MessagingSwitchboard::data_engine_response(),
352            TypedIntoHandler::from(move |resp: DataResponse| {
353                if let Some(rc) = weak5.upgrade() {
354                    rc.borrow_mut().response(resp);
355                }
356            }),
357        );
358    }
359
360    /// Returns the total count of data commands received by the engine.
361    #[must_use]
362    pub const fn command_count(&self) -> u64 {
363        self.command_count
364    }
365
366    /// Returns the total count of data stream objects received by the engine.
367    #[must_use]
368    pub const fn data_count(&self) -> u64 {
369        self.data_count
370    }
371
372    #[cfg(feature = "defi")]
373    pub(crate) const fn increment_data_count(&mut self) {
374        self.data_count += 1;
375    }
376
377    /// Returns the total count of data requests received by the engine.
378    #[must_use]
379    pub const fn request_count(&self) -> u64 {
380        self.request_count
381    }
382
383    /// Returns the total count of data responses received by the engine.
384    #[must_use]
385    pub const fn response_count(&self) -> u64 {
386        self.response_count
387    }
388
389    /// Returns whether an `OptionChainManager` exists for the given series.
390    #[must_use]
391    pub fn has_option_chain_manager(&self, series_id: &OptionSeriesId) -> bool {
392        self.option_chain_managers.contains_key(series_id)
393    }
394
395    /// Returns the count of pending option-chain bootstrap requests.
396    #[must_use]
397    pub fn pending_option_chain_request_count(&self) -> usize {
398        self.pending_option_chain_requests.len()
399    }
400
401    /// Returns the number of request pipelines awaiting leg responses.
402    #[must_use]
403    pub fn request_pipeline_count(&self) -> usize {
404        self.request_pipeline_parent_request.len()
405    }
406
407    /// Returns the number of time-range pipelines awaiting child responses.
408    #[must_use]
409    pub fn time_range_pipeline_count(&self) -> usize {
410        self.time_range_pipeline_requests.len()
411    }
412
413    /// Returns the number of `RequestJoin` originals awaiting finalization.
414    #[must_use]
415    pub fn pending_join_request_count(&self) -> usize {
416        self.pending_join_requests.len()
417    }
418
419    /// Returns a read-only reference to the engines clock.
420    #[must_use]
421    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
422        self.clock.borrow()
423    }
424
425    /// Returns a read-only reference to the engines cache.
426    #[must_use]
427    pub fn get_cache(&self) -> Ref<'_, Cache> {
428        self.cache.borrow()
429    }
430
431    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
432    #[must_use]
433    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
434        Rc::clone(&self.cache)
435    }
436
437    /// Registers the `client` with the engine with an optional venue `routing`.
438    ///
439    ///
440    /// # Panics
441    ///
442    /// Panics if a client with the same client ID has already been registered.
443    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
444        let client_id = client.client_id();
445
446        if let Some(default_client) = &self.default_client {
447            check_predicate_false(
448                default_client.client_id() == client.client_id(),
449                "client_id already registered as default client",
450            )
451            .expect(FAILED);
452        }
453
454        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
455
456        if let Some(routing) = routing {
457            self.routing_map.insert(routing, client_id);
458            log::debug!("Set client {client_id} routing for {routing}");
459        }
460
461        if client.venue.is_none() && self.default_client.is_none() {
462            self.default_client = Some(client);
463            log::debug!("Registered client {client_id} for default routing");
464        } else {
465            self.clients.insert(client_id, client);
466            log::debug!("Registered client {client_id}");
467        }
468    }
469
470    /// Deregisters the client for the `client_id`.
471    ///
472    /// # Panics
473    ///
474    /// Panics if the client ID has not been registered.
475    pub fn deregister_client(&mut self, client_id: &ClientId) {
476        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
477
478        self.clients.shift_remove(client_id);
479        log::info!("Deregistered client {client_id}");
480    }
481
482    /// Registers the data `client` with the engine as the default routing client.
483    ///
484    /// When a specific venue routing cannot be found, this client will receive messages.
485    ///
486    /// # Warnings
487    ///
488    /// Any existing default routing client will be overwritten.
489    ///
490    /// # Panics
491    ///
492    /// Panics if a default client has already been registered.
493    pub fn register_default_client(&mut self, client: DataClientAdapter) {
494        check_predicate_true(
495            self.default_client.is_none(),
496            "default client already registered",
497        )
498        .expect(FAILED);
499
500        let client_id = client.client_id();
501
502        self.default_client = Some(client);
503        log::debug!("Registered default client {client_id}");
504    }
505
506    /// Starts all registered data clients and re-arms bar aggregator timers.
507    pub fn start(&mut self) {
508        for client in self.get_clients_mut() {
509            if let Err(e) = client.start() {
510                log::error!("{e}");
511            }
512        }
513
514        for aggregator in self.bar_aggregators.values() {
515            if aggregator.borrow().bar_type().spec().is_time_aggregated() {
516                aggregator
517                    .borrow_mut()
518                    .start_timer(Some(aggregator.clone()));
519            }
520        }
521
522        for aggregator in self.spread_quote_aggregators.values() {
523            aggregator
524                .borrow_mut()
525                .start_timer(Some(aggregator.clone()));
526        }
527    }
528
529    /// Stops all registered data clients and bar aggregator timers.
530    pub fn stop(&mut self) {
531        for client in self.get_clients_mut() {
532            if let Err(e) = client.stop() {
533                log::error!("{e}");
534            }
535        }
536
537        for aggregator in self.bar_aggregators.values() {
538            aggregator.borrow_mut().stop();
539        }
540
541        for aggregator in self.spread_quote_aggregators.values() {
542            aggregator.borrow_mut().stop_timer();
543        }
544    }
545
546    /// Resets all registered data clients and clears engine state.
547    pub fn reset(&mut self) {
548        for client in self.get_clients_mut() {
549            if let Err(e) = client.reset() {
550                log::error!("{e}");
551            }
552        }
553
554        let keys: Vec<BarAggregatorKey> = self.bar_aggregators.keys().copied().collect();
555        for (bar_type, request_id) in keys {
556            if let Err(e) = self.stop_bar_aggregator(bar_type, request_id) {
557                log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
558            }
559        }
560
561        self.request_bar_aggregations.clear();
562        self.request_pipeline_parent_request.clear();
563        self.request_pipeline_n_components.clear();
564        self.request_pipeline_parent_request_id.clear();
565        self.request_pipeline_responses.clear();
566        self.time_range_pipeline_requests.clear();
567        self.time_range_pipeline_parent_request_id.clear();
568        self.parent_join_request_id.clear();
569        self.pending_join_requests.clear();
570        self.continuous_future_requests.clear();
571
572        for state in self.continuous_future_subscriptions.values_mut() {
573            if let Some(name) = state.timer_name.take() {
574                self.clock.borrow_mut().cancel_timer(&name);
575            }
576        }
577        self.continuous_future_subscriptions.clear();
578
579        let spread_ids: Vec<InstrumentId> = self.spread_quote_aggregators.keys().copied().collect();
580        for spread_id in spread_ids {
581            self.stop_spread_quote_aggregator(spread_id);
582        }
583
584        // Tear down option chain managers to unregister their msgbus handlers
585        let managers: Vec<_> = self.option_chain_managers.drain().collect();
586        for (_, manager) in managers {
587            manager.borrow_mut().teardown(&self.clock);
588        }
589
590        self.option_chain_instrument_index.clear();
591        self.pending_option_chain_requests.clear();
592
593        // Unsubscribe BookUpdaters before dropping; otherwise the typed router
594        // keeps dispatching to abandoned updaters. `book_updaters` is keyed by
595        // per-underlying id, so the literal per-underlying topic is the same
596        // string the subscribe path used.
597        let book_updaters: Vec<(InstrumentId, Rc<BookUpdater>)> =
598            self.book_updaters.drain().collect();
599        for (instrument_id, updater) in book_updaters {
600            let deltas_topic = switchboard::get_book_deltas_topic(instrument_id);
601            let depth_topic = switchboard::get_book_depth10_topic(instrument_id);
602            let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
603            let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
604            msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
605            msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
606        }
607
608        self.book_deltas_parent_expansions.clear();
609        self.book_depth10_parent_expansions.clear();
610
611        self.book_deltas_counts.clear();
612        self.book_depth10_subs.clear();
613        self.book_intervals.clear();
614        self.book_snapshot_counts.clear();
615        self.book_snapshotters.clear();
616        self.buffered_deltas_map.clear();
617
618        self.synthetic_quote_feeds.clear();
619        self.synthetic_trade_feeds.clear();
620        self.subscribed_synthetic_quotes.clear();
621        self.subscribed_synthetic_trades.clear();
622
623        self.deferred_cmd_queue.borrow_mut().clear();
624
625        self.clock.borrow_mut().cancel_timers();
626
627        self.command_count = 0;
628        self.data_count = 0;
629        self.request_count = 0;
630        self.response_count = 0;
631    }
632
633    /// Disposes the engine, stopping all clients and canceling any timers.
634    pub fn dispose(&mut self) {
635        for client in self.get_clients_mut() {
636            if let Err(e) = client.dispose() {
637                log::error!("{e}");
638            }
639        }
640
641        self.clock.borrow_mut().cancel_timers();
642    }
643
644    /// Connects all registered data clients concurrently.
645    ///
646    /// Connection failures are logged but do not prevent the node from running.
647    pub async fn connect(&mut self) {
648        let futures: Vec<_> = self
649            .get_clients_mut()
650            .into_iter()
651            .map(DataClientAdapter::connect)
652            .collect();
653
654        let results = join_all(futures).await;
655
656        for error in results.into_iter().filter_map(Result::err) {
657            log::error!("Failed to connect data client: {error}");
658        }
659    }
660
661    /// Disconnects all registered data clients concurrently.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if any client fails to disconnect.
666    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
667        let futures: Vec<_> = self
668            .get_clients_mut()
669            .into_iter()
670            .map(DataClientAdapter::disconnect)
671            .collect();
672
673        let results = join_all(futures).await;
674        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
675
676        if errors.is_empty() {
677            Ok(())
678        } else {
679            let error_msgs: Vec<_> = errors.iter().map(ToString::to_string).collect();
680            anyhow::bail!(
681                "Failed to disconnect data clients: {}",
682                error_msgs.join("; ")
683            )
684        }
685    }
686
687    /// Returns `true` if all registered data clients are currently connected.
688    #[must_use]
689    pub fn check_connected(&self) -> bool {
690        self.get_clients()
691            .iter()
692            .all(|client| client.is_connected())
693    }
694
695    /// Returns `true` if all registered data clients are currently disconnected.
696    #[must_use]
697    pub fn check_disconnected(&self) -> bool {
698        self.get_clients()
699            .iter()
700            .all(|client| !client.is_connected())
701    }
702
703    /// Returns connection status for each registered client.
704    #[must_use]
705    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
706        self.get_clients()
707            .into_iter()
708            .map(|client| (client.client_id(), client.is_connected()))
709            .collect()
710    }
711
712    /// Returns a list of all registered client IDs, including the default client if set.
713    #[must_use]
714    pub fn registered_clients(&self) -> Vec<ClientId> {
715        self.get_clients()
716            .into_iter()
717            .map(|client| client.client_id())
718            .collect()
719    }
720
721    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
722    where
723        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
724        T: Clone,
725    {
726        self.get_clients()
727            .into_iter()
728            .flat_map(get_subs)
729            .cloned()
730            .collect()
731    }
732
733    #[must_use]
734    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
735        let (default_opt, clients_map) = (&self.default_client, &self.clients);
736        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
737
738        if let Some(default) = default_opt {
739            clients.push(default);
740        }
741
742        clients
743    }
744
745    #[must_use]
746    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
747        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
748        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
749
750        if let Some(default) = default_opt {
751            clients.push(default);
752        }
753
754        clients
755    }
756
757    pub fn get_client(
758        &mut self,
759        client_id: Option<&ClientId>,
760        venue: Option<&Venue>,
761    ) -> Option<&mut DataClientAdapter> {
762        if let Some(client_id) = client_id {
763            // Explicit ID: first look in registered clients
764            if let Some(client) = self.clients.get_mut(client_id) {
765                return Some(client);
766            }
767
768            // Then check if it matches the default client
769            if let Some(default) = self.default_client.as_mut()
770                && default.client_id() == *client_id
771            {
772                return Some(default);
773            }
774
775            // Unknown explicit client
776            return None;
777        }
778
779        if let Some(v) = venue {
780            // Route by venue if mapped client still registered
781            if let Some(client_id) = self.routing_map.get(v) {
782                return self.clients.get_mut(client_id);
783            }
784        }
785
786        // Fallback to default client
787        self.get_default_client()
788    }
789
790    /// Resolves the client for a subscribe/unsubscribe command.
791    ///
792    /// When `BACKTEST` is registered, all commands route through it regardless of
793    /// the command's `client_id` or `venue`. Request paths skip this override.
794    fn get_command_client(
795        &mut self,
796        client_id: Option<&ClientId>,
797        venue: Option<&Venue>,
798    ) -> Option<&mut DataClientAdapter> {
799        let backtest_id = ClientId::new("BACKTEST");
800        // BACKTEST may live in `clients` or as the default (venue=None branch in
801        // `register_client`)
802        if self.clients.contains_key(&backtest_id) {
803            return self.clients.get_mut(&backtest_id);
804        }
805        let default_is_backtest = self
806            .default_client
807            .as_ref()
808            .is_some_and(|c| c.client_id() == backtest_id);
809        if default_is_backtest {
810            return self.default_client.as_mut();
811        }
812        self.get_client(client_id, venue)
813    }
814
815    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
816        self.default_client.as_mut()
817    }
818
819    /// Returns all custom data types currently subscribed across all clients.
820    #[must_use]
821    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
822        self.collect_subscriptions(|client| &client.subscriptions_custom)
823    }
824
825    /// Returns all instrument IDs currently subscribed across all clients.
826    #[must_use]
827    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
828        self.collect_subscriptions(|client| &client.subscriptions_instrument)
829    }
830
831    /// Returns all instrument IDs for which book delta subscriptions exist.
832    #[must_use]
833    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
834        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
835    }
836
837    /// Returns all instrument IDs for which book depth10 subscriptions exist.
838    #[must_use]
839    pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
840        self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
841    }
842
843    /// Returns all instrument IDs for which book snapshot subscriptions exist.
844    #[must_use]
845    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
846        self.book_snapshot_counts
847            .keys()
848            .map(|(instrument_id, _)| *instrument_id)
849            .collect()
850    }
851
852    /// Returns all instrument IDs for which quote subscriptions exist.
853    #[must_use]
854    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
855        self.collect_subscriptions(|client| &client.subscriptions_quotes)
856    }
857
858    /// Returns all synthetic instrument IDs for which quote subscriptions exist.
859    #[must_use]
860    pub fn subscribed_synthetic_quotes(&self) -> Vec<InstrumentId> {
861        self.subscribed_synthetic_quotes.iter().copied().collect()
862    }
863
864    /// Returns all instrument IDs for which trade subscriptions exist.
865    #[must_use]
866    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
867        self.collect_subscriptions(|client| &client.subscriptions_trades)
868    }
869
870    /// Returns all synthetic instrument IDs for which trade subscriptions exist.
871    #[must_use]
872    pub fn subscribed_synthetic_trades(&self) -> Vec<InstrumentId> {
873        self.subscribed_synthetic_trades.iter().copied().collect()
874    }
875
876    /// Returns all bar types currently subscribed across all clients.
877    #[must_use]
878    pub fn subscribed_bars(&self) -> Vec<BarType> {
879        self.collect_subscriptions(|client| &client.subscriptions_bars)
880    }
881
882    /// Returns all instrument IDs for which mark price subscriptions exist.
883    #[must_use]
884    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
885        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
886    }
887
888    /// Returns all instrument IDs for which index price subscriptions exist.
889    #[must_use]
890    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
891        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
892    }
893
894    /// Returns all instrument IDs for which funding rate subscriptions exist.
895    #[must_use]
896    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
897        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
898    }
899
900    /// Returns all instrument IDs for which status subscriptions exist.
901    #[must_use]
902    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
903        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
904    }
905
906    /// Returns all instrument IDs for which instrument close subscriptions exist.
907    #[must_use]
908    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
909        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
910    }
911
912    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
913    ///
914    /// This is the final synchronous dispatch point for data commands. Runtime command producers
915    /// should send to `DataEngine.queue_execute`, which lets the runner sequence command execution
916    /// before this method runs. The engine also calls this method for child commands generated while
917    /// processing a parent command, where immediate in-engine ordering matters.
918    ///
919    /// Errors during execution are logged.
920    pub fn execute(&mut self, cmd: DataCommand) {
921        match &cmd {
922            DataCommand::Subscribe(_) | DataCommand::Unsubscribe(_) => self.command_count += 1,
923            DataCommand::Request(_) => self.request_count += 1,
924            #[cfg(feature = "defi")]
925            DataCommand::DefiRequest(_) => self.request_count += 1,
926            #[cfg(feature = "defi")]
927            DataCommand::DefiSubscribe(_) | DataCommand::DefiUnsubscribe(_) => {
928                self.command_count += 1;
929            }
930            _ => {}
931        }
932
933        if let Err(e) = match cmd {
934            DataCommand::Subscribe(c) => self.execute_subscribe(c),
935            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
936            DataCommand::Request(c) => self.execute_request(c),
937            #[cfg(feature = "defi")]
938            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
939            #[cfg(feature = "defi")]
940            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
941            #[cfg(feature = "defi")]
942            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
943            _ => {
944                log::warn!("Unhandled DataCommand variant");
945                Ok(())
946            }
947        } {
948            log::error!("{e}");
949        }
950    }
951
952    /// Handles a subscribe command, updating internal state and forwarding to the client.
953    ///
954    /// # Errors
955    ///
956    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
957    /// or if the underlying client operation fails.
958    pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
959        if let Some(client_id) = cmd.client_id()
960            && self.external_clients.contains(client_id)
961        {
962            if self.config.debug {
963                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
964            }
965            return Ok(());
966        }
967
968        // Update internal engine state
969        match &cmd {
970            SubscribeCommand::BookDeltas(cmd) if !self.subscribe_book_deltas(cmd)? => {
971                return Ok(());
972            }
973            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
974            SubscribeCommand::BookSnapshots(cmd) => {
975                // Handles client forwarding internally (forwards as BookDeltas)
976                return self.subscribe_book_snapshots(cmd);
977            }
978            SubscribeCommand::Bars(cmd) if has_continuous_future_params(cmd.params.as_ref()) => {
979                return self.subscribe_continuous_future_bars(cmd);
980            }
981            SubscribeCommand::Bars(cmd) => {
982                self.subscribe_bars(cmd)?;
983                if cmd.bar_type.is_internally_aggregated() {
984                    return Ok(());
985                }
986            }
987            SubscribeCommand::OptionChain(cmd) => {
988                self.subscribe_option_chain(cmd);
989                return Ok(());
990            }
991            SubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
992                self.subscribe_synthetic_quotes(cmd.instrument_id);
993                return Ok(());
994            }
995            SubscribeCommand::Quotes(cmd)
996                if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
997            {
998                self.subscribe_spread_quotes(cmd);
999                return Ok(());
1000            }
1001            SubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
1002                self.subscribe_synthetic_trades(cmd.instrument_id);
1003                return Ok(());
1004            }
1005            SubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
1006                anyhow::bail!("Cannot subscribe for synthetic instrument `Instrument` data");
1007            }
1008            SubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
1009                anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentStatus` data");
1010            }
1011            SubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
1012                anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentClose` data");
1013            }
1014            SubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
1015                anyhow::bail!("Cannot subscribe for synthetic instrument `OptionGreeks` data");
1016            }
1017            _ => {} // Do nothing else
1018        }
1019
1020        #[cfg(feature = "streaming")]
1021        let cmd = self.subscribe_command_with_prefilled_start_ns(cmd)?;
1022
1023        if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
1024            client.execute_subscribe(cmd);
1025        } else {
1026            log::error!(
1027                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1028                cmd.client_id(),
1029                cmd.venue(),
1030            );
1031        }
1032
1033        Ok(())
1034    }
1035
1036    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
1037    ///
1038    /// # Errors
1039    ///
1040    /// Returns an error if the underlying client operation fails.
1041    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
1042        if let Some(client_id) = cmd.client_id()
1043            && self.external_clients.contains(client_id)
1044        {
1045            if self.config.debug {
1046                log::debug!(
1047                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
1048                );
1049            }
1050            return Ok(());
1051        }
1052
1053        match &cmd {
1054            UnsubscribeCommand::BookDeltas(cmd) if !self.unsubscribe_book_deltas(cmd) => {
1055                return Ok(());
1056            }
1057            UnsubscribeCommand::BookDepth10(cmd) if !self.unsubscribe_book_depth10(cmd) => {
1058                return Ok(());
1059            }
1060            UnsubscribeCommand::BookSnapshots(cmd) => {
1061                // Handles client forwarding internally (forwards as BookDeltas)
1062                self.unsubscribe_book_snapshots(cmd);
1063                return Ok(());
1064            }
1065            UnsubscribeCommand::Bars(cmd)
1066                if self
1067                    .continuous_future_subscriptions
1068                    .contains_key(&cmd.bar_type.standard()) =>
1069            {
1070                self.unsubscribe_continuous_future_bars(cmd);
1071                return Ok(());
1072            }
1073            UnsubscribeCommand::Bars(cmd) => {
1074                self.unsubscribe_bars(cmd);
1075                if cmd.bar_type.is_internally_aggregated() {
1076                    return Ok(());
1077                }
1078            }
1079            UnsubscribeCommand::OptionChain(cmd) => {
1080                self.unsubscribe_option_chain(cmd);
1081                return Ok(());
1082            }
1083            UnsubscribeCommand::Quotes(cmd) if cmd.instrument_id.is_synthetic() => {
1084                self.unsubscribe_synthetic_quotes(cmd.instrument_id);
1085                return Ok(());
1086            }
1087            UnsubscribeCommand::Quotes(cmd)
1088                if self.is_spread_quote_command(cmd.instrument_id, cmd.params.as_ref()) =>
1089            {
1090                self.unsubscribe_spread_quotes(cmd);
1091                return Ok(());
1092            }
1093            UnsubscribeCommand::Trades(cmd) if cmd.instrument_id.is_synthetic() => {
1094                self.unsubscribe_synthetic_trades(cmd.instrument_id);
1095                return Ok(());
1096            }
1097            UnsubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
1098                anyhow::bail!("Cannot unsubscribe from synthetic instrument `Instrument` data");
1099            }
1100            UnsubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
1101                anyhow::bail!(
1102                    "Cannot unsubscribe from synthetic instrument `InstrumentStatus` data"
1103                );
1104            }
1105            UnsubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
1106                anyhow::bail!(
1107                    "Cannot unsubscribe from synthetic instrument `InstrumentClose` data"
1108                );
1109            }
1110            UnsubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
1111                anyhow::bail!("Cannot unsubscribe from synthetic instrument `OptionGreeks` data");
1112            }
1113            _ => {}
1114        }
1115
1116        // Keep client subscribed while exact-topic subscribers remain
1117        if Self::topic_has_remaining_subscribers(cmd) {
1118            return Ok(());
1119        }
1120
1121        if let Some(client) = self.get_command_client(cmd.client_id(), cmd.venue()) {
1122            client.execute_unsubscribe(cmd);
1123        } else {
1124            log::error!(
1125                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1126                cmd.client_id(),
1127                cmd.venue(),
1128            );
1129        }
1130
1131        Ok(())
1132    }
1133
1134    fn topic_has_remaining_subscribers(cmd: &UnsubscribeCommand) -> bool {
1135        // Exact match only; wildcard observers must not block venue detach.
1136        // BookDeltas/Depth10 excluded: binary engine state cannot distinguish
1137        // the internal BookUpdater handler from external subscribers
1138        match cmd {
1139            UnsubscribeCommand::Quotes(c) => {
1140                let topic = switchboard::get_quotes_topic(c.instrument_id);
1141                msgbus::exact_subscriber_count_quotes(topic) > 0
1142            }
1143            UnsubscribeCommand::Trades(c) => {
1144                let topic = switchboard::get_trades_topic(c.instrument_id);
1145                msgbus::exact_subscriber_count_trades(topic) > 0
1146            }
1147            UnsubscribeCommand::MarkPrices(c) => {
1148                let topic = switchboard::get_mark_price_topic(c.instrument_id);
1149                msgbus::exact_subscriber_count_mark_prices(topic) > 0
1150            }
1151            UnsubscribeCommand::IndexPrices(c) => {
1152                let topic = switchboard::get_index_price_topic(c.instrument_id);
1153                msgbus::exact_subscriber_count_index_prices(topic) > 0
1154            }
1155            UnsubscribeCommand::FundingRates(c) => {
1156                let topic = switchboard::get_funding_rate_topic(c.instrument_id);
1157                msgbus::exact_subscriber_count_funding_rates(topic) > 0
1158            }
1159            UnsubscribeCommand::OptionGreeks(c) => {
1160                let topic = switchboard::get_option_greeks_topic(c.instrument_id);
1161                msgbus::exact_subscriber_count_option_greeks(topic) > 0
1162            }
1163            _ => false,
1164        }
1165    }
1166
1167    /// Sends a [`RequestCommand`] to a suitable data client implementation.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if no client is found for the given client ID or venue,
1172    /// or if the client fails to process the request.
1173    pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
1174        // Skip requests for external clients
1175        if let Some(cid) = req.client_id()
1176            && self.external_clients.contains(cid)
1177        {
1178            if self.config.debug {
1179                log::debug!("Skipping data request for external client {cid}: {req:?}");
1180            }
1181            return Ok(());
1182        }
1183
1184        if let RequestCommand::Join(join) = req {
1185            return self.handle_request_join(join);
1186        }
1187
1188        if has_continuous_future_params(request_params(&req)) {
1189            return self.execute_continuous_future_request(req);
1190        }
1191
1192        let request_id = *req.request_id();
1193        self.prepare_request_bar_aggregators(&req)?;
1194
1195        if has_time_range_pipeline_params(request_params(&req))
1196            && is_time_range_pipeline_variant(&req)
1197        {
1198            let result = self.execute_time_range_pipeline_request(req);
1199            if result.is_err() {
1200                self.cleanup_request_bar_aggregators(&request_id);
1201            }
1202            return result;
1203        }
1204
1205        #[cfg(feature = "streaming")]
1206        if self.catalogs_registered() && streaming::is_date_range_variant(&req) {
1207            let result = self.dispatch_date_range_request(req);
1208            if result.is_err() {
1209                self.cleanup_request_bar_aggregators(&request_id);
1210            }
1211            return result;
1212        }
1213
1214        let result = self.dispatch_request_to_client(req);
1215
1216        if result.is_err() {
1217            self.cleanup_request_bar_aggregators(&request_id);
1218        }
1219
1220        result.map(|_| ())
1221    }
1222
1223    pub(super) fn dispatch_request_to_client(
1224        &mut self,
1225        req: RequestCommand,
1226    ) -> anyhow::Result<ClientId> {
1227        let client_id = req.client_id().copied();
1228        let venue = req.venue().copied();
1229        let Some(client) = self.get_client(client_id.as_ref(), venue.as_ref()) else {
1230            anyhow::bail!("Cannot handle request: no client found for {client_id:?} {venue:?}");
1231        };
1232        let resolved_client_id = client.client_id();
1233
1234        match req {
1235            RequestCommand::Data(req) => client.request_data(req),
1236            RequestCommand::Instrument(req) => client.request_instrument(req),
1237            RequestCommand::Instruments(req) => client.request_instruments(req),
1238            RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
1239            RequestCommand::BookDeltas(req) => client.request_book_deltas(req),
1240            RequestCommand::BookDepth(req) => client.request_book_depth(req),
1241            RequestCommand::Quotes(req) => client.request_quotes(req),
1242            RequestCommand::Trades(req) => client.request_trades(req),
1243            RequestCommand::FundingRates(req) => client.request_funding_rates(req),
1244            RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
1245            RequestCommand::Bars(req) => client.request_bars(req),
1246            RequestCommand::Join(_) => {
1247                anyhow::bail!("RequestJoin must be handled by handle_request_join")
1248            }
1249        }?;
1250
1251        Ok(resolved_client_id)
1252    }
1253
1254    fn execute_continuous_future_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
1255        let RequestCommand::Bars(parent) = req else {
1256            anyhow::bail!("Continuous future requests require `RequestBars`");
1257        };
1258        let request_id = parent.request_id;
1259        let Some(continuous_request) = continuous_future_request_from_bars(&parent)? else {
1260            return Ok(());
1261        };
1262
1263        self.ensure_continuous_future_target_instrument(&continuous_request);
1264        self.prepare_request_bar_aggregators_from_state(
1265            request_id,
1266            &continuous_request.request_bar_aggregation,
1267        )?;
1268
1269        let response_client_id = match self.resolve_request_client_id(
1270            parent.client_id.as_ref(),
1271            Some(&continuous_request.primary_bar_type.instrument_id().venue),
1272        ) {
1273            Ok(client_id) => client_id,
1274            Err(e) => {
1275                self.cleanup_request_bar_aggregators(&request_id);
1276                return Err(e);
1277            }
1278        };
1279        let (cursor_ns, end_ns) = match self.bound_continuous_future_dates(&parent) {
1280            Ok(bounds) => bounds,
1281            Err(e) => {
1282                self.cleanup_request_bar_aggregators(&request_id);
1283                return Err(e);
1284            }
1285        };
1286
1287        self.continuous_future_requests.insert(
1288            request_id,
1289            ContinuousFutureRequestState {
1290                parent,
1291                request: continuous_request,
1292                start_ns: cursor_ns,
1293                cursor_ns,
1294                end_ns,
1295                response_client_id,
1296                data_count: 0,
1297            },
1298        );
1299
1300        if let Err(e) = self.dispatch_next_continuous_future_segment(request_id) {
1301            self.continuous_future_requests.remove(&request_id);
1302            self.cleanup_request_bar_aggregators(&request_id);
1303            return Err(e);
1304        }
1305
1306        Ok(())
1307    }
1308
1309    fn resolve_request_client_id(
1310        &mut self,
1311        client_id: Option<&ClientId>,
1312        venue: Option<&Venue>,
1313    ) -> anyhow::Result<ClientId> {
1314        self.get_client(client_id, venue)
1315            .map(|client| client.client_id())
1316            .ok_or_else(|| {
1317                anyhow::anyhow!(
1318                    "Cannot handle request: no client found for {client_id:?} {venue:?}"
1319                )
1320            })
1321    }
1322
1323    fn bound_continuous_future_dates(
1324        &self,
1325        request: &RequestBars,
1326    ) -> anyhow::Result<(UnixNanos, UnixNanos)> {
1327        let now = self.clock.borrow().timestamp_ns();
1328        let start = request
1329            .start
1330            .map(datetime_to_unix_nanos)
1331            .transpose()?
1332            .unwrap_or_default();
1333        let end = request
1334            .end
1335            .map(datetime_to_unix_nanos)
1336            .transpose()?
1337            .unwrap_or(now);
1338
1339        Ok((start.min(now), end.min(now)))
1340    }
1341
1342    fn ensure_continuous_future_target_instrument(&self, request: &ContinuousFutureRequest) {
1343        let target_id = request.primary_bar_type.instrument_id();
1344        if self.cache.borrow().instrument(&target_id).is_some() {
1345            return;
1346        }
1347
1348        let segment_id = request.first_segment_instrument_id();
1349        let segment_instrument = self.cache.borrow().instrument(&segment_id).cloned();
1350        let Some(segment_instrument) = segment_instrument else {
1351            log::warn!(
1352                "Cannot synthesize continuous future instrument {target_id}: first segment {segment_id} not in cache"
1353            );
1354            return;
1355        };
1356
1357        let InstrumentAny::FuturesContract(mut target) = segment_instrument else {
1358            log::warn!(
1359                "Cannot synthesize continuous future instrument {target_id}: segment {segment_id} is not a FuturesContract",
1360            );
1361            return;
1362        };
1363
1364        target.id = target_id;
1365        target.raw_symbol = target_id.symbol;
1366        target.activation_ns = UnixNanos::default();
1367        target.expiration_ns = UnixNanos::default();
1368
1369        if let Err(e) = self
1370            .cache
1371            .borrow_mut()
1372            .add_instrument(InstrumentAny::FuturesContract(target))
1373        {
1374            log_error_on_cache_insert(&e);
1375        }
1376    }
1377
1378    fn prepare_request_bar_aggregators_from_state(
1379        &mut self,
1380        request_id: UUID4,
1381        state: &RequestBarAggregation,
1382    ) -> anyhow::Result<()> {
1383        if !self.can_start_request_bar_aggregators(request_id, state) {
1384            anyhow::bail!(
1385                "Cannot request aggregated bars: one of the aggregators in `bar_types` is already running"
1386            );
1387        }
1388
1389        self.request_bar_aggregations
1390            .insert(request_id, state.clone());
1391
1392        if let Err(e) = self.init_request_bar_aggregators(request_id, state) {
1393            self.cleanup_request_bar_aggregators(&request_id);
1394            return Err(e);
1395        }
1396
1397        Ok(())
1398    }
1399
1400    fn dispatch_next_continuous_future_segment(&mut self, request_id: UUID4) -> anyhow::Result<()> {
1401        let Some(state) = self.continuous_future_requests.get(&request_id).cloned() else {
1402            anyhow::bail!("No active continuous future request for {request_id}");
1403        };
1404
1405        let Some(segment) = state
1406            .request
1407            .next_segment(state.cursor_ns.as_u64(), state.end_ns.as_u64())
1408        else {
1409            self.emit_empty_continuous_future_response(request_id);
1410            return Ok(());
1411        };
1412
1413        self.apply_continuous_future_adjustment(request_id, &state.request, segment.index)?;
1414        let child = self.build_continuous_future_child_request(request_id, &state, segment);
1415        if let Some(active) = self.continuous_future_requests.get_mut(&request_id) {
1416            active.cursor_ns = UnixNanos::from(segment.end_ns.saturating_add(1));
1417        }
1418
1419        self.dispatch_request_to_client(child).map(|_| ())
1420    }
1421
1422    fn apply_continuous_future_adjustment(
1423        &self,
1424        request_id: UUID4,
1425        request: &ContinuousFutureRequest,
1426        segment_index: usize,
1427    ) -> anyhow::Result<()> {
1428        let adjustment = request.adjustment_for_segment(segment_index);
1429        let key = bar_aggregator_key(request.primary_bar_type, Some(request_id));
1430        let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
1431            anyhow::anyhow!("No aggregator for continuous future request {request_id}")
1432        })?;
1433        aggregator
1434            .borrow_mut()
1435            .set_adjustment(adjustment, request.adjustment_mode);
1436
1437        Ok(())
1438    }
1439
1440    fn build_continuous_future_child_request(
1441        &self,
1442        request_id: UUID4,
1443        state: &ContinuousFutureRequestState,
1444        segment: ContinuousFutureSegment,
1445    ) -> RequestCommand {
1446        let source = state.request.source_for_segment(segment.instrument_id);
1447        let start = Some(UnixNanos::from(segment.start_ns).to_datetime_utc());
1448        let end = Some(UnixNanos::from(segment.end_ns).to_datetime_utc());
1449        let child_params = Some(
1450            state
1451                .request
1452                .child_params(state.parent.params.as_ref(), request_id),
1453        );
1454        let child_request_id = UUID4::new();
1455        let ts_init = self.clock.borrow().timestamp_ns();
1456
1457        match source {
1458            ContinuousFutureSource::Bars(bar_type) => RequestCommand::Bars(RequestBars::new(
1459                bar_type,
1460                start,
1461                end,
1462                state.parent.limit,
1463                state.parent.client_id,
1464                child_request_id,
1465                ts_init,
1466                child_params,
1467            )),
1468            ContinuousFutureSource::Trades => RequestCommand::Trades(RequestTrades::new(
1469                segment.instrument_id,
1470                start,
1471                end,
1472                state.parent.limit,
1473                state.parent.client_id,
1474                child_request_id,
1475                ts_init,
1476                child_params,
1477            )),
1478            ContinuousFutureSource::Quotes => RequestCommand::Quotes(RequestQuotes::new(
1479                segment.instrument_id,
1480                start,
1481                end,
1482                state.parent.limit,
1483                state.parent.client_id,
1484                child_request_id,
1485                ts_init,
1486                child_params,
1487            )),
1488        }
1489    }
1490
1491    fn emit_empty_continuous_future_response(&mut self, request_id: UUID4) {
1492        let Some(state) = self.continuous_future_requests.remove(&request_id) else {
1493            return;
1494        };
1495
1496        let mut params = state.parent.params.unwrap_or_default();
1497        if state.data_count != 0 {
1498            params.insert(
1499                "data_count".to_string(),
1500                serde_json::json!(state.data_count),
1501            );
1502        }
1503
1504        let response = DataResponse::Bars(BarsResponse::new(
1505            request_id,
1506            state.response_client_id,
1507            state.parent.bar_type,
1508            Vec::new(),
1509            Some(state.start_ns),
1510            Some(state.end_ns),
1511            self.clock.borrow().timestamp_ns(),
1512            Some(params),
1513        ));
1514        self.response(response);
1515    }
1516
1517    fn prepare_request_bar_aggregators(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
1518        let request_id = *req.request_id();
1519        let Some(state) = request_bar_aggregation_from_params(request_params(req))? else {
1520            return Ok(());
1521        };
1522
1523        self.prepare_request_bar_aggregators_from_state(request_id, &state)
1524    }
1525
1526    fn can_start_request_bar_aggregators(
1527        &self,
1528        request_id: UUID4,
1529        state: &RequestBarAggregation,
1530    ) -> bool {
1531        let aggregator_request_id = state.aggregator_request_id(request_id);
1532        state.bar_types.iter().all(|bar_type| {
1533            let key = bar_aggregator_key(*bar_type, aggregator_request_id);
1534            self.bar_aggregators
1535                .get(&key)
1536                .is_none_or(|aggregator| !aggregator.borrow().is_running())
1537        })
1538    }
1539
1540    fn init_request_bar_aggregators(
1541        &mut self,
1542        request_id: UUID4,
1543        state: &RequestBarAggregation,
1544    ) -> anyhow::Result<()> {
1545        let aggregator_request_id = state.aggregator_request_id(request_id);
1546
1547        for bar_type in &state.bar_types {
1548            self.create_bar_aggregator_for_key(*bar_type, aggregator_request_id)?;
1549            self.setup_bar_aggregator(*bar_type, true, aggregator_request_id)?;
1550
1551            let key = bar_aggregator_key(*bar_type, aggregator_request_id);
1552            if let Some(aggregator) = self.bar_aggregators.get(&key) {
1553                aggregator.borrow_mut().set_is_running(true);
1554            }
1555        }
1556
1557        self.set_request_bar_aggregator_chain_handlers(request_id, state);
1558
1559        Ok(())
1560    }
1561
1562    fn set_request_bar_aggregator_chain_handlers(
1563        &self,
1564        request_id: UUID4,
1565        state: &RequestBarAggregation,
1566    ) {
1567        let aggregator_request_id = state.aggregator_request_id(request_id);
1568
1569        for bar_type in &state.bar_types {
1570            let key = bar_aggregator_key(*bar_type, aggregator_request_id);
1571            let Some(aggregator) = self.bar_aggregators.get(&key).cloned() else {
1572                continue;
1573            };
1574
1575            let downstream: Vec<_> = state
1576                .bar_types
1577                .iter()
1578                .filter(|candidate| {
1579                    candidate.is_composite()
1580                        && candidate.composite().standard() == bar_type.standard()
1581                })
1582                .filter_map(|candidate| {
1583                    let key = bar_aggregator_key(*candidate, aggregator_request_id);
1584                    self.bar_aggregators.get(&key).cloned()
1585                })
1586                .collect();
1587            let cache = self.cache.clone();
1588            let validate_sequence = self.config.validate_data_sequence;
1589            let handler: Box<dyn FnMut(Bar)> = Box::new(move |bar: Bar| {
1590                process_engine_bar(&cache, validate_sequence, false, bar);
1591
1592                for aggregator in &downstream {
1593                    aggregator.borrow_mut().handle_bar(bar);
1594                }
1595            });
1596
1597            aggregator.borrow_mut().set_historical_mode(true, handler);
1598        }
1599    }
1600
1601    fn cleanup_request_bar_aggregators(&mut self, request_id: &UUID4) -> bool {
1602        let Some(state) = self.request_bar_aggregations.remove(request_id) else {
1603            return false;
1604        };
1605        let aggregator_request_id = state.aggregator_request_id(*request_id);
1606
1607        for bar_type in state.bar_types {
1608            let key = bar_aggregator_key(bar_type, aggregator_request_id);
1609            let has_live_handlers =
1610                state.update_subscriptions && self.bar_aggregator_handlers.contains_key(&key);
1611            let keep_running = if has_live_handlers {
1612                match self.setup_bar_aggregator(bar_type, false, aggregator_request_id) {
1613                    Ok(()) => true,
1614                    Err(e) => {
1615                        log::error!(
1616                            "Error starting live request bar aggregator for {bar_type}: {e}"
1617                        );
1618                        false
1619                    }
1620                }
1621            } else {
1622                false
1623            };
1624
1625            if let Some(aggregator) = self.bar_aggregators.get(&key) {
1626                aggregator.borrow_mut().set_is_running(keep_running);
1627            }
1628
1629            if !state.update_subscriptions
1630                && let Err(e) = self.stop_bar_aggregator(bar_type, aggregator_request_id)
1631            {
1632                log::error!("Error stopping request bar aggregator for {bar_type}: {e}");
1633            }
1634        }
1635
1636        true
1637    }
1638
1639    /// Processes a dynamically-typed data message.
1640    ///
1641    /// Currently supports `InstrumentAny`, funding rates, instrument status, option greeks, and
1642    /// custom data; unrecognized types are logged as errors.
1643    pub fn process(&mut self, data: &dyn Any) {
1644        self.data_count += 1;
1645        // Dynamically-typed entry point: `FundingRateUpdate`, `InstrumentStatus`, `OptionGreeks`,
1646        // and custom data are also `Data` enum variants handled in `process_data`, but can arrive
1647        // here as typed data, whereas `InstrumentAny` is not a `Data` variant.
1648        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
1649            self.handle_instrument(instrument);
1650        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
1651            self.handle_funding_rate(*funding_rate);
1652        } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
1653            self.handle_instrument_status(*status);
1654        } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
1655            self.cache.borrow_mut().add_option_greeks(*option_greeks);
1656            let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
1657            msgbus::publish_option_greeks(topic, option_greeks);
1658            self.drain_deferred_commands();
1659        } else if let Some(custom) = data.downcast_ref::<CustomData>() {
1660            self.handle_custom_data(custom);
1661        } else {
1662            log::error!("Cannot process data {data:?}, type is unrecognized");
1663        }
1664    }
1665
1666    /// Processes a `Data` enum instance, dispatching to live handlers.
1667    pub fn process_data(&mut self, data: Data) {
1668        #[cfg(feature = "defi")]
1669        let data = match data {
1670            Data::Defi(defi) => {
1671                self.process_defi_data(*defi);
1672                return;
1673            }
1674            data => data,
1675        };
1676
1677        self.data_count += 1;
1678
1679        match data {
1680            Data::Delta(delta) => self.handle_delta(delta),
1681            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
1682            Data::Depth10(depth) => self.handle_depth10(*depth),
1683            Data::Quote(quote) => {
1684                self.handle_quote(quote);
1685                self.drain_deferred_commands();
1686            }
1687            Data::Trade(trade) => self.handle_trade(trade),
1688            Data::Bar(bar) => self.handle_bar(bar),
1689            Data::MarkPriceUpdate(mark_price) => {
1690                self.handle_mark_price(mark_price);
1691                self.drain_deferred_commands();
1692            }
1693            Data::IndexPriceUpdate(index_price) => {
1694                self.handle_index_price(index_price);
1695                self.drain_deferred_commands();
1696            }
1697            Data::FundingRateUpdate(funding_rate) => {
1698                self.handle_funding_rate(funding_rate);
1699                self.drain_deferred_commands();
1700            }
1701            Data::InstrumentStatus(status) => {
1702                self.handle_instrument_status(status);
1703                self.drain_deferred_commands();
1704            }
1705            Data::OptionGreeks(greeks) => {
1706                self.cache.borrow_mut().add_option_greeks(greeks);
1707                let topic = switchboard::get_option_greeks_topic(greeks.instrument_id);
1708                msgbus::publish_option_greeks(topic, &greeks);
1709                self.drain_deferred_commands();
1710            }
1711            Data::InstrumentClose(close) => self.handle_instrument_close(close),
1712            Data::Custom(custom) => self.handle_custom_data(&custom),
1713            #[cfg(feature = "defi")]
1714            Data::Defi(_) => unreachable!("handled before market data dispatch"),
1715        }
1716    }
1717
1718    /// Processes a `Data` instance through the pipeline bus path.
1719    ///
1720    /// Pipeline mode publishes each item on the `data.pipeline.` topic family and gates cache
1721    /// writes on `disable_historical_cache`. None of the live-only side effects (synthetic
1722    /// republish, option-chain expiry, depth-derived quotes, deferred-command drains) run in this
1723    /// path.
1724    pub fn process_pipeline(&mut self, data: Data) {
1725        #[cfg(feature = "defi")]
1726        let data = match data {
1727            Data::Defi(defi) => {
1728                self.process_defi_data(*defi);
1729                return;
1730            }
1731            data => data,
1732        };
1733
1734        self.data_count += 1;
1735
1736        match data {
1737            Data::Delta(delta) => self.handle_delta_pipeline(delta),
1738            Data::Deltas(deltas) => self.handle_deltas_pipeline(&deltas.into_inner()),
1739            Data::Depth10(depth) => self.handle_depth10_pipeline(*depth),
1740            Data::Quote(quote) => self.handle_quote_pipeline(quote),
1741            Data::Trade(trade) => self.handle_trade_pipeline(trade),
1742            Data::Bar(bar) => self.handle_bar_pipeline(bar),
1743            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price_pipeline(mark_price),
1744            Data::IndexPriceUpdate(index_price) => self.handle_index_price_pipeline(index_price),
1745            Data::FundingRateUpdate(funding_rate) => {
1746                self.handle_funding_rate_pipeline(funding_rate);
1747            }
1748            Data::InstrumentStatus(status) => self.handle_instrument_status_pipeline(status),
1749            Data::OptionGreeks(greeks) => self.handle_option_greeks_pipeline(greeks),
1750            Data::InstrumentClose(close) => self.handle_instrument_close_pipeline(close),
1751            Data::Custom(custom) => self.handle_custom_data_pipeline(&custom),
1752            #[cfg(feature = "defi")]
1753            Data::Defi(_) => unreachable!("handled before market data dispatch"),
1754        }
1755    }
1756
1757    /// Processes a `DataResponse`, handling and publishing the response message.
1758    pub fn response(&mut self, mut resp: DataResponse) {
1759        if log::log_enabled!(log::Level::Debug) {
1760            let correlation_id = resp.correlation_id();
1761            match resp.record_count() {
1762                Some(count) => log::debug!(
1763                    "{RECV}{RES} {} correlation_id={correlation_id} records={count}",
1764                    resp.kind(),
1765                ),
1766                None => log::debug!(
1767                    "{RECV}{RES} {} correlation_id={correlation_id}",
1768                    resp.kind(),
1769                ),
1770            }
1771        }
1772        log::trace!("{RECV}{RES} {resp:?}");
1773
1774        self.response_count += 1;
1775
1776        resp.trim_to_bounds();
1777
1778        if let Some(parent_id) = continuous_future_parent_request_id(response_params(&resp)) {
1779            self.handle_continuous_future_child_response(parent_id, &resp);
1780            return;
1781        }
1782
1783        let Some(resp) = self.handle_request_pipeline_response(resp) else {
1784            return;
1785        };
1786
1787        if let Some(parent_id) = self
1788            .time_range_pipeline_parent_request_id
1789            .remove(resp.correlation_id())
1790        {
1791            self.handle_time_range_pipeline_child_response(parent_id, &resp);
1792            return;
1793        }
1794
1795        if self
1796            .parent_join_request_id
1797            .contains_key(resp.correlation_id())
1798        {
1799            self.finalize_request_join(resp);
1800            return;
1801        }
1802
1803        let correlation_id = *resp.correlation_id();
1804
1805        match &resp {
1806            DataResponse::Instrument(r) => {
1807                self.handle_instrument_response(r.data.clone());
1808            }
1809            DataResponse::Instruments(r) => {
1810                self.handle_instruments(&r.data);
1811            }
1812            DataResponse::Quotes(r) => {
1813                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1814                    self.handle_quotes(&r.data);
1815                }
1816            }
1817            DataResponse::Trades(r) => {
1818                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1819                    self.handle_trades(&r.data);
1820                }
1821            }
1822            DataResponse::FundingRates(r) => {
1823                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1824                    self.handle_funding_rates(&r.data);
1825                }
1826            }
1827            DataResponse::Bars(r) => {
1828                if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
1829                    self.handle_bars(&r.data);
1830                }
1831            }
1832            DataResponse::Book(r) => self.handle_book_response(&r.data),
1833            DataResponse::BookDeltas(r) => {
1834                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1835                    self.handle_book_deltas_response(r);
1836                }
1837            }
1838            DataResponse::BookDepth(r) => {
1839                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1840                    self.handle_book_depth_response(r);
1841                }
1842            }
1843            DataResponse::ForwardPrices(r) => {
1844                self.process_request_bar_aggregation_response(&resp);
1845                return self.handle_forward_prices_response(&correlation_id, r);
1846            }
1847            DataResponse::Data(_) => {}
1848        }
1849
1850        self.process_request_bar_aggregation_response(&resp);
1851
1852        msgbus::send_response(&correlation_id, &resp);
1853    }
1854
1855    /// Registers a parent request whose response will be rebuilt from `n_components` leg responses.
1856    pub fn new_request_pipeline(&mut self, parent: RequestCommand, n_components: usize) {
1857        let parent_id = *parent.request_id();
1858        self.request_pipeline_n_components
1859            .insert(parent_id, n_components);
1860        self.request_pipeline_parent_request
1861            .insert(parent_id, parent);
1862        self.request_pipeline_responses
1863            .insert(parent_id, Vec::with_capacity(n_components));
1864    }
1865
1866    /// Registers a leg `request_id` as a child of the pipeline keyed by `parent_id`.
1867    pub fn register_request_pipeline_leg(&mut self, leg_id: UUID4, parent_id: UUID4) {
1868        self.request_pipeline_parent_request_id
1869            .insert(leg_id, parent_id);
1870    }
1871
1872    /// Fans a leg response into its parent pipeline and emits the rebuilt response when all legs arrive.
1873    ///
1874    /// Responses whose `correlation_id` is not part of any pipeline pass through unchanged.
1875    /// While accumulating legs, returns `None` so the caller skips further response handling.
1876    fn handle_request_pipeline_response(&mut self, resp: DataResponse) -> Option<DataResponse> {
1877        let leg_id = *resp.correlation_id();
1878        let Some(parent_id) = self.request_pipeline_parent_request_id.remove(&leg_id) else {
1879            return Some(resp);
1880        };
1881
1882        let Some(buf) = self.request_pipeline_responses.get_mut(&parent_id) else {
1883            log::error!("Pipeline response buffer missing for parent {parent_id} (leg {leg_id})");
1884            return Some(resp);
1885        };
1886        buf.push(resp);
1887
1888        let expected = self.request_pipeline_n_components.get(&parent_id).copied();
1889        let received = buf.len();
1890        match expected {
1891            Some(n) if received < n => return None,
1892            Some(_) => {}
1893            None => {
1894                log::error!("Pipeline n_components missing for parent {parent_id}");
1895                return None;
1896            }
1897        }
1898
1899        let mut legs = self.request_pipeline_responses.remove(&parent_id)?;
1900        self.request_pipeline_n_components.remove(&parent_id);
1901        let parent = self.request_pipeline_parent_request.remove(&parent_id);
1902
1903        for leg in &mut legs {
1904            leg.trim_to_bounds();
1905        }
1906
1907        let (parent_start, parent_end) = parent_request_window(parent.as_ref());
1908        let rebuilt = rebuild_pipeline_response(parent_id, parent.as_ref(), legs);
1909
1910        // If the rebuild failed (mixed-variant or unsupported-variant legs), drop the
1911        // associated `RequestJoin` so its staging maps do not leak. Without this the
1912        // original join request stays in `pending_join_requests` and its
1913        // `parent_join_request_id` mapping stays live, neither of which will ever
1914        // resolve through normal flow.
1915        if rebuilt.is_none()
1916            && let Some(original_id) = self.parent_join_request_id.remove(&parent_id)
1917        {
1918            self.pending_join_requests.remove(&original_id);
1919            log::error!(
1920                "Dropped RequestJoin {original_id} because pipeline rebuild failed for dated parent {parent_id}"
1921            );
1922        }
1923
1924        let mut rebuilt = rebuilt?;
1925
1926        // Replay must run before `trim_to_bounds`, which would otherwise discard the pre-start
1927        // deltas the replay folds into the snapshot.
1928        if let DataResponse::BookDeltas(r) = &mut rebuilt {
1929            self.book_deltas_snapshot_replay(r);
1930        }
1931
1932        // Trim against the parent window only when the parent supplied one. With no
1933        // parent window the rebuilt response inherits the first leg's bounds; legs are
1934        // already trimmed against their own bounds at the top of `response()`, so a
1935        // second pass would discard data from later legs whose bounds the parent never
1936        // constrained.
1937        if parent_start.is_some() || parent_end.is_some() {
1938            rebuilt.trim_to_bounds();
1939        }
1940
1941        Some(rebuilt)
1942    }
1943
1944    // Replays a day-start snapshot forward to the request's original start: when the first delta
1945    // is an F_SNAPSHOT on a UTC day boundary, rebuilds the book from the pre-start deltas and
1946    // replaces them with one snapshot keyed at the original start, then forwards the rest.
1947    // Mirrors the Cython `_handle_order_book_deltas_snapshot_replay`.
1948    fn book_deltas_snapshot_replay(&self, resp: &mut BookDeltasResponse) {
1949        let Some(original_start_ns) = resp.start else {
1950            return;
1951        };
1952
1953        let Some(first) = resp.data.first().copied() else {
1954            return;
1955        };
1956
1957        if !RecordFlag::F_SNAPSHOT.matches(first.flags) {
1958            return;
1959        }
1960
1961        if first.ts_init.as_u64() % NANOSECONDS_IN_DAY != 0 {
1962            return;
1963        }
1964
1965        // Nothing to fast-forward when the request starts at or before the day-start snapshot
1966        if original_start_ns <= first.ts_init {
1967            return;
1968        }
1969
1970        if self
1971            .cache
1972            .borrow()
1973            .instrument(&resp.instrument_id)
1974            .is_none()
1975        {
1976            log::warn!(
1977                "Instrument {} not found in cache, skipping snapshot replay",
1978                resp.instrument_id,
1979            );
1980            return;
1981        }
1982
1983        let book_type = resp
1984            .params
1985            .as_ref()
1986            .and_then(|p| p.get_str("book_type"))
1987            .and_then(|s| BookType::from_str(s).ok())
1988            .unwrap_or(BookType::L2_MBP);
1989
1990        let mut book = OrderBook::new(resp.instrument_id, book_type);
1991        let mut before: Vec<OrderBookDelta> = Vec::new();
1992        let mut after: Vec<OrderBookDelta> = Vec::new();
1993        let mut last_applied_ts: Option<UnixNanos> = None;
1994        let mut crossed = false;
1995
1996        for delta in &resp.data {
1997            if crossed {
1998                after.push(*delta);
1999            } else {
2000                before.push(*delta);
2001                if delta.ts_init >= original_start_ns {
2002                    crossed = true;
2003                    last_applied_ts = Some(delta.ts_init);
2004                }
2005            }
2006        }
2007
2008        if !before.is_empty() {
2009            if last_applied_ts.is_none() {
2010                last_applied_ts = before.last().map(|d| d.ts_init);
2011            }
2012
2013            let batch = OrderBookDeltas::new(resp.instrument_id, before);
2014            if let Err(e) = book.apply_deltas(&batch) {
2015                log::error!(
2016                    "Failed to rebuild book for snapshot replay on {}: {e}",
2017                    resp.instrument_id,
2018                );
2019                return;
2020            }
2021        }
2022
2023        let Some(last_ts) = last_applied_ts else {
2024            return;
2025        };
2026
2027        let snapshot_ts = last_ts.max(original_start_ns);
2028        let mut new_data = book.to_deltas(snapshot_ts, snapshot_ts).deltas;
2029        new_data.extend(after);
2030        resp.data = new_data;
2031    }
2032
2033    fn handle_request_join(&mut self, req: RequestJoin) -> anyhow::Result<()> {
2034        if has_time_range_pipeline_params(req.params.as_ref()) {
2035            return self.execute_time_range_pipeline_request(RequestCommand::Join(req));
2036        }
2037
2038        let now_ns = self.clock.borrow().timestamp_ns();
2039        let now_dt = now_ns.to_datetime_utc();
2040        let zero = chrono::DateTime::<chrono::Utc>::from_timestamp_nanos(0);
2041        let start = req.start.unwrap_or(zero).min(now_dt);
2042        let end = req.end.unwrap_or(now_dt).min(now_dt);
2043        let dated = req.with_dates(Some(start), Some(end), now_ns);
2044
2045        let original_id = req.request_id;
2046        let dated_id = dated.request_id;
2047
2048        self.pending_join_requests.insert(original_id, req);
2049        self.parent_join_request_id.insert(dated_id, original_id);
2050
2051        let leg_ids: Vec<UUID4> = dated.request_ids.clone();
2052        self.new_request_pipeline(RequestCommand::Join(dated), leg_ids.len());
2053        for leg_id in leg_ids {
2054            self.register_request_pipeline_leg(leg_id, dated_id);
2055        }
2056
2057        Ok(())
2058    }
2059
2060    fn finalize_request_join(&mut self, resp: DataResponse) {
2061        let dated_id = *resp.correlation_id();
2062        let Some(original_id) = self.parent_join_request_id.remove(&dated_id) else {
2063            log::error!("parent_join_request_id missing for dated correlation {dated_id}");
2064            return;
2065        };
2066
2067        let Some(original) = self.pending_join_requests.remove(&original_id) else {
2068            log::error!("pending_join_requests missing for original {original_id}");
2069            return;
2070        };
2071
2072        let now_ns = self.clock.borrow().timestamp_ns();
2073
2074        // Empty leg responses fire each leg's callback so caller-side request
2075        // workflows clean up. Per-leg metadata is reconstructed from the
2076        // rebuilt parent response and may not match a leg's original
2077        // instrument_id/bar_type when the join spans heterogeneous legs;
2078        // tracked as a follow-up in #5 (needs an in-flight leg-request cache).
2079        for leg_request_id in &original.request_ids {
2080            let empty = empty_response_like(&resp, *leg_request_id, now_ns);
2081            msgbus::send_response(leg_request_id, &empty);
2082        }
2083
2084        // Route the final join response through the normal response path so
2085        // bounds-trim against the parent window runs and the per-variant
2086        // handlers (cache writes, request bar aggregators) fire. The pipeline
2087        // and join staging maps for this request have already been popped, so
2088        // the recursive call cannot re-enter either gate.
2089        let final_resp = rebind_response_correlation(resp, original_id);
2090        self.response(final_resp);
2091    }
2092
2093    fn process_request_bar_aggregation_response(&mut self, resp: &DataResponse) {
2094        let correlation_id = *resp.correlation_id();
2095        let Some(state) = self.request_bar_aggregations.get(&correlation_id).cloned() else {
2096            return;
2097        };
2098
2099        match resp {
2100            DataResponse::Quotes(r) => {
2101                for quote in &r.data {
2102                    self.update_request_bar_aggregators_from_quote(&state, correlation_id, *quote);
2103                }
2104            }
2105            DataResponse::Trades(r) => {
2106                for trade in &r.data {
2107                    self.update_request_bar_aggregators_from_trade(&state, correlation_id, *trade);
2108                }
2109            }
2110            DataResponse::Bars(r) => {
2111                for bar in &r.data {
2112                    self.update_request_bar_aggregators_from_bar(&state, correlation_id, *bar);
2113                }
2114            }
2115            _ => {}
2116        }
2117
2118        self.cleanup_request_bar_aggregators(&correlation_id);
2119    }
2120
2121    fn handle_continuous_future_child_response(&mut self, parent_id: UUID4, resp: &DataResponse) {
2122        if !self.continuous_future_requests.contains_key(&parent_id) {
2123            log::error!("No active continuous future request for child response {parent_id}");
2124            return;
2125        }
2126
2127        let data_count = response_params(resp)
2128            .and_then(|params| params.get("data_count"))
2129            .and_then(serde_json::Value::as_u64)
2130            .or_else(|| resp.record_count().map(|count| count as u64))
2131            .unwrap_or(0);
2132
2133        if let Some(state) = self.continuous_future_requests.get_mut(&parent_id) {
2134            state.data_count += data_count;
2135        }
2136
2137        match resp {
2138            DataResponse::Quotes(r) => {
2139                if !log_if_empty_response(&r.data, &r.instrument_id, resp.correlation_id()) {
2140                    self.handle_quotes(&r.data);
2141                }
2142            }
2143            DataResponse::Trades(r) => {
2144                if !log_if_empty_response(&r.data, &r.instrument_id, resp.correlation_id()) {
2145                    self.handle_trades(&r.data);
2146                }
2147            }
2148            DataResponse::Bars(r) => {
2149                if !log_if_empty_response(&r.data, &r.bar_type, resp.correlation_id()) {
2150                    self.handle_bars(&r.data);
2151                }
2152            }
2153            _ => {
2154                log::error!(
2155                    "Continuous future child response {parent_id} must contain quotes, trades, or bars"
2156                );
2157                return;
2158            }
2159        }
2160
2161        self.process_continuous_future_aggregation_response(parent_id, resp);
2162        if let Err(e) = self.dispatch_next_continuous_future_segment(parent_id) {
2163            log::error!("Error dispatching continuous future segment for {parent_id}: {e}");
2164            self.emit_empty_continuous_future_response(parent_id);
2165        }
2166    }
2167
2168    fn process_continuous_future_aggregation_response(
2169        &self,
2170        parent_id: UUID4,
2171        resp: &DataResponse,
2172    ) {
2173        let Some(state) = self.continuous_future_requests.get(&parent_id) else {
2174            return;
2175        };
2176        let primary_bar_type = state.request.primary_bar_type;
2177        let aggregator_request_id = Some(parent_id);
2178
2179        match resp {
2180            DataResponse::Quotes(r) => {
2181                for quote in &r.data {
2182                    self.update_request_bar_aggregator(
2183                        primary_bar_type,
2184                        aggregator_request_id,
2185                        |aggregator| {
2186                            aggregator.handle_quote(*quote);
2187                        },
2188                    );
2189                }
2190            }
2191            DataResponse::Trades(r) => {
2192                for trade in &r.data {
2193                    self.update_request_bar_aggregator(
2194                        primary_bar_type,
2195                        aggregator_request_id,
2196                        |aggregator| {
2197                            aggregator.handle_trade(*trade);
2198                        },
2199                    );
2200                }
2201            }
2202            DataResponse::Bars(r) => {
2203                for bar in &r.data {
2204                    self.update_request_bar_aggregator(
2205                        primary_bar_type,
2206                        aggregator_request_id,
2207                        |aggregator| {
2208                            aggregator.handle_bar(*bar);
2209                        },
2210                    );
2211                }
2212            }
2213            _ => {}
2214        }
2215    }
2216
2217    fn update_request_bar_aggregators_from_quote(
2218        &self,
2219        state: &RequestBarAggregation,
2220        request_id: UUID4,
2221        quote: QuoteTick,
2222    ) {
2223        let aggregator_request_id = state.aggregator_request_id(request_id);
2224
2225        for bar_type in &state.bar_types {
2226            if bar_type.is_composite()
2227                || bar_type.instrument_id() != quote.instrument_id
2228                || bar_type.spec().price_type == PriceType::Last
2229            {
2230                continue;
2231            }
2232
2233            self.update_request_bar_aggregator(*bar_type, aggregator_request_id, |aggregator| {
2234                aggregator.handle_quote(quote);
2235            });
2236        }
2237    }
2238
2239    fn update_request_bar_aggregators_from_trade(
2240        &self,
2241        state: &RequestBarAggregation,
2242        request_id: UUID4,
2243        trade: TradeTick,
2244    ) {
2245        let aggregator_request_id = state.aggregator_request_id(request_id);
2246
2247        for bar_type in &state.bar_types {
2248            if bar_type.is_composite()
2249                || bar_type.instrument_id() != trade.instrument_id
2250                || bar_type.spec().price_type != PriceType::Last
2251            {
2252                continue;
2253            }
2254
2255            self.update_request_bar_aggregator(*bar_type, aggregator_request_id, |aggregator| {
2256                aggregator.handle_trade(trade);
2257            });
2258        }
2259    }
2260
2261    fn update_request_bar_aggregators_from_bar(
2262        &self,
2263        state: &RequestBarAggregation,
2264        request_id: UUID4,
2265        bar: Bar,
2266    ) {
2267        let aggregator_request_id = state.aggregator_request_id(request_id);
2268
2269        for bar_type in &state.bar_types {
2270            if !bar_type.is_composite()
2271                || bar_type.composite().standard() != bar.bar_type.standard()
2272            {
2273                continue;
2274            }
2275
2276            self.update_request_bar_aggregator(*bar_type, aggregator_request_id, |aggregator| {
2277                aggregator.handle_bar(bar);
2278            });
2279        }
2280    }
2281
2282    fn update_request_bar_aggregator<F>(
2283        &self,
2284        bar_type: BarType,
2285        request_id: Option<UUID4>,
2286        update: F,
2287    ) where
2288        F: FnOnce(&mut dyn BarAggregator),
2289    {
2290        let key = bar_aggregator_key(bar_type, request_id);
2291        let Some(aggregator) = self.bar_aggregators.get(&key) else {
2292            log::error!("Cannot update request bar aggregator: no aggregator found for {bar_type}");
2293            return;
2294        };
2295
2296        update(aggregator.borrow_mut().as_mut());
2297    }
2298
2299    #[inline]
2300    fn pipeline_cache_writes_allowed(&self) -> bool {
2301        !self.config.disable_historical_cache
2302    }
2303
2304    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
2305        log::debug!("Handling instrument: {}", instrument.id());
2306
2307        if let Err(e) = self
2308            .cache
2309            .as_ref()
2310            .borrow_mut()
2311            .add_instrument(instrument.clone())
2312        {
2313            log_error_on_cache_insert(&e);
2314        }
2315
2316        let topic = switchboard::get_instrument_topic(instrument.id());
2317        log::debug!("Publishing instrument to topic: {topic}");
2318        msgbus::publish_instrument(topic, instrument);
2319
2320        self.update_option_chains(instrument);
2321    }
2322
2323    fn update_option_chains(&mut self, instrument: &InstrumentAny) {
2324        let Some(underlying) = instrument.underlying() else {
2325            return;
2326        };
2327        let Some(expiration_ns) = instrument.expiration_ns() else {
2328            return;
2329        };
2330        let Some(strike) = instrument.strike_price() else {
2331            return;
2332        };
2333        let Some(kind) = instrument.option_kind() else {
2334            return;
2335        };
2336
2337        let venue = instrument.id().venue;
2338        let settlement = instrument.settlement_currency().code;
2339        let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
2340
2341        // Clone Rc to release borrow on self.option_chain_managers before accessing self.clients
2342        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
2343            return;
2344        };
2345
2346        let clock = self.clock.clone();
2347        let client = self.get_command_client(None, Some(&venue));
2348
2349        if manager_rc
2350            .borrow_mut()
2351            .add_instrument(instrument.id(), strike, kind, client, &clock)
2352        {
2353            self.option_chain_instrument_index
2354                .insert(instrument.id(), series_id);
2355        }
2356    }
2357
2358    fn handle_delta(&mut self, delta: OrderBookDelta) {
2359        let deltas = if self.config.buffer_deltas {
2360            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
2361                buffered_deltas.deltas.push(delta);
2362                buffered_deltas.flags = delta.flags;
2363                buffered_deltas.sequence = delta.sequence;
2364                buffered_deltas.ts_event = delta.ts_event;
2365                buffered_deltas.ts_init = delta.ts_init;
2366            } else {
2367                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
2368                self.buffered_deltas_map
2369                    .insert(delta.instrument_id, buffered_deltas);
2370            }
2371
2372            if !RecordFlag::F_LAST.matches(delta.flags) {
2373                return; // Not the last delta for event
2374            }
2375
2376            self.buffered_deltas_map
2377                .remove(&delta.instrument_id)
2378                .expect("buffered deltas exist")
2379        } else {
2380            OrderBookDeltas::new(delta.instrument_id, vec![delta])
2381        };
2382
2383        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
2384        msgbus::publish_deltas(topic, &deltas);
2385    }
2386
2387    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
2388        if self.config.buffer_deltas {
2389            let instrument_id = deltas.instrument_id;
2390
2391            for delta in deltas.deltas {
2392                if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
2393                    buffered_deltas.deltas.push(delta);
2394                    buffered_deltas.flags = delta.flags;
2395                    buffered_deltas.sequence = delta.sequence;
2396                    buffered_deltas.ts_event = delta.ts_event;
2397                    buffered_deltas.ts_init = delta.ts_init;
2398                } else {
2399                    let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
2400                    self.buffered_deltas_map
2401                        .insert(instrument_id, buffered_deltas);
2402                }
2403
2404                if RecordFlag::F_LAST.matches(delta.flags) {
2405                    let deltas_to_publish = self
2406                        .buffered_deltas_map
2407                        .remove(&instrument_id)
2408                        .expect("buffered deltas exist");
2409                    let topic = switchboard::get_book_deltas_topic(instrument_id);
2410                    msgbus::publish_deltas(topic, &deltas_to_publish);
2411                }
2412            }
2413        } else {
2414            let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
2415            msgbus::publish_deltas(topic, &deltas);
2416        }
2417    }
2418
2419    fn handle_depth10(&self, depth: OrderBookDepth10) {
2420        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
2421        msgbus::publish_depth10(topic, &depth);
2422
2423        if self.config.emit_quotes_from_book_depths
2424            && let Some(quote) = derive_quote_from_depth(&depth)
2425        {
2426            book::publish_quote_if_changed(&self.cache, quote);
2427        }
2428    }
2429
2430    fn handle_quote(&self, quote: QuoteTick) {
2431        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
2432            log_error_on_cache_insert(&e);
2433        }
2434
2435        for synthetic_quote in self.synthetic_quotes_from_quote(quote) {
2436            let topic = switchboard::get_quotes_topic(synthetic_quote.instrument_id);
2437            msgbus::publish_quote(topic, &synthetic_quote);
2438        }
2439
2440        let topic = switchboard::get_quotes_topic(quote.instrument_id);
2441        msgbus::publish_quote(topic, &quote);
2442    }
2443
2444    fn handle_trade(&self, trade: TradeTick) {
2445        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
2446            log_error_on_cache_insert(&e);
2447        }
2448
2449        for synthetic_trade in self.synthetic_trades_from_trade(trade) {
2450            let topic = switchboard::get_trades_topic(synthetic_trade.instrument_id);
2451            msgbus::publish_trade(topic, &synthetic_trade);
2452        }
2453
2454        let topic = switchboard::get_trades_topic(trade.instrument_id);
2455        msgbus::publish_trade(topic, &trade);
2456    }
2457
2458    fn synthetic_quotes_from_quote(&self, update: QuoteTick) -> Vec<QuoteTick> {
2459        let Some(synthetics) = self.synthetic_quote_feeds.get(&update.instrument_id) else {
2460            return Vec::new();
2461        };
2462
2463        synthetics
2464            .iter()
2465            .filter_map(|synthetic| self.synthetic_quote_from_update(synthetic, update))
2466            .collect()
2467    }
2468
2469    fn synthetic_quote_from_update(
2470        &self,
2471        synthetic: &SyntheticInstrument,
2472        update: QuoteTick,
2473    ) -> Option<QuoteTick> {
2474        let cache = self.cache.borrow();
2475        let mut bid_inputs = Vec::with_capacity(synthetic.components.len());
2476        let mut ask_inputs = Vec::with_capacity(synthetic.components.len());
2477
2478        for instrument_id in &synthetic.components {
2479            let (bid_price, ask_price) = if *instrument_id == update.instrument_id {
2480                (update.bid_price, update.ask_price)
2481            } else {
2482                let Some(component_quote) = cache.quote(instrument_id) else {
2483                    log::warn!(
2484                        "Cannot calculate synthetic instrument {} price, no quotes for {} yet",
2485                        synthetic.id,
2486                        instrument_id,
2487                    );
2488                    return None;
2489                };
2490                (component_quote.bid_price, component_quote.ask_price)
2491            };
2492
2493            bid_inputs.push(bid_price.as_f64());
2494            ask_inputs.push(ask_price.as_f64());
2495        }
2496        drop(cache);
2497
2498        let bid_price = match synthetic.calculate(&bid_inputs) {
2499            Ok(price) => price,
2500            Err(e) => {
2501                log::error!(
2502                    "Cannot calculate synthetic instrument {} bid price: {e}",
2503                    synthetic.id
2504                );
2505                return None;
2506            }
2507        };
2508        let ask_price = match synthetic.calculate(&ask_inputs) {
2509            Ok(price) => price,
2510            Err(e) => {
2511                log::error!(
2512                    "Cannot calculate synthetic instrument {} ask price: {e}",
2513                    synthetic.id
2514                );
2515                return None;
2516            }
2517        };
2518        let size_one = Quantity::from(1);
2519
2520        Some(QuoteTick::new(
2521            synthetic.id,
2522            bid_price,
2523            ask_price,
2524            size_one,
2525            size_one,
2526            update.ts_event,
2527            self.clock.borrow().timestamp_ns(),
2528        ))
2529    }
2530
2531    fn synthetic_trades_from_trade(&self, update: TradeTick) -> Vec<TradeTick> {
2532        let Some(synthetics) = self.synthetic_trade_feeds.get(&update.instrument_id) else {
2533            return Vec::new();
2534        };
2535
2536        synthetics
2537            .iter()
2538            .filter_map(|synthetic| self.synthetic_trade_from_update(synthetic, update))
2539            .collect()
2540    }
2541
2542    fn synthetic_trade_from_update(
2543        &self,
2544        synthetic: &SyntheticInstrument,
2545        update: TradeTick,
2546    ) -> Option<TradeTick> {
2547        let cache = self.cache.borrow();
2548        let mut inputs = Vec::with_capacity(synthetic.components.len());
2549
2550        for instrument_id in &synthetic.components {
2551            let price = if *instrument_id == update.instrument_id {
2552                update.price
2553            } else {
2554                let Some(component_trade) = cache.trade(instrument_id) else {
2555                    log::warn!(
2556                        "Cannot calculate synthetic instrument {} price, no trades for {} yet",
2557                        synthetic.id,
2558                        instrument_id,
2559                    );
2560                    return None;
2561                };
2562                component_trade.price
2563            };
2564
2565            inputs.push(price.as_f64());
2566        }
2567        drop(cache);
2568
2569        let price = match synthetic.calculate(&inputs) {
2570            Ok(price) => price,
2571            Err(e) => {
2572                log::error!(
2573                    "Cannot calculate synthetic instrument {} trade price: {e}",
2574                    synthetic.id
2575                );
2576                return None;
2577            }
2578        };
2579
2580        Some(TradeTick::new(
2581            synthetic.id,
2582            price,
2583            Quantity::from(1),
2584            update.aggressor_side,
2585            update.trade_id,
2586            update.ts_event,
2587            self.clock.borrow().timestamp_ns(),
2588        ))
2589    }
2590
2591    fn handle_bar(&self, bar: Bar) {
2592        process_engine_bar(&self.cache, self.config.validate_data_sequence, true, bar);
2593    }
2594
2595    fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
2596        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
2597            log_error_on_cache_insert(&e);
2598        }
2599
2600        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
2601        msgbus::publish_mark_price(topic, &mark_price);
2602    }
2603
2604    fn handle_index_price(&self, index_price: IndexPriceUpdate) {
2605        if let Err(e) = self
2606            .cache
2607            .as_ref()
2608            .borrow_mut()
2609            .add_index_price(index_price)
2610        {
2611            log_error_on_cache_insert(&e);
2612        }
2613
2614        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
2615        msgbus::publish_index_price(topic, &index_price);
2616    }
2617
2618    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
2619    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
2620        if let Err(e) = self
2621            .cache
2622            .as_ref()
2623            .borrow_mut()
2624            .add_funding_rate(funding_rate)
2625        {
2626            log_error_on_cache_insert(&e);
2627        }
2628
2629        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
2630        msgbus::publish_funding_rate(topic, &funding_rate);
2631    }
2632
2633    fn handle_instrument_status(&mut self, status: InstrumentStatus) {
2634        if let Err(e) = self
2635            .cache
2636            .as_ref()
2637            .borrow_mut()
2638            .add_instrument_status(status)
2639        {
2640            log_error_on_cache_insert(&e);
2641        }
2642
2643        let topic = switchboard::get_instrument_status_topic(status.instrument_id);
2644        msgbus::publish_any(topic, &status);
2645
2646        if self
2647            .option_chain_instrument_index
2648            .contains_key(&status.instrument_id)
2649            && matches!(
2650                status.action,
2651                MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
2652            )
2653        {
2654            self.expire_option_chain_instrument(status.instrument_id);
2655        }
2656    }
2657
2658    /// Removes a settled/expired instrument from its option chain manager.
2659    ///
2660    /// Looks up the owning series via the reverse index, delegates removal to
2661    /// the manager (which unregisters msgbus handlers and pushes deferred wire
2662    /// unsubscribes), then drains those commands. When the series catalog
2663    /// becomes empty, the entire manager is torn down.
2664    fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
2665        let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
2666            return;
2667        };
2668
2669        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
2670            return;
2671        };
2672
2673        let series_empty = manager_rc
2674            .borrow_mut()
2675            .handle_instrument_expired(&instrument_id);
2676
2677        // Drain deferred unsubscribe commands pushed by the manager
2678        self.drain_deferred_commands();
2679
2680        log::info!(
2681            "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
2682        );
2683
2684        if series_empty {
2685            manager_rc.borrow_mut().teardown(&self.clock);
2686            self.option_chain_managers.remove(&series_id);
2687
2688            log::info!("Torn down empty option chain manager for {series_id}");
2689        }
2690    }
2691
2692    fn handle_instrument_close(&self, close: InstrumentClose) {
2693        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
2694        msgbus::publish_any(topic, &close);
2695    }
2696
2697    fn handle_custom_data(&self, custom: &CustomData) {
2698        log::debug!("Processing custom data: {}", custom.data.type_name());
2699        let topic = switchboard::get_custom_topic(&custom.data_type);
2700        msgbus::publish_any(topic, custom);
2701    }
2702
2703    fn handle_delta_pipeline(&self, delta: OrderBookDelta) {
2704        // Pipeline deltas are not buffered; replays arrive pre-batched
2705        let deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
2706        let topic = switchboard::get_pipeline_book_deltas_topic(deltas.instrument_id);
2707        msgbus::publish_deltas(topic, &deltas);
2708    }
2709
2710    fn handle_deltas_pipeline(&self, deltas: &OrderBookDeltas) {
2711        let topic = switchboard::get_pipeline_book_deltas_topic(deltas.instrument_id);
2712        msgbus::publish_deltas(topic, deltas);
2713    }
2714
2715    fn handle_depth10_pipeline(&self, depth: OrderBookDepth10) {
2716        let topic = switchboard::get_pipeline_book_depth10_topic(depth.instrument_id);
2717        msgbus::publish_depth10(topic, &depth);
2718    }
2719
2720    fn handle_quote_pipeline(&self, quote: QuoteTick) {
2721        if self.pipeline_cache_writes_allowed()
2722            && let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote)
2723        {
2724            log_error_on_cache_insert(&e);
2725        }
2726
2727        let topic = switchboard::get_pipeline_quotes_topic(quote.instrument_id);
2728        msgbus::publish_quote(topic, &quote);
2729    }
2730
2731    fn handle_trade_pipeline(&self, trade: TradeTick) {
2732        if self.pipeline_cache_writes_allowed()
2733            && let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade)
2734        {
2735            log_error_on_cache_insert(&e);
2736        }
2737
2738        let topic = switchboard::get_pipeline_trades_topic(trade.instrument_id);
2739        msgbus::publish_trade(topic, &trade);
2740    }
2741
2742    fn handle_bar_pipeline(&self, bar: Bar) {
2743        if !validate_bar_sequence(&self.cache, self.config.validate_data_sequence, &bar) {
2744            return;
2745        }
2746
2747        if self.pipeline_cache_writes_allowed()
2748            && let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar)
2749        {
2750            log_error_on_cache_insert(&e);
2751        }
2752
2753        let topic = switchboard::get_pipeline_bars_topic(bar.bar_type);
2754        msgbus::publish_bar(topic, &bar);
2755    }
2756
2757    fn handle_mark_price_pipeline(&self, mark_price: MarkPriceUpdate) {
2758        if self.pipeline_cache_writes_allowed()
2759            && let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price)
2760        {
2761            log_error_on_cache_insert(&e);
2762        }
2763
2764        let topic = switchboard::get_pipeline_mark_price_topic(mark_price.instrument_id);
2765        msgbus::publish_mark_price(topic, &mark_price);
2766    }
2767
2768    fn handle_index_price_pipeline(&self, index_price: IndexPriceUpdate) {
2769        if self.pipeline_cache_writes_allowed()
2770            && let Err(e) = self
2771                .cache
2772                .as_ref()
2773                .borrow_mut()
2774                .add_index_price(index_price)
2775        {
2776            log_error_on_cache_insert(&e);
2777        }
2778
2779        let topic = switchboard::get_pipeline_index_price_topic(index_price.instrument_id);
2780        msgbus::publish_index_price(topic, &index_price);
2781    }
2782
2783    fn handle_funding_rate_pipeline(&self, funding_rate: FundingRateUpdate) {
2784        if self.pipeline_cache_writes_allowed()
2785            && let Err(e) = self
2786                .cache
2787                .as_ref()
2788                .borrow_mut()
2789                .add_funding_rate(funding_rate)
2790        {
2791            log_error_on_cache_insert(&e);
2792        }
2793
2794        let topic = switchboard::get_pipeline_funding_rate_topic(funding_rate.instrument_id);
2795        msgbus::publish_funding_rate(topic, &funding_rate);
2796    }
2797
2798    fn handle_instrument_status_pipeline(&self, status: InstrumentStatus) {
2799        if self.pipeline_cache_writes_allowed()
2800            && let Err(e) = self
2801                .cache
2802                .as_ref()
2803                .borrow_mut()
2804                .add_instrument_status(status)
2805        {
2806            log_error_on_cache_insert(&e);
2807        }
2808
2809        let topic = switchboard::get_pipeline_instrument_status_topic(status.instrument_id);
2810        msgbus::publish_any(topic, &status);
2811    }
2812
2813    fn handle_option_greeks_pipeline(&self, greeks: OptionGreeks) {
2814        if self.pipeline_cache_writes_allowed() {
2815            self.cache.borrow_mut().add_option_greeks(greeks);
2816        }
2817
2818        let topic = switchboard::get_pipeline_option_greeks_topic(greeks.instrument_id);
2819        msgbus::publish_option_greeks(topic, &greeks);
2820    }
2821
2822    fn handle_instrument_close_pipeline(&self, close: InstrumentClose) {
2823        let topic = switchboard::get_pipeline_instrument_close_topic(close.instrument_id);
2824        msgbus::publish_any(topic, &close);
2825    }
2826
2827    fn handle_custom_data_pipeline(&self, custom: &CustomData) {
2828        log::debug!("Pipeline custom data: {}", custom.data.type_name());
2829        let topic = switchboard::get_pipeline_custom_topic(&custom.data_type);
2830        msgbus::publish_any(topic, custom);
2831    }
2832
2833    /// Drains deferred subscribe/unsubscribe commands pushed by option chain
2834    /// managers (or any other component) and executes them against the appropriate
2835    /// data client.
2836    fn drain_deferred_commands(&mut self) {
2837        // Loop because expire_series pushes Unsubscribe commands; converges in <= 3 iterations
2838        loop {
2839            let commands: VecDeque<DeferredCommand> =
2840                std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
2841
2842            if commands.is_empty() {
2843                break;
2844            }
2845
2846            for cmd in commands {
2847                match cmd {
2848                    DeferredCommand::Subscribe(sub) => {
2849                        let client = self.get_command_client(sub.client_id(), sub.venue());
2850                        if let Some(client) = client {
2851                            client.execute_subscribe(sub);
2852                        }
2853                    }
2854                    DeferredCommand::Unsubscribe(unsub) => {
2855                        let client = self.get_command_client(unsub.client_id(), unsub.venue());
2856                        if let Some(client) = client {
2857                            client.execute_unsubscribe(&unsub);
2858                        }
2859                    }
2860                    DeferredCommand::ExpireInstrument(instrument_id) => {
2861                        self.expire_option_chain_instrument(instrument_id);
2862                    }
2863                    DeferredCommand::ExpireSeries(series_id) => {
2864                        self.expire_series(series_id);
2865                    }
2866                }
2867            }
2868        }
2869    }
2870
2871    /// Proactively expires all instruments for a series and tears down the manager.
2872    ///
2873    /// `handle_instrument_expired` removes each instrument from the aggregator and pushes
2874    /// deferred unsubscribe commands. `teardown` then cancels the snapshot timer and clears
2875    /// the handler lists (the aggregator is already empty at that point).
2876    fn expire_series(&mut self, series_id: OptionSeriesId) {
2877        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
2878            return;
2879        };
2880
2881        let instrument_ids: Vec<InstrumentId> = self
2882            .option_chain_instrument_index
2883            .iter()
2884            .filter(|(_, sid)| **sid == series_id)
2885            .map(|(id, _)| *id)
2886            .collect();
2887
2888        for id in &instrument_ids {
2889            self.option_chain_instrument_index.remove(id);
2890            manager_rc.borrow_mut().handle_instrument_expired(id);
2891        }
2892
2893        manager_rc.borrow_mut().teardown(&self.clock);
2894        self.option_chain_managers.remove(&series_id);
2895
2896        log::info!("Proactively torn down expired option chain {series_id}");
2897    }
2898
2899    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<bool> {
2900        if cmd.instrument_id.is_synthetic() {
2901            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
2902        }
2903
2904        // Validate parent shape BEFORE mutating subscription state so a parse
2905        // failure leaves the engine bookkeeping unchanged.
2906        let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
2907
2908        let had_deltas =
2909            self.has_book_delta_subscription_key(cmd.instrument_id, cmd.client_id, cmd.venue);
2910        self.increment_book_delta_subscription(cmd.instrument_id, cmd.client_id, cmd.venue);
2911
2912        if cmd.managed {
2913            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, parent)?;
2914        }
2915
2916        Ok(!had_deltas)
2917    }
2918
2919    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
2920        if cmd.instrument_id.is_synthetic() {
2921            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
2922        }
2923
2924        let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
2925
2926        self.book_depth10_subs.insert(cmd.instrument_id);
2927        if cmd.managed {
2928            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
2929        }
2930
2931        Ok(())
2932    }
2933
2934    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
2935        if cmd.instrument_id.is_synthetic() {
2936            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
2937        }
2938
2939        let parent = resolve_parent_components(&cmd.instrument_id, cmd.params.as_ref())?;
2940
2941        let had_snapshots = self.has_book_snapshot_subscriptions(&cmd.instrument_id);
2942        let inserted = self.increment_book_snapshot_subscription(cmd, parent);
2943
2944        if inserted && !had_snapshots {
2945            // Always run setup so the depth10 handler is registered alongside
2946            // the deltas handler when this is the first snapshot for the id;
2947            // setup_book_updater is idempotent and the typed router dedups
2948            // overlapping subscribes.
2949            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, parent)?;
2950        }
2951
2952        if had_snapshots || self.has_book_delta_subscriptions(&cmd.instrument_id) {
2953            return Ok(());
2954        }
2955
2956        if let Some(client_id) = cmd.client_id.as_ref()
2957            && self.external_clients.contains(client_id)
2958        {
2959            if self.config.debug {
2960                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}");
2961            }
2962            return Ok(());
2963        }
2964
2965        log::debug!(
2966            "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
2967            cmd.instrument_id,
2968            cmd.client_id,
2969            cmd.venue,
2970        );
2971
2972        if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
2973            let deltas_cmd = SubscribeBookDeltas::new(
2974                cmd.instrument_id,
2975                cmd.book_type,
2976                cmd.client_id,
2977                cmd.venue,
2978                UUID4::new(),
2979                cmd.ts_init,
2980                cmd.depth,
2981                true, // managed
2982                Some(cmd.command_id),
2983                cmd.params.clone(),
2984            );
2985            log::debug!(
2986                "Calling client.execute_subscribe for BookDeltas: {}",
2987                cmd.instrument_id
2988            );
2989            client.execute_subscribe(SubscribeCommand::BookDeltas(deltas_cmd));
2990        } else {
2991            log::error!(
2992                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
2993                cmd.client_id,
2994                cmd.venue,
2995            );
2996        }
2997
2998        Ok(())
2999    }
3000
3001    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
3002        match cmd.bar_type.aggregation_source() {
3003            AggregationSource::Internal => self.start_live_bar_aggregator(cmd)?,
3004            AggregationSource::External => {
3005                if cmd.bar_type.instrument_id().is_synthetic() {
3006                    anyhow::bail!(
3007                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
3008                    );
3009                }
3010            }
3011        }
3012
3013        Ok(())
3014    }
3015
3016    fn subscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
3017        let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
3018            log::error!(
3019                "Cannot subscribe to `QuoteTick` data for synthetic instrument {instrument_id}, not found",
3020            );
3021            return;
3022        };
3023
3024        if !self.subscribed_synthetic_quotes.insert(instrument_id) {
3025            return;
3026        }
3027
3028        for component_id in &synthetic.components {
3029            let synthetics = self.synthetic_quote_feeds.entry(*component_id).or_default();
3030            if !synthetics
3031                .iter()
3032                .any(|registered| registered.id == synthetic.id)
3033            {
3034                synthetics.push(synthetic.clone());
3035            }
3036        }
3037    }
3038
3039    fn subscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
3040        let Some(synthetic) = self.cache.borrow().synthetic(&instrument_id).cloned() else {
3041            log::error!(
3042                "Cannot subscribe to `TradeTick` data for synthetic instrument {instrument_id}, not found",
3043            );
3044            return;
3045        };
3046
3047        if !self.subscribed_synthetic_trades.insert(instrument_id) {
3048            return;
3049        }
3050
3051        for component_id in &synthetic.components {
3052            let synthetics = self.synthetic_trade_feeds.entry(*component_id).or_default();
3053            if !synthetics
3054                .iter()
3055                .any(|registered| registered.id == synthetic.id)
3056            {
3057                synthetics.push(synthetic.clone());
3058            }
3059        }
3060    }
3061
3062    fn is_spread_quote_command(
3063        &self,
3064        instrument_id: InstrumentId,
3065        params: Option<&Params>,
3066    ) -> bool {
3067        if !params
3068            .and_then(|params| params.get_bool("aggregate_spread_quotes"))
3069            .unwrap_or(false)
3070        {
3071            return false;
3072        }
3073
3074        self.cache
3075            .borrow()
3076            .instrument(&instrument_id)
3077            .is_some_and(InstrumentAny::is_spread)
3078    }
3079
3080    fn subscribe_spread_quotes(&mut self, cmd: &SubscribeQuotes) {
3081        if self
3082            .spread_quote_aggregators
3083            .contains_key(&cmd.instrument_id)
3084        {
3085            log::warn!(
3086                "SpreadQuoteAggregator for {} is currently in use, subscription can't be started",
3087                cmd.instrument_id,
3088            );
3089            return;
3090        }
3091
3092        let Some(instrument) = self.cache.borrow().instrument(&cmd.instrument_id).cloned() else {
3093            log::error!(
3094                "Cannot create spread quote aggregator: no instrument found for {}",
3095                cmd.instrument_id,
3096            );
3097            return;
3098        };
3099        let Some(legs) = spread_instrument_legs(&instrument) else {
3100            log::error!(
3101                "Cannot create spread quote aggregator: invalid spread legs for {}",
3102                cmd.instrument_id,
3103            );
3104            return;
3105        };
3106
3107        if legs.len() <= 1 {
3108            log::error!(
3109                "Cannot create spread quote aggregator: spread instrument {} should have more than one leg",
3110                cmd.instrument_id,
3111            );
3112            return;
3113        }
3114
3115        let cache = self.cache.clone();
3116        let handler = Box::new(move |quote: QuoteTick| {
3117            let exchange_endpoint = format!(
3118                "SimulatedExchange.process_new_quote.{}",
3119                quote.instrument_id.venue
3120            );
3121            let exchange_endpoint = exchange_endpoint.into();
3122            if msgbus::has_quote_endpoint(exchange_endpoint) {
3123                msgbus::send_quote(exchange_endpoint, &quote);
3124            }
3125
3126            if let Err(e) = cache.borrow_mut().add_quote(quote) {
3127                log_error_on_cache_insert(&e);
3128            }
3129            let topic = switchboard::get_quotes_topic(quote.instrument_id);
3130            msgbus::publish_quote(topic, &quote);
3131        });
3132        let aggregator = Rc::new(RefCell::new(SpreadQuoteAggregator::new(
3133            cmd.instrument_id,
3134            &legs,
3135            matches!(
3136                instrument,
3137                InstrumentAny::FuturesSpread(_) | InstrumentAny::CryptoFuturesSpread(_)
3138            ),
3139            instrument.price_precision(),
3140            instrument.size_precision(),
3141            handler,
3142            self.clock.clone(),
3143            false,
3144            spread_quote_update_interval_seconds(cmd.params.as_ref()),
3145            cmd.params
3146                .as_ref()
3147                .and_then(|params| params.get_u64("quote_build_delay"))
3148                .unwrap_or(0),
3149            None,
3150            None,
3151        )));
3152
3153        let mut handlers = Vec::with_capacity(legs.len());
3154        for (leg_id, _) in &legs {
3155            let topic = switchboard::get_quotes_topic(*leg_id);
3156            let handler = TypedHandler::new(SpreadQuoteHandler::new(
3157                &aggregator,
3158                cmd.instrument_id,
3159                *leg_id,
3160            ));
3161            msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(BAR_AGGREGATOR_PRIORITY));
3162            handlers.push((*leg_id, handler));
3163        }
3164
3165        aggregator
3166            .borrow_mut()
3167            .start_timer(Some(aggregator.clone()));
3168        aggregator.borrow_mut().set_running(true);
3169        self.spread_quote_aggregators
3170            .insert(cmd.instrument_id, aggregator);
3171        self.spread_quote_handlers
3172            .insert(cmd.instrument_id, handlers);
3173
3174        for (leg_id, _) in legs {
3175            let subscribe = SubscribeQuotes::new(
3176                leg_id,
3177                cmd.client_id,
3178                cmd.venue,
3179                UUID4::new(),
3180                cmd.ts_init,
3181                Some(cmd.command_id),
3182                cmd.params.clone(),
3183            );
3184            self.execute(DataCommand::Subscribe(SubscribeCommand::Quotes(subscribe)));
3185        }
3186    }
3187
3188    fn unsubscribe_spread_quotes(&mut self, cmd: &UnsubscribeQuotes) {
3189        let Some(leg_ids) = self.stop_spread_quote_aggregator(cmd.instrument_id) else {
3190            return;
3191        };
3192
3193        for leg_id in leg_ids {
3194            let unsubscribe = UnsubscribeQuotes::new(
3195                leg_id,
3196                cmd.client_id,
3197                cmd.venue,
3198                UUID4::new(),
3199                cmd.ts_init,
3200                Some(cmd.command_id),
3201                cmd.params.clone(),
3202            );
3203            self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(
3204                unsubscribe,
3205            )));
3206        }
3207    }
3208
3209    fn stop_spread_quote_aggregator(
3210        &mut self,
3211        spread_instrument_id: InstrumentId,
3212    ) -> Option<Vec<InstrumentId>> {
3213        let Some(aggregator) = self.spread_quote_aggregators.remove(&spread_instrument_id) else {
3214            log::warn!(
3215                "Cannot stop spread quote aggregator: no aggregator to stop for {spread_instrument_id}",
3216            );
3217            return None;
3218        };
3219
3220        aggregator.borrow_mut().stop_timer();
3221        aggregator.borrow_mut().set_running(false);
3222
3223        let handlers = self
3224            .spread_quote_handlers
3225            .remove(&spread_instrument_id)
3226            .unwrap_or_default();
3227        let mut leg_ids = Vec::with_capacity(handlers.len());
3228        for (leg_id, handler) in handlers {
3229            let topic = switchboard::get_quotes_topic(leg_id);
3230            msgbus::unsubscribe_quotes(topic.into(), &handler);
3231            leg_ids.push(leg_id);
3232        }
3233
3234        Some(leg_ids)
3235    }
3236
3237    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> bool {
3238        match self.decrement_book_delta_subscription(cmd.instrument_id, cmd.client_id, cmd.venue) {
3239            BookDeltasUnsubscribeResult::NotSubscribed => {
3240                log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
3241                return false;
3242            }
3243            BookDeltasUnsubscribeResult::Decremented => return false,
3244            BookDeltasUnsubscribeResult::Removed => {}
3245        }
3246
3247        self.maintain_book_updater(&cmd.instrument_id);
3248
3249        // Snapshot subscriptions reuse the deltas feed.
3250        // Keep the client subscribed until the last snapshot consumer is gone.
3251        !self.has_book_delta_subscriptions(&cmd.instrument_id)
3252            && !self.has_book_snapshot_subscriptions(&cmd.instrument_id)
3253    }
3254
3255    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> bool {
3256        if !self.book_depth10_subs.contains(&cmd.instrument_id) {
3257            log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
3258            return false;
3259        }
3260
3261        self.book_depth10_subs.remove(&cmd.instrument_id);
3262        self.maintain_book_updater(&cmd.instrument_id);
3263
3264        true
3265    }
3266
3267    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
3268        match self.decrement_book_snapshot_subscription(cmd.instrument_id, cmd.interval_ms) {
3269            BookSnapshotUnsubscribeResult::NotSubscribed => {
3270                log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
3271                return;
3272            }
3273            BookSnapshotUnsubscribeResult::Decremented => return,
3274            BookSnapshotUnsubscribeResult::Removed => {}
3275        }
3276
3277        if self.has_book_snapshot_subscriptions(&cmd.instrument_id) {
3278            return;
3279        }
3280
3281        self.maintain_book_updater(&cmd.instrument_id);
3282
3283        if self.has_book_delta_subscriptions(&cmd.instrument_id) {
3284            return;
3285        }
3286
3287        if let Some(client_id) = cmd.client_id.as_ref()
3288            && self.external_clients.contains(client_id)
3289        {
3290            return;
3291        }
3292
3293        if let Some(client) = self.get_command_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
3294            let deltas_cmd = UnsubscribeBookDeltas::new(
3295                cmd.instrument_id,
3296                cmd.client_id,
3297                cmd.venue,
3298                UUID4::new(),
3299                cmd.ts_init,
3300                Some(cmd.command_id),
3301                cmd.params.clone(),
3302            );
3303            client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
3304        }
3305    }
3306
3307    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
3308        let bar_type = cmd.bar_type;
3309
3310        // Don't remove aggregator if other exact-topic subscribers still exist
3311        let topic = switchboard::get_bars_topic(bar_type.standard());
3312        if msgbus::exact_subscriber_count_bars(topic) > 0 {
3313            return;
3314        }
3315
3316        if self
3317            .bar_aggregators
3318            .contains_key(&bar_aggregator_key(bar_type, None))
3319        {
3320            match self.stop_bar_aggregator(bar_type, None) {
3321                Ok(()) => self.unsubscribe_bar_aggregator(cmd),
3322                Err(e) => log::error!("Error stopping bar aggregator for {bar_type}: {e}"),
3323            }
3324        }
3325
3326        // After stopping a composite, check if the source aggregator is now orphaned
3327        if bar_type.is_composite() {
3328            let source_type = bar_type.composite();
3329            let source_topic = switchboard::get_bars_topic(source_type);
3330            if msgbus::exact_subscriber_count_bars(source_topic) == 0
3331                && self
3332                    .bar_aggregators
3333                    .contains_key(&bar_aggregator_key(source_type, None))
3334                && let Err(e) = self.stop_bar_aggregator(source_type, None)
3335            {
3336                log::error!("Error stopping source bar aggregator for {source_type}: {e}");
3337            }
3338        }
3339    }
3340
3341    fn unsubscribe_synthetic_quotes(&mut self, instrument_id: InstrumentId) {
3342        if !self.subscribed_synthetic_quotes.remove(&instrument_id) {
3343            log::warn!("Cannot unsubscribe from synthetic `QuoteTick` data: not subscribed");
3344            return;
3345        }
3346
3347        self.synthetic_quote_feeds.retain(|_, synthetics| {
3348            synthetics.retain(|synthetic| synthetic.id != instrument_id);
3349            !synthetics.is_empty()
3350        });
3351    }
3352
3353    fn unsubscribe_synthetic_trades(&mut self, instrument_id: InstrumentId) {
3354        if !self.subscribed_synthetic_trades.remove(&instrument_id) {
3355            log::warn!("Cannot unsubscribe from synthetic `TradeTick` data: not subscribed");
3356            return;
3357        }
3358
3359        self.synthetic_trade_feeds.retain(|_, synthetics| {
3360            synthetics.retain(|synthetic| synthetic.id != instrument_id);
3361            !synthetics.is_empty()
3362        });
3363    }
3364
3365    fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
3366        let series_id = cmd.series_id;
3367
3368        // Handle edits to existing subscriptions by tearing down and re-setting up the OptionChainManager.
3369        if let Some(old) = self.option_chain_managers.remove(&series_id) {
3370            log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
3371            let all_ids = old.borrow().all_instrument_ids();
3372            let old_venue = old.borrow().venue();
3373            old.borrow_mut().teardown(&self.clock);
3374            self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
3375        }
3376
3377        // Drain any stale pending forward price requests for this series
3378        self.pending_option_chain_requests
3379            .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
3380
3381        // For ATM-based strike ranges, request forward prices from the adapter
3382        // to enable instant bootstrap without waiting for the first WebSocket tick.
3383        if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
3384            // Extract client_id first to avoid borrow conflicts
3385            let resolved_client_id = self
3386                .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
3387                .map(|c| c.client_id);
3388
3389            if let Some(client_id) = resolved_client_id {
3390                let request_id = UUID4::new();
3391                let ts_init = self.clock.borrow().timestamp_ns();
3392
3393                // Pick any one option instrument at this expiry from cache
3394                // to enable single-instrument forward price fetch (1 HTTP call)
3395                let sample_instrument_id = {
3396                    let cache = self.cache.borrow();
3397                    cache
3398                        .instruments(&series_id.venue, Some(&series_id.underlying))
3399                        .iter()
3400                        .find(|i| {
3401                            i.expiration_ns() == Some(series_id.expiration_ns)
3402                                && i.settlement_currency().code == series_id.settlement_currency
3403                        })
3404                        .map(|i| i.id())
3405                };
3406
3407                let request = RequestForwardPrices::new(
3408                    series_id.venue,
3409                    series_id.underlying,
3410                    sample_instrument_id,
3411                    Some(client_id),
3412                    request_id,
3413                    ts_init,
3414                    None,
3415                );
3416
3417                self.pending_option_chain_requests
3418                    .insert(request_id, cmd.clone());
3419
3420                let req_cmd = RequestCommand::ForwardPrices(request);
3421                if let Err(e) = self.execute_request(req_cmd) {
3422                    log::warn!("Failed to request forward prices for {series_id}: {e}");
3423                    let cmd = self
3424                        .pending_option_chain_requests
3425                        .remove(&request_id)
3426                        .expect("just inserted");
3427                    self.create_option_chain_manager(&cmd, None);
3428                }
3429
3430                return;
3431            }
3432        }
3433
3434        self.create_option_chain_manager(cmd, None);
3435    }
3436
3437    /// Creates and stores an `OptionChainManager` for the given subscription.
3438    fn create_option_chain_manager(
3439        &mut self,
3440        cmd: &SubscribeOptionChain,
3441        initial_atm_price: Option<Price>,
3442    ) {
3443        let series_id = cmd.series_id;
3444        let cache = self.cache.clone();
3445        let clock = self.clock.clone();
3446        let priority = self.msgbus_priority;
3447        let deferred_cmd_queue = self.deferred_cmd_queue.clone();
3448
3449        let manager_rc = {
3450            let client = self.get_command_client(cmd.client_id.as_ref(), Some(&series_id.venue));
3451            OptionChainManager::create_and_setup(
3452                series_id,
3453                &cache,
3454                cmd,
3455                &clock,
3456                priority,
3457                client,
3458                initial_atm_price,
3459                deferred_cmd_queue,
3460            )
3461        };
3462
3463        // Index all instruments for reverse lookup
3464        for id in manager_rc.borrow().all_instrument_ids() {
3465            self.option_chain_instrument_index.insert(id, series_id);
3466        }
3467
3468        self.option_chain_managers.insert(series_id, manager_rc);
3469    }
3470
3471    fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
3472        let series_id = cmd.series_id;
3473
3474        let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
3475            log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
3476            return;
3477        };
3478
3479        // Extract info before teardown
3480        let all_ids = manager_rc.borrow().all_instrument_ids();
3481        let venue = manager_rc.borrow().venue();
3482
3483        // Remove all instruments from reverse index
3484        for id in &all_ids {
3485            self.option_chain_instrument_index.remove(id);
3486        }
3487
3488        manager_rc.borrow_mut().teardown(&self.clock);
3489
3490        // Forward wire-level unsubscribes to the data client
3491        self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
3492
3493        log::info!("Unsubscribed option chain for {series_id}");
3494    }
3495
3496    /// Forwards wire-level unsubscribe commands for all option chain instruments.
3497    fn forward_option_chain_unsubscribes(
3498        &mut self,
3499        instrument_ids: &[InstrumentId],
3500        venue: Venue,
3501        client_id: Option<ClientId>,
3502    ) {
3503        let ts_init = self.clock.borrow().timestamp_ns();
3504
3505        let Some(client) = self.get_command_client(client_id.as_ref(), Some(&venue)) else {
3506            log::error!(
3507                "Cannot forward option chain unsubscribes: no client found for venue={venue}",
3508            );
3509            return;
3510        };
3511
3512        for instrument_id in instrument_ids {
3513            client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
3514                *instrument_id,
3515                client_id,
3516                Some(venue),
3517                UUID4::new(),
3518                ts_init,
3519                None,
3520                None,
3521            )));
3522            client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
3523                UnsubscribeOptionGreeks::new(
3524                    *instrument_id,
3525                    client_id,
3526                    Some(venue),
3527                    UUID4::new(),
3528                    ts_init,
3529                    None,
3530                    None,
3531                ),
3532            ));
3533            client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
3534                UnsubscribeInstrumentStatus::new(
3535                    *instrument_id,
3536                    client_id,
3537                    Some(venue),
3538                    UUID4::new(),
3539                    ts_init,
3540                    None,
3541                    None,
3542                ),
3543            ));
3544        }
3545    }
3546
3547    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId) {
3548        // Determine which per-underlying books this subscription touched, then
3549        // for each book check whether any other active subscription still
3550        // wants it before unsubscribing/dropping the shared BookUpdater.
3551        //
3552        // The presence of a memoized expansion identifies a parent teardown.
3553        // Concrete subscriptions touch only the exact id.
3554        let is_parent = self
3555            .book_deltas_parent_expansions
3556            .contains_key(instrument_id)
3557            || self
3558                .book_depth10_parent_expansions
3559                .contains_key(instrument_id);
3560        let target_ids: Vec<InstrumentId> = if is_parent {
3561            let mut set: AHashSet<InstrumentId> = AHashSet::new();
3562
3563            if let Some(expansion) = self.book_deltas_parent_expansions.get(instrument_id) {
3564                set.extend(expansion.iter().copied());
3565            }
3566
3567            if let Some(expansion) = self.book_depth10_parent_expansions.get(instrument_id) {
3568                set.extend(expansion.iter().copied());
3569            }
3570
3571            if set.is_empty() {
3572                return;
3573            }
3574
3575            set.into_iter().collect()
3576        } else {
3577            vec![*instrument_id]
3578        };
3579
3580        if is_parent {
3581            // Each parent kind (deltas / depth10 / snapshots) writes its own
3582            // memo via setup_book_updater. Keep each memo alive while any
3583            // sibling subscription that drives the same handler kind remains
3584            // active for this parent id.
3585            let parent_still_needs_deltas = self.has_book_delta_subscriptions(instrument_id)
3586                || self.book_depth10_subs.contains(instrument_id)
3587                || self.has_book_snapshot_subscriptions(instrument_id);
3588            let parent_still_needs_depth10 = self.book_depth10_subs.contains(instrument_id)
3589                || self.has_book_snapshot_subscriptions(instrument_id);
3590
3591            if !parent_still_needs_deltas {
3592                self.book_deltas_parent_expansions.remove(instrument_id);
3593            }
3594
3595            if !parent_still_needs_depth10 {
3596                self.book_depth10_parent_expansions.remove(instrument_id);
3597            }
3598        }
3599
3600        for target_id in &target_ids {
3601            let wants_deltas = self.is_underlying_wanted_for_deltas(target_id);
3602            let wants_depth10 = self.is_underlying_wanted_for_depth10(target_id);
3603
3604            let Some(updater) = self.book_updaters.get(target_id).cloned() else {
3605                continue;
3606            };
3607
3608            let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
3609            let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater);
3610
3611            if !wants_deltas {
3612                let topic = switchboard::get_book_deltas_topic(*target_id);
3613                msgbus::unsubscribe_book_deltas(topic.into(), &deltas_handler);
3614            }
3615
3616            if !wants_depth10 {
3617                let topic = switchboard::get_book_depth10_topic(*target_id);
3618                msgbus::unsubscribe_book_depth10(topic.into(), &depth_handler);
3619            }
3620
3621            if !wants_deltas && !wants_depth10 {
3622                self.book_updaters.remove(target_id);
3623                log::debug!("Removed BookUpdater for instrument ID {target_id}");
3624            }
3625        }
3626    }
3627
3628    fn has_book_snapshot_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
3629        self.book_snapshot_counts
3630            .keys()
3631            .any(|(id, _)| id == instrument_id)
3632    }
3633
3634    fn has_book_delta_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
3635        self.book_deltas_counts
3636            .keys()
3637            .any(|(id, _, _)| id == instrument_id)
3638    }
3639
3640    fn has_book_delta_subscription_key(
3641        &self,
3642        instrument_id: InstrumentId,
3643        client_id: Option<ClientId>,
3644        venue: Option<Venue>,
3645    ) -> bool {
3646        self.book_deltas_counts
3647            .contains_key(&(instrument_id, client_id, venue))
3648    }
3649
3650    fn increment_book_delta_subscription(
3651        &mut self,
3652        instrument_id: InstrumentId,
3653        client_id: Option<ClientId>,
3654        venue: Option<Venue>,
3655    ) {
3656        let key = (instrument_id, client_id, venue);
3657
3658        if let Some(count) = self.book_deltas_counts.get_mut(&key) {
3659            *count += 1;
3660        } else {
3661            self.book_deltas_counts.insert(key, 1);
3662        }
3663    }
3664
3665    fn decrement_book_delta_subscription(
3666        &mut self,
3667        instrument_id: InstrumentId,
3668        client_id: Option<ClientId>,
3669        venue: Option<Venue>,
3670    ) -> BookDeltasUnsubscribeResult {
3671        let key = (instrument_id, client_id, venue);
3672
3673        let Some(count) = self.book_deltas_counts.get_mut(&key) else {
3674            return BookDeltasUnsubscribeResult::NotSubscribed;
3675        };
3676
3677        if *count > 1 {
3678            *count -= 1;
3679            return BookDeltasUnsubscribeResult::Decremented;
3680        }
3681
3682        self.book_deltas_counts.shift_remove(&key);
3683        BookDeltasUnsubscribeResult::Removed
3684    }
3685
3686    fn increment_book_snapshot_subscription(
3687        &mut self,
3688        cmd: &SubscribeBookSnapshots,
3689        parent: Option<(Ustr, InstrumentClass)>,
3690    ) -> bool {
3691        let key = (cmd.instrument_id, cmd.interval_ms);
3692
3693        if let Some(count) = self.book_snapshot_counts.get_mut(&key) {
3694            *count += 1;
3695            return false;
3696        }
3697
3698        self.book_snapshot_counts.insert(key, 1);
3699
3700        let snapshot_infos = if let Some(snapshot_infos) = self.book_intervals.get(&cmd.interval_ms)
3701        {
3702            snapshot_infos.clone()
3703        } else {
3704            let snapshot_infos = Rc::new(RefCell::new(IndexMap::new()));
3705            self.book_intervals
3706                .insert(cmd.interval_ms, snapshot_infos.clone());
3707            self.schedule_book_snapshotter(cmd.interval_ms, snapshot_infos.clone());
3708            snapshot_infos
3709        };
3710
3711        let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
3712        let snap_info = BookSnapshotInfo {
3713            instrument_id: cmd.instrument_id,
3714            venue: cmd.instrument_id.venue,
3715            parent,
3716            topic,
3717            interval_ms: cmd.interval_ms,
3718        };
3719
3720        snapshot_infos
3721            .borrow_mut()
3722            .insert(cmd.instrument_id, snap_info);
3723
3724        true
3725    }
3726
3727    fn decrement_book_snapshot_subscription(
3728        &mut self,
3729        instrument_id: InstrumentId,
3730        interval_ms: NonZeroUsize,
3731    ) -> BookSnapshotUnsubscribeResult {
3732        let key = (instrument_id, interval_ms);
3733
3734        let Some(count) = self.book_snapshot_counts.get_mut(&key) else {
3735            return BookSnapshotUnsubscribeResult::NotSubscribed;
3736        };
3737
3738        if *count > 1 {
3739            *count -= 1;
3740            return BookSnapshotUnsubscribeResult::Decremented;
3741        }
3742
3743        self.book_snapshot_counts.shift_remove(&key);
3744
3745        let remove_interval = if let Some(snapshot_infos) = self.book_intervals.get(&interval_ms) {
3746            let mut snapshot_infos = snapshot_infos.borrow_mut();
3747            snapshot_infos.shift_remove(&instrument_id);
3748            snapshot_infos.is_empty()
3749        } else {
3750            false
3751        };
3752
3753        if remove_interval {
3754            self.book_intervals.remove(&interval_ms);
3755
3756            if let Some(snapshotter) = self.book_snapshotters.remove(&interval_ms) {
3757                let timer_name = snapshotter.timer_name;
3758                let mut clock = self.clock.borrow_mut();
3759                if clock.timer_exists(&timer_name) {
3760                    clock.cancel_timer(&timer_name);
3761                }
3762            }
3763        }
3764
3765        BookSnapshotUnsubscribeResult::Removed
3766    }
3767
3768    fn schedule_book_snapshotter(
3769        &mut self,
3770        interval_ms: NonZeroUsize,
3771        snapshot_infos: BookSnapshotInfos,
3772    ) {
3773        let interval_ns = millis_to_nanos_unchecked(interval_ms.get() as f64);
3774        let now_ns = self.clock.borrow().timestamp_ns().as_u64();
3775        let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
3776
3777        let snapshotter = Rc::new(BookSnapshotter::new(
3778            interval_ms,
3779            snapshot_infos,
3780            self.cache.clone(),
3781        ));
3782        let timer_name = snapshotter.timer_name;
3783        let snapshotter_callback = snapshotter.clone();
3784        let callback_fn: Rc<dyn Fn(TimeEvent)> =
3785            Rc::new(move |event| snapshotter_callback.snapshot(event));
3786        let callback = TimeEventCallback::from(callback_fn);
3787
3788        self.clock
3789            .borrow_mut()
3790            .set_timer_ns(
3791                &timer_name,
3792                interval_ns,
3793                Some(start_time_ns.into()),
3794                None,
3795                Some(callback),
3796                None,
3797                None,
3798            )
3799            .expect(FAILED);
3800
3801        self.book_snapshotters.insert(interval_ms, snapshotter);
3802    }
3803
3804    fn handle_instrument_response(&self, instrument: InstrumentAny) {
3805        let mut cache = self.cache.as_ref().borrow_mut();
3806        if let Err(e) = cache.add_instrument(instrument) {
3807            log_error_on_cache_insert(&e);
3808        }
3809    }
3810
3811    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
3812        // TODO: Improve by adding bulk update methods to cache and database
3813        let mut cache = self.cache.as_ref().borrow_mut();
3814        for instrument in instruments {
3815            if let Err(e) = cache.add_instrument(instrument.clone()) {
3816                log_error_on_cache_insert(&e);
3817            }
3818        }
3819    }
3820
3821    fn handle_quotes(&self, quotes: &[QuoteTick]) {
3822        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
3823            log_error_on_cache_insert(&e);
3824        }
3825    }
3826
3827    fn handle_trades(&self, trades: &[TradeTick]) {
3828        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
3829            log_error_on_cache_insert(&e);
3830        }
3831    }
3832
3833    fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
3834        if let Err(e) = self
3835            .cache
3836            .as_ref()
3837            .borrow_mut()
3838            .add_funding_rates(funding_rates)
3839        {
3840            log_error_on_cache_insert(&e);
3841        }
3842    }
3843
3844    fn handle_bars(&self, bars: &[Bar]) {
3845        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
3846            log_error_on_cache_insert(&e);
3847        }
3848    }
3849
3850    // Skip cache writes that would regress a book a `BookUpdater` is maintaining.
3851    // Unmanaged subscriptions don't install a `BookUpdater`, so they don't gate writes.
3852    fn cache_is_owned_by_live_subscription(&self, instrument_id: &InstrumentId) -> bool {
3853        self.book_updaters.contains_key(instrument_id)
3854    }
3855
3856    fn handle_book_response(&self, book: &OrderBook) {
3857        if self.cache_is_owned_by_live_subscription(&book.instrument_id) {
3858            log::debug!(
3859                "Skipping cache write for order book {}: live subscription owns the book",
3860                book.instrument_id,
3861            );
3862            return;
3863        }
3864
3865        log::debug!("Adding order book {} to cache", book.instrument_id);
3866
3867        if let Err(e) = self
3868            .cache
3869            .as_ref()
3870            .borrow_mut()
3871            .add_order_book(book.clone())
3872        {
3873            log_error_on_cache_insert(&e);
3874        }
3875    }
3876
3877    fn handle_book_deltas_response(&self, resp: &BookDeltasResponse) {
3878        if !self.cache_is_owned_by_live_subscription(&resp.instrument_id) {
3879            let mut cache = self.cache.as_ref().borrow_mut();
3880            if let Some(book) = cache.order_book_mut(&resp.instrument_id) {
3881                for delta in &resp.data {
3882                    if let Err(e) = book.apply_delta(delta) {
3883                        log::error!("Failed to apply historical delta to cache: {e}");
3884                    }
3885                }
3886            } else {
3887                log::debug!(
3888                    "Skipping cache write for {} historical deltas on {}: no cache book yet",
3889                    resp.data.len(),
3890                    resp.instrument_id,
3891                );
3892            }
3893        }
3894
3895        // Group deltas by `F_LAST` so each published batch preserves the original event
3896        // boundary and metadata (timestamps and sequence from the closing delta), matching
3897        // the live `handle_delta` buffering semantic. Collapsing the whole response into
3898        // one batch would surface a synthetic event with the trailing delta's flags only.
3899        if resp.data.is_empty() {
3900            return;
3901        }
3902
3903        let topic = switchboard::get_pipeline_book_deltas_topic(resp.instrument_id);
3904        let mut frame: Vec<OrderBookDelta> = Vec::new();
3905
3906        for delta in &resp.data {
3907            frame.push(*delta);
3908            if RecordFlag::F_LAST.matches(delta.flags) {
3909                let batch = OrderBookDeltas::new(resp.instrument_id, std::mem::take(&mut frame));
3910                msgbus::publish_deltas(topic, &batch);
3911            }
3912        }
3913
3914        if !frame.is_empty() {
3915            let batch = OrderBookDeltas::new(resp.instrument_id, frame);
3916            msgbus::publish_deltas(topic, &batch);
3917        }
3918    }
3919
3920    fn handle_book_depth_response(&self, resp: &BookDepthResponse) {
3921        let topic = switchboard::get_pipeline_book_depth10_topic(resp.instrument_id);
3922
3923        for depth in &resp.data {
3924            msgbus::publish_depth10(topic, depth);
3925        }
3926    }
3927
3928    /// Handles a `ForwardPricesResponse` by extracting the forward price
3929    /// for the pending option chain and creating the manager with instant bootstrap.
3930    fn handle_forward_prices_response(
3931        &mut self,
3932        correlation_id: &UUID4,
3933        resp: &ForwardPricesResponse,
3934    ) {
3935        let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
3936            log::debug!(
3937                "No pending option chain request for correlation_id={correlation_id}, ignoring"
3938            );
3939            return;
3940        };
3941
3942        let series_id = cmd.series_id;
3943
3944        // Find a forward price that matches an instrument in this series.
3945        // We look up each forward price instrument in the cache to match by expiry and currency.
3946        let cache = self.cache.borrow();
3947        let mut best_price: Option<Price> = None;
3948
3949        for fp in &resp.data {
3950            // Check if any cached instrument with this id belongs to our series
3951            if let Some(instrument) = cache.instrument(&fp.instrument_id)
3952                && let Some(expiration) = instrument.expiration_ns()
3953                && expiration == series_id.expiration_ns
3954                && instrument.settlement_currency().code == series_id.settlement_currency
3955            {
3956                match Price::from_decimal(fp.forward_price) {
3957                    Ok(price) => best_price = Some(price),
3958                    Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
3959                }
3960                break;
3961            }
3962        }
3963        drop(cache);
3964
3965        if let Some(price) = best_price {
3966            log::info!("Forward price for {series_id}: {price} (instant bootstrap)");
3967        } else {
3968            log::info!(
3969                "No matching forward price found for {series_id}, will bootstrap from live data",
3970            );
3971        }
3972
3973        self.create_option_chain_manager(&cmd, best_price);
3974    }
3975
3976    fn setup_book_updater(
3977        &mut self,
3978        instrument_id: &InstrumentId,
3979        book_type: BookType,
3980        only_deltas: bool,
3981        parent: Option<(Ustr, InstrumentClass)>,
3982    ) -> anyhow::Result<()> {
3983        // One BookUpdater per cache book (keyed by per-underlying id), shared
3984        // across overlapping subscriptions. Parent subs are expanded into
3985        // their underlyings here; the expansion is memoized so unsubscribe
3986        // mirrors the exact set even if the cache composition changes later.
3987        let target_ids: Vec<InstrumentId> = if let Some((root, class)) = parent {
3988            self.cache
3989                .borrow()
3990                .instruments_by_parent(&instrument_id.venue, &root, class)
3991                .iter()
3992                .map(|i| i.id())
3993                .collect()
3994        } else {
3995            vec![*instrument_id]
3996        };
3997
3998        if parent.is_some() {
3999            self.book_deltas_parent_expansions
4000                .insert(*instrument_id, target_ids.clone());
4001
4002            if !only_deltas {
4003                self.book_depth10_parent_expansions
4004                    .insert(*instrument_id, target_ids.clone());
4005            }
4006        }
4007
4008        {
4009            let mut cache = self.cache.borrow_mut();
4010            for target_id in &target_ids {
4011                if !cache.has_order_book(target_id) {
4012                    let book = OrderBook::new(*target_id, book_type);
4013                    log::debug!("Created {book}");
4014                    cache.add_order_book(book)?;
4015                }
4016            }
4017        }
4018
4019        for target_id in &target_ids {
4020            let updater = self
4021                .book_updaters
4022                .entry(*target_id)
4023                .or_insert_with(|| {
4024                    Rc::new(BookUpdater::new(
4025                        target_id,
4026                        self.cache.clone(),
4027                        self.config.emit_quotes_from_book,
4028                    ))
4029                })
4030                .clone();
4031
4032            // Subscribe handler to the literal per-underlying topic. The
4033            // typed router dedups (pattern, handler_id) pairs, so overlapping
4034            // composite + exact subscriptions register exactly one handler
4035            // entry per book and a single delta apply per publish.
4036            let deltas_topic = switchboard::get_book_deltas_topic(*target_id);
4037            let deltas_handler = TypedHandler::new(updater.clone());
4038            msgbus::subscribe_book_deltas(
4039                deltas_topic.into(),
4040                deltas_handler,
4041                Some(self.msgbus_priority),
4042            );
4043
4044            if !only_deltas {
4045                let depth_topic = switchboard::get_book_depth10_topic(*target_id);
4046                let depth_handler = TypedHandler::new(updater);
4047                msgbus::subscribe_book_depth10(
4048                    depth_topic.into(),
4049                    depth_handler,
4050                    Some(self.msgbus_priority),
4051                );
4052            }
4053        }
4054
4055        Ok(())
4056    }
4057
4058    fn is_underlying_wanted_for_deltas(&self, target_id: &InstrumentId) -> bool {
4059        // Any of {deltas, depth10, snapshots} subs causes setup_book_updater to
4060        // subscribe the deltas handler (depth10/snapshots use only_deltas=false),
4061        // so all three keep the per-underlying deltas handler alive.
4062        if self.has_book_delta_subscriptions(target_id)
4063            || self.book_depth10_subs.contains(target_id)
4064            || self.has_book_snapshot_subscriptions(target_id)
4065        {
4066            return true;
4067        }
4068        self.book_deltas_parent_expansions
4069            .values()
4070            .any(|expansion| expansion.contains(target_id))
4071    }
4072
4073    fn is_underlying_wanted_for_depth10(&self, target_id: &InstrumentId) -> bool {
4074        // Snapshots use only_deltas=false, so they drive the depth10 handler
4075        // as well as the deltas handler.
4076        if self.book_depth10_subs.contains(target_id)
4077            || self.has_book_snapshot_subscriptions(target_id)
4078        {
4079            return true;
4080        }
4081        self.book_depth10_parent_expansions
4082            .values()
4083            .any(|expansion| expansion.contains(target_id))
4084    }
4085
4086    fn create_bar_aggregator(
4087        &self,
4088        instrument: &InstrumentAny,
4089        bar_type: BarType,
4090    ) -> Box<dyn BarAggregator> {
4091        let cache = self.cache.clone();
4092        let validate_sequence = self.config.validate_data_sequence;
4093
4094        let handler = move |bar: Bar| {
4095            process_engine_bar(&cache, validate_sequence, true, bar);
4096        };
4097
4098        let clock = self.clock.clone();
4099        let config = self.config.clone();
4100
4101        let price_precision = instrument.price_precision();
4102        let size_precision = instrument.size_precision();
4103
4104        if bar_type.spec().is_time_aggregated() {
4105            let time_bars_origin_offset = config
4106                .time_bars_origin_offset
4107                .get(&bar_type.spec().aggregation)
4108                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
4109
4110            Box::new(TimeBarAggregator::new(
4111                bar_type,
4112                price_precision,
4113                size_precision,
4114                clock,
4115                handler,
4116                config.time_bars_build_with_no_updates,
4117                config.time_bars_timestamp_on_close,
4118                config.time_bars_interval_type,
4119                time_bars_origin_offset,
4120                config.time_bars_build_delay,
4121                config.time_bars_skip_first_non_full_bar,
4122            ))
4123        } else {
4124            match bar_type.spec().aggregation {
4125                BarAggregation::Tick => Box::new(TickBarAggregator::new(
4126                    bar_type,
4127                    price_precision,
4128                    size_precision,
4129                    handler,
4130                )) as Box<dyn BarAggregator>,
4131                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
4132                    bar_type,
4133                    price_precision,
4134                    size_precision,
4135                    handler,
4136                )) as Box<dyn BarAggregator>,
4137                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
4138                    bar_type,
4139                    price_precision,
4140                    size_precision,
4141                    handler,
4142                )) as Box<dyn BarAggregator>,
4143                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
4144                    bar_type,
4145                    price_precision,
4146                    size_precision,
4147                    handler,
4148                )) as Box<dyn BarAggregator>,
4149                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
4150                    bar_type,
4151                    price_precision,
4152                    size_precision,
4153                    handler,
4154                )) as Box<dyn BarAggregator>,
4155                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
4156                    bar_type,
4157                    price_precision,
4158                    size_precision,
4159                    handler,
4160                )) as Box<dyn BarAggregator>,
4161                BarAggregation::Value => Box::new(ValueBarAggregator::new(
4162                    bar_type,
4163                    price_precision,
4164                    size_precision,
4165                    handler,
4166                )) as Box<dyn BarAggregator>,
4167                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
4168                    bar_type,
4169                    price_precision,
4170                    size_precision,
4171                    handler,
4172                )) as Box<dyn BarAggregator>,
4173                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
4174                    bar_type,
4175                    price_precision,
4176                    size_precision,
4177                    handler,
4178                )) as Box<dyn BarAggregator>,
4179                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
4180                    bar_type,
4181                    price_precision,
4182                    size_precision,
4183                    instrument.price_increment(),
4184                    handler,
4185                )) as Box<dyn BarAggregator>,
4186                other => unreachable!(
4187                    "Unsupported internal bar aggregation dispatch for {other:?}; update `create_bar_aggregator`"
4188                ),
4189            }
4190        }
4191    }
4192
4193    fn create_bar_aggregator_for_key(
4194        &mut self,
4195        bar_type: BarType,
4196        request_id: Option<UUID4>,
4197    ) -> anyhow::Result<()> {
4198        let key = bar_aggregator_key(bar_type, request_id);
4199        if self.bar_aggregators.contains_key(&key) {
4200            return Ok(());
4201        }
4202
4203        let instrument = {
4204            let cache = self.cache.borrow();
4205            cache
4206                .instrument(&bar_type.instrument_id())
4207                .ok_or_else(|| {
4208                    anyhow::anyhow!(
4209                        "Cannot start bar aggregation: no instrument found for {}",
4210                        bar_type.instrument_id(),
4211                    )
4212                })?
4213                .clone()
4214        };
4215        let aggregator = self.create_bar_aggregator(&instrument, bar_type);
4216        self.bar_aggregators
4217            .insert(key, Rc::new(RefCell::new(aggregator)));
4218
4219        Ok(())
4220    }
4221
4222    fn start_live_bar_aggregator(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
4223        let key = bar_aggregator_key(cmd.bar_type, None);
4224
4225        if self
4226            .bar_aggregators
4227            .get(&key)
4228            .is_some_and(|aggregator| aggregator.borrow().is_running())
4229            && self.bar_aggregator_handlers.contains_key(&key)
4230        {
4231            log::warn!(
4232                "Aggregator for {} is currently in use, subscription can't be started",
4233                cmd.bar_type,
4234            );
4235            return Ok(());
4236        }
4237
4238        self.start_bar_aggregator(cmd.bar_type, None)?;
4239        self.subscribe_bar_aggregator(cmd);
4240
4241        Ok(())
4242    }
4243
4244    fn start_bar_aggregator(
4245        &mut self,
4246        bar_type: BarType,
4247        request_id: Option<UUID4>,
4248    ) -> anyhow::Result<()> {
4249        let key = bar_aggregator_key(bar_type, request_id);
4250        let bar_type_std = bar_type.standard();
4251
4252        self.create_bar_aggregator_for_key(bar_type, request_id)?;
4253        let aggregator = self
4254            .bar_aggregators
4255            .get(&key)
4256            .ok_or_else(|| anyhow::anyhow!("Cannot start bar aggregation for {bar_type}"))?
4257            .clone();
4258        let defer_live_activation = request_id.is_none()
4259            && aggregator.borrow().is_running()
4260            && !self.bar_aggregator_handlers.contains_key(&key);
4261
4262        if !self.bar_aggregator_handlers.contains_key(&key) {
4263            // Subscribe to underlying data topics
4264            let mut subscriptions = Vec::new();
4265
4266            if bar_type.is_composite() {
4267                let topic = switchboard::get_bars_topic(bar_type.composite());
4268                let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_type_std));
4269                msgbus::subscribe_bars(topic.into(), handler.clone(), None);
4270                subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
4271            } else if bar_type.spec().price_type == PriceType::Last {
4272                let topic = switchboard::get_trades_topic(bar_type.instrument_id());
4273                let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_type_std));
4274                msgbus::subscribe_trades(
4275                    topic.into(),
4276                    handler.clone(),
4277                    Some(BAR_AGGREGATOR_PRIORITY),
4278                );
4279                subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
4280            } else {
4281                // Warn if imbalance/runs aggregation is wired to quotes (needs aggressor_side from trades)
4282                if matches!(
4283                    bar_type.spec().aggregation,
4284                    BarAggregation::TickImbalance
4285                        | BarAggregation::VolumeImbalance
4286                        | BarAggregation::ValueImbalance
4287                        | BarAggregation::TickRuns
4288                        | BarAggregation::VolumeRuns
4289                        | BarAggregation::ValueRuns
4290                ) {
4291                    log::warn!(
4292                        "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
4293                         data with `aggressor_side`, but `price_type` is not LAST so it will receive \
4294                         quote data: bars will not emit correctly",
4295                    );
4296                }
4297
4298                let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
4299                let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_type_std));
4300                msgbus::subscribe_quotes(
4301                    topic.into(),
4302                    handler.clone(),
4303                    Some(BAR_AGGREGATOR_PRIORITY),
4304                );
4305                subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
4306            }
4307
4308            self.bar_aggregator_handlers.insert(key, subscriptions);
4309        }
4310
4311        if defer_live_activation {
4312            return Ok(());
4313        }
4314
4315        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
4316        self.setup_bar_aggregator(bar_type, false, request_id)?;
4317
4318        aggregator.borrow_mut().set_is_running(true);
4319
4320        Ok(())
4321    }
4322
4323    fn subscribe_bar_aggregator(&mut self, cmd: &SubscribeBars) {
4324        let key = bar_aggregator_key(cmd.bar_type, None);
4325        if !self.bar_aggregators.contains_key(&key) {
4326            log::error!(
4327                "Cannot subscribe bar aggregator: no aggregator found for {}",
4328                cmd.bar_type,
4329            );
4330            return;
4331        }
4332
4333        if cmd.bar_type.is_composite() {
4334            let composite_bar_type = cmd.bar_type.composite();
4335            if composite_bar_type.is_externally_aggregated() {
4336                let subscribe = SubscribeBars::new(
4337                    composite_bar_type,
4338                    cmd.client_id,
4339                    cmd.venue,
4340                    UUID4::new(),
4341                    cmd.ts_init,
4342                    Some(cmd.command_id),
4343                    cmd.params.clone(),
4344                );
4345                self.execute(DataCommand::Subscribe(SubscribeCommand::Bars(subscribe)));
4346            }
4347        } else if cmd.bar_type.spec().price_type == PriceType::Last {
4348            let subscribe = SubscribeTrades::new(
4349                cmd.bar_type.instrument_id(),
4350                cmd.client_id,
4351                cmd.venue,
4352                UUID4::new(),
4353                cmd.ts_init,
4354                Some(cmd.command_id),
4355                cmd.params.clone(),
4356            );
4357            self.execute(DataCommand::Subscribe(SubscribeCommand::Trades(subscribe)));
4358        } else {
4359            let subscribe = SubscribeQuotes::new(
4360                cmd.bar_type.instrument_id(),
4361                cmd.client_id,
4362                cmd.venue,
4363                UUID4::new(),
4364                cmd.ts_init,
4365                Some(cmd.command_id),
4366                cmd.params.clone(),
4367            );
4368            self.execute(DataCommand::Subscribe(SubscribeCommand::Quotes(subscribe)));
4369        }
4370    }
4371
4372    /// Sets up a bar aggregator, matching Cython `_setup_bar_aggregator` logic.
4373    ///
4374    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
4375    fn setup_bar_aggregator(
4376        &self,
4377        bar_type: BarType,
4378        historical: bool,
4379        request_id: Option<UUID4>,
4380    ) -> anyhow::Result<()> {
4381        let key = bar_aggregator_key(bar_type, request_id);
4382        let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
4383            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
4384        })?;
4385
4386        // Set historical mode and handler
4387        let cache = self.cache.clone();
4388        let validate_sequence = self.config.validate_data_sequence;
4389        let publish = !historical;
4390        let handler: Box<dyn FnMut(Bar)> = Box::new(move |bar: Bar| {
4391            process_engine_bar(&cache, validate_sequence, publish, bar);
4392        });
4393
4394        aggregator
4395            .borrow_mut()
4396            .set_historical_mode(historical, handler);
4397
4398        // For TimeBarAggregator, set clock and start timer
4399        if bar_type.spec().is_time_aggregated() {
4400            use nautilus_common::clock::TestClock;
4401
4402            if historical {
4403                // Each aggregator gets its own independent clock
4404                let test_clock = Rc::new(RefCell::new(TestClock::new()));
4405                aggregator.borrow_mut().set_clock(test_clock);
4406                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
4407                // Store weak reference so start_timer can use it when called later
4408                let aggregator_weak = Rc::downgrade(aggregator);
4409                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
4410            } else {
4411                aggregator.borrow_mut().set_clock(self.clock.clone());
4412                aggregator
4413                    .borrow_mut()
4414                    .start_timer(Some(aggregator.clone()));
4415            }
4416        }
4417
4418        Ok(())
4419    }
4420
4421    fn unsubscribe_bar_aggregator(&mut self, cmd: &UnsubscribeBars) {
4422        if cmd.bar_type.is_composite() {
4423            let composite_bar_type = cmd.bar_type.composite();
4424            if composite_bar_type.is_externally_aggregated() {
4425                let unsubscribe = UnsubscribeBars::new(
4426                    composite_bar_type,
4427                    cmd.client_id,
4428                    cmd.venue,
4429                    UUID4::new(),
4430                    cmd.ts_init,
4431                    Some(cmd.command_id),
4432                    cmd.params.clone(),
4433                );
4434                self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Bars(
4435                    unsubscribe,
4436                )));
4437            }
4438        } else if cmd.bar_type.spec().price_type == PriceType::Last {
4439            let unsubscribe = UnsubscribeTrades::new(
4440                cmd.bar_type.instrument_id(),
4441                cmd.client_id,
4442                cmd.venue,
4443                UUID4::new(),
4444                cmd.ts_init,
4445                Some(cmd.command_id),
4446                cmd.params.clone(),
4447            );
4448            self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Trades(
4449                unsubscribe,
4450            )));
4451        } else {
4452            let unsubscribe = UnsubscribeQuotes::new(
4453                cmd.bar_type.instrument_id(),
4454                cmd.client_id,
4455                cmd.venue,
4456                UUID4::new(),
4457                cmd.ts_init,
4458                Some(cmd.command_id),
4459                cmd.params.clone(),
4460            );
4461            self.execute(DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(
4462                unsubscribe,
4463            )));
4464        }
4465    }
4466
4467    fn stop_bar_aggregator(
4468        &mut self,
4469        bar_type: BarType,
4470        request_id: Option<UUID4>,
4471    ) -> anyhow::Result<()> {
4472        let key = bar_aggregator_key(bar_type, request_id);
4473        let aggregator = self.bar_aggregators.shift_remove(&key).ok_or_else(|| {
4474            anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
4475        })?;
4476
4477        aggregator.borrow_mut().stop();
4478
4479        // Unsubscribe any registered message handlers
4480        if let Some(subs) = self.bar_aggregator_handlers.remove(&key) {
4481            for sub in subs {
4482                match sub {
4483                    BarAggregatorSubscription::Bar { topic, handler } => {
4484                        msgbus::unsubscribe_bars(topic.into(), &handler);
4485                    }
4486                    BarAggregatorSubscription::Trade { topic, handler } => {
4487                        msgbus::unsubscribe_trades(topic.into(), &handler);
4488                    }
4489                    BarAggregatorSubscription::Quote { topic, handler } => {
4490                        msgbus::unsubscribe_quotes(topic.into(), &handler);
4491                    }
4492                }
4493            }
4494        }
4495
4496        Ok(())
4497    }
4498
4499    fn subscribe_continuous_future_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
4500        let target_bar_type = cmd.bar_type;
4501        let target_key = target_bar_type.standard();
4502
4503        if !target_bar_type.is_internally_aggregated() {
4504            anyhow::bail!(
4505                "Continuous future bar subscriptions require an internally aggregated target, was {target_bar_type}"
4506            );
4507        }
4508
4509        if self.continuous_future_roller.is_none() {
4510            anyhow::bail!(
4511                "Cannot subscribe continuous future bars for {target_bar_type}: roller is not initialized; ensure `register_msgbus_handlers` runs before subscribing"
4512            );
4513        }
4514
4515        let request = continuous_future_subscription_from_bars(cmd)?.ok_or_else(|| {
4516            anyhow::anyhow!(
4517                "Continuous future bar subscription requires `continuous_future_transitions`, was {cmd:?}"
4518            )
4519        })?;
4520
4521        self.ensure_continuous_future_target_instrument(&request);
4522
4523        if self
4524            .continuous_future_subscriptions
4525            .contains_key(&target_key)
4526        {
4527            log::warn!("Continuous future bars already subscribed for {target_bar_type}");
4528            return Ok(());
4529        }
4530
4531        let aggregator_key = bar_aggregator_key(target_bar_type, None);
4532        if let Some(aggregator) = self.bar_aggregators.get(&aggregator_key)
4533            && aggregator.borrow().is_running()
4534        {
4535            log::warn!(
4536                "Aggregator for {target_bar_type} is currently in use, continuous future subscription can't be started"
4537            );
4538            return Ok(());
4539        }
4540
4541        self.create_bar_aggregator_for_key(target_bar_type, None)?;
4542        self.setup_bar_aggregator(target_bar_type, false, None)?;
4543
4544        let now_ns = self.clock.borrow().timestamp_ns().as_u64();
4545        let Some(segment) = request.next_segment(now_ns, now_ns) else {
4546            log::error!("Cannot determine active continuous future segment for {target_bar_type}");
4547            if let Err(e) = self.stop_bar_aggregator(target_bar_type, None) {
4548                log::error!(
4549                    "Error rolling back continuous future aggregator for {target_bar_type}: {e}"
4550                );
4551            }
4552            return Ok(());
4553        };
4554
4555        self.apply_continuous_future_subscription_adjustment(&request, segment.index)?;
4556        let source = request.source_for_segment(segment.instrument_id);
4557        let source_subscription =
4558            self.subscribe_continuous_future_source(target_bar_type, source, segment.instrument_id);
4559
4560        if let Some(aggregator) = self.bar_aggregators.get(&aggregator_key) {
4561            aggregator.borrow_mut().set_is_running(true);
4562        }
4563
4564        let next_transition_index =
4565            (segment.index < request.transitions.len()).then_some(segment.index);
4566
4567        self.continuous_future_subscriptions.insert(
4568            target_key,
4569            ContinuousFutureSubscriptionState {
4570                target_bar_type,
4571                client_id: cmd.client_id,
4572                venue: cmd.venue,
4573                command_id: cmd.command_id,
4574                params: cmd.params.clone(),
4575                request,
4576                active_segment_instrument_id: segment.instrument_id,
4577                active_source: source,
4578                active_source_subscription: Some(source_subscription),
4579                next_transition_index,
4580                timer_name: None,
4581            },
4582        );
4583
4584        let child_cmd = self.build_continuous_future_subscribe_command(
4585            &target_key,
4586            source,
4587            segment.instrument_id,
4588            cmd.command_id,
4589            cmd.ts_init,
4590            true,
4591        );
4592
4593        if let Some(child) = child_cmd {
4594            self.execute(child);
4595        }
4596
4597        self.schedule_continuous_future_transition(target_key);
4598
4599        Ok(())
4600    }
4601
4602    fn unsubscribe_continuous_future_bars(&mut self, cmd: &UnsubscribeBars) {
4603        let target_key = cmd.bar_type.standard();
4604        let Some(mut state) = self.continuous_future_subscriptions.remove(&target_key) else {
4605            log::warn!(
4606                "Cannot unsubscribe continuous future bars: no subscription state for {target_key}"
4607            );
4608            return;
4609        };
4610
4611        if let Some(name) = state.timer_name.take() {
4612            self.clock.borrow_mut().cancel_timer(&name);
4613        }
4614
4615        let ts_init = self.clock.borrow().timestamp_ns();
4616        let segment_instrument_id = state.active_segment_instrument_id;
4617        let source = state.active_source;
4618        let source_subscription = state.active_source_subscription.take();
4619        let client_id = state.client_id;
4620        let venue = state.venue;
4621        let params = state.params.clone();
4622        let target_bar_type = state.target_bar_type;
4623        drop(state);
4624
4625        if let Some(subscription) = source_subscription {
4626            self.unsubscribe_continuous_future_source(target_bar_type, subscription);
4627        }
4628
4629        let child_cmd = build_continuous_future_unsubscribe_command(
4630            source,
4631            segment_instrument_id,
4632            client_id,
4633            venue,
4634            params.as_ref(),
4635            cmd.command_id,
4636            ts_init,
4637        );
4638        self.execute(child_cmd);
4639
4640        if let Err(e) = self.stop_bar_aggregator(target_bar_type, None) {
4641            log::error!("Error stopping continuous future aggregator for {target_bar_type}: {e}");
4642        }
4643    }
4644
4645    fn handle_continuous_future_subscription_transition(&mut self, event: &TimeEvent) {
4646        let event_name = event.name.as_str();
4647        let Some((target_key, transition_index)) = parse_transition_timer_name(event_name) else {
4648            log::warn!(
4649                "Ignoring continuous future transition event with unparsable name {event_name}"
4650            );
4651            return;
4652        };
4653
4654        let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) else {
4655            log::warn!(
4656                "Ignoring continuous future transition event {event_name}: no subscription state for {target_key}"
4657            );
4658            return;
4659        };
4660
4661        if state.timer_name.as_deref() != Some(event_name) {
4662            return;
4663        }
4664        state.timer_name = None;
4665
4666        let Some(next_index) = state.next_transition_index else {
4667            return;
4668        };
4669
4670        if next_index != transition_index || next_index >= state.request.transitions.len() {
4671            return;
4672        }
4673
4674        let prev_segment_instrument_id = state.active_segment_instrument_id;
4675        let next_segment_instrument_id = state.request.transitions[next_index].post_instrument_id;
4676        let new_segment_index = next_index + 1;
4677        state.active_segment_instrument_id = next_segment_instrument_id;
4678        state.next_transition_index =
4679            (new_segment_index < state.request.transitions.len()).then_some(new_segment_index);
4680
4681        let old_source = state.active_source;
4682        let old_source_subscription = state.active_source_subscription.take();
4683        let client_id = state.client_id;
4684        let venue = state.venue;
4685        let params = state.params.clone();
4686        let command_id = state.command_id;
4687        let target_bar_type = state.target_bar_type;
4688
4689        let ts_init = self.clock.borrow().timestamp_ns();
4690
4691        if let Some(subscription) = old_source_subscription {
4692            self.unsubscribe_continuous_future_source(target_bar_type, subscription);
4693        }
4694
4695        let unsub_child = build_continuous_future_unsubscribe_command(
4696            old_source,
4697            prev_segment_instrument_id,
4698            client_id,
4699            venue,
4700            params.as_ref(),
4701            command_id,
4702            ts_init,
4703        );
4704        self.execute(unsub_child);
4705
4706        if let Err(e) = self
4707            .apply_continuous_future_subscription_adjustment_for(target_bar_type, new_segment_index)
4708        {
4709            log::error!("Error applying continuous future adjustment for {target_bar_type}: {e}");
4710            return;
4711        }
4712
4713        let new_source = {
4714            let Some(state) = self.continuous_future_subscriptions.get(&target_key) else {
4715                return;
4716            };
4717            state.request.source_for_segment(next_segment_instrument_id)
4718        };
4719        let new_subscription = self.subscribe_continuous_future_source(
4720            target_bar_type,
4721            new_source,
4722            next_segment_instrument_id,
4723        );
4724
4725        if let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) {
4726            state.active_source = new_source;
4727            state.active_source_subscription = Some(new_subscription);
4728        }
4729
4730        let sub_child = self.build_continuous_future_subscribe_command(
4731            &target_key,
4732            new_source,
4733            next_segment_instrument_id,
4734            command_id,
4735            ts_init,
4736            true,
4737        );
4738
4739        if let Some(child) = sub_child {
4740            self.execute(child);
4741        }
4742
4743        self.schedule_continuous_future_transition(target_key);
4744    }
4745
4746    fn apply_continuous_future_subscription_adjustment(
4747        &self,
4748        request: &ContinuousFutureRequest,
4749        segment_index: usize,
4750    ) -> anyhow::Result<()> {
4751        let key = bar_aggregator_key(request.primary_bar_type, None);
4752        let aggregator = self.bar_aggregators.get(&key).ok_or_else(|| {
4753            anyhow::anyhow!(
4754                "No live aggregator for continuous future subscription {}",
4755                request.primary_bar_type
4756            )
4757        })?;
4758        let adjustment = request.adjustment_for_segment(segment_index);
4759        aggregator
4760            .borrow_mut()
4761            .set_adjustment(adjustment, request.adjustment_mode);
4762        Ok(())
4763    }
4764
4765    fn apply_continuous_future_subscription_adjustment_for(
4766        &self,
4767        target_bar_type: BarType,
4768        segment_index: usize,
4769    ) -> anyhow::Result<()> {
4770        let Some(state) = self
4771            .continuous_future_subscriptions
4772            .get(&target_bar_type.standard())
4773        else {
4774            anyhow::bail!("No continuous future subscription state for {target_bar_type}");
4775        };
4776        self.apply_continuous_future_subscription_adjustment(&state.request, segment_index)
4777    }
4778
4779    fn subscribe_continuous_future_source(
4780        &mut self,
4781        target_bar_type: BarType,
4782        source: ContinuousFutureSource,
4783        segment_instrument_id: InstrumentId,
4784    ) -> BarAggregatorSubscription {
4785        let key = bar_aggregator_key(target_bar_type, None);
4786        let aggregator = self
4787            .bar_aggregators
4788            .get(&key)
4789            .cloned()
4790            .expect("aggregator was created before subscribe_continuous_future_source");
4791
4792        let subscription = match source {
4793            ContinuousFutureSource::Bars(source_bar_type) => {
4794                let topic = switchboard::get_bars_topic(source_bar_type);
4795                let handler =
4796                    TypedHandler::new(BarBarHandler::new(&aggregator, target_bar_type.standard()));
4797                msgbus::subscribe_bars(topic.into(), handler.clone(), None);
4798                BarAggregatorSubscription::Bar { topic, handler }
4799            }
4800            ContinuousFutureSource::Trades => {
4801                let topic = switchboard::get_trades_topic(segment_instrument_id);
4802                let handler = TypedHandler::new(BarTradeHandler::new(
4803                    &aggregator,
4804                    target_bar_type.standard(),
4805                ));
4806                msgbus::subscribe_trades(
4807                    topic.into(),
4808                    handler.clone(),
4809                    Some(BAR_AGGREGATOR_PRIORITY),
4810                );
4811                BarAggregatorSubscription::Trade { topic, handler }
4812            }
4813            ContinuousFutureSource::Quotes => {
4814                let topic = switchboard::get_quotes_topic(segment_instrument_id);
4815                let handler = TypedHandler::new(BarQuoteHandler::new(
4816                    &aggregator,
4817                    target_bar_type.standard(),
4818                ));
4819                msgbus::subscribe_quotes(
4820                    topic.into(),
4821                    handler.clone(),
4822                    Some(BAR_AGGREGATOR_PRIORITY),
4823                );
4824                BarAggregatorSubscription::Quote { topic, handler }
4825            }
4826        };
4827
4828        self.bar_aggregator_handlers
4829            .entry(key)
4830            .or_default()
4831            .push(subscription.clone());
4832
4833        subscription
4834    }
4835
4836    fn unsubscribe_continuous_future_source(
4837        &mut self,
4838        target_bar_type: BarType,
4839        subscription: BarAggregatorSubscription,
4840    ) {
4841        let key = bar_aggregator_key(target_bar_type, None);
4842        if let Some(subs) = self.bar_aggregator_handlers.get_mut(&key) {
4843            subs.retain(|registered| !same_subscription(registered, &subscription));
4844        }
4845
4846        match subscription {
4847            BarAggregatorSubscription::Bar { topic, handler } => {
4848                msgbus::unsubscribe_bars(topic.into(), &handler);
4849            }
4850            BarAggregatorSubscription::Trade { topic, handler } => {
4851                msgbus::unsubscribe_trades(topic.into(), &handler);
4852            }
4853            BarAggregatorSubscription::Quote { topic, handler } => {
4854                msgbus::unsubscribe_quotes(topic.into(), &handler);
4855            }
4856        }
4857    }
4858
4859    fn build_continuous_future_subscribe_command(
4860        &self,
4861        target_key: &BarType,
4862        source: ContinuousFutureSource,
4863        segment_instrument_id: InstrumentId,
4864        command_id: UUID4,
4865        ts_init: UnixNanos,
4866        subscribe: bool,
4867    ) -> Option<DataCommand> {
4868        let state = self.continuous_future_subscriptions.get(target_key)?;
4869
4870        if !subscribe {
4871            return Some(build_continuous_future_unsubscribe_command(
4872                source,
4873                segment_instrument_id,
4874                state.client_id,
4875                state.venue,
4876                state.params.as_ref(),
4877                command_id,
4878                ts_init,
4879            ));
4880        }
4881
4882        let child_params = state
4883            .request
4884            .child_params(state.params.as_ref(), command_id);
4885
4886        Some(build_continuous_future_subscribe_inner(
4887            source,
4888            segment_instrument_id,
4889            state.client_id,
4890            state.venue,
4891            child_params,
4892            command_id,
4893            ts_init,
4894        ))
4895    }
4896
4897    fn schedule_continuous_future_transition(&mut self, target_key: BarType) {
4898        let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) else {
4899            return;
4900        };
4901
4902        if let Some(name) = state.timer_name.take() {
4903            self.clock.borrow_mut().cancel_timer(&name);
4904        }
4905
4906        let Some(transition_index) = state.next_transition_index else {
4907            return;
4908        };
4909        let Some(row) = state.request.transitions.get(transition_index) else {
4910            return;
4911        };
4912        let transition_ns = row.transition_time_ns;
4913        let timer_name = format!("continuous-future-roll:{target_key}:{transition_index}");
4914
4915        let Some(roller) = self.continuous_future_roller.clone() else {
4916            log::error!(
4917                "Cannot schedule continuous future transition timer for {target_key}: roller not initialized"
4918            );
4919            return;
4920        };
4921
4922        let callback_fn: Rc<dyn Fn(TimeEvent)> =
4923            Rc::new(move |event| roller.handle_transition(&event));
4924        let callback = TimeEventCallback::from(callback_fn);
4925
4926        if let Err(e) = self.clock.borrow_mut().set_time_alert_ns(
4927            &timer_name,
4928            UnixNanos::from(transition_ns),
4929            Some(callback),
4930            Some(true),
4931        ) {
4932            log::error!("Failed to schedule continuous future transition {timer_name}: {e}");
4933            return;
4934        }
4935
4936        if let Some(state) = self.continuous_future_subscriptions.get_mut(&target_key) {
4937            state.timer_name = Some(timer_name);
4938        }
4939    }
4940}
4941
4942// Resolves parent expansion components for a book subscription command.
4943//
4944// Returns Ok(Some((root, class))) when params carries PARAMS_IS_PARENT=true and
4945// the instrument_id parses as a recognised <root>.<class> shape; Ok(None) for
4946// concrete (non-parent) subscriptions; Err when the caller asserts a parent
4947// subscription but the id cannot be parsed, so subscribe entries can reject up
4948// front before touching state.
4949fn resolve_parent_components(
4950    instrument_id: &InstrumentId,
4951    params: Option<&Params>,
4952) -> anyhow::Result<Option<(Ustr, InstrumentClass)>> {
4953    if !is_parent_subscription(params) {
4954        return Ok(None);
4955    }
4956    let Some((root, class)) = instrument_id.parse_parent_components() else {
4957        anyhow::bail!(
4958            "Cannot expand parent subscription for {instrument_id}: \
4959             symbol does not parse as `<root>.<class>` with a recognised class suffix"
4960        );
4961    };
4962    Ok(Some((Ustr::from(root), class)))
4963}
4964
4965fn spread_quote_update_interval_seconds(params: Option<&Params>) -> Option<u64> {
4966    match params.and_then(|params| params.get("update_interval_seconds")) {
4967        Some(value) if value.is_null() => None,
4968        Some(value) => value.as_u64().filter(|interval| *interval > 0),
4969        None => Some(1),
4970    }
4971}
4972
4973const GENERIC_SPREAD_ID_SEPARATOR: &str = "___";
4974
4975fn spread_instrument_legs(instrument: &InstrumentAny) -> Option<Vec<(InstrumentId, i64)>> {
4976    if !instrument.is_spread() {
4977        return None;
4978    }
4979
4980    let instrument_id = instrument.id();
4981    let symbol = instrument_id.symbol.as_str();
4982    if !symbol.contains(GENERIC_SPREAD_ID_SEPARATOR) {
4983        return Some(vec![(instrument_id, 1)]);
4984    }
4985
4986    symbol
4987        .split(GENERIC_SPREAD_ID_SEPARATOR)
4988        .map(|component| parse_spread_leg(component, instrument_id.venue))
4989        .collect()
4990}
4991
4992fn parse_spread_leg(component: &str, venue: Venue) -> Option<(InstrumentId, i64)> {
4993    if let Some(rest) = component.strip_prefix("((") {
4994        let (ratio, symbol) = rest.split_once("))")?;
4995        return parse_spread_leg_parts(ratio, symbol, venue, -1);
4996    }
4997
4998    let rest = component.strip_prefix('(')?;
4999    let (ratio, symbol) = rest.split_once(')')?;
5000    parse_spread_leg_parts(ratio, symbol, venue, 1)
5001}
5002
5003fn parse_spread_leg_parts(
5004    ratio: &str,
5005    symbol: &str,
5006    venue: Venue,
5007    sign: i64,
5008) -> Option<(InstrumentId, i64)> {
5009    if symbol.is_empty() {
5010        return None;
5011    }
5012
5013    let ratio = ratio.parse::<i64>().ok()?.checked_mul(sign)?;
5014    if ratio == 0 {
5015        return None;
5016    }
5017
5018    Some((InstrumentId::new(Symbol::new(symbol), venue), ratio))
5019}
5020
5021#[inline(always)]
5022fn log_error_on_cache_insert<T: Display>(e: &T) {
5023    log::error!("Error on cache insert: {e}");
5024}
5025
5026/// Routes continuous-future transition timer events back to the engine.
5027///
5028/// The clock owns the timer's callback closure; the closure must be able to
5029/// call back into the engine without creating an Rc cycle. The roller holds a
5030/// weak reference to the engine and upgrades on each fire.
5031#[derive(Debug)]
5032struct ContinuousFutureRoller {
5033    engine: WeakCell<DataEngine>,
5034}
5035
5036impl ContinuousFutureRoller {
5037    fn new(engine: &Rc<RefCell<DataEngine>>) -> Self {
5038        Self {
5039            engine: WeakCell::from(Rc::downgrade(engine)),
5040        }
5041    }
5042
5043    fn handle_transition(&self, event: &TimeEvent) {
5044        if let Some(engine) = self.engine.upgrade() {
5045            engine
5046                .borrow_mut()
5047                .handle_continuous_future_subscription_transition(event);
5048        }
5049    }
5050}
5051
5052#[derive(Debug)]
5053struct ContinuousFutureSubscriptionState {
5054    target_bar_type: BarType,
5055    client_id: Option<ClientId>,
5056    venue: Option<Venue>,
5057    command_id: UUID4,
5058    params: Option<Params>,
5059    request: ContinuousFutureRequest,
5060    active_segment_instrument_id: InstrumentId,
5061    active_source: ContinuousFutureSource,
5062    active_source_subscription: Option<BarAggregatorSubscription>,
5063    next_transition_index: Option<usize>,
5064    timer_name: Option<String>,
5065}
5066
5067fn same_subscription(a: &BarAggregatorSubscription, b: &BarAggregatorSubscription) -> bool {
5068    match (a, b) {
5069        (
5070            BarAggregatorSubscription::Bar { handler: h1, .. },
5071            BarAggregatorSubscription::Bar { handler: h2, .. },
5072        ) => h1.id() == h2.id(),
5073        (
5074            BarAggregatorSubscription::Trade { handler: h1, .. },
5075            BarAggregatorSubscription::Trade { handler: h2, .. },
5076        ) => h1.id() == h2.id(),
5077        (
5078            BarAggregatorSubscription::Quote { handler: h1, .. },
5079            BarAggregatorSubscription::Quote { handler: h2, .. },
5080        ) => h1.id() == h2.id(),
5081        _ => false,
5082    }
5083}
5084
5085fn parse_transition_timer_name(name: &str) -> Option<(BarType, usize)> {
5086    let rest = name.strip_prefix("continuous-future-roll:")?;
5087    let (target, index) = rest.rsplit_once(':')?;
5088    let bar_type = BarType::from_str(target).ok()?;
5089    let index = index.parse::<usize>().ok()?;
5090    Some((bar_type, index))
5091}
5092
5093fn build_continuous_future_subscribe_inner(
5094    source: ContinuousFutureSource,
5095    segment_instrument_id: InstrumentId,
5096    client_id: Option<ClientId>,
5097    _venue: Option<Venue>,
5098    child_params: Params,
5099    correlation_id: UUID4,
5100    ts_init: UnixNanos,
5101) -> DataCommand {
5102    let command_id = UUID4::new();
5103    let child_venue = Some(segment_instrument_id.venue);
5104
5105    match source {
5106        ContinuousFutureSource::Bars(source_bar_type) => {
5107            DataCommand::Subscribe(SubscribeCommand::Bars(SubscribeBars::new(
5108                source_bar_type,
5109                client_id,
5110                child_venue,
5111                command_id,
5112                ts_init,
5113                Some(correlation_id),
5114                Some(child_params),
5115            )))
5116        }
5117        ContinuousFutureSource::Trades => {
5118            DataCommand::Subscribe(SubscribeCommand::Trades(SubscribeTrades::new(
5119                segment_instrument_id,
5120                client_id,
5121                child_venue,
5122                command_id,
5123                ts_init,
5124                Some(correlation_id),
5125                Some(child_params),
5126            )))
5127        }
5128        ContinuousFutureSource::Quotes => {
5129            DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
5130                segment_instrument_id,
5131                client_id,
5132                child_venue,
5133                command_id,
5134                ts_init,
5135                Some(correlation_id),
5136                Some(child_params),
5137            )))
5138        }
5139    }
5140}
5141
5142fn build_continuous_future_unsubscribe_command(
5143    source: ContinuousFutureSource,
5144    segment_instrument_id: InstrumentId,
5145    client_id: Option<ClientId>,
5146    _venue: Option<Venue>,
5147    parent_params: Option<&Params>,
5148    correlation_id: UUID4,
5149    ts_init: UnixNanos,
5150) -> DataCommand {
5151    let mut child_params = parent_params.cloned().unwrap_or_default();
5152    child_params.shift_remove("continuous_future_transitions");
5153    child_params.shift_remove("continuous_future_adjustment_mode");
5154    child_params.shift_remove("last_post_instrument_id");
5155    child_params.shift_remove("first_pre_instrument_id");
5156    child_params.shift_remove("bar_types");
5157    let command_id = UUID4::new();
5158    let child_venue = Some(segment_instrument_id.venue);
5159
5160    match source {
5161        ContinuousFutureSource::Bars(source_bar_type) => {
5162            DataCommand::Unsubscribe(UnsubscribeCommand::Bars(UnsubscribeBars::new(
5163                source_bar_type,
5164                client_id,
5165                child_venue,
5166                command_id,
5167                ts_init,
5168                Some(correlation_id),
5169                Some(child_params),
5170            )))
5171        }
5172        ContinuousFutureSource::Trades => {
5173            DataCommand::Unsubscribe(UnsubscribeCommand::Trades(UnsubscribeTrades::new(
5174                segment_instrument_id,
5175                client_id,
5176                child_venue,
5177                command_id,
5178                ts_init,
5179                Some(correlation_id),
5180                Some(child_params),
5181            )))
5182        }
5183        ContinuousFutureSource::Quotes => {
5184            DataCommand::Unsubscribe(UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
5185                segment_instrument_id,
5186                client_id,
5187                child_venue,
5188                command_id,
5189                ts_init,
5190                Some(correlation_id),
5191                Some(child_params),
5192            )))
5193        }
5194    }
5195}
5196
5197fn datetime_to_unix_nanos(datetime: chrono::DateTime<chrono::Utc>) -> anyhow::Result<UnixNanos> {
5198    let timestamp = datetime
5199        .timestamp_nanos_opt()
5200        .ok_or_else(|| anyhow::anyhow!("datetime is outside the supported nanosecond range"))?;
5201    let timestamp = u64::try_from(timestamp)
5202        .context("datetime is before the UNIX epoch and cannot be represented as UnixNanos")?;
5203    Ok(UnixNanos::from(timestamp))
5204}
5205
5206// Top-of-book `QuoteTick` from an `OrderBookDepth10`. Returns `None` for
5207// `NoOrderSide` padding or zero size.
5208fn derive_quote_from_depth(depth: &OrderBookDepth10) -> Option<QuoteTick> {
5209    let bid = depth.bids.first()?;
5210    let ask = depth.asks.first()?;
5211
5212    if bid.side == OrderSide::NoOrderSide
5213        || ask.side == OrderSide::NoOrderSide
5214        || bid.size.raw == 0
5215        || ask.size.raw == 0
5216    {
5217        return None;
5218    }
5219
5220    Some(QuoteTick::new(
5221        depth.instrument_id,
5222        bid.price,
5223        ask.price,
5224        bid.size,
5225        ask.size,
5226        depth.ts_event,
5227        depth.ts_init,
5228    ))
5229}
5230
5231// Validates a bar against `last_bar` before writing and (optionally) publishing.
5232// Shared by `handle_bar` and aggregator-emitted bars so both honour
5233// `validate_data_sequence`.
5234fn process_engine_bar(
5235    cache: &Rc<RefCell<Cache>>,
5236    validate_sequence: bool,
5237    publish: bool,
5238    bar: Bar,
5239) {
5240    if !validate_bar_sequence(cache, validate_sequence, &bar) {
5241        return;
5242    }
5243
5244    if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
5245        log_error_on_cache_insert(&e);
5246    }
5247
5248    if publish {
5249        let topic = switchboard::get_bars_topic(bar.bar_type);
5250        msgbus::publish_bar(topic, &bar);
5251    }
5252}
5253
5254fn validate_bar_sequence(cache: &Rc<RefCell<Cache>>, validate_sequence: bool, bar: &Bar) -> bool {
5255    if !validate_sequence {
5256        return true;
5257    }
5258
5259    let Some(last_bar) = cache.as_ref().borrow().bar(&bar.bar_type).copied() else {
5260        return true;
5261    };
5262
5263    if bar.ts_event < last_bar.ts_event {
5264        log::warn!(
5265            "Bar {bar} was prior to last bar `ts_event` {}",
5266            last_bar.ts_event,
5267        );
5268        return false;
5269    }
5270
5271    if bar.ts_init < last_bar.ts_init {
5272        log::warn!(
5273            "Bar {bar} was prior to last bar `ts_init` {}",
5274            last_bar.ts_init,
5275        );
5276        return false;
5277    }
5278
5279    // Bar revision overwrite needs a `Bar.is_revision` field on the model;
5280    // not present today. Tracked under #8 in the data engine parity plan
5281    true
5282}
5283
5284#[inline(always)]
5285fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
5286    if data.is_empty() {
5287        let name = type_name::<T>();
5288        let short_name = name.rsplit("::").next().unwrap_or(name);
5289        log::warn!("Received empty {short_name} response for {id} {correlation_id}");
5290        return true;
5291    }
5292    false
5293}
5294
5295/// Concatenates same-variant leg payloads into a single rebuilt response keyed by `parent_id`.
5296///
5297/// Returns `None` when legs are mixed-variant or empty; pipelines only group legs of the same
5298/// variant. The rebuilt response inherits `start` and `end` from the parent request when the
5299/// parent is a `RequestJoin`; otherwise leg bounds are preserved on the first leg.
5300fn rebuild_pipeline_response(
5301    parent_id: UUID4,
5302    parent: Option<&RequestCommand>,
5303    legs: Vec<DataResponse>,
5304) -> Option<DataResponse> {
5305    if legs.is_empty() {
5306        return None;
5307    }
5308
5309    let (parent_start, parent_end) = parent_request_window(parent);
5310
5311    let mut iter = legs.into_iter();
5312    let first = iter.next()?;
5313
5314    match first {
5315        DataResponse::Data(mut acc) => {
5316            let mut data = custom_response_data(&acc, parent_id)?;
5317
5318            for leg in iter {
5319                let DataResponse::Data(other) = leg else {
5320                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5321                    return None;
5322                };
5323                data.extend(custom_response_data(&other, parent_id)?);
5324            }
5325
5326            data.sort_by_key(CustomData::ts_init);
5327            acc.data = std::sync::Arc::new(data);
5328            acc.correlation_id = parent_id;
5329            if parent_start.is_some() {
5330                acc.start = parent_start;
5331            }
5332
5333            if parent_end.is_some() {
5334                acc.end = parent_end;
5335            }
5336            Some(DataResponse::Data(acc))
5337        }
5338        DataResponse::Quotes(mut acc) => {
5339            for leg in iter {
5340                let DataResponse::Quotes(other) = leg else {
5341                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5342                    return None;
5343                };
5344                acc.data.extend(other.data);
5345            }
5346            acc.data.sort_by_key(|q| q.ts_init);
5347            acc.correlation_id = parent_id;
5348            if parent_start.is_some() {
5349                acc.start = parent_start;
5350            }
5351
5352            if parent_end.is_some() {
5353                acc.end = parent_end;
5354            }
5355            Some(DataResponse::Quotes(acc))
5356        }
5357        DataResponse::Trades(mut acc) => {
5358            for leg in iter {
5359                let DataResponse::Trades(other) = leg else {
5360                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5361                    return None;
5362                };
5363                acc.data.extend(other.data);
5364            }
5365            acc.data.sort_by_key(|t| t.ts_init);
5366            acc.correlation_id = parent_id;
5367            if parent_start.is_some() {
5368                acc.start = parent_start;
5369            }
5370
5371            if parent_end.is_some() {
5372                acc.end = parent_end;
5373            }
5374            Some(DataResponse::Trades(acc))
5375        }
5376        DataResponse::FundingRates(mut acc) => {
5377            for leg in iter {
5378                let DataResponse::FundingRates(other) = leg else {
5379                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5380                    return None;
5381                };
5382                acc.data.extend(other.data);
5383            }
5384            acc.data.sort_by_key(|r| r.ts_init);
5385            acc.correlation_id = parent_id;
5386            if parent_start.is_some() {
5387                acc.start = parent_start;
5388            }
5389
5390            if parent_end.is_some() {
5391                acc.end = parent_end;
5392            }
5393            Some(DataResponse::FundingRates(acc))
5394        }
5395        DataResponse::Bars(mut acc) => {
5396            for leg in iter {
5397                let DataResponse::Bars(other) = leg else {
5398                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5399                    return None;
5400                };
5401                acc.data.extend(other.data);
5402            }
5403            acc.data.sort_by_key(|b| b.ts_init);
5404            acc.correlation_id = parent_id;
5405            if parent_start.is_some() {
5406                acc.start = parent_start;
5407            }
5408
5409            if parent_end.is_some() {
5410                acc.end = parent_end;
5411            }
5412            Some(DataResponse::Bars(acc))
5413        }
5414        DataResponse::Instruments(mut acc) => {
5415            for leg in iter {
5416                let DataResponse::Instruments(other) = leg else {
5417                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5418                    return None;
5419                };
5420                acc.data.extend(other.data);
5421            }
5422            acc.correlation_id = parent_id;
5423            Some(DataResponse::Instruments(acc))
5424        }
5425        DataResponse::BookDeltas(mut acc) => {
5426            for leg in iter {
5427                let DataResponse::BookDeltas(other) = leg else {
5428                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5429                    return None;
5430                };
5431                acc.data.extend(other.data);
5432            }
5433            acc.data.sort_by_key(|d| d.ts_init);
5434            acc.correlation_id = parent_id;
5435            if parent_start.is_some() {
5436                acc.start = parent_start;
5437            }
5438
5439            if parent_end.is_some() {
5440                acc.end = parent_end;
5441            }
5442            Some(DataResponse::BookDeltas(acc))
5443        }
5444        DataResponse::BookDepth(mut acc) => {
5445            for leg in iter {
5446                let DataResponse::BookDepth(other) = leg else {
5447                    log::error!("Mixed-variant legs in pipeline {parent_id}");
5448                    return None;
5449                };
5450                acc.data.extend(other.data);
5451            }
5452            acc.data.sort_by_key(|d| d.ts_init);
5453            acc.correlation_id = parent_id;
5454            if parent_start.is_some() {
5455                acc.start = parent_start;
5456            }
5457
5458            if parent_end.is_some() {
5459                acc.end = parent_end;
5460            }
5461            Some(DataResponse::BookDepth(acc))
5462        }
5463        other => {
5464            // Pipelines today rebuild same-variant time-series legs. Variants
5465            // without a per-item ts_init payload (singular Book/Instrument,
5466            // ForwardPrices) cannot be concatenated and would
5467            // otherwise leak a leg-keyed response. Drop rather than forward.
5468            log::error!(
5469                "Pipeline rebuild not supported for variant {} (parent {parent_id})",
5470                other.kind(),
5471            );
5472            None
5473        }
5474    }
5475}
5476
5477fn custom_response_data(resp: &CustomDataResponse, parent_id: UUID4) -> Option<Vec<CustomData>> {
5478    if let Some(data) = resp.data.as_ref().downcast_ref::<Vec<CustomData>>() {
5479        return Some(data.clone());
5480    }
5481
5482    if let Some(data) = resp.data.as_ref().downcast_ref::<CustomData>() {
5483        return Some(vec![data.clone()]);
5484    }
5485
5486    if let Some(data) = resp.data.as_ref().downcast_ref::<Vec<Data>>() {
5487        let mut custom = Vec::with_capacity(data.len());
5488        for item in data {
5489            let Data::Custom(value) = item else {
5490                log::error!("Custom data pipeline {parent_id} received non-custom data {item:?}");
5491                return None;
5492            };
5493            custom.push(value.clone());
5494        }
5495        return Some(custom);
5496    }
5497
5498    log::error!(
5499        "Custom data pipeline {parent_id} received unsupported payload for {}",
5500        resp.data_type,
5501    );
5502    None
5503}
5504
5505fn parent_request_window(
5506    parent: Option<&RequestCommand>,
5507) -> (Option<UnixNanos>, Option<UnixNanos>) {
5508    let Some(parent) = parent else {
5509        return (None, None);
5510    };
5511
5512    let (start, end) = match parent {
5513        RequestCommand::Data(cmd) => (cmd.start, cmd.end),
5514        RequestCommand::Instrument(cmd) => (cmd.start, cmd.end),
5515        RequestCommand::Instruments(cmd) => (cmd.start, cmd.end),
5516        RequestCommand::BookDeltas(cmd) => (cmd.start, cmd.end),
5517        RequestCommand::BookDepth(cmd) => (cmd.start, cmd.end),
5518        RequestCommand::Quotes(cmd) => (cmd.start, cmd.end),
5519        RequestCommand::Trades(cmd) => (cmd.start, cmd.end),
5520        RequestCommand::FundingRates(cmd) => (cmd.start, cmd.end),
5521        RequestCommand::Bars(cmd) => (cmd.start, cmd.end),
5522        RequestCommand::Join(cmd) => (cmd.start, cmd.end),
5523        RequestCommand::BookSnapshot(_) | RequestCommand::ForwardPrices(_) => return (None, None),
5524    };
5525
5526    (
5527        start.map(datetime_to_unix_nanos_or_zero),
5528        end.map(datetime_to_unix_nanos_or_zero),
5529    )
5530}
5531
5532fn datetime_to_unix_nanos_or_zero(dt: chrono::DateTime<chrono::Utc>) -> UnixNanos {
5533    UnixNanos::from(u64::try_from(dt.timestamp_nanos_opt().unwrap_or(0).max(0)).unwrap_or(0))
5534}
5535
5536fn empty_response_like(
5537    template: &DataResponse,
5538    correlation_id: UUID4,
5539    ts_init: UnixNanos,
5540) -> DataResponse {
5541    match template {
5542        DataResponse::Quotes(r) => DataResponse::Quotes(QuotesResponse::new(
5543            correlation_id,
5544            r.client_id,
5545            r.instrument_id,
5546            Vec::new(),
5547            r.start,
5548            r.end,
5549            ts_init,
5550            r.params.clone(),
5551        )),
5552        DataResponse::Trades(r) => DataResponse::Trades(TradesResponse::new(
5553            correlation_id,
5554            r.client_id,
5555            r.instrument_id,
5556            Vec::new(),
5557            r.start,
5558            r.end,
5559            ts_init,
5560            r.params.clone(),
5561        )),
5562        DataResponse::FundingRates(r) => DataResponse::FundingRates(FundingRatesResponse::new(
5563            correlation_id,
5564            r.client_id,
5565            r.instrument_id,
5566            Vec::new(),
5567            r.start,
5568            r.end,
5569            ts_init,
5570            r.params.clone(),
5571        )),
5572        DataResponse::Bars(r) => DataResponse::Bars(BarsResponse::new(
5573            correlation_id,
5574            r.client_id,
5575            r.bar_type,
5576            Vec::new(),
5577            r.start,
5578            r.end,
5579            ts_init,
5580            r.params.clone(),
5581        )),
5582        DataResponse::BookDeltas(r) => DataResponse::BookDeltas(BookDeltasResponse::new(
5583            correlation_id,
5584            r.client_id,
5585            r.instrument_id,
5586            Vec::new(),
5587            r.start,
5588            r.end,
5589            ts_init,
5590            r.params.clone(),
5591        )),
5592        DataResponse::BookDepth(r) => DataResponse::BookDepth(BookDepthResponse::new(
5593            correlation_id,
5594            r.client_id,
5595            r.instrument_id,
5596            Vec::new(),
5597            r.start,
5598            r.end,
5599            ts_init,
5600            r.params.clone(),
5601        )),
5602        other => {
5603            log::error!(
5604                "Cannot fabricate empty leg response for variant {}",
5605                other.kind(),
5606            );
5607            other.clone()
5608        }
5609    }
5610}
5611
5612fn rebind_response_correlation(mut resp: DataResponse, new_id: UUID4) -> DataResponse {
5613    match &mut resp {
5614        DataResponse::Data(r) => r.correlation_id = new_id,
5615        DataResponse::Instrument(r) => r.correlation_id = new_id,
5616        DataResponse::Instruments(r) => r.correlation_id = new_id,
5617        DataResponse::Book(r) => r.correlation_id = new_id,
5618        DataResponse::BookDeltas(r) => r.correlation_id = new_id,
5619        DataResponse::BookDepth(r) => r.correlation_id = new_id,
5620        DataResponse::Quotes(r) => r.correlation_id = new_id,
5621        DataResponse::Trades(r) => r.correlation_id = new_id,
5622        DataResponse::FundingRates(r) => r.correlation_id = new_id,
5623        DataResponse::ForwardPrices(r) => r.correlation_id = new_id,
5624        DataResponse::Bars(r) => r.correlation_id = new_id,
5625    }
5626    resp
5627}