Skip to main content

nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    rc::Rc,
23    sync::Arc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use chrono::{DateTime, Utc};
28use indexmap::IndexMap;
29use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
30#[cfg(feature = "defi")]
31use nautilus_model::defi::{
32    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
33};
34use nautilus_model::{
35    data::{
36        Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
37        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
38        close::InstrumentClose,
39        option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
40    },
41    enums::BookType,
42    events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
43    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
44    instruments::{InstrumentAny, SyntheticInstrument},
45    orderbook::OrderBook,
46};
47use serde::{Deserialize, Serialize};
48use ustr::Ustr;
49
50use super::{
51    Actor,
52    indicators::{Indicators, SharedActorIndicator},
53    registry::{get_actor_unchecked, try_get_actor_unchecked},
54};
55#[cfg(feature = "defi")]
56use crate::defi;
57#[cfg(feature = "defi")]
58#[allow(unused_imports)]
59use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
60use crate::{
61    cache::{Cache, CacheApi},
62    clock::{Clock, ClockApi},
63    component::Component,
64    enums::{ComponentState, ComponentTrigger},
65    logging::{CMD, RECV, REQ, SEND},
66    messages::{
67        data::{
68            BarsResponse, BookDeltasResponse, BookDepthResponse, BookResponse, CustomDataResponse,
69            DataCommand, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
70            QuotesResponse, RequestBars, RequestBookDeltas, RequestBookDepth, RequestBookSnapshot,
71            RequestCommand, RequestCustomData, RequestFundingRates, RequestInstrument,
72            RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
73            SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
74            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
75            SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices,
76            SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades,
77            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
78            UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
79            UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
80            UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices,
81            UnsubscribeOptionChain, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
82            is_parent_subscription,
83        },
84        system::ShutdownSystem,
85    },
86    msgbus::{
87        self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
88        switchboard::{
89            MessagingSwitchboard, get_bars_topic, get_book_deltas_pattern, get_book_deltas_topic,
90            get_book_snapshots_topic, get_custom_topic, get_funding_rate_topic,
91            get_index_price_topic, get_instrument_close_topic, get_instrument_status_topic,
92            get_instrument_topic, get_instruments_pattern, get_mark_price_topic,
93            get_option_chain_topic, get_option_greeks_topic, get_order_canceled_topic,
94            get_order_filled_topic, get_quotes_topic, get_signal_pattern, get_trades_topic,
95        },
96    },
97    signal::Signal,
98    timer::{TimeEvent, TimeEventCallback},
99};
100
101/// Common configuration for [`DataActor`] based components.
102#[derive(Debug, Clone, Deserialize, Serialize)]
103#[serde(default, deny_unknown_fields)]
104#[cfg_attr(
105    feature = "python",
106    pyo3::pyclass(
107        module = "nautilus_trader.core.nautilus_pyo3.common",
108        subclass,
109        from_py_object
110    )
111)]
112#[cfg_attr(
113    feature = "python",
114    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
115)]
116pub struct DataActorConfig {
117    /// The custom identifier for the Actor.
118    pub actor_id: Option<ActorId>,
119    /// If events should be logged.
120    pub log_events: bool,
121    /// If commands should be logged.
122    pub log_commands: bool,
123}
124
125impl Default for DataActorConfig {
126    fn default() -> Self {
127        Self {
128            actor_id: None,
129            log_events: true,
130            log_commands: true,
131        }
132    }
133}
134
135/// Configuration for creating actors from importable paths.
136#[derive(Debug, Clone, Deserialize, Serialize)]
137#[serde(deny_unknown_fields)]
138#[cfg_attr(
139    feature = "python",
140    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
141)]
142#[cfg_attr(
143    feature = "python",
144    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
145)]
146pub struct ImportableActorConfig {
147    /// The fully qualified name of the Actor class.
148    pub actor_path: String,
149    /// The fully qualified name of the Actor config class.
150    pub config_path: String,
151    /// The actor configuration as a dictionary.
152    pub config: HashMap<String, serde_json::Value>,
153}
154
155type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
156
157/// Explicit native-only access for data actor runtime state.
158///
159/// Normal actor and strategy code should use facade methods such as
160/// [`DataActor::clock`] and [`DataActor::cache`]. Import this trait only from
161/// Rust code compiled into the same native binary as the engine, when a
162/// performance-sensitive path or host integration needs access below the facade
163/// API.
164///
165/// Do not import this trait in strategy code intended to run through Python or
166/// the plug-in authoring surface. Native borrows, `Rc<RefCell<_>>`, and core
167/// references do not cross those boundaries.
168pub trait DataActorNative {
169    /// Returns the actor core.
170    fn core(&self) -> &DataActorCore;
171
172    /// Returns the mutable actor core.
173    fn core_mut(&mut self) -> &mut DataActorCore;
174
175    /// Returns the mutable clock borrow for the actor.
176    ///
177    /// # Panics
178    ///
179    /// Panics if the actor has not been registered with a trader.
180    fn clock_mut(&mut self) -> RefMut<'_, dyn Clock> {
181        let core = self.core_mut();
182        core.clock
183            .as_ref()
184            .unwrap_or_else(|| {
185                panic!(
186                    "DataActor {} must be registered before calling `clock_mut()` - trader_id: {:?}",
187                    core.actor_id, core.trader_id
188                )
189            })
190            .borrow_mut()
191    }
192
193    /// Returns a clone of the reference-counted clock.
194    ///
195    /// # Panics
196    ///
197    /// Panics if the actor has not yet been registered.
198    fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
199        self.core()
200            .clock
201            .as_ref()
202            .expect("DataActor must be registered before accessing clock")
203            .clone()
204    }
205
206    /// Returns a read-only cache borrow.
207    ///
208    /// # Panics
209    ///
210    /// Panics if the actor has not yet been registered.
211    fn cache_ref(&self) -> Ref<'_, Cache> {
212        self.core()
213            .cache
214            .as_ref()
215            .expect("DataActor must be registered before accessing cache")
216            .borrow()
217    }
218
219    /// Returns a clone of the reference-counted cache.
220    ///
221    /// # Panics
222    ///
223    /// Panics if the actor has not yet been registered.
224    fn cache_rc(&self) -> Rc<RefCell<Cache>> {
225        self.core()
226            .cache
227            .as_ref()
228            .expect("DataActor must be registered before accessing cache")
229            .clone()
230    }
231}
232
233/// Defines lifecycle callbacks, data handlers, and subscription/request
234/// methods for data actors.
235///
236/// Default methods that read or mutate native runtime state carry explicit
237/// [`DataActorNative`] bounds. Data actor implementations that only need
238/// core-free callbacks can implement this trait with their own [`Component`]
239/// implementation, while runtime-registered actors keep using native wiring.
240pub trait DataActor: Component {
241    /// Returns the actor ID.
242    fn actor_id(&self) -> ActorId
243    where
244        Self: DataActorNative,
245    {
246        self.core().actor_id()
247    }
248
249    /// Returns the trader ID this actor is registered to.
250    fn trader_id(&self) -> Option<TraderId>
251    where
252        Self: DataActorNative,
253    {
254        self.core().trader_id()
255    }
256
257    /// Returns whether the actor is registered with a trader.
258    fn is_registered(&self) -> bool
259    where
260        Self: DataActorNative,
261    {
262        self.core().is_registered()
263    }
264
265    /// Returns the actor configuration.
266    fn config(&self) -> &DataActorConfig
267    where
268        Self: DataActorNative,
269    {
270        &self.core().config
271    }
272
273    /// Actions to be performed when the actor state is saved.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if saving the actor state fails.
278    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
279        Ok(IndexMap::new())
280    }
281
282    /// Actions to be performed when the actor state is loaded.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if loading the actor state fails.
287    #[allow(unused_variables)]
288    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
289        Ok(())
290    }
291
292    /// Actions to be performed on start.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if starting the actor fails.
297    fn on_start(&mut self) -> anyhow::Result<()> {
298        log::warn!(
299            "The `on_start` handler was called when not overridden, \
300            it's expected that any actions required when starting the actor \
301            occur here, such as subscribing/requesting data"
302        );
303        Ok(())
304    }
305
306    /// Actions to be performed on stop.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if stopping the actor fails.
311    fn on_stop(&mut self) -> anyhow::Result<()> {
312        log::warn!(
313            "The `on_stop` handler was called when not overridden, \
314            it's expected that any actions required when stopping the actor \
315            occur here, such as unsubscribing from data",
316        );
317        Ok(())
318    }
319
320    /// Actions to be performed on resume.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if resuming the actor fails.
325    fn on_resume(&mut self) -> anyhow::Result<()> {
326        log::warn!(
327            "The `on_resume` handler was called when not overridden, \
328            it's expected that any actions required when resuming the actor \
329            following a stop occur here"
330        );
331        Ok(())
332    }
333
334    /// Actions to be performed on reset.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if resetting the actor fails.
339    fn on_reset(&mut self) -> anyhow::Result<()> {
340        log::warn!(
341            "The `on_reset` handler was called when not overridden, \
342            it's expected that any actions required when resetting the actor \
343            occur here, such as resetting indicators and other state"
344        );
345        Ok(())
346    }
347
348    /// Actions to be performed on dispose.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if disposing the actor fails.
353    fn on_dispose(&mut self) -> anyhow::Result<()> {
354        Ok(())
355    }
356
357    /// Actions to be performed on degrade.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if degrading the actor fails.
362    fn on_degrade(&mut self) -> anyhow::Result<()> {
363        Ok(())
364    }
365
366    /// Actions to be performed on fault.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if faulting the actor fails.
371    fn on_fault(&mut self) -> anyhow::Result<()> {
372        Ok(())
373    }
374
375    /// Actions to be performed when receiving a time event.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if handling the time event fails.
380    #[allow(unused_variables)]
381    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
382        Ok(())
383    }
384
385    /// Actions to be performed when receiving custom data.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if handling the data fails.
390    #[allow(unused_variables)]
391    fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
392        Ok(())
393    }
394
395    /// Actions to be performed when receiving a signal.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if handling the signal fails.
400    #[allow(unused_variables)]
401    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
402        Ok(())
403    }
404
405    /// Actions to be performed when receiving an instrument.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if handling the instrument fails.
410    #[allow(unused_variables)]
411    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
412        Ok(())
413    }
414
415    /// Actions to be performed when receiving order book deltas.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if handling the book deltas fails.
420    #[allow(unused_variables)]
421    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
422        Ok(())
423    }
424
425    /// Actions to be performed when receiving an order book.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if handling the book fails.
430    #[allow(unused_variables)]
431    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
432        Ok(())
433    }
434
435    /// Actions to be performed when receiving a quote.
436    ///
437    /// # Errors
438    ///
439    /// Returns an error if handling the quote fails.
440    #[allow(unused_variables)]
441    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
442        Ok(())
443    }
444
445    /// Actions to be performed when receiving a trade.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if handling the trade fails.
450    #[allow(unused_variables)]
451    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
452        Ok(())
453    }
454
455    /// Actions to be performed when receiving a bar.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if handling the bar fails.
460    #[allow(unused_variables)]
461    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
462        Ok(())
463    }
464
465    /// Actions to be performed when receiving a mark price update.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if handling the mark price update fails.
470    #[allow(unused_variables)]
471    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
472        Ok(())
473    }
474
475    /// Actions to be performed when receiving an index price update.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if handling the index price update fails.
480    #[allow(unused_variables)]
481    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
482        Ok(())
483    }
484
485    /// Actions to be performed when receiving a funding rate update.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if handling the funding rate update fails.
490    #[allow(unused_variables)]
491    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
492        Ok(())
493    }
494
495    /// Actions to be performed when receiving exchange-provided option greeks.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if handling the option greeks fails.
500    #[allow(unused_variables)]
501    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
502        Ok(())
503    }
504
505    /// Actions to be performed when receiving an option chain slice snapshot.
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if handling the option chain slice fails.
510    #[allow(unused_variables)]
511    fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
512        Ok(())
513    }
514
515    /// Actions to be performed when receiving an instrument status update.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if handling the instrument status update fails.
520    #[allow(unused_variables)]
521    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
522        Ok(())
523    }
524
525    /// Actions to be performed when receiving an instrument close update.
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if handling the instrument close update fails.
530    #[allow(unused_variables)]
531    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
532        Ok(())
533    }
534
535    /// Actions to be performed when receiving an order filled event.
536    ///
537    /// # Errors
538    ///
539    /// Returns an error if handling the order filled event fails.
540    #[allow(unused_variables)]
541    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
542        Ok(())
543    }
544
545    /// Actions to be performed when receiving an order canceled event.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if handling the order canceled event fails.
550    #[allow(unused_variables)]
551    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
552        Ok(())
553    }
554
555    #[cfg(feature = "defi")]
556    /// Actions to be performed when receiving a block.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if handling the block fails.
561    #[allow(unused_variables)]
562    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
563        Ok(())
564    }
565
566    #[cfg(feature = "defi")]
567    /// Actions to be performed when receiving a pool.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if handling the pool fails.
572    #[allow(unused_variables)]
573    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
574        Ok(())
575    }
576
577    #[cfg(feature = "defi")]
578    /// Actions to be performed when receiving a pool swap.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if handling the pool swap fails.
583    #[allow(unused_variables)]
584    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
585        Ok(())
586    }
587
588    #[cfg(feature = "defi")]
589    /// Actions to be performed when receiving a pool liquidity update.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if handling the pool liquidity update fails.
594    #[allow(unused_variables)]
595    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
596        Ok(())
597    }
598
599    #[cfg(feature = "defi")]
600    /// Actions to be performed when receiving a pool fee collect event.
601    ///
602    /// # Errors
603    ///
604    /// Returns an error if handling the pool fee collect fails.
605    #[allow(unused_variables)]
606    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
607        Ok(())
608    }
609
610    #[cfg(feature = "defi")]
611    /// Actions to be performed when receiving a pool flash event.
612    ///
613    /// # Errors
614    ///
615    /// Returns an error if handling the pool flash fails.
616    #[allow(unused_variables)]
617    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
618        Ok(())
619    }
620
621    /// Actions to be performed when receiving historical data.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if handling the historical data fails.
626    #[allow(unused_variables)]
627    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
628        Ok(())
629    }
630
631    /// Actions to be performed when receiving historical book deltas.
632    ///
633    /// # Errors
634    ///
635    /// Returns an error if handling the historical book deltas fails.
636    #[allow(unused_variables)]
637    fn on_historical_book_deltas(&mut self, deltas: &[OrderBookDelta]) -> anyhow::Result<()> {
638        Ok(())
639    }
640
641    /// Actions to be performed when receiving historical book depth.
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if handling the historical book depth fails.
646    #[allow(unused_variables)]
647    fn on_historical_book_depth(&mut self, depths: &[OrderBookDepth10]) -> anyhow::Result<()> {
648        Ok(())
649    }
650
651    /// Actions to be performed when receiving historical quotes.
652    ///
653    /// # Errors
654    ///
655    /// Returns an error if handling the historical quotes fails.
656    #[allow(unused_variables)]
657    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
658        Ok(())
659    }
660
661    /// Actions to be performed when receiving historical trades.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if handling the historical trades fails.
666    #[allow(unused_variables)]
667    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
668        Ok(())
669    }
670
671    /// Actions to be performed when receiving historical bars.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if handling the historical bars fails.
676    #[allow(unused_variables)]
677    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
678        Ok(())
679    }
680
681    /// Actions to be performed when receiving historical mark prices.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if handling the historical mark prices fails.
686    #[allow(unused_variables)]
687    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
688        Ok(())
689    }
690
691    /// Actions to be performed when receiving historical index prices.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if handling the historical index prices fails.
696    #[allow(unused_variables)]
697    fn on_historical_index_prices(
698        &mut self,
699        index_prices: &[IndexPriceUpdate],
700    ) -> anyhow::Result<()> {
701        Ok(())
702    }
703
704    /// Actions to be performed when receiving historical funding rates.
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if handling the historical funding rates fails.
709    #[allow(unused_variables)]
710    fn on_historical_funding_rates(
711        &mut self,
712        funding_rates: &[FundingRateUpdate],
713    ) -> anyhow::Result<()> {
714        Ok(())
715    }
716
717    /// Returns the user-facing clock API.
718    fn clock(&self) -> ClockApi<'_>
719    where
720        Self: DataActorNative,
721    {
722        self.core().clock_api()
723    }
724
725    /// Returns the user-facing cache API.
726    fn cache(&self) -> CacheApi<'_>
727    where
728        Self: DataActorNative,
729    {
730        self.core().cache_api()
731    }
732
733    /// Sends a shutdown command to the system with an optional reason.
734    ///
735    /// # Panics
736    ///
737    /// Panics if the actor is not registered or has no trader ID.
738    fn shutdown_system(&self, reason: Option<String>)
739    where
740        Self: DataActorNative,
741    {
742        self.core().shutdown_system(reason);
743    }
744
745    /// Publishes `data` on the message bus under the topic derived from `data_type`.
746    ///
747    /// `data_type` is kept as an explicit parameter to allow callers to override the
748    /// routing topic from the payload's intrinsic type.
749    ///
750    /// # Panics
751    ///
752    /// Panics if the actor is not registered with a trader.
753    fn publish_data(&self, data_type: &DataType, data: &CustomData)
754    where
755        Self: DataActorNative,
756    {
757        self.core().publish_data(data_type, data);
758    }
759
760    /// Publishes a [`Signal`] constructed from `name` and `value`.
761    ///
762    /// # Panics
763    ///
764    /// Panics if the actor is not registered with a trader.
765    fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos)
766    where
767        Self: DataActorNative,
768    {
769        self.core().publish_signal(name, value, ts_event);
770    }
771
772    // panics-doc-ok
773    /// Adds the `synthetic` instrument to the cache.
774    ///
775    /// # Errors
776    ///
777    /// Returns an error if a synthetic with the same ID already exists, or if the
778    /// backing cache fails to persist it.
779    ///
780    /// # Panics
781    ///
782    /// Panics if the actor is not registered with a trader.
783    fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()>
784    where
785        Self: DataActorNative,
786    {
787        self.core().add_synthetic(synthetic)
788    }
789
790    // panics-doc-ok
791    /// Updates the `synthetic` instrument in the cache, replacing the existing entry.
792    ///
793    /// # Errors
794    ///
795    /// Returns an error if no synthetic with the same ID already exists, or if the
796    /// backing cache fails to persist the replacement.
797    ///
798    /// # Panics
799    ///
800    /// Panics if the actor is not registered with a trader.
801    fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()>
802    where
803        Self: DataActorNative,
804    {
805        self.core().update_synthetic(synthetic)
806    }
807
808    /// Handles a received time event.
809    fn handle_time_event(&mut self, event: &TimeEvent) {
810        log_received(&event);
811
812        if self.not_running() {
813            log_not_running(&event);
814            return;
815        }
816
817        if let Err(e) = DataActor::on_time_event(self, event) {
818            log_error(&e);
819        }
820    }
821
822    /// Handles a received custom data point.
823    fn handle_data(&mut self, data: &CustomData) {
824        log_received(&data);
825
826        if self.not_running() {
827            log_not_running(&data);
828            return;
829        }
830
831        if let Err(e) = self.on_data(data) {
832            log_error(&e);
833        }
834    }
835
836    /// Handles a received signal.
837    fn handle_signal(&mut self, signal: &Signal) {
838        log_received(&signal);
839
840        if self.not_running() {
841            log_not_running(&signal);
842            return;
843        }
844
845        if let Err(e) = self.on_signal(signal) {
846            log_error(&e);
847        }
848    }
849
850    /// Handles a received instrument.
851    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
852        log_received(&instrument);
853
854        if self.not_running() {
855            log_not_running(&instrument);
856            return;
857        }
858
859        if let Err(e) = self.on_instrument(instrument) {
860            log_error(&e);
861        }
862    }
863
864    /// Handles received order book deltas.
865    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
866        log_received(&deltas);
867
868        if self.not_running() {
869            log_not_running(&deltas);
870            return;
871        }
872
873        if let Err(e) = self.on_book_deltas(deltas) {
874            log_error(&e);
875        }
876    }
877
878    /// Handles a received order book reference.
879    fn handle_book(&mut self, book: &OrderBook) {
880        log_received(&book);
881
882        if self.not_running() {
883            log_not_running(&book);
884            return;
885        }
886
887        if let Err(e) = self.on_book(book) {
888            log_error(&e);
889        }
890    }
891
892    /// Handles a received quote.
893    fn handle_quote(&mut self, quote: &QuoteTick)
894    where
895        Self: DataActorNative,
896    {
897        log_received(&quote);
898
899        if let Err(e) = self.core().handle_indicators_for_quote(quote) {
900            log_error(&e);
901            return;
902        }
903
904        if self.not_running() {
905            log_not_running(&quote);
906            return;
907        }
908
909        if let Err(e) = self.on_quote(quote) {
910            log_error(&e);
911        }
912    }
913
914    /// Handles a received trade.
915    fn handle_trade(&mut self, trade: &TradeTick)
916    where
917        Self: DataActorNative,
918    {
919        log_received(&trade);
920
921        if let Err(e) = self.core().handle_indicators_for_trade(trade) {
922            log_error(&e);
923            return;
924        }
925
926        if self.not_running() {
927            log_not_running(&trade);
928            return;
929        }
930
931        if let Err(e) = self.on_trade(trade) {
932            log_error(&e);
933        }
934    }
935
936    /// Handles a receiving bar.
937    fn handle_bar(&mut self, bar: &Bar)
938    where
939        Self: DataActorNative,
940    {
941        log_received(&bar);
942
943        if let Err(e) = self.core().handle_indicators_for_bar(bar) {
944            log_error(&e);
945            return;
946        }
947
948        if self.not_running() {
949            log_not_running(&bar);
950            return;
951        }
952
953        if let Err(e) = self.on_bar(bar) {
954            log_error(&e);
955        }
956    }
957
958    /// Handles a received mark price update.
959    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
960        log_received(&mark_price);
961
962        if self.not_running() {
963            log_not_running(&mark_price);
964            return;
965        }
966
967        if let Err(e) = self.on_mark_price(mark_price) {
968            log_error(&e);
969        }
970    }
971
972    /// Handles a received index price update.
973    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
974        log_received(&index_price);
975
976        if self.not_running() {
977            log_not_running(&index_price);
978            return;
979        }
980
981        if let Err(e) = self.on_index_price(index_price) {
982            log_error(&e);
983        }
984    }
985
986    /// Handles a received funding rate update.
987    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
988        log_received(&funding_rate);
989
990        if self.not_running() {
991            log_not_running(&funding_rate);
992            return;
993        }
994
995        if let Err(e) = self.on_funding_rate(funding_rate) {
996            log_error(&e);
997        }
998    }
999
1000    /// Handles a received option greeks update.
1001    fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
1002        log_received(&greeks);
1003
1004        if self.not_running() {
1005            log_not_running(&greeks);
1006            return;
1007        }
1008
1009        if let Err(e) = self.on_option_greeks(greeks) {
1010            log_error(&e);
1011        }
1012    }
1013
1014    /// Handles a received option chain slice snapshot.
1015    fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
1016        log_received(&slice);
1017
1018        if self.not_running() {
1019            log_not_running(&slice);
1020            return;
1021        }
1022
1023        if let Err(e) = self.on_option_chain(slice) {
1024            log_error(&e);
1025        }
1026    }
1027
1028    /// Handles a received instrument status.
1029    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
1030        log_received(&status);
1031
1032        if self.not_running() {
1033            log_not_running(&status);
1034            return;
1035        }
1036
1037        if let Err(e) = self.on_instrument_status(status) {
1038            log_error(&e);
1039        }
1040    }
1041
1042    /// Handles a received instrument close.
1043    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
1044        log_received(&close);
1045
1046        if self.not_running() {
1047            log_not_running(&close);
1048            return;
1049        }
1050
1051        if let Err(e) = self.on_instrument_close(close) {
1052            log_error(&e);
1053        }
1054    }
1055
1056    /// Handles a received order filled event.
1057    fn handle_order_filled(&mut self, event: &OrderFilled)
1058    where
1059        Self: DataActorNative,
1060    {
1061        log_received(&event);
1062
1063        // Check for double-handling: if the event's strategy_id matches this actor's id,
1064        // it means a Strategy is receiving its own fill event through both automatic
1065        // subscription and manual subscribe_order_fills, so skip the manual handler.
1066        if event.strategy_id.inner() == self.core().actor_id().inner() {
1067            return;
1068        }
1069
1070        if self.not_running() {
1071            log_not_running(&event);
1072            return;
1073        }
1074
1075        if let Err(e) = self.on_order_filled(event) {
1076            log_error(&e);
1077        }
1078    }
1079
1080    /// Handles a received order canceled event.
1081    fn handle_order_canceled(&mut self, event: &OrderCanceled)
1082    where
1083        Self: DataActorNative,
1084    {
1085        log_received(&event);
1086
1087        // Check for double-handling: if the event's strategy_id matches this actor's id,
1088        // it means a Strategy is receiving its own cancel event through both automatic
1089        // subscription and manual subscribe_order_cancels, so skip the manual handler.
1090        if event.strategy_id.inner() == self.core().actor_id().inner() {
1091            return;
1092        }
1093
1094        if self.not_running() {
1095            log_not_running(&event);
1096            return;
1097        }
1098
1099        if let Err(e) = self.on_order_canceled(event) {
1100            log_error(&e);
1101        }
1102    }
1103
1104    #[cfg(feature = "defi")]
1105    /// Handles a received block.
1106    fn handle_block(&mut self, block: &Block) {
1107        log_received(&block);
1108
1109        if self.not_running() {
1110            log_not_running(&block);
1111            return;
1112        }
1113
1114        if let Err(e) = self.on_block(block) {
1115            log_error(&e);
1116        }
1117    }
1118
1119    #[cfg(feature = "defi")]
1120    /// Handles a received pool definition update.
1121    fn handle_pool(&mut self, pool: &Pool) {
1122        log_received(&pool);
1123
1124        if self.not_running() {
1125            log_not_running(&pool);
1126            return;
1127        }
1128
1129        if let Err(e) = self.on_pool(pool) {
1130            log_error(&e);
1131        }
1132    }
1133
1134    #[cfg(feature = "defi")]
1135    /// Handles a received pool swap.
1136    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
1137        log_received(&swap);
1138
1139        if self.not_running() {
1140            log_not_running(&swap);
1141            return;
1142        }
1143
1144        if let Err(e) = self.on_pool_swap(swap) {
1145            log_error(&e);
1146        }
1147    }
1148
1149    #[cfg(feature = "defi")]
1150    /// Handles a received pool liquidity update.
1151    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
1152        log_received(&update);
1153
1154        if self.not_running() {
1155            log_not_running(&update);
1156            return;
1157        }
1158
1159        if let Err(e) = self.on_pool_liquidity_update(update) {
1160            log_error(&e);
1161        }
1162    }
1163
1164    #[cfg(feature = "defi")]
1165    /// Handles a received pool fee collect.
1166    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
1167        log_received(&collect);
1168
1169        if self.not_running() {
1170            log_not_running(&collect);
1171            return;
1172        }
1173
1174        if let Err(e) = self.on_pool_fee_collect(collect) {
1175            log_error(&e);
1176        }
1177    }
1178
1179    #[cfg(feature = "defi")]
1180    /// Handles a received pool flash event.
1181    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
1182        log_received(&flash);
1183
1184        if self.not_running() {
1185            log_not_running(&flash);
1186            return;
1187        }
1188
1189        if let Err(e) = self.on_pool_flash(flash) {
1190            log_error(&e);
1191        }
1192    }
1193
1194    /// Handles received historical data.
1195    fn handle_historical_data(&mut self, data: &dyn Any) {
1196        log_received(&data);
1197
1198        if let Err(e) = self.on_historical_data(data) {
1199            log_error(&e);
1200        }
1201    }
1202
1203    /// Handles a data response.
1204    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
1205        log_received(&resp);
1206
1207        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
1208            log_error(&e);
1209        }
1210    }
1211
1212    /// Handles an instrument response.
1213    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
1214        log_received(&resp);
1215
1216        if let Err(e) = self.on_instrument(&resp.data) {
1217            log_error(&e);
1218        }
1219    }
1220
1221    /// Handles an instruments response.
1222    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
1223        log_received_bulk("InstrumentsResponse", &resp.correlation_id, resp.data.len());
1224        log::trace!("{RECV} {resp:?}");
1225
1226        for inst in &resp.data {
1227            if let Err(e) = self.on_instrument(inst) {
1228                log_error(&e);
1229            }
1230        }
1231    }
1232
1233    /// Handles a book response.
1234    fn handle_book_response(&mut self, resp: &BookResponse) {
1235        log_received(&resp);
1236
1237        if let Err(e) = self.on_book(&resp.data) {
1238            log_error(&e);
1239        }
1240    }
1241
1242    /// Handles a book deltas response.
1243    fn handle_book_deltas_response(&mut self, resp: &BookDeltasResponse) {
1244        log_received_bulk("BookDeltasResponse", &resp.correlation_id, resp.data.len());
1245        log::trace!("{RECV} {resp:?}");
1246
1247        if let Err(e) = self.on_historical_book_deltas(&resp.data) {
1248            log_error(&e);
1249        }
1250    }
1251
1252    /// Handles a book depth response.
1253    fn handle_book_depth_response(&mut self, resp: &BookDepthResponse) {
1254        log_received_bulk("BookDepthResponse", &resp.correlation_id, resp.data.len());
1255        log::trace!("{RECV} {resp:?}");
1256
1257        if let Err(e) = self.on_historical_book_depth(&resp.data) {
1258            log_error(&e);
1259        }
1260    }
1261
1262    /// Handles a quotes response.
1263    fn handle_quotes_response(&mut self, resp: &QuotesResponse)
1264    where
1265        Self: DataActorNative,
1266    {
1267        log_received_bulk("QuotesResponse", &resp.correlation_id, resp.data.len());
1268        log::trace!("{RECV} {resp:?}");
1269
1270        if let Err(e) = self.core().handle_indicators_for_quotes(&resp.data) {
1271            log_error(&e);
1272            return;
1273        }
1274
1275        if let Err(e) = self.on_historical_quotes(&resp.data) {
1276            log_error(&e);
1277        }
1278    }
1279
1280    /// Handles a trades response.
1281    fn handle_trades_response(&mut self, resp: &TradesResponse)
1282    where
1283        Self: DataActorNative,
1284    {
1285        log_received_bulk("TradesResponse", &resp.correlation_id, resp.data.len());
1286        log::trace!("{RECV} {resp:?}");
1287
1288        if let Err(e) = self.core().handle_indicators_for_trades(&resp.data) {
1289            log_error(&e);
1290            return;
1291        }
1292
1293        if let Err(e) = self.on_historical_trades(&resp.data) {
1294            log_error(&e);
1295        }
1296    }
1297
1298    /// Handles a bars response.
1299    fn handle_bars_response(&mut self, resp: &BarsResponse)
1300    where
1301        Self: DataActorNative,
1302    {
1303        log_received_bulk("BarsResponse", &resp.correlation_id, resp.data.len());
1304        log::trace!("{RECV} {resp:?}");
1305
1306        if let Err(e) = self.core().handle_indicators_for_bars(&resp.data) {
1307            log_error(&e);
1308            return;
1309        }
1310
1311        if let Err(e) = self.on_historical_bars(&resp.data) {
1312            log_error(&e);
1313        }
1314    }
1315
1316    /// Handles a funding rates response.
1317    fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1318        log_received_bulk(
1319            "FundingRatesResponse",
1320            &resp.correlation_id,
1321            resp.data.len(),
1322        );
1323        log::trace!("{RECV} {resp:?}");
1324
1325        if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1326            log_error(&e);
1327        }
1328    }
1329
1330    /// Subscribe to streaming `data_type` data.
1331    fn subscribe_data(
1332        &mut self,
1333        data_type: DataType,
1334        client_id: Option<ClientId>,
1335        params: Option<Params>,
1336    ) where
1337        Self: DataActorNative,
1338        Self: 'static + Debug + Sized,
1339    {
1340        let actor_id = self.core().actor_id().inner();
1341        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1342            get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1343        });
1344
1345        DataActorCore::subscribe_data(self.core_mut(), handler, data_type, client_id, params);
1346    }
1347
1348    /// Subscribe to [`Signal`] data by `name`.
1349    ///
1350    /// An empty `name` subscribes to every signal.
1351    ///
1352    /// # Parameters
1353    ///
1354    /// - `name`: signal name to subscribe to.
1355    /// - `priority`: optional dispatch priority. Pass `None` for default
1356    ///   ordering (by pattern then handler ID). Pass `Some(p)` when actors
1357    ///   sharing a signal need deterministic ordering: higher-priority
1358    ///   handlers receive the message before lower-priority handlers.
1359    ///
1360    /// Re-subscribing does not update an existing priority; call
1361    /// [`unsubscribe_signal`](Self::unsubscribe_signal) first.
1362    fn subscribe_signal(&mut self, name: &str, priority: Option<u32>)
1363    where
1364        Self: DataActorNative,
1365        Self: 'static + Debug + Sized,
1366    {
1367        let actor_id = self.core().actor_id().inner();
1368        // Signals are published as `CustomData` wrapping a `Signal`; downcast
1369        // the inner value so subscribers receive the typed `Signal` in `on_signal`.
1370        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1371            if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1372                if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1373                    actor.handle_signal(signal);
1374                } else {
1375                    log::error!("Actor {actor_id} not found for signal handling");
1376                }
1377            }
1378        });
1379
1380        DataActorCore::subscribe_signal(self.core_mut(), handler, name, priority);
1381    }
1382
1383    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
1384    fn subscribe_quotes(
1385        &mut self,
1386        instrument_id: InstrumentId,
1387        client_id: Option<ClientId>,
1388        params: Option<Params>,
1389    ) where
1390        Self: DataActorNative,
1391        Self: 'static + Debug + Sized,
1392    {
1393        let actor_id = self.core().actor_id().inner();
1394        let topic = get_quotes_topic(instrument_id);
1395
1396        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1397            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1398                actor.handle_quote(quote);
1399            } else {
1400                log::error!("Actor {actor_id} not found for quote handling");
1401            }
1402        });
1403
1404        DataActorCore::subscribe_quotes(
1405            self.core_mut(),
1406            topic,
1407            handler,
1408            instrument_id,
1409            client_id,
1410            params,
1411        );
1412    }
1413
1414    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
1415    fn subscribe_instruments(
1416        &mut self,
1417        venue: Venue,
1418        client_id: Option<ClientId>,
1419        params: Option<Params>,
1420    ) where
1421        Self: DataActorNative,
1422        Self: 'static + Debug + Sized,
1423    {
1424        let actor_id = self.core().actor_id().inner();
1425        let pattern = get_instruments_pattern(venue);
1426
1427        let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1428            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1429                actor.handle_instrument(instrument);
1430            } else {
1431                log::error!("Actor {actor_id} not found for instruments handling");
1432            }
1433        });
1434
1435        DataActorCore::subscribe_instruments(
1436            self.core_mut(),
1437            pattern,
1438            handler,
1439            venue,
1440            client_id,
1441            params,
1442        );
1443    }
1444
1445    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
1446    fn subscribe_instrument(
1447        &mut self,
1448        instrument_id: InstrumentId,
1449        client_id: Option<ClientId>,
1450        params: Option<Params>,
1451    ) where
1452        Self: DataActorNative,
1453        Self: 'static + Debug + Sized,
1454    {
1455        let actor_id = self.core().actor_id().inner();
1456        let topic = get_instrument_topic(instrument_id);
1457
1458        let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1459            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1460                actor.handle_instrument(instrument);
1461            } else {
1462                log::error!("Actor {actor_id} not found for instrument handling");
1463            }
1464        });
1465
1466        DataActorCore::subscribe_instrument(
1467            self.core_mut(),
1468            topic,
1469            handler,
1470            instrument_id,
1471            client_id,
1472            params,
1473        );
1474    }
1475
1476    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1477    fn subscribe_book_deltas(
1478        &mut self,
1479        instrument_id: InstrumentId,
1480        book_type: BookType,
1481        depth: Option<NonZeroUsize>,
1482        client_id: Option<ClientId>,
1483        managed: bool,
1484        params: Option<Params>,
1485    ) where
1486        Self: DataActorNative,
1487        Self: 'static + Debug + Sized,
1488    {
1489        let actor_id = self.core().actor_id().inner();
1490        let is_parent = is_parent_subscription(params.as_ref());
1491        let pattern = if is_parent {
1492            get_book_deltas_pattern(instrument_id)
1493        } else {
1494            get_book_deltas_topic(instrument_id).into()
1495        };
1496
1497        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1498            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1499        });
1500
1501        DataActorCore::subscribe_book_deltas(
1502            self.core_mut(),
1503            pattern,
1504            handler,
1505            instrument_id,
1506            book_type,
1507            depth,
1508            client_id,
1509            managed,
1510            params,
1511        );
1512    }
1513
1514    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1515    fn subscribe_book_at_interval(
1516        &mut self,
1517        instrument_id: InstrumentId,
1518        book_type: BookType,
1519        depth: Option<NonZeroUsize>,
1520        interval_ms: NonZeroUsize,
1521        client_id: Option<ClientId>,
1522        params: Option<Params>,
1523    ) where
1524        Self: DataActorNative,
1525        Self: 'static + Debug + Sized,
1526    {
1527        let actor_id = self.core().actor_id().inner();
1528        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1529
1530        let handler = TypedHandler::from(move |book: &OrderBook| {
1531            get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1532        });
1533
1534        DataActorCore::subscribe_book_at_interval(
1535            self.core_mut(),
1536            topic,
1537            handler,
1538            instrument_id,
1539            book_type,
1540            depth,
1541            interval_ms,
1542            client_id,
1543            params,
1544        );
1545    }
1546
1547    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1548    fn subscribe_trades(
1549        &mut self,
1550        instrument_id: InstrumentId,
1551        client_id: Option<ClientId>,
1552        params: Option<Params>,
1553    ) where
1554        Self: DataActorNative,
1555        Self: 'static + Debug + Sized,
1556    {
1557        let actor_id = self.core().actor_id().inner();
1558        let topic = get_trades_topic(instrument_id);
1559
1560        let handler = TypedHandler::from(move |trade: &TradeTick| {
1561            get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1562        });
1563
1564        DataActorCore::subscribe_trades(
1565            self.core_mut(),
1566            topic,
1567            handler,
1568            instrument_id,
1569            client_id,
1570            params,
1571        );
1572    }
1573
1574    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1575    fn subscribe_bars(
1576        &mut self,
1577        bar_type: BarType,
1578        client_id: Option<ClientId>,
1579        params: Option<Params>,
1580    ) where
1581        Self: DataActorNative,
1582        Self: 'static + Debug + Sized,
1583    {
1584        let actor_id = self.core().actor_id().inner();
1585        let topic = get_bars_topic(bar_type);
1586
1587        let handler = TypedHandler::from(move |bar: &Bar| {
1588            get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1589        });
1590
1591        DataActorCore::subscribe_bars(self.core_mut(), topic, handler, bar_type, client_id, params);
1592    }
1593
1594    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1595    fn subscribe_mark_prices(
1596        &mut self,
1597        instrument_id: InstrumentId,
1598        client_id: Option<ClientId>,
1599        params: Option<Params>,
1600    ) where
1601        Self: DataActorNative,
1602        Self: 'static + Debug + Sized,
1603    {
1604        let actor_id = self.core().actor_id().inner();
1605        let topic = get_mark_price_topic(instrument_id);
1606
1607        let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1608            get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1609        });
1610
1611        DataActorCore::subscribe_mark_prices(
1612            self.core_mut(),
1613            topic,
1614            handler,
1615            instrument_id,
1616            client_id,
1617            params,
1618        );
1619    }
1620
1621    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1622    fn subscribe_index_prices(
1623        &mut self,
1624        instrument_id: InstrumentId,
1625        client_id: Option<ClientId>,
1626        params: Option<Params>,
1627    ) where
1628        Self: DataActorNative,
1629        Self: 'static + Debug + Sized,
1630    {
1631        let actor_id = self.core().actor_id().inner();
1632        let topic = get_index_price_topic(instrument_id);
1633
1634        let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1635            get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1636        });
1637
1638        DataActorCore::subscribe_index_prices(
1639            self.core_mut(),
1640            topic,
1641            handler,
1642            instrument_id,
1643            client_id,
1644            params,
1645        );
1646    }
1647
1648    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1649    fn subscribe_funding_rates(
1650        &mut self,
1651        instrument_id: InstrumentId,
1652        client_id: Option<ClientId>,
1653        params: Option<Params>,
1654    ) where
1655        Self: DataActorNative,
1656        Self: 'static + Debug + Sized,
1657    {
1658        let actor_id = self.core().actor_id().inner();
1659        let topic = get_funding_rate_topic(instrument_id);
1660
1661        let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1662            get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1663        });
1664
1665        DataActorCore::subscribe_funding_rates(
1666            self.core_mut(),
1667            topic,
1668            handler,
1669            instrument_id,
1670            client_id,
1671            params,
1672        );
1673    }
1674
1675    /// Subscribe to streaming [`OptionGreeks`] data for the `instrument_id`.
1676    fn subscribe_option_greeks(
1677        &mut self,
1678        instrument_id: InstrumentId,
1679        client_id: Option<ClientId>,
1680        params: Option<Params>,
1681    ) where
1682        Self: DataActorNative,
1683        Self: 'static + Debug + Sized,
1684    {
1685        let actor_id = self.core().actor_id().inner();
1686        let topic = get_option_greeks_topic(instrument_id);
1687
1688        let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1689            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1690                actor.handle_option_greeks(option_greeks);
1691            } else {
1692                log::error!("Actor {actor_id} not found for option greeks handling");
1693            }
1694        });
1695
1696        DataActorCore::subscribe_option_greeks(
1697            self.core_mut(),
1698            topic,
1699            handler,
1700            instrument_id,
1701            client_id,
1702            params,
1703        );
1704    }
1705
1706    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1707    fn subscribe_instrument_status(
1708        &mut self,
1709        instrument_id: InstrumentId,
1710        client_id: Option<ClientId>,
1711        params: Option<Params>,
1712    ) where
1713        Self: DataActorNative,
1714        Self: 'static + Debug + Sized,
1715    {
1716        let actor_id = self.core().actor_id().inner();
1717        let topic = get_instrument_status_topic(instrument_id);
1718
1719        let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1720            get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1721        });
1722
1723        DataActorCore::subscribe_instrument_status(
1724            self.core_mut(),
1725            topic,
1726            handler,
1727            instrument_id,
1728            client_id,
1729            params,
1730        );
1731    }
1732
1733    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1734    fn subscribe_instrument_close(
1735        &mut self,
1736        instrument_id: InstrumentId,
1737        client_id: Option<ClientId>,
1738        params: Option<Params>,
1739    ) where
1740        Self: DataActorNative,
1741        Self: 'static + Debug + Sized,
1742    {
1743        let actor_id = self.core().actor_id().inner();
1744        let topic = get_instrument_close_topic(instrument_id);
1745
1746        let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1747            get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1748        });
1749
1750        DataActorCore::subscribe_instrument_close(
1751            self.core_mut(),
1752            topic,
1753            handler,
1754            instrument_id,
1755            client_id,
1756            params,
1757        );
1758    }
1759
1760    /// Subscribe to streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1761    ///
1762    /// The ATM price is always derived from the exchange-provided forward price
1763    /// embedded in each option greeks/ticker update.
1764    fn subscribe_option_chain(
1765        &mut self,
1766        series_id: OptionSeriesId,
1767        strike_range: StrikeRange,
1768        snapshot_interval_ms: Option<u64>,
1769        client_id: Option<ClientId>,
1770        params: Option<Params>,
1771    ) where
1772        Self: DataActorNative,
1773        Self: 'static + Debug + Sized,
1774    {
1775        let actor_id = self.core().actor_id().inner();
1776        let topic = get_option_chain_topic(series_id);
1777
1778        let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1779            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1780                actor.handle_option_chain(slice);
1781            } else {
1782                log::error!("Actor {actor_id} not found for option chain handling");
1783            }
1784        });
1785
1786        DataActorCore::subscribe_option_chain(
1787            self.core_mut(),
1788            topic,
1789            handler,
1790            series_id,
1791            strike_range,
1792            snapshot_interval_ms,
1793            client_id,
1794            params,
1795        );
1796    }
1797
1798    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1799    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1800    where
1801        Self: DataActorNative,
1802        Self: 'static + Debug + Sized,
1803    {
1804        let actor_id = self.core().actor_id().inner();
1805        let topic = get_order_filled_topic(instrument_id);
1806
1807        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1808            if let OrderEventAny::Filled(filled) = event {
1809                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1810            }
1811        });
1812
1813        DataActorCore::subscribe_order_fills(self.core_mut(), topic, handler);
1814    }
1815
1816    /// Subscribe to [`OrderCanceled`] events for the `instrument_id`.
1817    fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1818    where
1819        Self: DataActorNative,
1820        Self: 'static + Debug + Sized,
1821    {
1822        let actor_id = self.core().actor_id().inner();
1823        let topic = get_order_canceled_topic(instrument_id);
1824
1825        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1826            if let OrderEventAny::Canceled(canceled) = event {
1827                get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1828            }
1829        });
1830
1831        DataActorCore::subscribe_order_cancels(self.core_mut(), topic, handler);
1832    }
1833
1834    #[cfg(feature = "defi")]
1835    /// Subscribe to streaming [`Block`] data for the `chain`.
1836    fn subscribe_blocks(
1837        &mut self,
1838        chain: Blockchain,
1839        client_id: Option<ClientId>,
1840        params: Option<Params>,
1841    ) where
1842        Self: DataActorNative,
1843        Self: 'static + Debug + Sized,
1844    {
1845        let actor_id = self.core().actor_id().inner();
1846        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1847
1848        let handler = TypedHandler::from(move |block: &Block| {
1849            get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1850        });
1851
1852        DataActorCore::subscribe_blocks(self.core_mut(), topic, handler, chain, client_id, params);
1853    }
1854
1855    #[cfg(feature = "defi")]
1856    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1857    fn subscribe_pool(
1858        &mut self,
1859        instrument_id: InstrumentId,
1860        client_id: Option<ClientId>,
1861        params: Option<Params>,
1862    ) where
1863        Self: DataActorNative,
1864        Self: 'static + Debug + Sized,
1865    {
1866        let actor_id = self.core().actor_id().inner();
1867        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1868
1869        let handler = TypedHandler::from(move |pool: &Pool| {
1870            get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1871        });
1872
1873        DataActorCore::subscribe_pool(
1874            self.core_mut(),
1875            topic,
1876            handler,
1877            instrument_id,
1878            client_id,
1879            params,
1880        );
1881    }
1882
1883    #[cfg(feature = "defi")]
1884    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1885    fn subscribe_pool_swaps(
1886        &mut self,
1887        instrument_id: InstrumentId,
1888        client_id: Option<ClientId>,
1889        params: Option<Params>,
1890    ) where
1891        Self: DataActorNative,
1892        Self: 'static + Debug + Sized,
1893    {
1894        let actor_id = self.core().actor_id().inner();
1895        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1896
1897        let handler = TypedHandler::from(move |swap: &PoolSwap| {
1898            get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1899        });
1900
1901        DataActorCore::subscribe_pool_swaps(
1902            self.core_mut(),
1903            topic,
1904            handler,
1905            instrument_id,
1906            client_id,
1907            params,
1908        );
1909    }
1910
1911    #[cfg(feature = "defi")]
1912    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1913    fn subscribe_pool_liquidity_updates(
1914        &mut self,
1915        instrument_id: InstrumentId,
1916        client_id: Option<ClientId>,
1917        params: Option<Params>,
1918    ) where
1919        Self: DataActorNative,
1920        Self: 'static + Debug + Sized,
1921    {
1922        let actor_id = self.core().actor_id().inner();
1923        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1924
1925        let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1926            get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1927        });
1928
1929        DataActorCore::subscribe_pool_liquidity_updates(
1930            self.core_mut(),
1931            topic,
1932            handler,
1933            instrument_id,
1934            client_id,
1935            params,
1936        );
1937    }
1938
1939    #[cfg(feature = "defi")]
1940    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1941    fn subscribe_pool_fee_collects(
1942        &mut self,
1943        instrument_id: InstrumentId,
1944        client_id: Option<ClientId>,
1945        params: Option<Params>,
1946    ) where
1947        Self: DataActorNative,
1948        Self: 'static + Debug + Sized,
1949    {
1950        let actor_id = self.core().actor_id().inner();
1951        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1952
1953        let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1954            get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1955        });
1956
1957        DataActorCore::subscribe_pool_fee_collects(
1958            self.core_mut(),
1959            topic,
1960            handler,
1961            instrument_id,
1962            client_id,
1963            params,
1964        );
1965    }
1966
1967    #[cfg(feature = "defi")]
1968    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1969    fn subscribe_pool_flash_events(
1970        &mut self,
1971        instrument_id: InstrumentId,
1972        client_id: Option<ClientId>,
1973        params: Option<Params>,
1974    ) where
1975        Self: DataActorNative,
1976        Self: 'static + Debug + Sized,
1977    {
1978        let actor_id = self.core().actor_id().inner();
1979        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1980
1981        let handler = TypedHandler::from(move |flash: &PoolFlash| {
1982            get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1983        });
1984
1985        DataActorCore::subscribe_pool_flash_events(
1986            self.core_mut(),
1987            topic,
1988            handler,
1989            instrument_id,
1990            client_id,
1991            params,
1992        );
1993    }
1994
1995    /// Unsubscribe from streaming `data_type` data.
1996    fn unsubscribe_data(
1997        &mut self,
1998        data_type: DataType,
1999        client_id: Option<ClientId>,
2000        params: Option<Params>,
2001    ) where
2002        Self: DataActorNative,
2003        Self: 'static + Debug + Sized,
2004    {
2005        DataActorCore::unsubscribe_data(self.core_mut(), data_type, client_id, params);
2006    }
2007
2008    /// Unsubscribe from [`Signal`] data by `name`.
2009    fn unsubscribe_signal(&mut self, name: &str)
2010    where
2011        Self: DataActorNative,
2012        Self: 'static + Debug + Sized,
2013    {
2014        DataActorCore::unsubscribe_signal(self.core_mut(), name);
2015    }
2016
2017    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
2018    fn unsubscribe_instruments(
2019        &mut self,
2020        venue: Venue,
2021        client_id: Option<ClientId>,
2022        params: Option<Params>,
2023    ) where
2024        Self: DataActorNative,
2025        Self: 'static + Debug + Sized,
2026    {
2027        DataActorCore::unsubscribe_instruments(self.core_mut(), venue, client_id, params);
2028    }
2029
2030    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
2031    fn unsubscribe_instrument(
2032        &mut self,
2033        instrument_id: InstrumentId,
2034        client_id: Option<ClientId>,
2035        params: Option<Params>,
2036    ) where
2037        Self: DataActorNative,
2038        Self: 'static + Debug + Sized,
2039    {
2040        DataActorCore::unsubscribe_instrument(self.core_mut(), instrument_id, client_id, params);
2041    }
2042
2043    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
2044    fn unsubscribe_book_deltas(
2045        &mut self,
2046        instrument_id: InstrumentId,
2047        client_id: Option<ClientId>,
2048        params: Option<Params>,
2049    ) where
2050        Self: DataActorNative,
2051        Self: 'static + Debug + Sized,
2052    {
2053        DataActorCore::unsubscribe_book_deltas(self.core_mut(), instrument_id, client_id, params);
2054    }
2055
2056    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
2057    fn unsubscribe_book_at_interval(
2058        &mut self,
2059        instrument_id: InstrumentId,
2060        interval_ms: NonZeroUsize,
2061        client_id: Option<ClientId>,
2062        params: Option<Params>,
2063    ) where
2064        Self: DataActorNative,
2065        Self: 'static + Debug + Sized,
2066    {
2067        DataActorCore::unsubscribe_book_at_interval(
2068            self.core_mut(),
2069            instrument_id,
2070            interval_ms,
2071            client_id,
2072            params,
2073        );
2074    }
2075
2076    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
2077    fn unsubscribe_quotes(
2078        &mut self,
2079        instrument_id: InstrumentId,
2080        client_id: Option<ClientId>,
2081        params: Option<Params>,
2082    ) where
2083        Self: DataActorNative,
2084        Self: 'static + Debug + Sized,
2085    {
2086        DataActorCore::unsubscribe_quotes(self.core_mut(), instrument_id, client_id, params);
2087    }
2088
2089    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
2090    fn unsubscribe_trades(
2091        &mut self,
2092        instrument_id: InstrumentId,
2093        client_id: Option<ClientId>,
2094        params: Option<Params>,
2095    ) where
2096        Self: DataActorNative,
2097        Self: 'static + Debug + Sized,
2098    {
2099        DataActorCore::unsubscribe_trades(self.core_mut(), instrument_id, client_id, params);
2100    }
2101
2102    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
2103    fn unsubscribe_bars(
2104        &mut self,
2105        bar_type: BarType,
2106        client_id: Option<ClientId>,
2107        params: Option<Params>,
2108    ) where
2109        Self: DataActorNative,
2110        Self: 'static + Debug + Sized,
2111    {
2112        DataActorCore::unsubscribe_bars(self.core_mut(), bar_type, client_id, params);
2113    }
2114
2115    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
2116    fn unsubscribe_mark_prices(
2117        &mut self,
2118        instrument_id: InstrumentId,
2119        client_id: Option<ClientId>,
2120        params: Option<Params>,
2121    ) where
2122        Self: DataActorNative,
2123        Self: 'static + Debug + Sized,
2124    {
2125        DataActorCore::unsubscribe_mark_prices(self.core_mut(), instrument_id, client_id, params);
2126    }
2127
2128    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
2129    fn unsubscribe_index_prices(
2130        &mut self,
2131        instrument_id: InstrumentId,
2132        client_id: Option<ClientId>,
2133        params: Option<Params>,
2134    ) where
2135        Self: DataActorNative,
2136        Self: 'static + Debug + Sized,
2137    {
2138        DataActorCore::unsubscribe_index_prices(self.core_mut(), instrument_id, client_id, params);
2139    }
2140
2141    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
2142    fn unsubscribe_funding_rates(
2143        &mut self,
2144        instrument_id: InstrumentId,
2145        client_id: Option<ClientId>,
2146        params: Option<Params>,
2147    ) where
2148        Self: DataActorNative,
2149        Self: 'static + Debug + Sized,
2150    {
2151        DataActorCore::unsubscribe_funding_rates(self.core_mut(), instrument_id, client_id, params);
2152    }
2153
2154    /// Unsubscribe from streaming [`OptionGreeks`] data for the `instrument_id`.
2155    fn unsubscribe_option_greeks(
2156        &mut self,
2157        instrument_id: InstrumentId,
2158        client_id: Option<ClientId>,
2159        params: Option<Params>,
2160    ) where
2161        Self: DataActorNative,
2162        Self: 'static + Debug + Sized,
2163    {
2164        DataActorCore::unsubscribe_option_greeks(self.core_mut(), instrument_id, client_id, params);
2165    }
2166
2167    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
2168    fn unsubscribe_instrument_status(
2169        &mut self,
2170        instrument_id: InstrumentId,
2171        client_id: Option<ClientId>,
2172        params: Option<Params>,
2173    ) where
2174        Self: DataActorNative,
2175        Self: 'static + Debug + Sized,
2176    {
2177        DataActorCore::unsubscribe_instrument_status(
2178            self.core_mut(),
2179            instrument_id,
2180            client_id,
2181            params,
2182        );
2183    }
2184
2185    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
2186    fn unsubscribe_instrument_close(
2187        &mut self,
2188        instrument_id: InstrumentId,
2189        client_id: Option<ClientId>,
2190        params: Option<Params>,
2191    ) where
2192        Self: DataActorNative,
2193        Self: 'static + Debug + Sized,
2194    {
2195        DataActorCore::unsubscribe_instrument_close(
2196            self.core_mut(),
2197            instrument_id,
2198            client_id,
2199            params,
2200        );
2201    }
2202
2203    /// Unsubscribe from streaming [`OptionChainSlice`] snapshots for the option `series_id`.
2204    fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
2205    where
2206        Self: DataActorNative,
2207        Self: 'static + Debug + Sized,
2208    {
2209        DataActorCore::unsubscribe_option_chain(self.core_mut(), series_id, client_id);
2210    }
2211
2212    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
2213    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
2214    where
2215        Self: DataActorNative,
2216        Self: 'static + Debug + Sized,
2217    {
2218        DataActorCore::unsubscribe_order_fills(self.core_mut(), instrument_id);
2219    }
2220
2221    /// Unsubscribe from [`OrderCanceled`] events for the `instrument_id`.
2222    fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
2223    where
2224        Self: DataActorNative,
2225        Self: 'static + Debug + Sized,
2226    {
2227        DataActorCore::unsubscribe_order_cancels(self.core_mut(), instrument_id);
2228    }
2229
2230    #[cfg(feature = "defi")]
2231    /// Unsubscribe from streaming [`Block`] data for the `chain`.
2232    fn unsubscribe_blocks(
2233        &mut self,
2234        chain: Blockchain,
2235        client_id: Option<ClientId>,
2236        params: Option<Params>,
2237    ) where
2238        Self: DataActorNative,
2239        Self: 'static + Debug + Sized,
2240    {
2241        DataActorCore::unsubscribe_blocks(self.core_mut(), chain, client_id, params);
2242    }
2243
2244    #[cfg(feature = "defi")]
2245    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
2246    fn unsubscribe_pool(
2247        &mut self,
2248        instrument_id: InstrumentId,
2249        client_id: Option<ClientId>,
2250        params: Option<Params>,
2251    ) where
2252        Self: DataActorNative,
2253        Self: 'static + Debug + Sized,
2254    {
2255        DataActorCore::unsubscribe_pool(self.core_mut(), instrument_id, client_id, params);
2256    }
2257
2258    #[cfg(feature = "defi")]
2259    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
2260    fn unsubscribe_pool_swaps(
2261        &mut self,
2262        instrument_id: InstrumentId,
2263        client_id: Option<ClientId>,
2264        params: Option<Params>,
2265    ) where
2266        Self: DataActorNative,
2267        Self: 'static + Debug + Sized,
2268    {
2269        DataActorCore::unsubscribe_pool_swaps(self.core_mut(), instrument_id, client_id, params);
2270    }
2271
2272    #[cfg(feature = "defi")]
2273    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
2274    fn unsubscribe_pool_liquidity_updates(
2275        &mut self,
2276        instrument_id: InstrumentId,
2277        client_id: Option<ClientId>,
2278        params: Option<Params>,
2279    ) where
2280        Self: DataActorNative,
2281        Self: 'static + Debug + Sized,
2282    {
2283        DataActorCore::unsubscribe_pool_liquidity_updates(
2284            self.core_mut(),
2285            instrument_id,
2286            client_id,
2287            params,
2288        );
2289    }
2290
2291    #[cfg(feature = "defi")]
2292    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
2293    fn unsubscribe_pool_fee_collects(
2294        &mut self,
2295        instrument_id: InstrumentId,
2296        client_id: Option<ClientId>,
2297        params: Option<Params>,
2298    ) where
2299        Self: DataActorNative,
2300        Self: 'static + Debug + Sized,
2301    {
2302        DataActorCore::unsubscribe_pool_fee_collects(
2303            self.core_mut(),
2304            instrument_id,
2305            client_id,
2306            params,
2307        );
2308    }
2309
2310    #[cfg(feature = "defi")]
2311    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
2312    fn unsubscribe_pool_flash_events(
2313        &mut self,
2314        instrument_id: InstrumentId,
2315        client_id: Option<ClientId>,
2316        params: Option<Params>,
2317    ) where
2318        Self: DataActorNative,
2319        Self: 'static + Debug + Sized,
2320    {
2321        DataActorCore::unsubscribe_pool_flash_events(
2322            self.core_mut(),
2323            instrument_id,
2324            client_id,
2325            params,
2326        );
2327    }
2328
2329    /// Request historical custom data of the given `data_type`.
2330    ///
2331    /// # Errors
2332    ///
2333    /// Returns an error if input parameters are invalid.
2334    fn request_data(
2335        &mut self,
2336        data_type: DataType,
2337        client_id: ClientId,
2338        start: Option<DateTime<Utc>>,
2339        end: Option<DateTime<Utc>>,
2340        limit: Option<NonZeroUsize>,
2341        params: Option<Params>,
2342    ) -> anyhow::Result<UUID4>
2343    where
2344        Self: DataActorNative,
2345        Self: 'static + Debug + Sized,
2346    {
2347        let actor_id = self.core().actor_id().inner();
2348        let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
2349            get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
2350        });
2351
2352        DataActorCore::request_data(
2353            self.core_mut(),
2354            data_type,
2355            client_id,
2356            start,
2357            end,
2358            limit,
2359            params,
2360            handler,
2361        )
2362    }
2363
2364    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
2365    ///
2366    /// # Errors
2367    ///
2368    /// Returns an error if input parameters are invalid.
2369    fn request_instrument(
2370        &mut self,
2371        instrument_id: InstrumentId,
2372        start: Option<DateTime<Utc>>,
2373        end: Option<DateTime<Utc>>,
2374        client_id: Option<ClientId>,
2375        params: Option<Params>,
2376    ) -> anyhow::Result<UUID4>
2377    where
2378        Self: DataActorNative,
2379        Self: 'static + Debug + Sized,
2380    {
2381        let actor_id = self.core().actor_id().inner();
2382        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
2383            get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
2384        });
2385
2386        DataActorCore::request_instrument(
2387            self.core_mut(),
2388            instrument_id,
2389            start,
2390            end,
2391            client_id,
2392            params,
2393            handler,
2394        )
2395    }
2396
2397    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
2398    ///
2399    /// # Errors
2400    ///
2401    /// Returns an error if input parameters are invalid.
2402    fn request_instruments(
2403        &mut self,
2404        venue: Option<Venue>,
2405        start: Option<DateTime<Utc>>,
2406        end: Option<DateTime<Utc>>,
2407        client_id: Option<ClientId>,
2408        params: Option<Params>,
2409    ) -> anyhow::Result<UUID4>
2410    where
2411        Self: DataActorNative,
2412        Self: 'static + Debug + Sized,
2413    {
2414        let actor_id = self.core().actor_id().inner();
2415        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
2416            get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
2417        });
2418
2419        DataActorCore::request_instruments(
2420            self.core_mut(),
2421            venue,
2422            start,
2423            end,
2424            client_id,
2425            params,
2426            handler,
2427        )
2428    }
2429
2430    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
2431    ///
2432    /// # Errors
2433    ///
2434    /// Returns an error if input parameters are invalid.
2435    fn request_book_snapshot(
2436        &mut self,
2437        instrument_id: InstrumentId,
2438        depth: Option<NonZeroUsize>,
2439        client_id: Option<ClientId>,
2440        params: Option<Params>,
2441    ) -> anyhow::Result<UUID4>
2442    where
2443        Self: DataActorNative,
2444        Self: 'static + Debug + Sized,
2445    {
2446        let actor_id = self.core().actor_id().inner();
2447        let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
2448            get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
2449        });
2450
2451        DataActorCore::request_book_snapshot(
2452            self.core_mut(),
2453            instrument_id,
2454            depth,
2455            client_id,
2456            params,
2457            handler,
2458        )
2459    }
2460
2461    /// Request historical [`OrderBookDelta`] data for the given `instrument_id`.
2462    ///
2463    /// # Errors
2464    ///
2465    /// Returns an error if input parameters are invalid.
2466    fn request_book_deltas(
2467        &mut self,
2468        instrument_id: InstrumentId,
2469        start: Option<DateTime<Utc>>,
2470        end: Option<DateTime<Utc>>,
2471        limit: Option<NonZeroUsize>,
2472        client_id: Option<ClientId>,
2473        params: Option<Params>,
2474    ) -> anyhow::Result<UUID4>
2475    where
2476        Self: DataActorNative,
2477        Self: 'static + Debug + Sized,
2478    {
2479        let actor_id = self.core().actor_id().inner();
2480        let handler = ShareableMessageHandler::from_typed(move |resp: &BookDeltasResponse| {
2481            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas_response(resp);
2482        });
2483
2484        DataActorCore::request_book_deltas(
2485            self.core_mut(),
2486            instrument_id,
2487            start,
2488            end,
2489            limit,
2490            client_id,
2491            params,
2492            handler,
2493        )
2494    }
2495
2496    /// Request historical [`OrderBookDepth10`] data for the given `instrument_id`.
2497    ///
2498    /// # Errors
2499    ///
2500    /// Returns an error if input parameters are invalid.
2501    #[expect(clippy::too_many_arguments)]
2502    fn request_book_depth(
2503        &mut self,
2504        instrument_id: InstrumentId,
2505        start: Option<DateTime<Utc>>,
2506        end: Option<DateTime<Utc>>,
2507        limit: Option<NonZeroUsize>,
2508        depth: Option<NonZeroUsize>,
2509        client_id: Option<ClientId>,
2510        params: Option<Params>,
2511    ) -> anyhow::Result<UUID4>
2512    where
2513        Self: DataActorNative,
2514        Self: 'static + Debug + Sized,
2515    {
2516        let actor_id = self.core().actor_id().inner();
2517        let handler = ShareableMessageHandler::from_typed(move |resp: &BookDepthResponse| {
2518            get_actor_unchecked::<Self>(&actor_id).handle_book_depth_response(resp);
2519        });
2520
2521        DataActorCore::request_book_depth(
2522            self.core_mut(),
2523            instrument_id,
2524            start,
2525            end,
2526            limit,
2527            depth,
2528            client_id,
2529            params,
2530            handler,
2531        )
2532    }
2533
2534    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
2535    ///
2536    /// # Errors
2537    ///
2538    /// Returns an error if input parameters are invalid.
2539    fn request_quotes(
2540        &mut self,
2541        instrument_id: InstrumentId,
2542        start: Option<DateTime<Utc>>,
2543        end: Option<DateTime<Utc>>,
2544        limit: Option<NonZeroUsize>,
2545        client_id: Option<ClientId>,
2546        params: Option<Params>,
2547    ) -> anyhow::Result<UUID4>
2548    where
2549        Self: DataActorNative,
2550        Self: 'static + Debug + Sized,
2551    {
2552        let actor_id = self.core().actor_id().inner();
2553        let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2554            get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2555        });
2556
2557        DataActorCore::request_quotes(
2558            self.core_mut(),
2559            instrument_id,
2560            start,
2561            end,
2562            limit,
2563            client_id,
2564            params,
2565            handler,
2566        )
2567    }
2568
2569    /// Request historical [`TradeTick`] data for the given `instrument_id`.
2570    ///
2571    /// # Errors
2572    ///
2573    /// Returns an error if input parameters are invalid.
2574    fn request_trades(
2575        &mut self,
2576        instrument_id: InstrumentId,
2577        start: Option<DateTime<Utc>>,
2578        end: Option<DateTime<Utc>>,
2579        limit: Option<NonZeroUsize>,
2580        client_id: Option<ClientId>,
2581        params: Option<Params>,
2582    ) -> anyhow::Result<UUID4>
2583    where
2584        Self: DataActorNative,
2585        Self: 'static + Debug + Sized,
2586    {
2587        let actor_id = self.core().actor_id().inner();
2588        let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2589            get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2590        });
2591
2592        DataActorCore::request_trades(
2593            self.core_mut(),
2594            instrument_id,
2595            start,
2596            end,
2597            limit,
2598            client_id,
2599            params,
2600            handler,
2601        )
2602    }
2603
2604    /// Request historical [`Bar`] data for the given `bar_type`.
2605    ///
2606    /// # Errors
2607    ///
2608    /// Returns an error if input parameters are invalid.
2609    fn request_bars(
2610        &mut self,
2611        bar_type: BarType,
2612        start: Option<DateTime<Utc>>,
2613        end: Option<DateTime<Utc>>,
2614        limit: Option<NonZeroUsize>,
2615        client_id: Option<ClientId>,
2616        params: Option<Params>,
2617    ) -> anyhow::Result<UUID4>
2618    where
2619        Self: DataActorNative,
2620        Self: 'static + Debug + Sized,
2621    {
2622        let actor_id = self.core().actor_id().inner();
2623        let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2624            get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2625        });
2626
2627        DataActorCore::request_bars(
2628            self.core_mut(),
2629            bar_type,
2630            start,
2631            end,
2632            limit,
2633            client_id,
2634            params,
2635            handler,
2636        )
2637    }
2638
2639    /// Request historical [`FundingRateUpdate`] data for the given `instrument_id`.
2640    ///
2641    /// # Errors
2642    ///
2643    /// Returns an error if input parameters are invalid.
2644    fn request_funding_rates(
2645        &mut self,
2646        instrument_id: InstrumentId,
2647        start: Option<DateTime<Utc>>,
2648        end: Option<DateTime<Utc>>,
2649        limit: Option<NonZeroUsize>,
2650        client_id: Option<ClientId>,
2651        params: Option<Params>,
2652    ) -> anyhow::Result<UUID4>
2653    where
2654        Self: DataActorNative,
2655        Self: 'static + Debug + Sized,
2656    {
2657        let actor_id = self.core().actor_id().inner();
2658        let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2659            get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2660        });
2661
2662        DataActorCore::request_funding_rates(
2663            self.core_mut(),
2664            instrument_id,
2665            start,
2666            end,
2667            limit,
2668            client_id,
2669            params,
2670            handler,
2671        )
2672    }
2673}
2674
2675// Blanket implementation: any DataActor automatically implements Actor
2676impl<T> Actor for T
2677where
2678    T: DataActor + DataActorNative + Debug + 'static,
2679{
2680    fn id(&self) -> Ustr {
2681        self.core().actor_id.inner()
2682    }
2683
2684    #[allow(unused_variables)]
2685    fn handle(&mut self, msg: &dyn Any) {
2686        // Default empty implementation - concrete actors can override if needed
2687    }
2688
2689    fn as_any(&self) -> &dyn Any {
2690        self
2691    }
2692}
2693
2694// Blanket implementation: any DataActor automatically implements Component
2695impl<T> Component for T
2696where
2697    T: DataActor + DataActorNative + Debug + 'static,
2698{
2699    fn component_id(&self) -> ComponentId {
2700        ComponentId::new(self.core().actor_id.inner().as_str())
2701    }
2702
2703    fn state(&self) -> ComponentState {
2704        self.core().state
2705    }
2706
2707    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2708        let core = self.core_mut();
2709        core.state = core.state.transition(&trigger)?;
2710        log::info!(
2711            component = core.actor_id.inner().as_str();
2712            "{}",
2713            core.state.variant_name()
2714        );
2715        Ok(())
2716    }
2717
2718    fn register(
2719        &mut self,
2720        trader_id: TraderId,
2721        clock: Rc<RefCell<dyn Clock>>,
2722        cache: Rc<RefCell<Cache>>,
2723    ) -> anyhow::Result<()> {
2724        DataActorCore::register(self.core_mut(), trader_id, clock.clone(), cache)?;
2725
2726        // Register default time event handler for this actor
2727        let actor_id = self.core().actor_id().inner();
2728        let callback = TimeEventCallback::from(move |event: TimeEvent| {
2729            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2730                actor.handle_time_event(&event);
2731            } else {
2732                log::error!("Actor {actor_id} not found for time event handling");
2733            }
2734        });
2735
2736        clock.borrow_mut().register_default_handler(callback);
2737
2738        self.initialize()
2739    }
2740
2741    fn on_start(&mut self) -> anyhow::Result<()> {
2742        DataActor::on_start(self)
2743    }
2744
2745    fn on_stop(&mut self) -> anyhow::Result<()> {
2746        DataActor::on_stop(self)
2747    }
2748
2749    fn on_resume(&mut self) -> anyhow::Result<()> {
2750        DataActor::on_resume(self)
2751    }
2752
2753    fn on_degrade(&mut self) -> anyhow::Result<()> {
2754        DataActor::on_degrade(self)
2755    }
2756
2757    fn on_fault(&mut self) -> anyhow::Result<()> {
2758        DataActor::on_fault(self)
2759    }
2760
2761    fn on_reset(&mut self) -> anyhow::Result<()> {
2762        DataActor::on_reset(self)
2763    }
2764
2765    fn on_dispose(&mut self) -> anyhow::Result<()> {
2766        DataActor::on_dispose(self)
2767    }
2768}
2769
2770/// Core functionality for all actors.
2771#[derive(Clone)]
2772#[allow(
2773    dead_code,
2774    reason = "TODO: Under development (pending_requests, signal_classes)"
2775)]
2776pub struct DataActorCore {
2777    /// The actor identifier.
2778    pub actor_id: ActorId,
2779    /// The actors configuration.
2780    pub config: DataActorConfig,
2781    trader_id: Option<TraderId>,
2782    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
2783    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
2784    state: ComponentState,
2785    topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2786    instrument_handlers: AHashMap<MStr<Pattern>, TypedHandler<InstrumentAny>>,
2787    deltas_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDeltas>>,
2788    depth10_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDepth10>>,
2789    book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2790    quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2791    trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2792    bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2793    mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2794    index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2795    funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2796    option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2797    option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2798    order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2799    #[cfg(feature = "defi")]
2800    block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2801    #[cfg(feature = "defi")]
2802    pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2803    #[cfg(feature = "defi")]
2804    pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2805    #[cfg(feature = "defi")]
2806    pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2807    #[cfg(feature = "defi")]
2808    pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2809    #[cfg(feature = "defi")]
2810    pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2811    warning_events: AHashSet<String>, // TODO: TBD
2812    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2813    signal_classes: AHashMap<String, String>,
2814    indicators: Indicators,
2815}
2816
2817impl Debug for DataActorCore {
2818    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2819        f.debug_struct(stringify!(DataActorCore))
2820            .field("actor_id", &self.actor_id)
2821            .field("config", &self.config)
2822            .field("state", &self.state)
2823            .field("trader_id", &self.trader_id)
2824            .finish()
2825    }
2826}
2827
2828impl DataActorCore {
2829    /// Adds a subscription handler for the `topic`.
2830    ///
2831    //// Logs a warning if the actor is already subscribed to the topic.
2832    pub(crate) fn add_subscription_any(
2833        &mut self,
2834        topic: MStr<Topic>,
2835        handler: ShareableMessageHandler,
2836    ) {
2837        let pattern: MStr<Pattern> = topic.into();
2838        if self.topic_handlers.contains_key(&pattern) {
2839            log::warn!(
2840                "Actor {} attempted duplicate subscription to topic '{topic}'",
2841                self.actor_id,
2842            );
2843            return;
2844        }
2845
2846        self.topic_handlers.insert(pattern, handler.clone());
2847        msgbus::subscribe_any(pattern, handler, None);
2848    }
2849
2850    /// Removes a subscription handler for the `topic` if present.
2851    ///
2852    /// Logs a warning if the actor is not currently subscribed to the topic.
2853    pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2854        let pattern: MStr<Pattern> = topic.into();
2855        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2856            msgbus::unsubscribe_any(pattern, &handler);
2857        } else {
2858            log::warn!(
2859                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2860                self.actor_id,
2861            );
2862        }
2863    }
2864
2865    pub(crate) fn add_quote_subscription(
2866        &mut self,
2867        topic: MStr<Topic>,
2868        handler: TypedHandler<QuoteTick>,
2869    ) {
2870        if self.quote_handlers.contains_key(&topic) {
2871            log::warn!(
2872                "Actor {} attempted duplicate quote subscription to '{topic}'",
2873                self.actor_id
2874            );
2875            return;
2876        }
2877        self.quote_handlers.insert(topic, handler.clone());
2878        msgbus::subscribe_quotes(topic.into(), handler, None);
2879    }
2880
2881    #[allow(dead_code)]
2882    pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2883        if let Some(handler) = self.quote_handlers.remove(&topic) {
2884            msgbus::unsubscribe_quotes(topic.into(), &handler);
2885        }
2886    }
2887
2888    pub(crate) fn add_trade_subscription(
2889        &mut self,
2890        topic: MStr<Topic>,
2891        handler: TypedHandler<TradeTick>,
2892    ) {
2893        if self.trade_handlers.contains_key(&topic) {
2894            log::warn!(
2895                "Actor {} attempted duplicate trade subscription to '{topic}'",
2896                self.actor_id
2897            );
2898            return;
2899        }
2900        self.trade_handlers.insert(topic, handler.clone());
2901        msgbus::subscribe_trades(topic.into(), handler, None);
2902    }
2903
2904    #[allow(dead_code)]
2905    pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2906        if let Some(handler) = self.trade_handlers.remove(&topic) {
2907            msgbus::unsubscribe_trades(topic.into(), &handler);
2908        }
2909    }
2910
2911    pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2912        if self.bar_handlers.contains_key(&topic) {
2913            log::warn!(
2914                "Actor {} attempted duplicate bar subscription to '{topic}'",
2915                self.actor_id
2916            );
2917            return;
2918        }
2919        self.bar_handlers.insert(topic, handler.clone());
2920        msgbus::subscribe_bars(topic.into(), handler, None);
2921    }
2922
2923    #[allow(dead_code)]
2924    pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2925        if let Some(handler) = self.bar_handlers.remove(&topic) {
2926            msgbus::unsubscribe_bars(topic.into(), &handler);
2927        }
2928    }
2929
2930    pub(crate) fn add_order_event_subscription(
2931        &mut self,
2932        topic: MStr<Topic>,
2933        handler: TypedHandler<OrderEventAny>,
2934    ) {
2935        if self.order_event_handlers.contains_key(&topic) {
2936            log::warn!(
2937                "Actor {} attempted duplicate order event subscription to '{topic}'",
2938                self.actor_id
2939            );
2940            return;
2941        }
2942        self.order_event_handlers.insert(topic, handler.clone());
2943        msgbus::subscribe_order_events(topic.into(), handler, None);
2944    }
2945
2946    #[allow(dead_code)]
2947    pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2948        if let Some(handler) = self.order_event_handlers.remove(&topic) {
2949            msgbus::unsubscribe_order_events(topic.into(), &handler);
2950        }
2951    }
2952
2953    pub(crate) fn add_deltas_subscription(
2954        &mut self,
2955        pattern: MStr<Pattern>,
2956        handler: TypedHandler<OrderBookDeltas>,
2957    ) {
2958        if self.deltas_handlers.contains_key(&pattern) {
2959            log::warn!(
2960                "Actor {} attempted duplicate deltas subscription to '{pattern}'",
2961                self.actor_id
2962            );
2963            return;
2964        }
2965        self.deltas_handlers.insert(pattern, handler.clone());
2966        msgbus::subscribe_book_deltas(pattern, handler, None);
2967    }
2968
2969    #[allow(dead_code)]
2970    pub(crate) fn remove_deltas_subscription(&mut self, pattern: MStr<Pattern>) {
2971        if let Some(handler) = self.deltas_handlers.remove(&pattern) {
2972            msgbus::unsubscribe_book_deltas(pattern, &handler);
2973        }
2974    }
2975
2976    #[allow(dead_code)]
2977    pub(crate) fn add_depth10_subscription(
2978        &mut self,
2979        pattern: MStr<Pattern>,
2980        handler: TypedHandler<OrderBookDepth10>,
2981    ) {
2982        if self.depth10_handlers.contains_key(&pattern) {
2983            log::warn!(
2984                "Actor {} attempted duplicate depth10 subscription to '{pattern}'",
2985                self.actor_id
2986            );
2987            return;
2988        }
2989        self.depth10_handlers.insert(pattern, handler.clone());
2990        msgbus::subscribe_book_depth10(pattern, handler, None);
2991    }
2992
2993    #[allow(dead_code)]
2994    pub(crate) fn remove_depth10_subscription(&mut self, pattern: MStr<Pattern>) {
2995        if let Some(handler) = self.depth10_handlers.remove(&pattern) {
2996            msgbus::unsubscribe_book_depth10(pattern, &handler);
2997        }
2998    }
2999
3000    pub(crate) fn add_instrument_subscription(
3001        &mut self,
3002        pattern: MStr<Pattern>,
3003        handler: TypedHandler<InstrumentAny>,
3004    ) {
3005        if self.instrument_handlers.contains_key(&pattern) {
3006            log::warn!(
3007                "Actor {} attempted duplicate instrument subscription to '{pattern}'",
3008                self.actor_id
3009            );
3010            return;
3011        }
3012        self.instrument_handlers.insert(pattern, handler.clone());
3013        msgbus::subscribe_instruments(pattern, handler, None);
3014    }
3015
3016    #[allow(dead_code)]
3017    pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
3018        if let Some(handler) = self.instrument_handlers.remove(&pattern) {
3019            msgbus::unsubscribe_instruments(pattern, &handler);
3020        }
3021    }
3022
3023    pub(crate) fn add_instrument_close_subscription(
3024        &mut self,
3025        topic: MStr<Topic>,
3026        handler: ShareableMessageHandler,
3027    ) {
3028        let pattern: MStr<Pattern> = topic.into();
3029        if self.topic_handlers.contains_key(&pattern) {
3030            log::warn!(
3031                "Actor {} attempted duplicate instrument close subscription to '{topic}'",
3032                self.actor_id
3033            );
3034            return;
3035        }
3036        self.topic_handlers.insert(pattern, handler.clone());
3037        msgbus::subscribe_any(pattern, handler, None);
3038    }
3039
3040    #[allow(dead_code)]
3041    pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
3042        let pattern: MStr<Pattern> = topic.into();
3043        if let Some(handler) = self.topic_handlers.remove(&pattern) {
3044            msgbus::unsubscribe_any(pattern, &handler);
3045        }
3046    }
3047
3048    pub(crate) fn add_book_snapshot_subscription(
3049        &mut self,
3050        topic: MStr<Topic>,
3051        handler: TypedHandler<OrderBook>,
3052    ) {
3053        if self.book_handlers.contains_key(&topic) {
3054            log::warn!(
3055                "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
3056                self.actor_id
3057            );
3058            return;
3059        }
3060        self.book_handlers.insert(topic, handler.clone());
3061        msgbus::subscribe_book_snapshots(topic.into(), handler, None);
3062    }
3063
3064    #[allow(dead_code)]
3065    pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
3066        if let Some(handler) = self.book_handlers.remove(&topic) {
3067            msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
3068        }
3069    }
3070
3071    pub(crate) fn add_mark_price_subscription(
3072        &mut self,
3073        topic: MStr<Topic>,
3074        handler: TypedHandler<MarkPriceUpdate>,
3075    ) {
3076        if self.mark_price_handlers.contains_key(&topic) {
3077            log::warn!(
3078                "Actor {} attempted duplicate mark price subscription to '{topic}'",
3079                self.actor_id
3080            );
3081            return;
3082        }
3083        self.mark_price_handlers.insert(topic, handler.clone());
3084        msgbus::subscribe_mark_prices(topic.into(), handler, None);
3085    }
3086
3087    #[allow(dead_code)]
3088    pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
3089        if let Some(handler) = self.mark_price_handlers.remove(&topic) {
3090            msgbus::unsubscribe_mark_prices(topic.into(), &handler);
3091        }
3092    }
3093
3094    pub(crate) fn add_index_price_subscription(
3095        &mut self,
3096        topic: MStr<Topic>,
3097        handler: TypedHandler<IndexPriceUpdate>,
3098    ) {
3099        if self.index_price_handlers.contains_key(&topic) {
3100            log::warn!(
3101                "Actor {} attempted duplicate index price subscription to '{topic}'",
3102                self.actor_id
3103            );
3104            return;
3105        }
3106        self.index_price_handlers.insert(topic, handler.clone());
3107        msgbus::subscribe_index_prices(topic.into(), handler, None);
3108    }
3109
3110    #[allow(dead_code)]
3111    pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
3112        if let Some(handler) = self.index_price_handlers.remove(&topic) {
3113            msgbus::unsubscribe_index_prices(topic.into(), &handler);
3114        }
3115    }
3116
3117    pub(crate) fn add_funding_rate_subscription(
3118        &mut self,
3119        topic: MStr<Topic>,
3120        handler: TypedHandler<FundingRateUpdate>,
3121    ) {
3122        if self.funding_rate_handlers.contains_key(&topic) {
3123            log::warn!(
3124                "Actor {} attempted duplicate funding rate subscription to '{topic}'",
3125                self.actor_id
3126            );
3127            return;
3128        }
3129        self.funding_rate_handlers.insert(topic, handler.clone());
3130        msgbus::subscribe_funding_rates(topic.into(), handler, None);
3131    }
3132
3133    #[allow(dead_code)]
3134    pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
3135        if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
3136            msgbus::unsubscribe_funding_rates(topic.into(), &handler);
3137        }
3138    }
3139
3140    pub(crate) fn add_option_greeks_subscription(
3141        &mut self,
3142        topic: MStr<Topic>,
3143        handler: TypedHandler<OptionGreeks>,
3144    ) {
3145        if self.option_greeks_handlers.contains_key(&topic) {
3146            log::warn!(
3147                "Actor {} attempted duplicate option greeks subscription to '{topic}'",
3148                self.actor_id
3149            );
3150            return;
3151        }
3152        self.option_greeks_handlers.insert(topic, handler.clone());
3153        msgbus::subscribe_option_greeks(topic.into(), handler, None);
3154    }
3155
3156    #[allow(dead_code)]
3157    pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
3158        if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
3159            msgbus::unsubscribe_option_greeks(topic.into(), &handler);
3160        }
3161    }
3162
3163    pub(crate) fn add_option_chain_subscription(
3164        &mut self,
3165        topic: MStr<Topic>,
3166        handler: TypedHandler<OptionChainSlice>,
3167    ) {
3168        if self.option_chain_handlers.contains_key(&topic) {
3169            log::warn!(
3170                "Actor {} attempted duplicate option chain subscription to '{topic}'",
3171                self.actor_id
3172            );
3173            return;
3174        }
3175        self.option_chain_handlers.insert(topic, handler.clone());
3176        msgbus::subscribe_option_chain(topic.into(), handler, None);
3177    }
3178
3179    pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
3180        if let Some(handler) = self.option_chain_handlers.remove(&topic) {
3181            msgbus::unsubscribe_option_chain(topic.into(), &handler);
3182        }
3183    }
3184
3185    #[cfg(feature = "defi")]
3186    pub(crate) fn add_block_subscription(
3187        &mut self,
3188        topic: MStr<Topic>,
3189        handler: TypedHandler<Block>,
3190    ) {
3191        if self.block_handlers.contains_key(&topic) {
3192            log::warn!(
3193                "Actor {} attempted duplicate block subscription to '{topic}'",
3194                self.actor_id
3195            );
3196            return;
3197        }
3198        self.block_handlers.insert(topic, handler.clone());
3199        msgbus::subscribe_defi_blocks(topic.into(), handler, None);
3200    }
3201
3202    #[cfg(feature = "defi")]
3203    #[allow(dead_code)]
3204    pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
3205        if let Some(handler) = self.block_handlers.remove(&topic) {
3206            msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
3207        }
3208    }
3209
3210    #[cfg(feature = "defi")]
3211    pub(crate) fn add_pool_subscription(
3212        &mut self,
3213        topic: MStr<Topic>,
3214        handler: TypedHandler<Pool>,
3215    ) {
3216        if self.pool_handlers.contains_key(&topic) {
3217            log::warn!(
3218                "Actor {} attempted duplicate pool subscription to '{topic}'",
3219                self.actor_id
3220            );
3221            return;
3222        }
3223        self.pool_handlers.insert(topic, handler.clone());
3224        msgbus::subscribe_defi_pools(topic.into(), handler, None);
3225    }
3226
3227    #[cfg(feature = "defi")]
3228    #[allow(dead_code)]
3229    pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
3230        if let Some(handler) = self.pool_handlers.remove(&topic) {
3231            msgbus::unsubscribe_defi_pools(topic.into(), &handler);
3232        }
3233    }
3234
3235    #[cfg(feature = "defi")]
3236    pub(crate) fn add_pool_swap_subscription(
3237        &mut self,
3238        topic: MStr<Topic>,
3239        handler: TypedHandler<PoolSwap>,
3240    ) {
3241        if self.pool_swap_handlers.contains_key(&topic) {
3242            log::warn!(
3243                "Actor {} attempted duplicate pool swap subscription to '{topic}'",
3244                self.actor_id
3245            );
3246            return;
3247        }
3248        self.pool_swap_handlers.insert(topic, handler.clone());
3249        msgbus::subscribe_defi_swaps(topic.into(), handler, None);
3250    }
3251
3252    #[cfg(feature = "defi")]
3253    #[allow(dead_code)]
3254    pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
3255        if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
3256            msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
3257        }
3258    }
3259
3260    #[cfg(feature = "defi")]
3261    pub(crate) fn add_pool_liquidity_subscription(
3262        &mut self,
3263        topic: MStr<Topic>,
3264        handler: TypedHandler<PoolLiquidityUpdate>,
3265    ) {
3266        if self.pool_liquidity_handlers.contains_key(&topic) {
3267            log::warn!(
3268                "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
3269                self.actor_id
3270            );
3271            return;
3272        }
3273        self.pool_liquidity_handlers.insert(topic, handler.clone());
3274        msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
3275    }
3276
3277    #[cfg(feature = "defi")]
3278    #[allow(dead_code)]
3279    pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
3280        if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
3281            msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
3282        }
3283    }
3284
3285    #[cfg(feature = "defi")]
3286    pub(crate) fn add_pool_collect_subscription(
3287        &mut self,
3288        topic: MStr<Topic>,
3289        handler: TypedHandler<PoolFeeCollect>,
3290    ) {
3291        if self.pool_collect_handlers.contains_key(&topic) {
3292            log::warn!(
3293                "Actor {} attempted duplicate pool collect subscription to '{topic}'",
3294                self.actor_id
3295            );
3296            return;
3297        }
3298        self.pool_collect_handlers.insert(topic, handler.clone());
3299        msgbus::subscribe_defi_collects(topic.into(), handler, None);
3300    }
3301
3302    #[cfg(feature = "defi")]
3303    #[allow(dead_code)]
3304    pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
3305        if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
3306            msgbus::unsubscribe_defi_collects(topic.into(), &handler);
3307        }
3308    }
3309
3310    #[cfg(feature = "defi")]
3311    pub(crate) fn add_pool_flash_subscription(
3312        &mut self,
3313        topic: MStr<Topic>,
3314        handler: TypedHandler<PoolFlash>,
3315    ) {
3316        if self.pool_flash_handlers.contains_key(&topic) {
3317            log::warn!(
3318                "Actor {} attempted duplicate pool flash subscription to '{topic}'",
3319                self.actor_id
3320            );
3321            return;
3322        }
3323        self.pool_flash_handlers.insert(topic, handler.clone());
3324        msgbus::subscribe_defi_flash(topic.into(), handler, None);
3325    }
3326
3327    #[cfg(feature = "defi")]
3328    #[allow(dead_code)]
3329    pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
3330        if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
3331            msgbus::unsubscribe_defi_flash(topic.into(), &handler);
3332        }
3333    }
3334
3335    /// Creates a new [`DataActorCore`] instance.
3336    pub fn new(config: DataActorConfig) -> Self {
3337        let actor_id = config
3338            .actor_id
3339            .unwrap_or_else(|| Self::default_actor_id(&config));
3340
3341        Self {
3342            actor_id,
3343            config,
3344            trader_id: None, // None until registered
3345            clock: None,     // None until registered
3346            cache: None,     // None until registered
3347            state: ComponentState::default(),
3348            topic_handlers: AHashMap::new(),
3349            instrument_handlers: AHashMap::new(),
3350            deltas_handlers: AHashMap::new(),
3351            depth10_handlers: AHashMap::new(),
3352            book_handlers: AHashMap::new(),
3353            quote_handlers: AHashMap::new(),
3354            trade_handlers: AHashMap::new(),
3355            bar_handlers: AHashMap::new(),
3356            mark_price_handlers: AHashMap::new(),
3357            index_price_handlers: AHashMap::new(),
3358            funding_rate_handlers: AHashMap::new(),
3359            option_greeks_handlers: AHashMap::new(),
3360            option_chain_handlers: AHashMap::new(),
3361            order_event_handlers: AHashMap::new(),
3362            #[cfg(feature = "defi")]
3363            block_handlers: AHashMap::new(),
3364            #[cfg(feature = "defi")]
3365            pool_handlers: AHashMap::new(),
3366            #[cfg(feature = "defi")]
3367            pool_swap_handlers: AHashMap::new(),
3368            #[cfg(feature = "defi")]
3369            pool_liquidity_handlers: AHashMap::new(),
3370            #[cfg(feature = "defi")]
3371            pool_collect_handlers: AHashMap::new(),
3372            #[cfg(feature = "defi")]
3373            pool_flash_handlers: AHashMap::new(),
3374            warning_events: AHashSet::new(),
3375            pending_requests: AHashMap::new(),
3376            signal_classes: AHashMap::new(),
3377            indicators: Indicators::default(),
3378        }
3379    }
3380
3381    /// Returns the registered indicators.
3382    #[must_use]
3383    pub fn registered_indicators(&self) -> Vec<SharedActorIndicator> {
3384        self.indicators.registered_indicators()
3385    }
3386
3387    /// Returns whether all registered indicators are initialized.
3388    ///
3389    /// # Errors
3390    ///
3391    /// Returns an error if a registered indicator cannot report readiness.
3392    pub fn indicators_initialized(&self) -> anyhow::Result<bool> {
3393        self.indicators.initialized()
3394    }
3395
3396    /// Registers an indicator to receive quote ticks for an instrument.
3397    pub fn register_indicator_for_quote_ticks(
3398        &mut self,
3399        instrument_id: InstrumentId,
3400        indicator: SharedActorIndicator,
3401    ) {
3402        self.indicators
3403            .register_indicator_for_quote_ticks(instrument_id, indicator);
3404    }
3405
3406    /// Registers an indicator to receive trade ticks for an instrument.
3407    pub fn register_indicator_for_trade_ticks(
3408        &mut self,
3409        instrument_id: InstrumentId,
3410        indicator: SharedActorIndicator,
3411    ) {
3412        self.indicators
3413            .register_indicator_for_trade_ticks(instrument_id, indicator);
3414    }
3415
3416    /// Registers an indicator to receive bars for a bar type.
3417    pub fn register_indicator_for_bars(
3418        &mut self,
3419        bar_type: BarType,
3420        indicator: SharedActorIndicator,
3421    ) {
3422        self.indicators
3423            .register_indicator_for_bars(bar_type, indicator);
3424    }
3425
3426    pub(crate) fn handle_indicators_for_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
3427        self.indicators.handle_quote(quote)
3428    }
3429
3430    pub(crate) fn handle_indicators_for_quotes(&self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
3431        self.indicators.handle_quotes(quotes)
3432    }
3433
3434    pub(crate) fn handle_indicators_for_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
3435        self.indicators.handle_trade(trade)
3436    }
3437
3438    pub(crate) fn handle_indicators_for_trades(&self, trades: &[TradeTick]) -> anyhow::Result<()> {
3439        self.indicators.handle_trades(trades)
3440    }
3441
3442    pub(crate) fn handle_indicators_for_bar(&self, bar: &Bar) -> anyhow::Result<()> {
3443        self.indicators.handle_bar(bar)
3444    }
3445
3446    pub(crate) fn handle_indicators_for_bars(&self, bars: &[Bar]) -> anyhow::Result<()> {
3447        self.indicators.handle_bars(bars)
3448    }
3449
3450    /// Returns the memory address of this instance as a hexadecimal string.
3451    #[must_use]
3452    pub fn mem_address(&self) -> String {
3453        format!("{self:p}")
3454    }
3455
3456    /// Returns the actors state.
3457    pub fn state(&self) -> ComponentState {
3458        self.state
3459    }
3460
3461    /// Returns the trader ID this actor is registered to.
3462    pub fn trader_id(&self) -> Option<TraderId> {
3463        self.trader_id
3464    }
3465
3466    /// Returns the actors ID.
3467    pub fn actor_id(&self) -> ActorId {
3468        self.actor_id
3469    }
3470
3471    fn default_actor_id(config: &DataActorConfig) -> ActorId {
3472        let memory_address = std::ptr::from_ref(config) as usize;
3473        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
3474    }
3475
3476    /// Returns a UNIX nanoseconds timestamp from the actor's internal clock.
3477    pub fn timestamp_ns(&self) -> UnixNanos {
3478        self.clock_ref().timestamp_ns()
3479    }
3480
3481    fn clock_api(&self) -> ClockApi<'_> {
3482        let clock = self.clock.as_ref().unwrap_or_else(|| {
3483            panic!(
3484                "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
3485                self.actor_id, self.trader_id
3486            )
3487        });
3488        ClockApi::new(clock.as_ref())
3489    }
3490
3491    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
3492        self.clock
3493            .as_ref()
3494            .unwrap_or_else(|| {
3495                panic!(
3496                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
3497                    self.actor_id, self.trader_id
3498                )
3499            })
3500            .borrow()
3501    }
3502
3503    fn cache_api(&self) -> CacheApi<'_> {
3504        let cache = self.cache.as_ref().unwrap_or_else(|| {
3505            panic!(
3506                "DataActor {} must be registered before calling `cache()` - trader_id: {:?}",
3507                self.actor_id, self.trader_id
3508            )
3509        });
3510        CacheApi::new(cache.as_ref())
3511    }
3512
3513    /// Register the data actor with a trader.
3514    ///
3515    /// # Errors
3516    ///
3517    /// Returns an error if the actor has already been registered with a trader
3518    /// or if the provided dependencies are invalid.
3519    pub fn register(
3520        &mut self,
3521        trader_id: TraderId,
3522        clock: Rc<RefCell<dyn Clock>>,
3523        cache: Rc<RefCell<Cache>>,
3524    ) -> anyhow::Result<()> {
3525        if let Some(existing_trader_id) = self.trader_id {
3526            anyhow::bail!(
3527                "DataActor {} already registered with trader {existing_trader_id}",
3528                self.actor_id
3529            );
3530        }
3531
3532        // Validate clock by attempting to access it
3533        {
3534            let _timestamp = clock.borrow().timestamp_ns();
3535        }
3536
3537        // Validate cache by attempting to access it
3538        {
3539            let _cache_borrow = cache.borrow();
3540        }
3541
3542        self.trader_id = Some(trader_id);
3543        self.clock = Some(clock);
3544        self.cache = Some(cache);
3545
3546        // Verify complete registration
3547        if !self.is_properly_registered() {
3548            anyhow::bail!(
3549                "DataActor {} registration incomplete - validation failed",
3550                self.actor_id
3551            );
3552        }
3553
3554        log::debug!("Registered {} with trader {trader_id}", self.actor_id);
3555        Ok(())
3556    }
3557
3558    /// Register an event type for warning log levels.
3559    pub fn register_warning_event(&mut self, event_type: &str) {
3560        self.warning_events.insert(event_type.to_string());
3561        log::debug!("Registered event type '{event_type}' for warning logs");
3562    }
3563
3564    /// Deregister an event type from warning log levels.
3565    pub fn deregister_warning_event(&mut self, event_type: &str) {
3566        self.warning_events.remove(event_type);
3567        log::debug!("Deregistered event type '{event_type}' from warning logs");
3568    }
3569
3570    pub fn is_registered(&self) -> bool {
3571        self.trader_id.is_some()
3572    }
3573
3574    pub(crate) fn check_registered(&self) {
3575        assert!(
3576            self.is_registered(),
3577            "Actor has not been registered with a Trader"
3578        );
3579    }
3580
3581    /// Validates registration state without panicking.
3582    fn is_properly_registered(&self) -> bool {
3583        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
3584    }
3585
3586    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
3587        if self.config.log_commands {
3588            log::info!("{CMD}{SEND} {command:?}");
3589        }
3590
3591        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3592        msgbus::send_data_command(endpoint, command);
3593    }
3594
3595    #[allow(dead_code)]
3596    fn send_data_req(&self, request: &RequestCommand) {
3597        if self.config.log_commands {
3598            log::info!("{REQ}{SEND} {request:?}");
3599        }
3600
3601        // For now, simplified approach - data requests without dynamic handlers
3602        // TODO: Implement proper dynamic dispatch for response handlers
3603        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3604        msgbus::send_any(endpoint, request.as_any());
3605    }
3606
3607    /// Sends a shutdown command to the system with an optional reason.
3608    ///
3609    /// # Panics
3610    ///
3611    /// Panics if the actor is not registered or has no trader ID.
3612    pub fn shutdown_system(&self, reason: Option<String>) {
3613        self.check_registered();
3614
3615        // Checked registered before unwrapping trader ID
3616        let command = ShutdownSystem::new(
3617            self.trader_id().unwrap(),
3618            self.actor_id.inner(),
3619            reason,
3620            UUID4::new(),
3621            self.timestamp_ns(),
3622            None, // correlation_id
3623        );
3624
3625        let topic = MessagingSwitchboard::shutdown_system_topic();
3626        msgbus::publish_any(topic, command.as_any());
3627    }
3628
3629    /// Publishes `data` on the message bus under the topic derived from `data_type`.
3630    ///
3631    /// `data_type` is kept as an explicit parameter (rather than deriving it from
3632    /// `data.data_type`) to mirror the v1 Python `publish_data(data_type, data)` API and
3633    /// to allow callers to override the routing topic from the payload's intrinsic type.
3634    ///
3635    /// # Panics
3636    ///
3637    /// Panics if the actor is not registered with a trader.
3638    pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3639        self.check_registered();
3640
3641        let topic = get_custom_topic(data_type);
3642        msgbus::publish_any(topic, data);
3643    }
3644
3645    /// Publishes a [`Signal`] constructed from `name` and `value`, wrapped in [`CustomData`]
3646    /// so it is consumed by signal subscribers and by any `CustomData`-aware pipeline
3647    /// (for example the feather persistence writer).
3648    ///
3649    /// The topic mirrors the v1 Python scheme `data.Signal<TitleName>` so subscribers
3650    /// using either a specific name or the global wildcard are both notified.
3651    /// If `ts_event` is zero then the current clock timestamp is used.
3652    ///
3653    /// # Panics
3654    ///
3655    /// Panics if the actor is not registered with a trader.
3656    pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3657        self.check_registered();
3658
3659        let now = self.timestamp_ns();
3660        let ts_event = if ts_event.as_u64() == 0 {
3661            now
3662        } else {
3663            ts_event
3664        };
3665        let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3666
3667        let data_type = DataType::new(
3668            &format!(
3669                "Signal{}",
3670                nautilus_core::string::conversions::title_case(name)
3671            ),
3672            None,
3673            None,
3674        );
3675        let data = CustomData::new(Arc::new(signal), data_type);
3676        let topic = get_custom_topic(&data.data_type);
3677        msgbus::publish_any(topic, &data);
3678    }
3679
3680    /// Adds the `synthetic` instrument to the cache.
3681    ///
3682    /// # Errors
3683    ///
3684    /// Returns an error if a synthetic with the same ID already exists, or if the
3685    /// backing cache fails to persist it. Panics if the actor is not registered
3686    /// with a trader. // panics-doc-ok
3687    pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3688        self.check_registered();
3689
3690        let cache = self.cache_rc();
3691        if cache.borrow().synthetic(&synthetic.id).is_some() {
3692            anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3693        }
3694        cache.borrow_mut().add_synthetic(synthetic)
3695    }
3696
3697    /// Updates the `synthetic` instrument in the cache, replacing the existing entry.
3698    ///
3699    /// # Errors
3700    ///
3701    /// Returns an error if no synthetic with the same ID already exists, or if the
3702    /// backing cache fails to persist the replacement. Panics if the actor is not
3703    /// registered with a trader. // panics-doc-ok
3704    pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3705        self.check_registered();
3706
3707        let cache = self.cache_rc();
3708        if cache.borrow().synthetic(&synthetic.id).is_none() {
3709            anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3710        }
3711        cache.borrow_mut().add_synthetic(synthetic)
3712    }
3713
3714    /// Helper method for registering data subscriptions from the trait.
3715    ///
3716    /// # Panics
3717    ///
3718    /// Panics if the actor is not properly registered.
3719    pub fn subscribe_data(
3720        &mut self,
3721        handler: ShareableMessageHandler,
3722        data_type: DataType,
3723        client_id: Option<ClientId>,
3724        params: Option<Params>,
3725    ) {
3726        assert!(
3727            self.is_properly_registered(),
3728            "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3729            self.actor_id,
3730            self.trader_id,
3731            self.clock.is_some(),
3732            self.cache.is_some()
3733        );
3734
3735        let topic = get_custom_topic(&data_type);
3736        self.add_subscription_any(topic, handler);
3737
3738        // If no client ID specified, just subscribe to the topic
3739        if client_id.is_none() {
3740            return;
3741        }
3742
3743        let command = SubscribeCommand::Data(SubscribeCustomData {
3744            data_type,
3745            client_id,
3746            venue: None,
3747            command_id: UUID4::new(),
3748            ts_init: self.timestamp_ns(),
3749            correlation_id: None,
3750            params,
3751        });
3752
3753        self.send_data_cmd(DataCommand::Subscribe(command));
3754    }
3755
3756    /// Helper method for registering signal subscriptions from the trait.
3757    ///
3758    /// An empty `name` subscribes to every signal via the `data.Signal*` wildcard pattern.
3759    ///
3760    /// # Panics
3761    ///
3762    /// Panics if the actor is not registered with a trader.
3763    pub fn subscribe_signal(
3764        &mut self,
3765        handler: ShareableMessageHandler,
3766        name: &str,
3767        priority: Option<u32>,
3768    ) {
3769        self.check_registered();
3770
3771        let pattern = get_signal_pattern(name);
3772        if self.topic_handlers.contains_key(&pattern) {
3773            log::warn!(
3774                "Actor {} attempted duplicate signal subscription to '{pattern}'",
3775                self.actor_id,
3776            );
3777            return;
3778        }
3779        self.topic_handlers.insert(pattern, handler.clone());
3780        msgbus::subscribe_any(pattern, handler, priority);
3781    }
3782
3783    /// Helper method for registering quotes subscriptions from the trait.
3784    pub fn subscribe_quotes(
3785        &mut self,
3786        topic: MStr<Topic>,
3787        handler: TypedHandler<QuoteTick>,
3788        instrument_id: InstrumentId,
3789        client_id: Option<ClientId>,
3790        params: Option<Params>,
3791    ) {
3792        self.check_registered();
3793
3794        self.add_quote_subscription(topic, handler);
3795
3796        let command = SubscribeCommand::Quotes(SubscribeQuotes {
3797            instrument_id,
3798            client_id,
3799            venue: Some(instrument_id.venue),
3800            command_id: UUID4::new(),
3801            ts_init: self.timestamp_ns(),
3802            correlation_id: None,
3803            params,
3804        });
3805
3806        self.send_data_cmd(DataCommand::Subscribe(command));
3807    }
3808
3809    /// Helper method for registering instruments subscriptions from the trait.
3810    pub fn subscribe_instruments(
3811        &mut self,
3812        pattern: MStr<Pattern>,
3813        handler: TypedHandler<InstrumentAny>,
3814        venue: Venue,
3815        client_id: Option<ClientId>,
3816        params: Option<Params>,
3817    ) {
3818        self.check_registered();
3819
3820        self.add_instrument_subscription(pattern, handler);
3821
3822        let command = SubscribeCommand::Instruments(SubscribeInstruments {
3823            client_id,
3824            venue,
3825            command_id: UUID4::new(),
3826            ts_init: self.timestamp_ns(),
3827            correlation_id: None,
3828            params,
3829        });
3830
3831        self.send_data_cmd(DataCommand::Subscribe(command));
3832    }
3833
3834    /// Helper method for registering instrument subscriptions from the trait.
3835    pub fn subscribe_instrument(
3836        &mut self,
3837        topic: MStr<Topic>,
3838        handler: TypedHandler<InstrumentAny>,
3839        instrument_id: InstrumentId,
3840        client_id: Option<ClientId>,
3841        params: Option<Params>,
3842    ) {
3843        self.check_registered();
3844
3845        self.add_instrument_subscription(topic.into(), handler);
3846
3847        let command = SubscribeCommand::Instrument(SubscribeInstrument {
3848            instrument_id,
3849            client_id,
3850            venue: Some(instrument_id.venue),
3851            command_id: UUID4::new(),
3852            ts_init: self.timestamp_ns(),
3853            correlation_id: None,
3854            params,
3855        });
3856
3857        self.send_data_cmd(DataCommand::Subscribe(command));
3858    }
3859
3860    /// Helper method for registering book deltas subscriptions from the trait.
3861    #[expect(clippy::too_many_arguments)]
3862    pub fn subscribe_book_deltas(
3863        &mut self,
3864        pattern: MStr<Pattern>,
3865        handler: TypedHandler<OrderBookDeltas>,
3866        instrument_id: InstrumentId,
3867        book_type: BookType,
3868        depth: Option<NonZeroUsize>,
3869        client_id: Option<ClientId>,
3870        managed: bool,
3871        params: Option<Params>,
3872    ) {
3873        self.check_registered();
3874
3875        self.add_deltas_subscription(pattern, handler);
3876
3877        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3878            instrument_id,
3879            book_type,
3880            client_id,
3881            venue: Some(instrument_id.venue),
3882            command_id: UUID4::new(),
3883            ts_init: self.timestamp_ns(),
3884            depth,
3885            managed,
3886            correlation_id: None,
3887            params,
3888        });
3889
3890        self.send_data_cmd(DataCommand::Subscribe(command));
3891    }
3892
3893    /// Helper method for registering book snapshots subscriptions from the trait.
3894    #[expect(clippy::too_many_arguments)]
3895    pub fn subscribe_book_at_interval(
3896        &mut self,
3897        topic: MStr<Topic>,
3898        handler: TypedHandler<OrderBook>,
3899        instrument_id: InstrumentId,
3900        book_type: BookType,
3901        depth: Option<NonZeroUsize>,
3902        interval_ms: NonZeroUsize,
3903        client_id: Option<ClientId>,
3904        params: Option<Params>,
3905    ) {
3906        self.check_registered();
3907
3908        self.add_book_snapshot_subscription(topic, handler);
3909
3910        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3911            instrument_id,
3912            book_type,
3913            client_id,
3914            venue: Some(instrument_id.venue),
3915            command_id: UUID4::new(),
3916            ts_init: self.timestamp_ns(),
3917            depth,
3918            interval_ms,
3919            correlation_id: None,
3920            params,
3921        });
3922
3923        self.send_data_cmd(DataCommand::Subscribe(command));
3924    }
3925
3926    /// Helper method for registering trades subscriptions from the trait.
3927    pub fn subscribe_trades(
3928        &mut self,
3929        topic: MStr<Topic>,
3930        handler: TypedHandler<TradeTick>,
3931        instrument_id: InstrumentId,
3932        client_id: Option<ClientId>,
3933        params: Option<Params>,
3934    ) {
3935        self.check_registered();
3936
3937        self.add_trade_subscription(topic, handler);
3938
3939        let command = SubscribeCommand::Trades(SubscribeTrades {
3940            instrument_id,
3941            client_id,
3942            venue: Some(instrument_id.venue),
3943            command_id: UUID4::new(),
3944            ts_init: self.timestamp_ns(),
3945            correlation_id: None,
3946            params,
3947        });
3948
3949        self.send_data_cmd(DataCommand::Subscribe(command));
3950    }
3951
3952    /// Helper method for registering bars subscriptions from the trait.
3953    pub fn subscribe_bars(
3954        &mut self,
3955        topic: MStr<Topic>,
3956        handler: TypedHandler<Bar>,
3957        bar_type: BarType,
3958        client_id: Option<ClientId>,
3959        params: Option<Params>,
3960    ) {
3961        self.check_registered();
3962
3963        self.add_bar_subscription(topic, handler);
3964
3965        let command = SubscribeCommand::Bars(SubscribeBars {
3966            bar_type,
3967            client_id,
3968            venue: Some(bar_type.instrument_id().venue),
3969            command_id: UUID4::new(),
3970            ts_init: self.timestamp_ns(),
3971            correlation_id: None,
3972            params,
3973        });
3974
3975        self.send_data_cmd(DataCommand::Subscribe(command));
3976    }
3977
3978    /// Helper method for registering mark prices subscriptions from the trait.
3979    pub fn subscribe_mark_prices(
3980        &mut self,
3981        topic: MStr<Topic>,
3982        handler: TypedHandler<MarkPriceUpdate>,
3983        instrument_id: InstrumentId,
3984        client_id: Option<ClientId>,
3985        params: Option<Params>,
3986    ) {
3987        self.check_registered();
3988
3989        self.add_mark_price_subscription(topic, handler);
3990
3991        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3992            instrument_id,
3993            client_id,
3994            venue: Some(instrument_id.venue),
3995            command_id: UUID4::new(),
3996            ts_init: self.timestamp_ns(),
3997            correlation_id: None,
3998            params,
3999        });
4000
4001        self.send_data_cmd(DataCommand::Subscribe(command));
4002    }
4003
4004    /// Helper method for registering index prices subscriptions from the trait.
4005    pub fn subscribe_index_prices(
4006        &mut self,
4007        topic: MStr<Topic>,
4008        handler: TypedHandler<IndexPriceUpdate>,
4009        instrument_id: InstrumentId,
4010        client_id: Option<ClientId>,
4011        params: Option<Params>,
4012    ) {
4013        self.check_registered();
4014
4015        self.add_index_price_subscription(topic, handler);
4016
4017        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
4018            instrument_id,
4019            client_id,
4020            venue: Some(instrument_id.venue),
4021            command_id: UUID4::new(),
4022            ts_init: self.timestamp_ns(),
4023            correlation_id: None,
4024            params,
4025        });
4026
4027        self.send_data_cmd(DataCommand::Subscribe(command));
4028    }
4029
4030    /// Helper method for registering funding rates subscriptions from the trait.
4031    pub fn subscribe_funding_rates(
4032        &mut self,
4033        topic: MStr<Topic>,
4034        handler: TypedHandler<FundingRateUpdate>,
4035        instrument_id: InstrumentId,
4036        client_id: Option<ClientId>,
4037        params: Option<Params>,
4038    ) {
4039        self.check_registered();
4040
4041        self.add_funding_rate_subscription(topic, handler);
4042
4043        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
4044            instrument_id,
4045            client_id,
4046            venue: Some(instrument_id.venue),
4047            command_id: UUID4::new(),
4048            ts_init: self.timestamp_ns(),
4049            correlation_id: None,
4050            params,
4051        });
4052
4053        self.send_data_cmd(DataCommand::Subscribe(command));
4054    }
4055
4056    /// Helper method for registering option greeks subscriptions from the trait.
4057    pub fn subscribe_option_greeks(
4058        &mut self,
4059        topic: MStr<Topic>,
4060        handler: TypedHandler<OptionGreeks>,
4061        instrument_id: InstrumentId,
4062        client_id: Option<ClientId>,
4063        params: Option<Params>,
4064    ) {
4065        self.check_registered();
4066
4067        self.add_option_greeks_subscription(topic, handler);
4068
4069        let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
4070            instrument_id,
4071            client_id,
4072            venue: Some(instrument_id.venue),
4073            command_id: UUID4::new(),
4074            ts_init: self.timestamp_ns(),
4075            correlation_id: None,
4076            params,
4077        });
4078
4079        self.send_data_cmd(DataCommand::Subscribe(command));
4080    }
4081
4082    /// Helper method for registering instrument status subscriptions from the trait.
4083    pub fn subscribe_instrument_status(
4084        &mut self,
4085        topic: MStr<Topic>,
4086        handler: ShareableMessageHandler,
4087        instrument_id: InstrumentId,
4088        client_id: Option<ClientId>,
4089        params: Option<Params>,
4090    ) {
4091        self.check_registered();
4092
4093        self.add_subscription_any(topic, handler);
4094
4095        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
4096            instrument_id,
4097            client_id,
4098            venue: Some(instrument_id.venue),
4099            command_id: UUID4::new(),
4100            ts_init: self.timestamp_ns(),
4101            correlation_id: None,
4102            params,
4103        });
4104
4105        self.send_data_cmd(DataCommand::Subscribe(command));
4106    }
4107
4108    /// Helper method for registering instrument close subscriptions from the trait.
4109    pub fn subscribe_instrument_close(
4110        &mut self,
4111        topic: MStr<Topic>,
4112        handler: ShareableMessageHandler,
4113        instrument_id: InstrumentId,
4114        client_id: Option<ClientId>,
4115        params: Option<Params>,
4116    ) {
4117        self.check_registered();
4118
4119        self.add_instrument_close_subscription(topic, handler);
4120
4121        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
4122            instrument_id,
4123            client_id,
4124            venue: Some(instrument_id.venue),
4125            command_id: UUID4::new(),
4126            ts_init: self.timestamp_ns(),
4127            correlation_id: None,
4128            params,
4129        });
4130
4131        self.send_data_cmd(DataCommand::Subscribe(command));
4132    }
4133
4134    /// Helper method for subscribing to option chain snapshots from the trait.
4135    #[expect(
4136        clippy::too_many_arguments,
4137        reason = "subscription command mirrors the option chain request fields"
4138    )]
4139    pub fn subscribe_option_chain(
4140        &mut self,
4141        topic: MStr<Topic>,
4142        handler: TypedHandler<OptionChainSlice>,
4143        series_id: OptionSeriesId,
4144        strike_range: StrikeRange,
4145        snapshot_interval_ms: Option<u64>,
4146        client_id: Option<ClientId>,
4147        params: Option<Params>,
4148    ) {
4149        self.check_registered();
4150
4151        self.add_option_chain_subscription(topic, handler);
4152
4153        let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
4154            series_id,
4155            strike_range,
4156            snapshot_interval_ms,
4157            UUID4::new(),
4158            self.timestamp_ns(),
4159            client_id,
4160            Some(series_id.venue),
4161            params,
4162        ));
4163
4164        self.send_data_cmd(DataCommand::Subscribe(command));
4165    }
4166
4167    /// Helper method for registering order fills subscriptions from the trait.
4168    pub fn subscribe_order_fills(
4169        &mut self,
4170        topic: MStr<Topic>,
4171        handler: TypedHandler<OrderEventAny>,
4172    ) {
4173        self.check_registered();
4174        self.add_order_event_subscription(topic, handler);
4175    }
4176
4177    /// Helper method for registering order cancels subscriptions from the trait.
4178    pub fn subscribe_order_cancels(
4179        &mut self,
4180        topic: MStr<Topic>,
4181        handler: TypedHandler<OrderEventAny>,
4182    ) {
4183        self.check_registered();
4184        self.add_order_event_subscription(topic, handler);
4185    }
4186
4187    /// Helper method for unsubscribing from data.
4188    pub fn unsubscribe_data(
4189        &mut self,
4190        data_type: DataType,
4191        client_id: Option<ClientId>,
4192        params: Option<Params>,
4193    ) {
4194        self.check_registered();
4195
4196        let topic = get_custom_topic(&data_type);
4197        self.remove_subscription_any(topic);
4198
4199        if client_id.is_none() {
4200            return;
4201        }
4202
4203        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
4204            data_type,
4205            client_id,
4206            venue: None,
4207            command_id: UUID4::new(),
4208            ts_init: self.timestamp_ns(),
4209            correlation_id: None,
4210            params,
4211        });
4212
4213        self.send_data_cmd(DataCommand::Unsubscribe(command));
4214    }
4215
4216    /// Helper method for unsubscribing from signals.
4217    ///
4218    /// # Panics
4219    ///
4220    /// Panics if the actor is not registered with a trader.
4221    pub fn unsubscribe_signal(&mut self, name: &str) {
4222        self.check_registered();
4223
4224        let pattern = get_signal_pattern(name);
4225        if let Some(handler) = self.topic_handlers.remove(&pattern) {
4226            msgbus::unsubscribe_any(pattern, &handler);
4227        } else {
4228            log::warn!(
4229                "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
4230                self.actor_id,
4231            );
4232        }
4233    }
4234
4235    /// Helper method for unsubscribing from instruments.
4236    pub fn unsubscribe_instruments(
4237        &mut self,
4238        venue: Venue,
4239        client_id: Option<ClientId>,
4240        params: Option<Params>,
4241    ) {
4242        self.check_registered();
4243
4244        let pattern = get_instruments_pattern(venue);
4245        self.remove_instrument_subscription(pattern);
4246
4247        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
4248            client_id,
4249            venue,
4250            command_id: UUID4::new(),
4251            ts_init: self.timestamp_ns(),
4252            correlation_id: None,
4253            params,
4254        });
4255
4256        self.send_data_cmd(DataCommand::Unsubscribe(command));
4257    }
4258
4259    /// Helper method for unsubscribing from instrument.
4260    pub fn unsubscribe_instrument(
4261        &mut self,
4262        instrument_id: InstrumentId,
4263        client_id: Option<ClientId>,
4264        params: Option<Params>,
4265    ) {
4266        self.check_registered();
4267
4268        let topic = get_instrument_topic(instrument_id);
4269        self.remove_instrument_subscription(topic.into());
4270
4271        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
4272            instrument_id,
4273            client_id,
4274            venue: Some(instrument_id.venue),
4275            command_id: UUID4::new(),
4276            ts_init: self.timestamp_ns(),
4277            correlation_id: None,
4278            params,
4279        });
4280
4281        self.send_data_cmd(DataCommand::Unsubscribe(command));
4282    }
4283
4284    /// Helper method for unsubscribing from book deltas.
4285    pub fn unsubscribe_book_deltas(
4286        &mut self,
4287        instrument_id: InstrumentId,
4288        client_id: Option<ClientId>,
4289        params: Option<Params>,
4290    ) {
4291        self.check_registered();
4292
4293        let pattern = if is_parent_subscription(params.as_ref()) {
4294            get_book_deltas_pattern(instrument_id)
4295        } else {
4296            get_book_deltas_topic(instrument_id).into()
4297        };
4298        self.remove_deltas_subscription(pattern);
4299
4300        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
4301            instrument_id,
4302            client_id,
4303            venue: Some(instrument_id.venue),
4304            command_id: UUID4::new(),
4305            ts_init: self.timestamp_ns(),
4306            correlation_id: None,
4307            params,
4308        });
4309
4310        self.send_data_cmd(DataCommand::Unsubscribe(command));
4311    }
4312
4313    /// Helper method for unsubscribing from book snapshots at interval.
4314    pub fn unsubscribe_book_at_interval(
4315        &mut self,
4316        instrument_id: InstrumentId,
4317        interval_ms: NonZeroUsize,
4318        client_id: Option<ClientId>,
4319        params: Option<Params>,
4320    ) {
4321        self.check_registered();
4322
4323        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
4324        self.remove_book_snapshot_subscription(topic);
4325
4326        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
4327            instrument_id,
4328            interval_ms,
4329            client_id,
4330            venue: Some(instrument_id.venue),
4331            command_id: UUID4::new(),
4332            ts_init: self.timestamp_ns(),
4333            correlation_id: None,
4334            params,
4335        });
4336
4337        self.send_data_cmd(DataCommand::Unsubscribe(command));
4338    }
4339
4340    /// Helper method for unsubscribing from quotes.
4341    pub fn unsubscribe_quotes(
4342        &mut self,
4343        instrument_id: InstrumentId,
4344        client_id: Option<ClientId>,
4345        params: Option<Params>,
4346    ) {
4347        self.check_registered();
4348
4349        let topic = get_quotes_topic(instrument_id);
4350        self.remove_quote_subscription(topic);
4351
4352        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
4353            instrument_id,
4354            client_id,
4355            venue: Some(instrument_id.venue),
4356            command_id: UUID4::new(),
4357            ts_init: self.timestamp_ns(),
4358            correlation_id: None,
4359            params,
4360        });
4361
4362        self.send_data_cmd(DataCommand::Unsubscribe(command));
4363    }
4364
4365    /// Helper method for unsubscribing from trades.
4366    pub fn unsubscribe_trades(
4367        &mut self,
4368        instrument_id: InstrumentId,
4369        client_id: Option<ClientId>,
4370        params: Option<Params>,
4371    ) {
4372        self.check_registered();
4373
4374        let topic = get_trades_topic(instrument_id);
4375        self.remove_trade_subscription(topic);
4376
4377        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
4378            instrument_id,
4379            client_id,
4380            venue: Some(instrument_id.venue),
4381            command_id: UUID4::new(),
4382            ts_init: self.timestamp_ns(),
4383            correlation_id: None,
4384            params,
4385        });
4386
4387        self.send_data_cmd(DataCommand::Unsubscribe(command));
4388    }
4389
4390    /// Helper method for unsubscribing from bars.
4391    pub fn unsubscribe_bars(
4392        &mut self,
4393        bar_type: BarType,
4394        client_id: Option<ClientId>,
4395        params: Option<Params>,
4396    ) {
4397        self.check_registered();
4398
4399        let topic = get_bars_topic(bar_type);
4400        self.remove_bar_subscription(topic);
4401
4402        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
4403            bar_type,
4404            client_id,
4405            venue: Some(bar_type.instrument_id().venue),
4406            command_id: UUID4::new(),
4407            ts_init: self.timestamp_ns(),
4408            correlation_id: None,
4409            params,
4410        });
4411
4412        self.send_data_cmd(DataCommand::Unsubscribe(command));
4413    }
4414
4415    /// Helper method for unsubscribing from mark prices.
4416    pub fn unsubscribe_mark_prices(
4417        &mut self,
4418        instrument_id: InstrumentId,
4419        client_id: Option<ClientId>,
4420        params: Option<Params>,
4421    ) {
4422        self.check_registered();
4423
4424        let topic = get_mark_price_topic(instrument_id);
4425        self.remove_mark_price_subscription(topic);
4426
4427        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
4428            instrument_id,
4429            client_id,
4430            venue: Some(instrument_id.venue),
4431            command_id: UUID4::new(),
4432            ts_init: self.timestamp_ns(),
4433            correlation_id: None,
4434            params,
4435        });
4436
4437        self.send_data_cmd(DataCommand::Unsubscribe(command));
4438    }
4439
4440    /// Helper method for unsubscribing from index prices.
4441    pub fn unsubscribe_index_prices(
4442        &mut self,
4443        instrument_id: InstrumentId,
4444        client_id: Option<ClientId>,
4445        params: Option<Params>,
4446    ) {
4447        self.check_registered();
4448
4449        let topic = get_index_price_topic(instrument_id);
4450        self.remove_index_price_subscription(topic);
4451
4452        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
4453            instrument_id,
4454            client_id,
4455            venue: Some(instrument_id.venue),
4456            command_id: UUID4::new(),
4457            ts_init: self.timestamp_ns(),
4458            correlation_id: None,
4459            params,
4460        });
4461
4462        self.send_data_cmd(DataCommand::Unsubscribe(command));
4463    }
4464
4465    /// Helper method for unsubscribing from funding rates.
4466    pub fn unsubscribe_funding_rates(
4467        &mut self,
4468        instrument_id: InstrumentId,
4469        client_id: Option<ClientId>,
4470        params: Option<Params>,
4471    ) {
4472        self.check_registered();
4473
4474        let topic = get_funding_rate_topic(instrument_id);
4475        self.remove_funding_rate_subscription(topic);
4476
4477        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
4478            instrument_id,
4479            client_id,
4480            venue: Some(instrument_id.venue),
4481            command_id: UUID4::new(),
4482            ts_init: self.timestamp_ns(),
4483            correlation_id: None,
4484            params,
4485        });
4486
4487        self.send_data_cmd(DataCommand::Unsubscribe(command));
4488    }
4489
4490    /// Helper method for unsubscribing from option greeks.
4491    pub fn unsubscribe_option_greeks(
4492        &mut self,
4493        instrument_id: InstrumentId,
4494        client_id: Option<ClientId>,
4495        params: Option<Params>,
4496    ) {
4497        self.check_registered();
4498
4499        let topic = get_option_greeks_topic(instrument_id);
4500        self.remove_option_greeks_subscription(topic);
4501
4502        let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
4503            instrument_id,
4504            client_id,
4505            venue: Some(instrument_id.venue),
4506            command_id: UUID4::new(),
4507            ts_init: self.timestamp_ns(),
4508            correlation_id: None,
4509            params,
4510        });
4511
4512        self.send_data_cmd(DataCommand::Unsubscribe(command));
4513    }
4514
4515    /// Helper method for unsubscribing from instrument status.
4516    pub fn unsubscribe_instrument_status(
4517        &mut self,
4518        instrument_id: InstrumentId,
4519        client_id: Option<ClientId>,
4520        params: Option<Params>,
4521    ) {
4522        self.check_registered();
4523
4524        let topic = get_instrument_status_topic(instrument_id);
4525        self.remove_subscription_any(topic);
4526
4527        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
4528            instrument_id,
4529            client_id,
4530            venue: Some(instrument_id.venue),
4531            command_id: UUID4::new(),
4532            ts_init: self.timestamp_ns(),
4533            correlation_id: None,
4534            params,
4535        });
4536
4537        self.send_data_cmd(DataCommand::Unsubscribe(command));
4538    }
4539
4540    /// Helper method for unsubscribing from instrument close.
4541    pub fn unsubscribe_instrument_close(
4542        &mut self,
4543        instrument_id: InstrumentId,
4544        client_id: Option<ClientId>,
4545        params: Option<Params>,
4546    ) {
4547        self.check_registered();
4548
4549        let topic = get_instrument_close_topic(instrument_id);
4550        self.remove_instrument_close_subscription(topic);
4551
4552        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
4553            instrument_id,
4554            client_id,
4555            venue: Some(instrument_id.venue),
4556            command_id: UUID4::new(),
4557            ts_init: self.timestamp_ns(),
4558            correlation_id: None,
4559            params,
4560        });
4561
4562        self.send_data_cmd(DataCommand::Unsubscribe(command));
4563    }
4564
4565    /// Helper method for unsubscribing from option chain snapshots.
4566    pub fn unsubscribe_option_chain(
4567        &mut self,
4568        series_id: OptionSeriesId,
4569        client_id: Option<ClientId>,
4570    ) {
4571        self.check_registered();
4572
4573        let topic = get_option_chain_topic(series_id);
4574        self.remove_option_chain_subscription(topic);
4575
4576        let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
4577            series_id,
4578            UUID4::new(),
4579            self.timestamp_ns(),
4580            client_id,
4581            Some(series_id.venue),
4582        ));
4583
4584        self.send_data_cmd(DataCommand::Unsubscribe(command));
4585    }
4586
4587    /// Helper method for unsubscribing from order fills.
4588    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
4589        self.check_registered();
4590
4591        let topic = get_order_filled_topic(instrument_id);
4592        self.remove_order_event_subscription(topic);
4593    }
4594
4595    /// Helper method for unsubscribing from order cancels.
4596    pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
4597        self.check_registered();
4598
4599        let topic = get_order_canceled_topic(instrument_id);
4600        self.remove_order_event_subscription(topic);
4601    }
4602
4603    /// Helper method for requesting data.
4604    ///
4605    /// # Errors
4606    ///
4607    /// Returns an error if input parameters are invalid.
4608    #[expect(clippy::too_many_arguments)]
4609    pub fn request_data(
4610        &self,
4611        data_type: DataType,
4612        client_id: ClientId,
4613        start: Option<DateTime<Utc>>,
4614        end: Option<DateTime<Utc>>,
4615        limit: Option<NonZeroUsize>,
4616        params: Option<Params>,
4617        handler: ShareableMessageHandler,
4618    ) -> anyhow::Result<UUID4> {
4619        self.check_registered();
4620
4621        let now = self.clock_ref().utc_now();
4622        check_timestamps(now, start, end)?;
4623
4624        let request_id = UUID4::new();
4625        let command = RequestCommand::Data(RequestCustomData {
4626            client_id,
4627            data_type,
4628            start,
4629            end,
4630            limit,
4631            request_id,
4632            ts_init: self.timestamp_ns(),
4633            params,
4634        });
4635
4636        get_message_bus()
4637            .borrow_mut()
4638            .register_response_handler(command.request_id(), handler)?;
4639
4640        self.send_data_cmd(DataCommand::Request(command));
4641
4642        Ok(request_id)
4643    }
4644
4645    /// Helper method for requesting instrument.
4646    ///
4647    /// # Errors
4648    ///
4649    /// Returns an error if input parameters are invalid.
4650    pub fn request_instrument(
4651        &self,
4652        instrument_id: InstrumentId,
4653        start: Option<DateTime<Utc>>,
4654        end: Option<DateTime<Utc>>,
4655        client_id: Option<ClientId>,
4656        params: Option<Params>,
4657        handler: ShareableMessageHandler,
4658    ) -> anyhow::Result<UUID4> {
4659        self.check_registered();
4660
4661        let now = self.clock_ref().utc_now();
4662        check_timestamps(now, start, end)?;
4663
4664        let request_id = UUID4::new();
4665        let command = RequestCommand::Instrument(RequestInstrument {
4666            instrument_id,
4667            start,
4668            end,
4669            client_id,
4670            request_id,
4671            ts_init: now.into(),
4672            params,
4673        });
4674
4675        get_message_bus()
4676            .borrow_mut()
4677            .register_response_handler(command.request_id(), handler)?;
4678
4679        self.send_data_cmd(DataCommand::Request(command));
4680
4681        Ok(request_id)
4682    }
4683
4684    /// Helper method for requesting instruments.
4685    ///
4686    /// # Errors
4687    ///
4688    /// Returns an error if input parameters are invalid.
4689    pub fn request_instruments(
4690        &self,
4691        venue: Option<Venue>,
4692        start: Option<DateTime<Utc>>,
4693        end: Option<DateTime<Utc>>,
4694        client_id: Option<ClientId>,
4695        params: Option<Params>,
4696        handler: ShareableMessageHandler,
4697    ) -> anyhow::Result<UUID4> {
4698        self.check_registered();
4699
4700        let now = self.clock_ref().utc_now();
4701        check_timestamps(now, start, end)?;
4702
4703        let request_id = UUID4::new();
4704        let command = RequestCommand::Instruments(RequestInstruments {
4705            venue,
4706            start,
4707            end,
4708            client_id,
4709            request_id,
4710            ts_init: now.into(),
4711            params,
4712        });
4713
4714        get_message_bus()
4715            .borrow_mut()
4716            .register_response_handler(command.request_id(), handler)?;
4717
4718        self.send_data_cmd(DataCommand::Request(command));
4719
4720        Ok(request_id)
4721    }
4722
4723    /// Helper method for requesting book snapshot.
4724    ///
4725    /// # Errors
4726    ///
4727    /// Returns an error if input parameters are invalid.
4728    pub fn request_book_snapshot(
4729        &self,
4730        instrument_id: InstrumentId,
4731        depth: Option<NonZeroUsize>,
4732        client_id: Option<ClientId>,
4733        params: Option<Params>,
4734        handler: ShareableMessageHandler,
4735    ) -> anyhow::Result<UUID4> {
4736        self.check_registered();
4737
4738        let request_id = UUID4::new();
4739        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4740            instrument_id,
4741            depth,
4742            client_id,
4743            request_id,
4744            ts_init: self.timestamp_ns(),
4745            params,
4746        });
4747
4748        get_message_bus()
4749            .borrow_mut()
4750            .register_response_handler(command.request_id(), handler)?;
4751
4752        self.send_data_cmd(DataCommand::Request(command));
4753
4754        Ok(request_id)
4755    }
4756
4757    /// Helper method for requesting book deltas.
4758    ///
4759    /// # Errors
4760    ///
4761    /// Returns an error if input parameters are invalid.
4762    #[expect(clippy::too_many_arguments)]
4763    pub fn request_book_deltas(
4764        &self,
4765        instrument_id: InstrumentId,
4766        start: Option<DateTime<Utc>>,
4767        end: Option<DateTime<Utc>>,
4768        limit: Option<NonZeroUsize>,
4769        client_id: Option<ClientId>,
4770        params: Option<Params>,
4771        handler: ShareableMessageHandler,
4772    ) -> anyhow::Result<UUID4> {
4773        self.check_registered();
4774
4775        let now = self.clock_ref().utc_now();
4776        check_timestamps(now, start, end)?;
4777
4778        let request_id = UUID4::new();
4779        let command = RequestCommand::BookDeltas(RequestBookDeltas {
4780            instrument_id,
4781            start,
4782            end,
4783            limit,
4784            client_id,
4785            request_id,
4786            ts_init: now.into(),
4787            params,
4788        });
4789
4790        get_message_bus()
4791            .borrow_mut()
4792            .register_response_handler(command.request_id(), handler)?;
4793
4794        self.send_data_cmd(DataCommand::Request(command));
4795
4796        Ok(request_id)
4797    }
4798
4799    /// Sends a request for historical book depth.
4800    ///
4801    /// # Errors
4802    ///
4803    /// Returns an error if input parameters are invalid.
4804    #[expect(clippy::too_many_arguments)]
4805    pub fn request_book_depth(
4806        &self,
4807        instrument_id: InstrumentId,
4808        start: Option<DateTime<Utc>>,
4809        end: Option<DateTime<Utc>>,
4810        limit: Option<NonZeroUsize>,
4811        depth: Option<NonZeroUsize>,
4812        client_id: Option<ClientId>,
4813        params: Option<Params>,
4814        handler: ShareableMessageHandler,
4815    ) -> anyhow::Result<UUID4> {
4816        self.check_registered();
4817
4818        let now = self.clock_ref().utc_now();
4819        check_timestamps(now, start, end)?;
4820
4821        let request_id = UUID4::new();
4822        let command = RequestCommand::BookDepth(RequestBookDepth {
4823            instrument_id,
4824            start,
4825            end,
4826            limit,
4827            depth,
4828            client_id,
4829            request_id,
4830            ts_init: now.into(),
4831            params,
4832        });
4833
4834        get_message_bus()
4835            .borrow_mut()
4836            .register_response_handler(command.request_id(), handler)?;
4837
4838        self.send_data_cmd(DataCommand::Request(command));
4839
4840        Ok(request_id)
4841    }
4842
4843    /// Helper method for requesting quotes.
4844    ///
4845    /// # Errors
4846    ///
4847    /// Returns an error if input parameters are invalid.
4848    #[expect(clippy::too_many_arguments)]
4849    pub fn request_quotes(
4850        &self,
4851        instrument_id: InstrumentId,
4852        start: Option<DateTime<Utc>>,
4853        end: Option<DateTime<Utc>>,
4854        limit: Option<NonZeroUsize>,
4855        client_id: Option<ClientId>,
4856        params: Option<Params>,
4857        handler: ShareableMessageHandler,
4858    ) -> anyhow::Result<UUID4> {
4859        self.check_registered();
4860
4861        let now = self.clock_ref().utc_now();
4862        check_timestamps(now, start, end)?;
4863
4864        let request_id = UUID4::new();
4865        let command = RequestCommand::Quotes(RequestQuotes {
4866            instrument_id,
4867            start,
4868            end,
4869            limit,
4870            client_id,
4871            request_id,
4872            ts_init: now.into(),
4873            params,
4874        });
4875
4876        get_message_bus()
4877            .borrow_mut()
4878            .register_response_handler(command.request_id(), handler)?;
4879
4880        self.send_data_cmd(DataCommand::Request(command));
4881
4882        Ok(request_id)
4883    }
4884
4885    /// Helper method for requesting trades.
4886    ///
4887    /// # Errors
4888    ///
4889    /// Returns an error if input parameters are invalid.
4890    #[expect(clippy::too_many_arguments)]
4891    pub fn request_trades(
4892        &self,
4893        instrument_id: InstrumentId,
4894        start: Option<DateTime<Utc>>,
4895        end: Option<DateTime<Utc>>,
4896        limit: Option<NonZeroUsize>,
4897        client_id: Option<ClientId>,
4898        params: Option<Params>,
4899        handler: ShareableMessageHandler,
4900    ) -> anyhow::Result<UUID4> {
4901        self.check_registered();
4902
4903        let now = self.clock_ref().utc_now();
4904        check_timestamps(now, start, end)?;
4905
4906        let request_id = UUID4::new();
4907        let command = RequestCommand::Trades(RequestTrades {
4908            instrument_id,
4909            start,
4910            end,
4911            limit,
4912            client_id,
4913            request_id,
4914            ts_init: now.into(),
4915            params,
4916        });
4917
4918        get_message_bus()
4919            .borrow_mut()
4920            .register_response_handler(command.request_id(), handler)?;
4921
4922        self.send_data_cmd(DataCommand::Request(command));
4923
4924        Ok(request_id)
4925    }
4926
4927    /// Helper method for requesting bars.
4928    ///
4929    /// # Errors
4930    ///
4931    /// Returns an error if input parameters are invalid.
4932    #[expect(clippy::too_many_arguments)]
4933    pub fn request_bars(
4934        &self,
4935        bar_type: BarType,
4936        start: Option<DateTime<Utc>>,
4937        end: Option<DateTime<Utc>>,
4938        limit: Option<NonZeroUsize>,
4939        client_id: Option<ClientId>,
4940        params: Option<Params>,
4941        handler: ShareableMessageHandler,
4942    ) -> anyhow::Result<UUID4> {
4943        self.check_registered();
4944
4945        let now = self.clock_ref().utc_now();
4946        check_timestamps(now, start, end)?;
4947
4948        let request_id = UUID4::new();
4949        let command = RequestCommand::Bars(RequestBars {
4950            bar_type,
4951            start,
4952            end,
4953            limit,
4954            client_id,
4955            request_id,
4956            ts_init: now.into(),
4957            params,
4958        });
4959
4960        get_message_bus()
4961            .borrow_mut()
4962            .register_response_handler(command.request_id(), handler)?;
4963
4964        self.send_data_cmd(DataCommand::Request(command));
4965
4966        Ok(request_id)
4967    }
4968
4969    /// Helper method for requesting funding rates.
4970    ///
4971    /// # Errors
4972    ///
4973    /// Returns an error if input parameters are invalid.
4974    #[expect(clippy::too_many_arguments)]
4975    pub fn request_funding_rates(
4976        &self,
4977        instrument_id: InstrumentId,
4978        start: Option<DateTime<Utc>>,
4979        end: Option<DateTime<Utc>>,
4980        limit: Option<NonZeroUsize>,
4981        client_id: Option<ClientId>,
4982        params: Option<Params>,
4983        handler: ShareableMessageHandler,
4984    ) -> anyhow::Result<UUID4> {
4985        self.check_registered();
4986
4987        let now = self.clock_ref().utc_now();
4988        check_timestamps(now, start, end)?;
4989
4990        let request_id = UUID4::new();
4991        let command = RequestCommand::FundingRates(RequestFundingRates {
4992            instrument_id,
4993            start,
4994            end,
4995            limit,
4996            client_id,
4997            request_id,
4998            ts_init: now.into(),
4999            params,
5000        });
5001
5002        get_message_bus()
5003            .borrow_mut()
5004            .register_response_handler(command.request_id(), handler)?;
5005
5006        self.send_data_cmd(DataCommand::Request(command));
5007
5008        Ok(request_id)
5009    }
5010
5011    #[cfg(test)]
5012    pub fn quote_handler_count(&self) -> usize {
5013        self.quote_handlers.len()
5014    }
5015
5016    #[cfg(test)]
5017    pub fn trade_handler_count(&self) -> usize {
5018        self.trade_handlers.len()
5019    }
5020
5021    #[cfg(test)]
5022    pub fn bar_handler_count(&self) -> usize {
5023        self.bar_handlers.len()
5024    }
5025
5026    #[cfg(test)]
5027    pub fn deltas_handler_count(&self) -> usize {
5028        self.deltas_handlers.len()
5029    }
5030
5031    #[cfg(test)]
5032    pub fn has_quote_handler(&self, topic: &str) -> bool {
5033        self.quote_handlers
5034            .contains_key(&MStr::<Topic>::from(topic))
5035    }
5036
5037    #[cfg(test)]
5038    pub fn has_trade_handler(&self, topic: &str) -> bool {
5039        self.trade_handlers
5040            .contains_key(&MStr::<Topic>::from(topic))
5041    }
5042
5043    #[cfg(test)]
5044    pub fn has_bar_handler(&self, topic: &str) -> bool {
5045        self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
5046    }
5047
5048    #[cfg(test)]
5049    pub fn has_deltas_handler(&self, pattern: &str) -> bool {
5050        self.deltas_handlers
5051            .contains_key(&MStr::<Pattern>::from(pattern))
5052    }
5053}
5054
5055impl DataActorNative for DataActorCore {
5056    fn core(&self) -> &DataActorCore {
5057        self
5058    }
5059
5060    fn core_mut(&mut self) -> &mut DataActorCore {
5061        self
5062    }
5063}
5064
5065fn check_timestamps(
5066    now: DateTime<Utc>,
5067    start: Option<DateTime<Utc>>,
5068    end: Option<DateTime<Utc>>,
5069) -> anyhow::Result<()> {
5070    if let Some(start) = start {
5071        check_predicate_true(start <= now, "start was > now")?;
5072    }
5073
5074    if let Some(end) = end {
5075        check_predicate_true(end <= now, "end was > now")?;
5076    }
5077
5078    if let (Some(start), Some(end)) = (start, end) {
5079        check_predicate_true(start < end, "start was >= end")?;
5080    }
5081
5082    Ok(())
5083}
5084
5085fn log_error(e: &anyhow::Error) {
5086    log::error!("{e}");
5087}
5088
5089fn log_not_running<T>(msg: &T)
5090where
5091    T: Debug,
5092{
5093    log::trace!("Received message when not running - skipping {msg:?}");
5094}
5095
5096fn log_received<T>(msg: &T)
5097where
5098    T: Debug,
5099{
5100    log::debug!("{RECV} {msg:?}");
5101}
5102
5103fn log_received_bulk(kind: &str, correlation_id: &UUID4, records: usize) {
5104    log::debug!("{RECV} {kind} correlation_id={correlation_id} records={records}");
5105}