Skip to main content

nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24    sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39        close::InstrumentClose,
40        option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41    },
42    enums::BookType,
43    events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45    instruments::{InstrumentAny, SyntheticInstrument},
46    orderbook::OrderBook,
47};
48use serde::{Deserialize, Serialize};
49use ustr::Ustr;
50
51#[cfg(feature = "indicators")]
52use super::indicators::Indicators;
53use super::{
54    Actor,
55    registry::{get_actor_unchecked, try_get_actor_unchecked},
56};
57#[cfg(feature = "defi")]
58use crate::defi;
59#[cfg(feature = "defi")]
60#[allow(unused_imports)]
61use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
62use crate::{
63    cache::Cache,
64    clock::Clock,
65    component::Component,
66    enums::{ComponentState, ComponentTrigger},
67    logging::{CMD, RECV, REQ, SEND},
68    messages::{
69        data::{
70            BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
71            InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
72            RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
73            RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
74            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
75            SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
76            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
77            SubscribeMarkPrices, SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes,
78            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
79            UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
80            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
81            UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
82            UnsubscribeMarkPrices, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
83            UnsubscribeQuotes, UnsubscribeTrades, is_parent_subscription,
84        },
85        system::ShutdownSystem,
86    },
87    msgbus::{
88        self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
89        switchboard::{
90            MessagingSwitchboard, get_bars_topic, get_book_deltas_pattern, get_book_deltas_topic,
91            get_book_snapshots_topic, get_custom_topic, get_funding_rate_topic,
92            get_index_price_topic, get_instrument_close_topic, get_instrument_status_topic,
93            get_instrument_topic, get_instruments_pattern, get_mark_price_topic,
94            get_option_chain_topic, get_option_greeks_topic, get_order_cancels_topic,
95            get_order_fills_topic, get_quotes_topic, get_signal_pattern, get_trades_topic,
96        },
97    },
98    signal::Signal,
99    timer::{TimeEvent, TimeEventCallback},
100};
101
102/// Common configuration for [`DataActor`] based components.
103#[derive(Debug, Clone, Deserialize, Serialize)]
104#[serde(default, deny_unknown_fields)]
105#[cfg_attr(
106    feature = "python",
107    pyo3::pyclass(
108        module = "nautilus_trader.core.nautilus_pyo3.common",
109        subclass,
110        from_py_object
111    )
112)]
113#[cfg_attr(
114    feature = "python",
115    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
116)]
117pub struct DataActorConfig {
118    /// The custom identifier for the Actor.
119    pub actor_id: Option<ActorId>,
120    /// If events should be logged.
121    pub log_events: bool,
122    /// If commands should be logged.
123    pub log_commands: bool,
124}
125
126impl Default for DataActorConfig {
127    fn default() -> Self {
128        Self {
129            actor_id: None,
130            log_events: true,
131            log_commands: true,
132        }
133    }
134}
135
136/// Configuration for creating actors from importable paths.
137#[derive(Debug, Clone, Deserialize, Serialize)]
138#[serde(deny_unknown_fields)]
139#[cfg_attr(
140    feature = "python",
141    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
142)]
143#[cfg_attr(
144    feature = "python",
145    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
146)]
147pub struct ImportableActorConfig {
148    /// The fully qualified name of the Actor class.
149    pub actor_path: String,
150    /// The fully qualified name of the Actor config class.
151    pub config_path: String,
152    /// The actor configuration as a dictionary.
153    pub config: HashMap<String, serde_json::Value>,
154}
155
156type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
157
158pub trait DataActor:
159    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
160{
161    /// Actions to be performed when the actor state is saved.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if saving the actor state fails.
166    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
167        Ok(IndexMap::new())
168    }
169
170    /// Actions to be performed when the actor state is loaded.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if loading the actor state fails.
175    #[allow(unused_variables)]
176    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
177        Ok(())
178    }
179
180    /// Actions to be performed on start.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if starting the actor fails.
185    fn on_start(&mut self) -> anyhow::Result<()> {
186        log::warn!(
187            "The `on_start` handler was called when not overridden, \
188            it's expected that any actions required when starting the actor \
189            occur here, such as subscribing/requesting data"
190        );
191        Ok(())
192    }
193
194    /// Actions to be performed on stop.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if stopping the actor fails.
199    fn on_stop(&mut self) -> anyhow::Result<()> {
200        log::warn!(
201            "The `on_stop` handler was called when not overridden, \
202            it's expected that any actions required when stopping the actor \
203            occur here, such as unsubscribing from data",
204        );
205        Ok(())
206    }
207
208    /// Actions to be performed on resume.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if resuming the actor fails.
213    fn on_resume(&mut self) -> anyhow::Result<()> {
214        log::warn!(
215            "The `on_resume` handler was called when not overridden, \
216            it's expected that any actions required when resuming the actor \
217            following a stop occur here"
218        );
219        Ok(())
220    }
221
222    /// Actions to be performed on reset.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if resetting the actor fails.
227    fn on_reset(&mut self) -> anyhow::Result<()> {
228        log::warn!(
229            "The `on_reset` handler was called when not overridden, \
230            it's expected that any actions required when resetting the actor \
231            occur here, such as resetting indicators and other state"
232        );
233        Ok(())
234    }
235
236    /// Actions to be performed on dispose.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if disposing the actor fails.
241    fn on_dispose(&mut self) -> anyhow::Result<()> {
242        Ok(())
243    }
244
245    /// Actions to be performed on degrade.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if degrading the actor fails.
250    fn on_degrade(&mut self) -> anyhow::Result<()> {
251        Ok(())
252    }
253
254    /// Actions to be performed on fault.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if faulting the actor fails.
259    fn on_fault(&mut self) -> anyhow::Result<()> {
260        Ok(())
261    }
262
263    /// Actions to be performed when receiving a time event.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if handling the time event fails.
268    #[allow(unused_variables)]
269    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
270        Ok(())
271    }
272
273    /// Actions to be performed when receiving custom data.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if handling the data fails.
278    #[allow(unused_variables)]
279    fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
280        Ok(())
281    }
282
283    /// Actions to be performed when receiving a signal.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if handling the signal fails.
288    #[allow(unused_variables)]
289    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
290        Ok(())
291    }
292
293    /// Actions to be performed when receiving an instrument.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if handling the instrument fails.
298    #[allow(unused_variables)]
299    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
300        Ok(())
301    }
302
303    /// Actions to be performed when receiving order book deltas.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if handling the book deltas fails.
308    #[allow(unused_variables)]
309    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
310        Ok(())
311    }
312
313    /// Actions to be performed when receiving an order book.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if handling the book fails.
318    #[allow(unused_variables)]
319    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
320        Ok(())
321    }
322
323    /// Actions to be performed when receiving a quote.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if handling the quote fails.
328    #[allow(unused_variables)]
329    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
330        Ok(())
331    }
332
333    /// Actions to be performed when receiving a trade.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if handling the trade fails.
338    #[allow(unused_variables)]
339    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
340        Ok(())
341    }
342
343    /// Actions to be performed when receiving a bar.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if handling the bar fails.
348    #[allow(unused_variables)]
349    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
350        Ok(())
351    }
352
353    /// Actions to be performed when receiving a mark price update.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if handling the mark price update fails.
358    #[allow(unused_variables)]
359    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
360        Ok(())
361    }
362
363    /// Actions to be performed when receiving an index price update.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if handling the index price update fails.
368    #[allow(unused_variables)]
369    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
370        Ok(())
371    }
372
373    /// Actions to be performed when receiving a funding rate update.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if handling the funding rate update fails.
378    #[allow(unused_variables)]
379    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
380        Ok(())
381    }
382
383    /// Actions to be performed when receiving exchange-provided option greeks.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if handling the option greeks fails.
388    #[allow(unused_variables)]
389    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
390        Ok(())
391    }
392
393    /// Actions to be performed when receiving an option chain slice snapshot.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if handling the option chain slice fails.
398    #[allow(unused_variables)]
399    fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
400        Ok(())
401    }
402
403    /// Actions to be performed when receiving an instrument status update.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if handling the instrument status update fails.
408    #[allow(unused_variables)]
409    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
410        Ok(())
411    }
412
413    /// Actions to be performed when receiving an instrument close update.
414    ///
415    /// # Errors
416    ///
417    /// Returns an error if handling the instrument close update fails.
418    #[allow(unused_variables)]
419    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
420        Ok(())
421    }
422
423    /// Actions to be performed when receiving an order filled event.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if handling the order filled event fails.
428    #[allow(unused_variables)]
429    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
430        Ok(())
431    }
432
433    /// Actions to be performed when receiving an order canceled event.
434    ///
435    /// # Errors
436    ///
437    /// Returns an error if handling the order canceled event fails.
438    #[allow(unused_variables)]
439    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
440        Ok(())
441    }
442
443    #[cfg(feature = "defi")]
444    /// Actions to be performed when receiving a block.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if handling the block fails.
449    #[allow(unused_variables)]
450    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
451        Ok(())
452    }
453
454    #[cfg(feature = "defi")]
455    /// Actions to be performed when receiving a pool.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if handling the pool fails.
460    #[allow(unused_variables)]
461    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
462        Ok(())
463    }
464
465    #[cfg(feature = "defi")]
466    /// Actions to be performed when receiving a pool swap.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if handling the pool swap fails.
471    #[allow(unused_variables)]
472    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
473        Ok(())
474    }
475
476    #[cfg(feature = "defi")]
477    /// Actions to be performed when receiving a pool liquidity update.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if handling the pool liquidity update fails.
482    #[allow(unused_variables)]
483    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
484        Ok(())
485    }
486
487    #[cfg(feature = "defi")]
488    /// Actions to be performed when receiving a pool fee collect event.
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if handling the pool fee collect fails.
493    #[allow(unused_variables)]
494    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
495        Ok(())
496    }
497
498    #[cfg(feature = "defi")]
499    /// Actions to be performed when receiving a pool flash event.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if handling the pool flash fails.
504    #[allow(unused_variables)]
505    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
506        Ok(())
507    }
508
509    /// Actions to be performed when receiving historical data.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if handling the historical data fails.
514    #[allow(unused_variables)]
515    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
516        Ok(())
517    }
518
519    /// Actions to be performed when receiving historical quotes.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if handling the historical quotes fails.
524    #[allow(unused_variables)]
525    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
526        Ok(())
527    }
528
529    /// Actions to be performed when receiving historical trades.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if handling the historical trades fails.
534    #[allow(unused_variables)]
535    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
536        Ok(())
537    }
538
539    /// Actions to be performed when receiving historical funding rates.
540    ///
541    /// # Errors
542    ///
543    /// Returns an error if handling the historical funding rates fails.
544    #[allow(unused_variables)]
545    fn on_historical_funding_rates(
546        &mut self,
547        funding_rates: &[FundingRateUpdate],
548    ) -> anyhow::Result<()> {
549        Ok(())
550    }
551
552    /// Actions to be performed when receiving historical bars.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if handling the historical bars fails.
557    #[allow(unused_variables)]
558    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
559        Ok(())
560    }
561
562    /// Actions to be performed when receiving historical mark prices.
563    ///
564    /// # Errors
565    ///
566    /// Returns an error if handling the historical mark prices fails.
567    #[allow(unused_variables)]
568    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
569        Ok(())
570    }
571
572    /// Actions to be performed when receiving historical index prices.
573    ///
574    /// # Errors
575    ///
576    /// Returns an error if handling the historical index prices fails.
577    #[allow(unused_variables)]
578    fn on_historical_index_prices(
579        &mut self,
580        index_prices: &[IndexPriceUpdate],
581    ) -> anyhow::Result<()> {
582        Ok(())
583    }
584
585    /// Handles a received time event.
586    fn handle_time_event(&mut self, event: &TimeEvent) {
587        log_received(&event);
588
589        if self.not_running() {
590            log_not_running(&event);
591            return;
592        }
593
594        if let Err(e) = DataActor::on_time_event(self, event) {
595            log_error(&e);
596        }
597    }
598
599    /// Handles a received custom data point.
600    fn handle_data(&mut self, data: &CustomData) {
601        log_received(&data);
602
603        if self.not_running() {
604            log_not_running(&data);
605            return;
606        }
607
608        if let Err(e) = self.on_data(data) {
609            log_error(&e);
610        }
611    }
612
613    /// Handles a received signal.
614    fn handle_signal(&mut self, signal: &Signal) {
615        log_received(&signal);
616
617        if self.not_running() {
618            log_not_running(&signal);
619            return;
620        }
621
622        if let Err(e) = self.on_signal(signal) {
623            log_error(&e);
624        }
625    }
626
627    /// Handles a received instrument.
628    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
629        log_received(&instrument);
630
631        if self.not_running() {
632            log_not_running(&instrument);
633            return;
634        }
635
636        if let Err(e) = self.on_instrument(instrument) {
637            log_error(&e);
638        }
639    }
640
641    /// Handles received order book deltas.
642    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
643        log_received(&deltas);
644
645        if self.not_running() {
646            log_not_running(&deltas);
647            return;
648        }
649
650        if let Err(e) = self.on_book_deltas(deltas) {
651            log_error(&e);
652        }
653    }
654
655    /// Handles a received order book reference.
656    fn handle_book(&mut self, book: &OrderBook) {
657        log_received(&book);
658
659        if self.not_running() {
660            log_not_running(&book);
661            return;
662        }
663
664        if let Err(e) = self.on_book(book) {
665            log_error(&e);
666        }
667    }
668
669    /// Handles a received quote.
670    fn handle_quote(&mut self, quote: &QuoteTick) {
671        log_received(&quote);
672
673        if self.not_running() {
674            log_not_running(&quote);
675            return;
676        }
677
678        if let Err(e) = self.on_quote(quote) {
679            log_error(&e);
680        }
681    }
682
683    /// Handles a received trade.
684    fn handle_trade(&mut self, trade: &TradeTick) {
685        log_received(&trade);
686
687        if self.not_running() {
688            log_not_running(&trade);
689            return;
690        }
691
692        if let Err(e) = self.on_trade(trade) {
693            log_error(&e);
694        }
695    }
696
697    /// Handles a receiving bar.
698    fn handle_bar(&mut self, bar: &Bar) {
699        log_received(&bar);
700
701        if self.not_running() {
702            log_not_running(&bar);
703            return;
704        }
705
706        if let Err(e) = self.on_bar(bar) {
707            log_error(&e);
708        }
709    }
710
711    /// Handles a received mark price update.
712    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
713        log_received(&mark_price);
714
715        if self.not_running() {
716            log_not_running(&mark_price);
717            return;
718        }
719
720        if let Err(e) = self.on_mark_price(mark_price) {
721            log_error(&e);
722        }
723    }
724
725    /// Handles a received index price update.
726    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
727        log_received(&index_price);
728
729        if self.not_running() {
730            log_not_running(&index_price);
731            return;
732        }
733
734        if let Err(e) = self.on_index_price(index_price) {
735            log_error(&e);
736        }
737    }
738
739    /// Handles a received funding rate update.
740    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
741        log_received(&funding_rate);
742
743        if self.not_running() {
744            log_not_running(&funding_rate);
745            return;
746        }
747
748        if let Err(e) = self.on_funding_rate(funding_rate) {
749            log_error(&e);
750        }
751    }
752
753    /// Handles a received option greeks update.
754    fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
755        log_received(&greeks);
756
757        if self.not_running() {
758            log_not_running(&greeks);
759            return;
760        }
761
762        if let Err(e) = self.on_option_greeks(greeks) {
763            log_error(&e);
764        }
765    }
766
767    /// Handles a received option chain slice snapshot.
768    fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
769        log_received(&slice);
770
771        if self.not_running() {
772            log_not_running(&slice);
773            return;
774        }
775
776        if let Err(e) = self.on_option_chain(slice) {
777            log_error(&e);
778        }
779    }
780
781    /// Handles a received instrument status.
782    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
783        log_received(&status);
784
785        if self.not_running() {
786            log_not_running(&status);
787            return;
788        }
789
790        if let Err(e) = self.on_instrument_status(status) {
791            log_error(&e);
792        }
793    }
794
795    /// Handles a received instrument close.
796    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
797        log_received(&close);
798
799        if self.not_running() {
800            log_not_running(&close);
801            return;
802        }
803
804        if let Err(e) = self.on_instrument_close(close) {
805            log_error(&e);
806        }
807    }
808
809    /// Handles a received order filled event.
810    fn handle_order_filled(&mut self, event: &OrderFilled) {
811        log_received(&event);
812
813        // Check for double-handling: if the event's strategy_id matches this actor's id,
814        // it means a Strategy is receiving its own fill event through both automatic
815        // subscription and manual subscribe_order_fills, so skip the manual handler.
816        if event.strategy_id.inner() == self.actor_id().inner() {
817            return;
818        }
819
820        if self.not_running() {
821            log_not_running(&event);
822            return;
823        }
824
825        if let Err(e) = self.on_order_filled(event) {
826            log_error(&e);
827        }
828    }
829
830    /// Handles a received order canceled event.
831    fn handle_order_canceled(&mut self, event: &OrderCanceled) {
832        log_received(&event);
833
834        // Check for double-handling: if the event's strategy_id matches this actor's id,
835        // it means a Strategy is receiving its own cancel event through both automatic
836        // subscription and manual subscribe_order_cancels, so skip the manual handler.
837        if event.strategy_id.inner() == self.actor_id().inner() {
838            return;
839        }
840
841        if self.not_running() {
842            log_not_running(&event);
843            return;
844        }
845
846        if let Err(e) = self.on_order_canceled(event) {
847            log_error(&e);
848        }
849    }
850
851    #[cfg(feature = "defi")]
852    /// Handles a received block.
853    fn handle_block(&mut self, block: &Block) {
854        log_received(&block);
855
856        if self.not_running() {
857            log_not_running(&block);
858            return;
859        }
860
861        if let Err(e) = self.on_block(block) {
862            log_error(&e);
863        }
864    }
865
866    #[cfg(feature = "defi")]
867    /// Handles a received pool definition update.
868    fn handle_pool(&mut self, pool: &Pool) {
869        log_received(&pool);
870
871        if self.not_running() {
872            log_not_running(&pool);
873            return;
874        }
875
876        if let Err(e) = self.on_pool(pool) {
877            log_error(&e);
878        }
879    }
880
881    #[cfg(feature = "defi")]
882    /// Handles a received pool swap.
883    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
884        log_received(&swap);
885
886        if self.not_running() {
887            log_not_running(&swap);
888            return;
889        }
890
891        if let Err(e) = self.on_pool_swap(swap) {
892            log_error(&e);
893        }
894    }
895
896    #[cfg(feature = "defi")]
897    /// Handles a received pool liquidity update.
898    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
899        log_received(&update);
900
901        if self.not_running() {
902            log_not_running(&update);
903            return;
904        }
905
906        if let Err(e) = self.on_pool_liquidity_update(update) {
907            log_error(&e);
908        }
909    }
910
911    #[cfg(feature = "defi")]
912    /// Handles a received pool fee collect.
913    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
914        log_received(&collect);
915
916        if self.not_running() {
917            log_not_running(&collect);
918            return;
919        }
920
921        if let Err(e) = self.on_pool_fee_collect(collect) {
922            log_error(&e);
923        }
924    }
925
926    #[cfg(feature = "defi")]
927    /// Handles a received pool flash event.
928    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
929        log_received(&flash);
930
931        if self.not_running() {
932            log_not_running(&flash);
933            return;
934        }
935
936        if let Err(e) = self.on_pool_flash(flash) {
937            log_error(&e);
938        }
939    }
940
941    /// Handles received historical data.
942    fn handle_historical_data(&mut self, data: &dyn Any) {
943        log_received(&data);
944
945        if let Err(e) = self.on_historical_data(data) {
946            log_error(&e);
947        }
948    }
949
950    /// Handles a data response.
951    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
952        log_received(&resp);
953
954        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
955            log_error(&e);
956        }
957    }
958
959    /// Handles an instrument response.
960    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
961        log_received(&resp);
962
963        if let Err(e) = self.on_instrument(&resp.data) {
964            log_error(&e);
965        }
966    }
967
968    /// Handles an instruments response.
969    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
970        log_received_bulk("InstrumentsResponse", &resp.correlation_id, resp.data.len());
971        log::trace!("{RECV} {resp:?}");
972
973        for inst in &resp.data {
974            if let Err(e) = self.on_instrument(inst) {
975                log_error(&e);
976            }
977        }
978    }
979
980    /// Handles a book response.
981    fn handle_book_response(&mut self, resp: &BookResponse) {
982        log_received(&resp);
983
984        if let Err(e) = self.on_book(&resp.data) {
985            log_error(&e);
986        }
987    }
988
989    /// Handles a quotes response.
990    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
991        log_received_bulk("QuotesResponse", &resp.correlation_id, resp.data.len());
992        log::trace!("{RECV} {resp:?}");
993
994        if let Err(e) = self.on_historical_quotes(&resp.data) {
995            log_error(&e);
996        }
997    }
998
999    /// Handles a trades response.
1000    fn handle_trades_response(&mut self, resp: &TradesResponse) {
1001        log_received_bulk("TradesResponse", &resp.correlation_id, resp.data.len());
1002        log::trace!("{RECV} {resp:?}");
1003
1004        if let Err(e) = self.on_historical_trades(&resp.data) {
1005            log_error(&e);
1006        }
1007    }
1008
1009    /// Handles a funding rates response.
1010    fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1011        log_received_bulk(
1012            "FundingRatesResponse",
1013            &resp.correlation_id,
1014            resp.data.len(),
1015        );
1016        log::trace!("{RECV} {resp:?}");
1017
1018        if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1019            log_error(&e);
1020        }
1021    }
1022
1023    /// Handles a bars response.
1024    fn handle_bars_response(&mut self, resp: &BarsResponse) {
1025        log_received_bulk("BarsResponse", &resp.correlation_id, resp.data.len());
1026        log::trace!("{RECV} {resp:?}");
1027
1028        if let Err(e) = self.on_historical_bars(&resp.data) {
1029            log_error(&e);
1030        }
1031    }
1032
1033    /// Subscribe to streaming `data_type` data.
1034    fn subscribe_data(
1035        &mut self,
1036        data_type: DataType,
1037        client_id: Option<ClientId>,
1038        params: Option<Params>,
1039    ) where
1040        Self: 'static + Debug + Sized,
1041    {
1042        let actor_id = self.actor_id().inner();
1043        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1044            get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1045        });
1046
1047        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1048    }
1049
1050    /// Subscribe to [`Signal`] data by `name`.
1051    ///
1052    /// An empty `name` subscribes to every signal.
1053    ///
1054    /// # Parameters
1055    ///
1056    /// - `name`: signal name to subscribe to.
1057    /// - `priority`: optional dispatch priority. Pass `None` for default
1058    ///   ordering (by pattern then handler ID). Pass `Some(p)` when actors
1059    ///   sharing a signal need deterministic ordering: higher-priority
1060    ///   handlers receive the message before lower-priority handlers.
1061    ///
1062    /// Re-subscribing does not update an existing priority; call
1063    /// [`unsubscribe_signal`](Self::unsubscribe_signal) first.
1064    fn subscribe_signal(&mut self, name: &str, priority: Option<u32>)
1065    where
1066        Self: 'static + Debug + Sized,
1067    {
1068        let actor_id = self.actor_id().inner();
1069        // Signals are published as `CustomData` wrapping a `Signal`; downcast
1070        // the inner value so subscribers receive the typed `Signal` in `on_signal`.
1071        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1072            if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1073                if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1074                    actor.handle_signal(signal);
1075                } else {
1076                    log::error!("Actor {actor_id} not found for signal handling");
1077                }
1078            }
1079        });
1080
1081        DataActorCore::subscribe_signal(self, handler, name, priority);
1082    }
1083
1084    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
1085    fn subscribe_quotes(
1086        &mut self,
1087        instrument_id: InstrumentId,
1088        client_id: Option<ClientId>,
1089        params: Option<Params>,
1090    ) where
1091        Self: 'static + Debug + Sized,
1092    {
1093        let actor_id = self.actor_id().inner();
1094        let topic = get_quotes_topic(instrument_id);
1095
1096        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1097            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1098                actor.handle_quote(quote);
1099            } else {
1100                log::error!("Actor {actor_id} not found for quote handling");
1101            }
1102        });
1103
1104        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1105    }
1106
1107    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
1108    fn subscribe_instruments(
1109        &mut self,
1110        venue: Venue,
1111        client_id: Option<ClientId>,
1112        params: Option<Params>,
1113    ) where
1114        Self: 'static + Debug + Sized,
1115    {
1116        let actor_id = self.actor_id().inner();
1117        let pattern = get_instruments_pattern(venue);
1118
1119        let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1120            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1121                actor.handle_instrument(instrument);
1122            } else {
1123                log::error!("Actor {actor_id} not found for instruments handling");
1124            }
1125        });
1126
1127        DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1128    }
1129
1130    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
1131    fn subscribe_instrument(
1132        &mut self,
1133        instrument_id: InstrumentId,
1134        client_id: Option<ClientId>,
1135        params: Option<Params>,
1136    ) where
1137        Self: 'static + Debug + Sized,
1138    {
1139        let actor_id = self.actor_id().inner();
1140        let topic = get_instrument_topic(instrument_id);
1141
1142        let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1143            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1144                actor.handle_instrument(instrument);
1145            } else {
1146                log::error!("Actor {actor_id} not found for instrument handling");
1147            }
1148        });
1149
1150        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1151    }
1152
1153    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1154    fn subscribe_book_deltas(
1155        &mut self,
1156        instrument_id: InstrumentId,
1157        book_type: BookType,
1158        depth: Option<NonZeroUsize>,
1159        client_id: Option<ClientId>,
1160        managed: bool,
1161        params: Option<Params>,
1162    ) where
1163        Self: 'static + Debug + Sized,
1164    {
1165        let actor_id = self.actor_id().inner();
1166        let is_parent = is_parent_subscription(params.as_ref());
1167        let pattern = if is_parent {
1168            get_book_deltas_pattern(instrument_id)
1169        } else {
1170            get_book_deltas_topic(instrument_id).into()
1171        };
1172
1173        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1174            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1175        });
1176
1177        DataActorCore::subscribe_book_deltas(
1178            self,
1179            pattern,
1180            handler,
1181            instrument_id,
1182            book_type,
1183            depth,
1184            client_id,
1185            managed,
1186            params,
1187        );
1188    }
1189
1190    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1191    fn subscribe_book_at_interval(
1192        &mut self,
1193        instrument_id: InstrumentId,
1194        book_type: BookType,
1195        depth: Option<NonZeroUsize>,
1196        interval_ms: NonZeroUsize,
1197        client_id: Option<ClientId>,
1198        params: Option<Params>,
1199    ) where
1200        Self: 'static + Debug + Sized,
1201    {
1202        let actor_id = self.actor_id().inner();
1203        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1204
1205        let handler = TypedHandler::from(move |book: &OrderBook| {
1206            get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1207        });
1208
1209        DataActorCore::subscribe_book_at_interval(
1210            self,
1211            topic,
1212            handler,
1213            instrument_id,
1214            book_type,
1215            depth,
1216            interval_ms,
1217            client_id,
1218            params,
1219        );
1220    }
1221
1222    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1223    fn subscribe_trades(
1224        &mut self,
1225        instrument_id: InstrumentId,
1226        client_id: Option<ClientId>,
1227        params: Option<Params>,
1228    ) where
1229        Self: 'static + Debug + Sized,
1230    {
1231        let actor_id = self.actor_id().inner();
1232        let topic = get_trades_topic(instrument_id);
1233
1234        let handler = TypedHandler::from(move |trade: &TradeTick| {
1235            get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1236        });
1237
1238        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1239    }
1240
1241    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1242    fn subscribe_bars(
1243        &mut self,
1244        bar_type: BarType,
1245        client_id: Option<ClientId>,
1246        params: Option<Params>,
1247    ) where
1248        Self: 'static + Debug + Sized,
1249    {
1250        let actor_id = self.actor_id().inner();
1251        let topic = get_bars_topic(bar_type);
1252
1253        let handler = TypedHandler::from(move |bar: &Bar| {
1254            get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1255        });
1256
1257        DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1258    }
1259
1260    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1261    fn subscribe_mark_prices(
1262        &mut self,
1263        instrument_id: InstrumentId,
1264        client_id: Option<ClientId>,
1265        params: Option<Params>,
1266    ) where
1267        Self: 'static + Debug + Sized,
1268    {
1269        let actor_id = self.actor_id().inner();
1270        let topic = get_mark_price_topic(instrument_id);
1271
1272        let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1273            get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1274        });
1275
1276        DataActorCore::subscribe_mark_prices(
1277            self,
1278            topic,
1279            handler,
1280            instrument_id,
1281            client_id,
1282            params,
1283        );
1284    }
1285
1286    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1287    fn subscribe_index_prices(
1288        &mut self,
1289        instrument_id: InstrumentId,
1290        client_id: Option<ClientId>,
1291        params: Option<Params>,
1292    ) where
1293        Self: 'static + Debug + Sized,
1294    {
1295        let actor_id = self.actor_id().inner();
1296        let topic = get_index_price_topic(instrument_id);
1297
1298        let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1299            get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1300        });
1301
1302        DataActorCore::subscribe_index_prices(
1303            self,
1304            topic,
1305            handler,
1306            instrument_id,
1307            client_id,
1308            params,
1309        );
1310    }
1311
1312    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1313    fn subscribe_funding_rates(
1314        &mut self,
1315        instrument_id: InstrumentId,
1316        client_id: Option<ClientId>,
1317        params: Option<Params>,
1318    ) where
1319        Self: 'static + Debug + Sized,
1320    {
1321        let actor_id = self.actor_id().inner();
1322        let topic = get_funding_rate_topic(instrument_id);
1323
1324        let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1325            get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1326        });
1327
1328        DataActorCore::subscribe_funding_rates(
1329            self,
1330            topic,
1331            handler,
1332            instrument_id,
1333            client_id,
1334            params,
1335        );
1336    }
1337
1338    /// Subscribe to streaming [`OptionGreeks`] data for the `instrument_id`.
1339    fn subscribe_option_greeks(
1340        &mut self,
1341        instrument_id: InstrumentId,
1342        client_id: Option<ClientId>,
1343        params: Option<Params>,
1344    ) where
1345        Self: 'static + Debug + Sized,
1346    {
1347        let actor_id = self.actor_id().inner();
1348        let topic = get_option_greeks_topic(instrument_id);
1349
1350        let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1351            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1352                actor.handle_option_greeks(option_greeks);
1353            } else {
1354                log::error!("Actor {actor_id} not found for option greeks handling");
1355            }
1356        });
1357
1358        DataActorCore::subscribe_option_greeks(
1359            self,
1360            topic,
1361            handler,
1362            instrument_id,
1363            client_id,
1364            params,
1365        );
1366    }
1367
1368    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1369    fn subscribe_instrument_status(
1370        &mut self,
1371        instrument_id: InstrumentId,
1372        client_id: Option<ClientId>,
1373        params: Option<Params>,
1374    ) where
1375        Self: 'static + Debug + Sized,
1376    {
1377        let actor_id = self.actor_id().inner();
1378        let topic = get_instrument_status_topic(instrument_id);
1379
1380        let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1381            get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1382        });
1383
1384        DataActorCore::subscribe_instrument_status(
1385            self,
1386            topic,
1387            handler,
1388            instrument_id,
1389            client_id,
1390            params,
1391        );
1392    }
1393
1394    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1395    fn subscribe_instrument_close(
1396        &mut self,
1397        instrument_id: InstrumentId,
1398        client_id: Option<ClientId>,
1399        params: Option<Params>,
1400    ) where
1401        Self: 'static + Debug + Sized,
1402    {
1403        let actor_id = self.actor_id().inner();
1404        let topic = get_instrument_close_topic(instrument_id);
1405
1406        let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1407            get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1408        });
1409
1410        DataActorCore::subscribe_instrument_close(
1411            self,
1412            topic,
1413            handler,
1414            instrument_id,
1415            client_id,
1416            params,
1417        );
1418    }
1419
1420    /// Subscribe to streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1421    ///
1422    /// The ATM price is always derived from the exchange-provided forward price
1423    /// embedded in each option greeks/ticker update.
1424    fn subscribe_option_chain(
1425        &mut self,
1426        series_id: OptionSeriesId,
1427        strike_range: StrikeRange,
1428        snapshot_interval_ms: Option<u64>,
1429        client_id: Option<ClientId>,
1430        params: Option<Params>,
1431    ) where
1432        Self: 'static + Debug + Sized,
1433    {
1434        let actor_id = self.actor_id().inner();
1435        let topic = get_option_chain_topic(series_id);
1436
1437        let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1438            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1439                actor.handle_option_chain(slice);
1440            } else {
1441                log::error!("Actor {actor_id} not found for option chain handling");
1442            }
1443        });
1444
1445        DataActorCore::subscribe_option_chain(
1446            self,
1447            topic,
1448            handler,
1449            series_id,
1450            strike_range,
1451            snapshot_interval_ms,
1452            client_id,
1453            params,
1454        );
1455    }
1456
1457    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1458    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1459    where
1460        Self: 'static + Debug + Sized,
1461    {
1462        let actor_id = self.actor_id().inner();
1463        let topic = get_order_fills_topic(instrument_id);
1464
1465        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1466            if let OrderEventAny::Filled(filled) = event {
1467                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1468            }
1469        });
1470
1471        DataActorCore::subscribe_order_fills(self, topic, handler);
1472    }
1473
1474    /// Subscribe to [`OrderCanceled`] events for the `instrument_id`.
1475    fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1476    where
1477        Self: 'static + Debug + Sized,
1478    {
1479        let actor_id = self.actor_id().inner();
1480        let topic = get_order_cancels_topic(instrument_id);
1481
1482        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1483            if let OrderEventAny::Canceled(canceled) = event {
1484                get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1485            }
1486        });
1487
1488        DataActorCore::subscribe_order_cancels(self, topic, handler);
1489    }
1490
1491    #[cfg(feature = "defi")]
1492    /// Subscribe to streaming [`Block`] data for the `chain`.
1493    fn subscribe_blocks(
1494        &mut self,
1495        chain: Blockchain,
1496        client_id: Option<ClientId>,
1497        params: Option<Params>,
1498    ) where
1499        Self: 'static + Debug + Sized,
1500    {
1501        let actor_id = self.actor_id().inner();
1502        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1503
1504        let handler = TypedHandler::from(move |block: &Block| {
1505            get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1506        });
1507
1508        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1509    }
1510
1511    #[cfg(feature = "defi")]
1512    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1513    fn subscribe_pool(
1514        &mut self,
1515        instrument_id: InstrumentId,
1516        client_id: Option<ClientId>,
1517        params: Option<Params>,
1518    ) where
1519        Self: 'static + Debug + Sized,
1520    {
1521        let actor_id = self.actor_id().inner();
1522        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1523
1524        let handler = TypedHandler::from(move |pool: &Pool| {
1525            get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1526        });
1527
1528        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1529    }
1530
1531    #[cfg(feature = "defi")]
1532    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1533    fn subscribe_pool_swaps(
1534        &mut self,
1535        instrument_id: InstrumentId,
1536        client_id: Option<ClientId>,
1537        params: Option<Params>,
1538    ) where
1539        Self: 'static + Debug + Sized,
1540    {
1541        let actor_id = self.actor_id().inner();
1542        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1543
1544        let handler = TypedHandler::from(move |swap: &PoolSwap| {
1545            get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1546        });
1547
1548        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1549    }
1550
1551    #[cfg(feature = "defi")]
1552    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1553    fn subscribe_pool_liquidity_updates(
1554        &mut self,
1555        instrument_id: InstrumentId,
1556        client_id: Option<ClientId>,
1557        params: Option<Params>,
1558    ) where
1559        Self: 'static + Debug + Sized,
1560    {
1561        let actor_id = self.actor_id().inner();
1562        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1563
1564        let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1565            get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1566        });
1567
1568        DataActorCore::subscribe_pool_liquidity_updates(
1569            self,
1570            topic,
1571            handler,
1572            instrument_id,
1573            client_id,
1574            params,
1575        );
1576    }
1577
1578    #[cfg(feature = "defi")]
1579    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1580    fn subscribe_pool_fee_collects(
1581        &mut self,
1582        instrument_id: InstrumentId,
1583        client_id: Option<ClientId>,
1584        params: Option<Params>,
1585    ) where
1586        Self: 'static + Debug + Sized,
1587    {
1588        let actor_id = self.actor_id().inner();
1589        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1590
1591        let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1592            get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1593        });
1594
1595        DataActorCore::subscribe_pool_fee_collects(
1596            self,
1597            topic,
1598            handler,
1599            instrument_id,
1600            client_id,
1601            params,
1602        );
1603    }
1604
1605    #[cfg(feature = "defi")]
1606    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1607    fn subscribe_pool_flash_events(
1608        &mut self,
1609        instrument_id: InstrumentId,
1610        client_id: Option<ClientId>,
1611        params: Option<Params>,
1612    ) where
1613        Self: 'static + Debug + Sized,
1614    {
1615        let actor_id = self.actor_id().inner();
1616        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1617
1618        let handler = TypedHandler::from(move |flash: &PoolFlash| {
1619            get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1620        });
1621
1622        DataActorCore::subscribe_pool_flash_events(
1623            self,
1624            topic,
1625            handler,
1626            instrument_id,
1627            client_id,
1628            params,
1629        );
1630    }
1631
1632    /// Unsubscribe from streaming `data_type` data.
1633    fn unsubscribe_data(
1634        &mut self,
1635        data_type: DataType,
1636        client_id: Option<ClientId>,
1637        params: Option<Params>,
1638    ) where
1639        Self: 'static + Debug + Sized,
1640    {
1641        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1642    }
1643
1644    /// Unsubscribe from [`Signal`] data by `name`.
1645    fn unsubscribe_signal(&mut self, name: &str)
1646    where
1647        Self: 'static + Debug + Sized,
1648    {
1649        DataActorCore::unsubscribe_signal(self, name);
1650    }
1651
1652    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1653    fn unsubscribe_instruments(
1654        &mut self,
1655        venue: Venue,
1656        client_id: Option<ClientId>,
1657        params: Option<Params>,
1658    ) where
1659        Self: 'static + Debug + Sized,
1660    {
1661        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1662    }
1663
1664    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1665    fn unsubscribe_instrument(
1666        &mut self,
1667        instrument_id: InstrumentId,
1668        client_id: Option<ClientId>,
1669        params: Option<Params>,
1670    ) where
1671        Self: 'static + Debug + Sized,
1672    {
1673        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1674    }
1675
1676    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1677    fn unsubscribe_book_deltas(
1678        &mut self,
1679        instrument_id: InstrumentId,
1680        client_id: Option<ClientId>,
1681        params: Option<Params>,
1682    ) where
1683        Self: 'static + Debug + Sized,
1684    {
1685        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1686    }
1687
1688    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1689    fn unsubscribe_book_at_interval(
1690        &mut self,
1691        instrument_id: InstrumentId,
1692        interval_ms: NonZeroUsize,
1693        client_id: Option<ClientId>,
1694        params: Option<Params>,
1695    ) where
1696        Self: 'static + Debug + Sized,
1697    {
1698        DataActorCore::unsubscribe_book_at_interval(
1699            self,
1700            instrument_id,
1701            interval_ms,
1702            client_id,
1703            params,
1704        );
1705    }
1706
1707    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1708    fn unsubscribe_quotes(
1709        &mut self,
1710        instrument_id: InstrumentId,
1711        client_id: Option<ClientId>,
1712        params: Option<Params>,
1713    ) where
1714        Self: 'static + Debug + Sized,
1715    {
1716        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1717    }
1718
1719    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1720    fn unsubscribe_trades(
1721        &mut self,
1722        instrument_id: InstrumentId,
1723        client_id: Option<ClientId>,
1724        params: Option<Params>,
1725    ) where
1726        Self: 'static + Debug + Sized,
1727    {
1728        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1729    }
1730
1731    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1732    fn unsubscribe_bars(
1733        &mut self,
1734        bar_type: BarType,
1735        client_id: Option<ClientId>,
1736        params: Option<Params>,
1737    ) where
1738        Self: 'static + Debug + Sized,
1739    {
1740        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1741    }
1742
1743    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1744    fn unsubscribe_mark_prices(
1745        &mut self,
1746        instrument_id: InstrumentId,
1747        client_id: Option<ClientId>,
1748        params: Option<Params>,
1749    ) where
1750        Self: 'static + Debug + Sized,
1751    {
1752        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1753    }
1754
1755    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1756    fn unsubscribe_index_prices(
1757        &mut self,
1758        instrument_id: InstrumentId,
1759        client_id: Option<ClientId>,
1760        params: Option<Params>,
1761    ) where
1762        Self: 'static + Debug + Sized,
1763    {
1764        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1765    }
1766
1767    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1768    fn unsubscribe_funding_rates(
1769        &mut self,
1770        instrument_id: InstrumentId,
1771        client_id: Option<ClientId>,
1772        params: Option<Params>,
1773    ) where
1774        Self: 'static + Debug + Sized,
1775    {
1776        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1777    }
1778
1779    /// Unsubscribe from streaming [`OptionGreeks`] data for the `instrument_id`.
1780    fn unsubscribe_option_greeks(
1781        &mut self,
1782        instrument_id: InstrumentId,
1783        client_id: Option<ClientId>,
1784        params: Option<Params>,
1785    ) where
1786        Self: 'static + Debug + Sized,
1787    {
1788        DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1789    }
1790
1791    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1792    fn unsubscribe_instrument_status(
1793        &mut self,
1794        instrument_id: InstrumentId,
1795        client_id: Option<ClientId>,
1796        params: Option<Params>,
1797    ) where
1798        Self: 'static + Debug + Sized,
1799    {
1800        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1801    }
1802
1803    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1804    fn unsubscribe_instrument_close(
1805        &mut self,
1806        instrument_id: InstrumentId,
1807        client_id: Option<ClientId>,
1808        params: Option<Params>,
1809    ) where
1810        Self: 'static + Debug + Sized,
1811    {
1812        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1813    }
1814
1815    /// Unsubscribe from streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1816    fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1817    where
1818        Self: 'static + Debug + Sized,
1819    {
1820        DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1821    }
1822
1823    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
1824    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1825    where
1826        Self: 'static + Debug + Sized,
1827    {
1828        DataActorCore::unsubscribe_order_fills(self, instrument_id);
1829    }
1830
1831    /// Unsubscribe from [`OrderCanceled`] events for the `instrument_id`.
1832    fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1833    where
1834        Self: 'static + Debug + Sized,
1835    {
1836        DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1837    }
1838
1839    #[cfg(feature = "defi")]
1840    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1841    fn unsubscribe_blocks(
1842        &mut self,
1843        chain: Blockchain,
1844        client_id: Option<ClientId>,
1845        params: Option<Params>,
1846    ) where
1847        Self: 'static + Debug + Sized,
1848    {
1849        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1850    }
1851
1852    #[cfg(feature = "defi")]
1853    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1854    fn unsubscribe_pool(
1855        &mut self,
1856        instrument_id: InstrumentId,
1857        client_id: Option<ClientId>,
1858        params: Option<Params>,
1859    ) where
1860        Self: 'static + Debug + Sized,
1861    {
1862        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1863    }
1864
1865    #[cfg(feature = "defi")]
1866    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1867    fn unsubscribe_pool_swaps(
1868        &mut self,
1869        instrument_id: InstrumentId,
1870        client_id: Option<ClientId>,
1871        params: Option<Params>,
1872    ) where
1873        Self: 'static + Debug + Sized,
1874    {
1875        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1876    }
1877
1878    #[cfg(feature = "defi")]
1879    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1880    fn unsubscribe_pool_liquidity_updates(
1881        &mut self,
1882        instrument_id: InstrumentId,
1883        client_id: Option<ClientId>,
1884        params: Option<Params>,
1885    ) where
1886        Self: 'static + Debug + Sized,
1887    {
1888        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1889    }
1890
1891    #[cfg(feature = "defi")]
1892    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
1893    fn unsubscribe_pool_fee_collects(
1894        &mut self,
1895        instrument_id: InstrumentId,
1896        client_id: Option<ClientId>,
1897        params: Option<Params>,
1898    ) where
1899        Self: 'static + Debug + Sized,
1900    {
1901        DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1902    }
1903
1904    #[cfg(feature = "defi")]
1905    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
1906    fn unsubscribe_pool_flash_events(
1907        &mut self,
1908        instrument_id: InstrumentId,
1909        client_id: Option<ClientId>,
1910        params: Option<Params>,
1911    ) where
1912        Self: 'static + Debug + Sized,
1913    {
1914        DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1915    }
1916
1917    /// Request historical custom data of the given `data_type`.
1918    ///
1919    /// # Errors
1920    ///
1921    /// Returns an error if input parameters are invalid.
1922    fn request_data(
1923        &mut self,
1924        data_type: DataType,
1925        client_id: ClientId,
1926        start: Option<DateTime<Utc>>,
1927        end: Option<DateTime<Utc>>,
1928        limit: Option<NonZeroUsize>,
1929        params: Option<Params>,
1930    ) -> anyhow::Result<UUID4>
1931    where
1932        Self: 'static + Debug + Sized,
1933    {
1934        let actor_id = self.actor_id().inner();
1935        let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1936            get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1937        });
1938
1939        DataActorCore::request_data(
1940            self, data_type, client_id, start, end, limit, params, handler,
1941        )
1942    }
1943
1944    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1945    ///
1946    /// # Errors
1947    ///
1948    /// Returns an error if input parameters are invalid.
1949    fn request_instrument(
1950        &mut self,
1951        instrument_id: InstrumentId,
1952        start: Option<DateTime<Utc>>,
1953        end: Option<DateTime<Utc>>,
1954        client_id: Option<ClientId>,
1955        params: Option<Params>,
1956    ) -> anyhow::Result<UUID4>
1957    where
1958        Self: 'static + Debug + Sized,
1959    {
1960        let actor_id = self.actor_id().inner();
1961        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1962            get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1963        });
1964
1965        DataActorCore::request_instrument(
1966            self,
1967            instrument_id,
1968            start,
1969            end,
1970            client_id,
1971            params,
1972            handler,
1973        )
1974    }
1975
1976    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1977    ///
1978    /// # Errors
1979    ///
1980    /// Returns an error if input parameters are invalid.
1981    fn request_instruments(
1982        &mut self,
1983        venue: Option<Venue>,
1984        start: Option<DateTime<Utc>>,
1985        end: Option<DateTime<Utc>>,
1986        client_id: Option<ClientId>,
1987        params: Option<Params>,
1988    ) -> anyhow::Result<UUID4>
1989    where
1990        Self: 'static + Debug + Sized,
1991    {
1992        let actor_id = self.actor_id().inner();
1993        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1994            get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1995        });
1996
1997        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1998    }
1999
2000    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
2001    ///
2002    /// # Errors
2003    ///
2004    /// Returns an error if input parameters are invalid.
2005    fn request_book_snapshot(
2006        &mut self,
2007        instrument_id: InstrumentId,
2008        depth: Option<NonZeroUsize>,
2009        client_id: Option<ClientId>,
2010        params: Option<Params>,
2011    ) -> anyhow::Result<UUID4>
2012    where
2013        Self: 'static + Debug + Sized,
2014    {
2015        let actor_id = self.actor_id().inner();
2016        let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
2017            get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
2018        });
2019
2020        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
2021    }
2022
2023    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
2024    ///
2025    /// # Errors
2026    ///
2027    /// Returns an error if input parameters are invalid.
2028    fn request_quotes(
2029        &mut self,
2030        instrument_id: InstrumentId,
2031        start: Option<DateTime<Utc>>,
2032        end: Option<DateTime<Utc>>,
2033        limit: Option<NonZeroUsize>,
2034        client_id: Option<ClientId>,
2035        params: Option<Params>,
2036    ) -> anyhow::Result<UUID4>
2037    where
2038        Self: 'static + Debug + Sized,
2039    {
2040        let actor_id = self.actor_id().inner();
2041        let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2042            get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2043        });
2044
2045        DataActorCore::request_quotes(
2046            self,
2047            instrument_id,
2048            start,
2049            end,
2050            limit,
2051            client_id,
2052            params,
2053            handler,
2054        )
2055    }
2056
2057    /// Request historical [`TradeTick`] data for the given `instrument_id`.
2058    ///
2059    /// # Errors
2060    ///
2061    /// Returns an error if input parameters are invalid.
2062    fn request_trades(
2063        &mut self,
2064        instrument_id: InstrumentId,
2065        start: Option<DateTime<Utc>>,
2066        end: Option<DateTime<Utc>>,
2067        limit: Option<NonZeroUsize>,
2068        client_id: Option<ClientId>,
2069        params: Option<Params>,
2070    ) -> anyhow::Result<UUID4>
2071    where
2072        Self: 'static + Debug + Sized,
2073    {
2074        let actor_id = self.actor_id().inner();
2075        let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2076            get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2077        });
2078
2079        DataActorCore::request_trades(
2080            self,
2081            instrument_id,
2082            start,
2083            end,
2084            limit,
2085            client_id,
2086            params,
2087            handler,
2088        )
2089    }
2090
2091    /// Request historical [`FundingRateUpdate`] data for the given `instrument_id`.
2092    ///
2093    /// # Errors
2094    ///
2095    /// Returns an error if input parameters are invalid.
2096    fn request_funding_rates(
2097        &mut self,
2098        instrument_id: InstrumentId,
2099        start: Option<DateTime<Utc>>,
2100        end: Option<DateTime<Utc>>,
2101        limit: Option<NonZeroUsize>,
2102        client_id: Option<ClientId>,
2103        params: Option<Params>,
2104    ) -> anyhow::Result<UUID4>
2105    where
2106        Self: 'static + Debug + Sized,
2107    {
2108        let actor_id = self.actor_id().inner();
2109        let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2110            get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2111        });
2112
2113        DataActorCore::request_funding_rates(
2114            self,
2115            instrument_id,
2116            start,
2117            end,
2118            limit,
2119            client_id,
2120            params,
2121            handler,
2122        )
2123    }
2124
2125    /// Request historical [`Bar`] data for the given `bar_type`.
2126    ///
2127    /// # Errors
2128    ///
2129    /// Returns an error if input parameters are invalid.
2130    fn request_bars(
2131        &mut self,
2132        bar_type: BarType,
2133        start: Option<DateTime<Utc>>,
2134        end: Option<DateTime<Utc>>,
2135        limit: Option<NonZeroUsize>,
2136        client_id: Option<ClientId>,
2137        params: Option<Params>,
2138    ) -> anyhow::Result<UUID4>
2139    where
2140        Self: 'static + Debug + Sized,
2141    {
2142        let actor_id = self.actor_id().inner();
2143        let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2144            get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2145        });
2146
2147        DataActorCore::request_bars(
2148            self, bar_type, start, end, limit, client_id, params, handler,
2149        )
2150    }
2151}
2152
2153// Blanket implementation: any DataActor automatically implements Actor
2154impl<T> Actor for T
2155where
2156    T: DataActor + Debug + 'static,
2157{
2158    fn id(&self) -> Ustr {
2159        self.actor_id.inner()
2160    }
2161
2162    #[allow(unused_variables)]
2163    fn handle(&mut self, msg: &dyn Any) {
2164        // Default empty implementation - concrete actors can override if needed
2165    }
2166
2167    fn as_any(&self) -> &dyn Any {
2168        self
2169    }
2170}
2171
2172// Blanket implementation: any DataActor automatically implements Component
2173impl<T> Component for T
2174where
2175    T: DataActor + Debug + 'static,
2176{
2177    fn component_id(&self) -> ComponentId {
2178        ComponentId::new(self.actor_id.inner().as_str())
2179    }
2180
2181    fn state(&self) -> ComponentState {
2182        self.state
2183    }
2184
2185    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2186        self.state = self.state.transition(&trigger)?;
2187        log::info!("{}", self.state.variant_name());
2188        Ok(())
2189    }
2190
2191    fn register(
2192        &mut self,
2193        trader_id: TraderId,
2194        clock: Rc<RefCell<dyn Clock>>,
2195        cache: Rc<RefCell<Cache>>,
2196    ) -> anyhow::Result<()> {
2197        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2198
2199        // Register default time event handler for this actor
2200        let actor_id = self.actor_id().inner();
2201        let callback = TimeEventCallback::from(move |event: TimeEvent| {
2202            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2203                actor.handle_time_event(&event);
2204            } else {
2205                log::error!("Actor {actor_id} not found for time event handling");
2206            }
2207        });
2208
2209        clock.borrow_mut().register_default_handler(callback);
2210
2211        self.initialize()
2212    }
2213
2214    fn on_start(&mut self) -> anyhow::Result<()> {
2215        DataActor::on_start(self)
2216    }
2217
2218    fn on_stop(&mut self) -> anyhow::Result<()> {
2219        DataActor::on_stop(self)
2220    }
2221
2222    fn on_resume(&mut self) -> anyhow::Result<()> {
2223        DataActor::on_resume(self)
2224    }
2225
2226    fn on_degrade(&mut self) -> anyhow::Result<()> {
2227        DataActor::on_degrade(self)
2228    }
2229
2230    fn on_fault(&mut self) -> anyhow::Result<()> {
2231        DataActor::on_fault(self)
2232    }
2233
2234    fn on_reset(&mut self) -> anyhow::Result<()> {
2235        DataActor::on_reset(self)
2236    }
2237
2238    fn on_dispose(&mut self) -> anyhow::Result<()> {
2239        DataActor::on_dispose(self)
2240    }
2241}
2242
2243/// Core functionality for all actors.
2244#[derive(Clone)]
2245#[allow(
2246    dead_code,
2247    reason = "TODO: Under development (pending_requests, signal_classes)"
2248)]
2249pub struct DataActorCore {
2250    /// The actor identifier.
2251    pub actor_id: ActorId,
2252    /// The actors configuration.
2253    pub config: DataActorConfig,
2254    trader_id: Option<TraderId>,
2255    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
2256    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
2257    state: ComponentState,
2258    topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2259    instrument_handlers: AHashMap<MStr<Pattern>, TypedHandler<InstrumentAny>>,
2260    deltas_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDeltas>>,
2261    depth10_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDepth10>>,
2262    book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2263    quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2264    trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2265    bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2266    mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2267    index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2268    funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2269    option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2270    option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2271    order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2272    #[cfg(feature = "defi")]
2273    block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2274    #[cfg(feature = "defi")]
2275    pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2276    #[cfg(feature = "defi")]
2277    pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2278    #[cfg(feature = "defi")]
2279    pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2280    #[cfg(feature = "defi")]
2281    pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2282    #[cfg(feature = "defi")]
2283    pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2284    warning_events: AHashSet<String>, // TODO: TBD
2285    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2286    signal_classes: AHashMap<String, String>,
2287    #[cfg(feature = "indicators")]
2288    indicators: Indicators,
2289}
2290
2291impl Debug for DataActorCore {
2292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2293        f.debug_struct(stringify!(DataActorCore))
2294            .field("actor_id", &self.actor_id)
2295            .field("config", &self.config)
2296            .field("state", &self.state)
2297            .field("trader_id", &self.trader_id)
2298            .finish()
2299    }
2300}
2301
2302impl DataActorCore {
2303    /// Adds a subscription handler for the `topic`.
2304    ///
2305    //// Logs a warning if the actor is already subscribed to the topic.
2306    pub(crate) fn add_subscription_any(
2307        &mut self,
2308        topic: MStr<Topic>,
2309        handler: ShareableMessageHandler,
2310    ) {
2311        let pattern: MStr<Pattern> = topic.into();
2312        if self.topic_handlers.contains_key(&pattern) {
2313            log::warn!(
2314                "Actor {} attempted duplicate subscription to topic '{topic}'",
2315                self.actor_id,
2316            );
2317            return;
2318        }
2319
2320        self.topic_handlers.insert(pattern, handler.clone());
2321        msgbus::subscribe_any(pattern, handler, None);
2322    }
2323
2324    /// Removes a subscription handler for the `topic` if present.
2325    ///
2326    /// Logs a warning if the actor is not currently subscribed to the topic.
2327    pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2328        let pattern: MStr<Pattern> = topic.into();
2329        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2330            msgbus::unsubscribe_any(pattern, &handler);
2331        } else {
2332            log::warn!(
2333                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2334                self.actor_id,
2335            );
2336        }
2337    }
2338
2339    pub(crate) fn add_quote_subscription(
2340        &mut self,
2341        topic: MStr<Topic>,
2342        handler: TypedHandler<QuoteTick>,
2343    ) {
2344        if self.quote_handlers.contains_key(&topic) {
2345            log::warn!(
2346                "Actor {} attempted duplicate quote subscription to '{topic}'",
2347                self.actor_id
2348            );
2349            return;
2350        }
2351        self.quote_handlers.insert(topic, handler.clone());
2352        msgbus::subscribe_quotes(topic.into(), handler, None);
2353    }
2354
2355    #[allow(dead_code)]
2356    pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2357        if let Some(handler) = self.quote_handlers.remove(&topic) {
2358            msgbus::unsubscribe_quotes(topic.into(), &handler);
2359        }
2360    }
2361
2362    pub(crate) fn add_trade_subscription(
2363        &mut self,
2364        topic: MStr<Topic>,
2365        handler: TypedHandler<TradeTick>,
2366    ) {
2367        if self.trade_handlers.contains_key(&topic) {
2368            log::warn!(
2369                "Actor {} attempted duplicate trade subscription to '{topic}'",
2370                self.actor_id
2371            );
2372            return;
2373        }
2374        self.trade_handlers.insert(topic, handler.clone());
2375        msgbus::subscribe_trades(topic.into(), handler, None);
2376    }
2377
2378    #[allow(dead_code)]
2379    pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2380        if let Some(handler) = self.trade_handlers.remove(&topic) {
2381            msgbus::unsubscribe_trades(topic.into(), &handler);
2382        }
2383    }
2384
2385    pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2386        if self.bar_handlers.contains_key(&topic) {
2387            log::warn!(
2388                "Actor {} attempted duplicate bar subscription to '{topic}'",
2389                self.actor_id
2390            );
2391            return;
2392        }
2393        self.bar_handlers.insert(topic, handler.clone());
2394        msgbus::subscribe_bars(topic.into(), handler, None);
2395    }
2396
2397    #[allow(dead_code)]
2398    pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2399        if let Some(handler) = self.bar_handlers.remove(&topic) {
2400            msgbus::unsubscribe_bars(topic.into(), &handler);
2401        }
2402    }
2403
2404    pub(crate) fn add_order_event_subscription(
2405        &mut self,
2406        topic: MStr<Topic>,
2407        handler: TypedHandler<OrderEventAny>,
2408    ) {
2409        if self.order_event_handlers.contains_key(&topic) {
2410            log::warn!(
2411                "Actor {} attempted duplicate order event subscription to '{topic}'",
2412                self.actor_id
2413            );
2414            return;
2415        }
2416        self.order_event_handlers.insert(topic, handler.clone());
2417        msgbus::subscribe_order_events(topic.into(), handler, None);
2418    }
2419
2420    #[allow(dead_code)]
2421    pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2422        if let Some(handler) = self.order_event_handlers.remove(&topic) {
2423            msgbus::unsubscribe_order_events(topic.into(), &handler);
2424        }
2425    }
2426
2427    pub(crate) fn add_deltas_subscription(
2428        &mut self,
2429        pattern: MStr<Pattern>,
2430        handler: TypedHandler<OrderBookDeltas>,
2431    ) {
2432        if self.deltas_handlers.contains_key(&pattern) {
2433            log::warn!(
2434                "Actor {} attempted duplicate deltas subscription to '{pattern}'",
2435                self.actor_id
2436            );
2437            return;
2438        }
2439        self.deltas_handlers.insert(pattern, handler.clone());
2440        msgbus::subscribe_book_deltas(pattern, handler, None);
2441    }
2442
2443    #[allow(dead_code)]
2444    pub(crate) fn remove_deltas_subscription(&mut self, pattern: MStr<Pattern>) {
2445        if let Some(handler) = self.deltas_handlers.remove(&pattern) {
2446            msgbus::unsubscribe_book_deltas(pattern, &handler);
2447        }
2448    }
2449
2450    #[allow(dead_code)]
2451    pub(crate) fn add_depth10_subscription(
2452        &mut self,
2453        pattern: MStr<Pattern>,
2454        handler: TypedHandler<OrderBookDepth10>,
2455    ) {
2456        if self.depth10_handlers.contains_key(&pattern) {
2457            log::warn!(
2458                "Actor {} attempted duplicate depth10 subscription to '{pattern}'",
2459                self.actor_id
2460            );
2461            return;
2462        }
2463        self.depth10_handlers.insert(pattern, handler.clone());
2464        msgbus::subscribe_book_depth10(pattern, handler, None);
2465    }
2466
2467    #[allow(dead_code)]
2468    pub(crate) fn remove_depth10_subscription(&mut self, pattern: MStr<Pattern>) {
2469        if let Some(handler) = self.depth10_handlers.remove(&pattern) {
2470            msgbus::unsubscribe_book_depth10(pattern, &handler);
2471        }
2472    }
2473
2474    pub(crate) fn add_instrument_subscription(
2475        &mut self,
2476        pattern: MStr<Pattern>,
2477        handler: TypedHandler<InstrumentAny>,
2478    ) {
2479        if self.instrument_handlers.contains_key(&pattern) {
2480            log::warn!(
2481                "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2482                self.actor_id
2483            );
2484            return;
2485        }
2486        self.instrument_handlers.insert(pattern, handler.clone());
2487        msgbus::subscribe_instruments(pattern, handler, None);
2488    }
2489
2490    #[allow(dead_code)]
2491    pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2492        if let Some(handler) = self.instrument_handlers.remove(&pattern) {
2493            msgbus::unsubscribe_instruments(pattern, &handler);
2494        }
2495    }
2496
2497    pub(crate) fn add_instrument_close_subscription(
2498        &mut self,
2499        topic: MStr<Topic>,
2500        handler: ShareableMessageHandler,
2501    ) {
2502        let pattern: MStr<Pattern> = topic.into();
2503        if self.topic_handlers.contains_key(&pattern) {
2504            log::warn!(
2505                "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2506                self.actor_id
2507            );
2508            return;
2509        }
2510        self.topic_handlers.insert(pattern, handler.clone());
2511        msgbus::subscribe_any(pattern, handler, None);
2512    }
2513
2514    #[allow(dead_code)]
2515    pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2516        let pattern: MStr<Pattern> = topic.into();
2517        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2518            msgbus::unsubscribe_any(pattern, &handler);
2519        }
2520    }
2521
2522    pub(crate) fn add_book_snapshot_subscription(
2523        &mut self,
2524        topic: MStr<Topic>,
2525        handler: TypedHandler<OrderBook>,
2526    ) {
2527        if self.book_handlers.contains_key(&topic) {
2528            log::warn!(
2529                "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2530                self.actor_id
2531            );
2532            return;
2533        }
2534        self.book_handlers.insert(topic, handler.clone());
2535        msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2536    }
2537
2538    #[allow(dead_code)]
2539    pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2540        if let Some(handler) = self.book_handlers.remove(&topic) {
2541            msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2542        }
2543    }
2544
2545    pub(crate) fn add_mark_price_subscription(
2546        &mut self,
2547        topic: MStr<Topic>,
2548        handler: TypedHandler<MarkPriceUpdate>,
2549    ) {
2550        if self.mark_price_handlers.contains_key(&topic) {
2551            log::warn!(
2552                "Actor {} attempted duplicate mark price subscription to '{topic}'",
2553                self.actor_id
2554            );
2555            return;
2556        }
2557        self.mark_price_handlers.insert(topic, handler.clone());
2558        msgbus::subscribe_mark_prices(topic.into(), handler, None);
2559    }
2560
2561    #[allow(dead_code)]
2562    pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2563        if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2564            msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2565        }
2566    }
2567
2568    pub(crate) fn add_index_price_subscription(
2569        &mut self,
2570        topic: MStr<Topic>,
2571        handler: TypedHandler<IndexPriceUpdate>,
2572    ) {
2573        if self.index_price_handlers.contains_key(&topic) {
2574            log::warn!(
2575                "Actor {} attempted duplicate index price subscription to '{topic}'",
2576                self.actor_id
2577            );
2578            return;
2579        }
2580        self.index_price_handlers.insert(topic, handler.clone());
2581        msgbus::subscribe_index_prices(topic.into(), handler, None);
2582    }
2583
2584    #[allow(dead_code)]
2585    pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2586        if let Some(handler) = self.index_price_handlers.remove(&topic) {
2587            msgbus::unsubscribe_index_prices(topic.into(), &handler);
2588        }
2589    }
2590
2591    pub(crate) fn add_funding_rate_subscription(
2592        &mut self,
2593        topic: MStr<Topic>,
2594        handler: TypedHandler<FundingRateUpdate>,
2595    ) {
2596        if self.funding_rate_handlers.contains_key(&topic) {
2597            log::warn!(
2598                "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2599                self.actor_id
2600            );
2601            return;
2602        }
2603        self.funding_rate_handlers.insert(topic, handler.clone());
2604        msgbus::subscribe_funding_rates(topic.into(), handler, None);
2605    }
2606
2607    #[allow(dead_code)]
2608    pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2609        if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2610            msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2611        }
2612    }
2613
2614    pub(crate) fn add_option_greeks_subscription(
2615        &mut self,
2616        topic: MStr<Topic>,
2617        handler: TypedHandler<OptionGreeks>,
2618    ) {
2619        if self.option_greeks_handlers.contains_key(&topic) {
2620            log::warn!(
2621                "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2622                self.actor_id
2623            );
2624            return;
2625        }
2626        self.option_greeks_handlers.insert(topic, handler.clone());
2627        msgbus::subscribe_option_greeks(topic.into(), handler, None);
2628    }
2629
2630    #[allow(dead_code)]
2631    pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2632        if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2633            msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2634        }
2635    }
2636
2637    pub(crate) fn add_option_chain_subscription(
2638        &mut self,
2639        topic: MStr<Topic>,
2640        handler: TypedHandler<OptionChainSlice>,
2641    ) {
2642        if self.option_chain_handlers.contains_key(&topic) {
2643            log::warn!(
2644                "Actor {} attempted duplicate option chain subscription to '{topic}'",
2645                self.actor_id
2646            );
2647            return;
2648        }
2649        self.option_chain_handlers.insert(topic, handler.clone());
2650        msgbus::subscribe_option_chain(topic.into(), handler, None);
2651    }
2652
2653    pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2654        if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2655            msgbus::unsubscribe_option_chain(topic.into(), &handler);
2656        }
2657    }
2658
2659    #[cfg(feature = "defi")]
2660    pub(crate) fn add_block_subscription(
2661        &mut self,
2662        topic: MStr<Topic>,
2663        handler: TypedHandler<Block>,
2664    ) {
2665        if self.block_handlers.contains_key(&topic) {
2666            log::warn!(
2667                "Actor {} attempted duplicate block subscription to '{topic}'",
2668                self.actor_id
2669            );
2670            return;
2671        }
2672        self.block_handlers.insert(topic, handler.clone());
2673        msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2674    }
2675
2676    #[cfg(feature = "defi")]
2677    #[allow(dead_code)]
2678    pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2679        if let Some(handler) = self.block_handlers.remove(&topic) {
2680            msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2681        }
2682    }
2683
2684    #[cfg(feature = "defi")]
2685    pub(crate) fn add_pool_subscription(
2686        &mut self,
2687        topic: MStr<Topic>,
2688        handler: TypedHandler<Pool>,
2689    ) {
2690        if self.pool_handlers.contains_key(&topic) {
2691            log::warn!(
2692                "Actor {} attempted duplicate pool subscription to '{topic}'",
2693                self.actor_id
2694            );
2695            return;
2696        }
2697        self.pool_handlers.insert(topic, handler.clone());
2698        msgbus::subscribe_defi_pools(topic.into(), handler, None);
2699    }
2700
2701    #[cfg(feature = "defi")]
2702    #[allow(dead_code)]
2703    pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2704        if let Some(handler) = self.pool_handlers.remove(&topic) {
2705            msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2706        }
2707    }
2708
2709    #[cfg(feature = "defi")]
2710    pub(crate) fn add_pool_swap_subscription(
2711        &mut self,
2712        topic: MStr<Topic>,
2713        handler: TypedHandler<PoolSwap>,
2714    ) {
2715        if self.pool_swap_handlers.contains_key(&topic) {
2716            log::warn!(
2717                "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2718                self.actor_id
2719            );
2720            return;
2721        }
2722        self.pool_swap_handlers.insert(topic, handler.clone());
2723        msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2724    }
2725
2726    #[cfg(feature = "defi")]
2727    #[allow(dead_code)]
2728    pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2729        if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2730            msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2731        }
2732    }
2733
2734    #[cfg(feature = "defi")]
2735    pub(crate) fn add_pool_liquidity_subscription(
2736        &mut self,
2737        topic: MStr<Topic>,
2738        handler: TypedHandler<PoolLiquidityUpdate>,
2739    ) {
2740        if self.pool_liquidity_handlers.contains_key(&topic) {
2741            log::warn!(
2742                "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2743                self.actor_id
2744            );
2745            return;
2746        }
2747        self.pool_liquidity_handlers.insert(topic, handler.clone());
2748        msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2749    }
2750
2751    #[cfg(feature = "defi")]
2752    #[allow(dead_code)]
2753    pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2754        if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2755            msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2756        }
2757    }
2758
2759    #[cfg(feature = "defi")]
2760    pub(crate) fn add_pool_collect_subscription(
2761        &mut self,
2762        topic: MStr<Topic>,
2763        handler: TypedHandler<PoolFeeCollect>,
2764    ) {
2765        if self.pool_collect_handlers.contains_key(&topic) {
2766            log::warn!(
2767                "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2768                self.actor_id
2769            );
2770            return;
2771        }
2772        self.pool_collect_handlers.insert(topic, handler.clone());
2773        msgbus::subscribe_defi_collects(topic.into(), handler, None);
2774    }
2775
2776    #[cfg(feature = "defi")]
2777    #[allow(dead_code)]
2778    pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2779        if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2780            msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2781        }
2782    }
2783
2784    #[cfg(feature = "defi")]
2785    pub(crate) fn add_pool_flash_subscription(
2786        &mut self,
2787        topic: MStr<Topic>,
2788        handler: TypedHandler<PoolFlash>,
2789    ) {
2790        if self.pool_flash_handlers.contains_key(&topic) {
2791            log::warn!(
2792                "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2793                self.actor_id
2794            );
2795            return;
2796        }
2797        self.pool_flash_handlers.insert(topic, handler.clone());
2798        msgbus::subscribe_defi_flash(topic.into(), handler, None);
2799    }
2800
2801    #[cfg(feature = "defi")]
2802    #[allow(dead_code)]
2803    pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2804        if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2805            msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2806        }
2807    }
2808
2809    /// Creates a new [`DataActorCore`] instance.
2810    pub fn new(config: DataActorConfig) -> Self {
2811        let actor_id = config
2812            .actor_id
2813            .unwrap_or_else(|| Self::default_actor_id(&config));
2814
2815        Self {
2816            actor_id,
2817            config,
2818            trader_id: None, // None until registered
2819            clock: None,     // None until registered
2820            cache: None,     // None until registered
2821            state: ComponentState::default(),
2822            topic_handlers: AHashMap::new(),
2823            instrument_handlers: AHashMap::new(),
2824            deltas_handlers: AHashMap::new(),
2825            depth10_handlers: AHashMap::new(),
2826            book_handlers: AHashMap::new(),
2827            quote_handlers: AHashMap::new(),
2828            trade_handlers: AHashMap::new(),
2829            bar_handlers: AHashMap::new(),
2830            mark_price_handlers: AHashMap::new(),
2831            index_price_handlers: AHashMap::new(),
2832            funding_rate_handlers: AHashMap::new(),
2833            option_greeks_handlers: AHashMap::new(),
2834            option_chain_handlers: AHashMap::new(),
2835            order_event_handlers: AHashMap::new(),
2836            #[cfg(feature = "defi")]
2837            block_handlers: AHashMap::new(),
2838            #[cfg(feature = "defi")]
2839            pool_handlers: AHashMap::new(),
2840            #[cfg(feature = "defi")]
2841            pool_swap_handlers: AHashMap::new(),
2842            #[cfg(feature = "defi")]
2843            pool_liquidity_handlers: AHashMap::new(),
2844            #[cfg(feature = "defi")]
2845            pool_collect_handlers: AHashMap::new(),
2846            #[cfg(feature = "defi")]
2847            pool_flash_handlers: AHashMap::new(),
2848            warning_events: AHashSet::new(),
2849            pending_requests: AHashMap::new(),
2850            signal_classes: AHashMap::new(),
2851            #[cfg(feature = "indicators")]
2852            indicators: Indicators::default(),
2853        }
2854    }
2855
2856    /// Returns the memory address of this instance as a hexadecimal string.
2857    #[must_use]
2858    pub fn mem_address(&self) -> String {
2859        format!("{self:p}")
2860    }
2861
2862    /// Returns the actors state.
2863    pub fn state(&self) -> ComponentState {
2864        self.state
2865    }
2866
2867    /// Returns the trader ID this actor is registered to.
2868    pub fn trader_id(&self) -> Option<TraderId> {
2869        self.trader_id
2870    }
2871
2872    /// Returns the actors ID.
2873    pub fn actor_id(&self) -> ActorId {
2874        self.actor_id
2875    }
2876
2877    fn default_actor_id(config: &DataActorConfig) -> ActorId {
2878        let memory_address = std::ptr::from_ref(config) as usize;
2879        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2880    }
2881
2882    /// Returns a UNIX nanoseconds timestamp from the actor's internal clock.
2883    pub fn timestamp_ns(&self) -> UnixNanos {
2884        self.clock_ref().timestamp_ns()
2885    }
2886
2887    /// Returns the clock for the actor (if registered).
2888    ///
2889    /// # Panics
2890    ///
2891    /// Panics if the actor has not been registered with a trader.
2892    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2893        self.clock
2894            .as_ref()
2895            .unwrap_or_else(|| {
2896                panic!(
2897                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2898                    self.actor_id, self.trader_id
2899                )
2900            })
2901            .borrow_mut()
2902    }
2903
2904    /// Returns a clone of the reference-counted clock.
2905    ///
2906    /// # Panics
2907    ///
2908    /// Panics if the actor has not yet been registered (clock is `None`).
2909    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2910        self.clock
2911            .as_ref()
2912            .expect("DataActor must be registered before accessing clock")
2913            .clone()
2914    }
2915
2916    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2917        self.clock
2918            .as_ref()
2919            .unwrap_or_else(|| {
2920                panic!(
2921                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2922                    self.actor_id, self.trader_id
2923                )
2924            })
2925            .borrow()
2926    }
2927
2928    /// Returns a read-only reference to the cache.
2929    ///
2930    /// # Panics
2931    ///
2932    /// Panics if the actor has not yet been registered (cache is `None`).
2933    pub fn cache(&self) -> Ref<'_, Cache> {
2934        self.cache
2935            .as_ref()
2936            .expect("DataActor must be registered before accessing cache")
2937            .borrow()
2938    }
2939
2940    /// Returns a clone of the reference-counted cache.
2941    ///
2942    /// # Panics
2943    ///
2944    /// Panics if the actor has not yet been registered (cache is `None`).
2945    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2946        self.cache
2947            .as_ref()
2948            .expect("DataActor must be registered before accessing cache")
2949            .clone()
2950    }
2951
2952    /// Register the data actor with a trader.
2953    ///
2954    /// # Errors
2955    ///
2956    /// Returns an error if the actor has already been registered with a trader
2957    /// or if the provided dependencies are invalid.
2958    pub fn register(
2959        &mut self,
2960        trader_id: TraderId,
2961        clock: Rc<RefCell<dyn Clock>>,
2962        cache: Rc<RefCell<Cache>>,
2963    ) -> anyhow::Result<()> {
2964        if let Some(existing_trader_id) = self.trader_id {
2965            anyhow::bail!(
2966                "DataActor {} already registered with trader {existing_trader_id}",
2967                self.actor_id
2968            );
2969        }
2970
2971        // Validate clock by attempting to access it
2972        {
2973            let _timestamp = clock.borrow().timestamp_ns();
2974        }
2975
2976        // Validate cache by attempting to access it
2977        {
2978            let _cache_borrow = cache.borrow();
2979        }
2980
2981        self.trader_id = Some(trader_id);
2982        self.clock = Some(clock);
2983        self.cache = Some(cache);
2984
2985        // Verify complete registration
2986        if !self.is_properly_registered() {
2987            anyhow::bail!(
2988                "DataActor {} registration incomplete - validation failed",
2989                self.actor_id
2990            );
2991        }
2992
2993        log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2994        Ok(())
2995    }
2996
2997    /// Register an event type for warning log levels.
2998    pub fn register_warning_event(&mut self, event_type: &str) {
2999        self.warning_events.insert(event_type.to_string());
3000        log::debug!("Registered event type '{event_type}' for warning logs");
3001    }
3002
3003    /// Deregister an event type from warning log levels.
3004    pub fn deregister_warning_event(&mut self, event_type: &str) {
3005        self.warning_events.remove(event_type);
3006        log::debug!("Deregistered event type '{event_type}' from warning logs");
3007    }
3008
3009    pub fn is_registered(&self) -> bool {
3010        self.trader_id.is_some()
3011    }
3012
3013    pub(crate) fn check_registered(&self) {
3014        assert!(
3015            self.is_registered(),
3016            "Actor has not been registered with a Trader"
3017        );
3018    }
3019
3020    /// Validates registration state without panicking.
3021    fn is_properly_registered(&self) -> bool {
3022        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
3023    }
3024
3025    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
3026        if self.config.log_commands {
3027            log::info!("{CMD}{SEND} {command:?}");
3028        }
3029
3030        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3031        msgbus::send_data_command(endpoint, command);
3032    }
3033
3034    #[allow(dead_code)]
3035    fn send_data_req(&self, request: &RequestCommand) {
3036        if self.config.log_commands {
3037            log::info!("{REQ}{SEND} {request:?}");
3038        }
3039
3040        // For now, simplified approach - data requests without dynamic handlers
3041        // TODO: Implement proper dynamic dispatch for response handlers
3042        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3043        msgbus::send_any(endpoint, request.as_any());
3044    }
3045
3046    /// Sends a shutdown command to the system with an optional reason.
3047    ///
3048    /// # Panics
3049    ///
3050    /// Panics if the actor is not registered or has no trader ID.
3051    pub fn shutdown_system(&self, reason: Option<String>) {
3052        self.check_registered();
3053
3054        // Checked registered before unwrapping trader ID
3055        let command = ShutdownSystem::new(
3056            self.trader_id().unwrap(),
3057            self.actor_id.inner(),
3058            reason,
3059            UUID4::new(),
3060            self.timestamp_ns(),
3061        );
3062
3063        let topic = MessagingSwitchboard::shutdown_system_topic();
3064        msgbus::publish_any(topic, command.as_any());
3065    }
3066
3067    /// Publishes `data` on the message bus under the topic derived from `data_type`.
3068    ///
3069    /// `data_type` is kept as an explicit parameter (rather than deriving it from
3070    /// `data.data_type`) to mirror the v1 Python `publish_data(data_type, data)` API and
3071    /// to allow callers to override the routing topic from the payload's intrinsic type.
3072    ///
3073    /// # Panics
3074    ///
3075    /// Panics if the actor is not registered with a trader.
3076    pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3077        self.check_registered();
3078
3079        let topic = get_custom_topic(data_type);
3080        msgbus::publish_any(topic, data);
3081    }
3082
3083    /// Publishes a [`Signal`] constructed from `name` and `value`, wrapped in [`CustomData`]
3084    /// so it is consumed by signal subscribers and by any `CustomData`-aware pipeline
3085    /// (for example the feather persistence writer).
3086    ///
3087    /// The topic mirrors the v1 Python scheme `data.Signal<TitleName>` so subscribers
3088    /// using either a specific name or the global wildcard are both notified.
3089    /// If `ts_event` is zero then the current clock timestamp is used.
3090    ///
3091    /// # Panics
3092    ///
3093    /// Panics if the actor is not registered with a trader.
3094    pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3095        self.check_registered();
3096
3097        let now = self.timestamp_ns();
3098        let ts_event = if ts_event.as_u64() == 0 {
3099            now
3100        } else {
3101            ts_event
3102        };
3103        let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3104
3105        let data_type = DataType::new(
3106            &format!(
3107                "Signal{}",
3108                nautilus_core::string::conversions::title_case(name)
3109            ),
3110            None,
3111            None,
3112        );
3113        let data = CustomData::new(Arc::new(signal), data_type);
3114        let topic = get_custom_topic(&data.data_type);
3115        msgbus::publish_any(topic, &data);
3116    }
3117
3118    /// Adds the `synthetic` instrument to the cache.
3119    ///
3120    /// # Errors
3121    ///
3122    /// Returns an error if a synthetic with the same ID already exists, or if the
3123    /// backing cache fails to persist it. Panics if the actor is not registered
3124    /// with a trader. // panics-doc-ok
3125    pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3126        self.check_registered();
3127
3128        let cache = self.cache_rc();
3129        if cache.borrow().synthetic(&synthetic.id).is_some() {
3130            anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3131        }
3132        cache.borrow_mut().add_synthetic(synthetic)
3133    }
3134
3135    /// Updates the `synthetic` instrument in the cache, replacing the existing entry.
3136    ///
3137    /// # Errors
3138    ///
3139    /// Returns an error if no synthetic with the same ID already exists, or if the
3140    /// backing cache fails to persist the replacement. Panics if the actor is not
3141    /// registered with a trader. // panics-doc-ok
3142    pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3143        self.check_registered();
3144
3145        let cache = self.cache_rc();
3146        if cache.borrow().synthetic(&synthetic.id).is_none() {
3147            anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3148        }
3149        cache.borrow_mut().add_synthetic(synthetic)
3150    }
3151
3152    /// Helper method for registering data subscriptions from the trait.
3153    ///
3154    /// # Panics
3155    ///
3156    /// Panics if the actor is not properly registered.
3157    pub fn subscribe_data(
3158        &mut self,
3159        handler: ShareableMessageHandler,
3160        data_type: DataType,
3161        client_id: Option<ClientId>,
3162        params: Option<Params>,
3163    ) {
3164        assert!(
3165            self.is_properly_registered(),
3166            "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3167            self.actor_id,
3168            self.trader_id,
3169            self.clock.is_some(),
3170            self.cache.is_some()
3171        );
3172
3173        let topic = get_custom_topic(&data_type);
3174        self.add_subscription_any(topic, handler);
3175
3176        // If no client ID specified, just subscribe to the topic
3177        if client_id.is_none() {
3178            return;
3179        }
3180
3181        let command = SubscribeCommand::Data(SubscribeCustomData {
3182            data_type,
3183            client_id,
3184            venue: None,
3185            command_id: UUID4::new(),
3186            ts_init: self.timestamp_ns(),
3187            correlation_id: None,
3188            params,
3189        });
3190
3191        self.send_data_cmd(DataCommand::Subscribe(command));
3192    }
3193
3194    /// Helper method for registering signal subscriptions from the trait.
3195    ///
3196    /// An empty `name` subscribes to every signal via the `data.Signal*` wildcard pattern.
3197    ///
3198    /// # Panics
3199    ///
3200    /// Panics if the actor is not registered with a trader.
3201    pub fn subscribe_signal(
3202        &mut self,
3203        handler: ShareableMessageHandler,
3204        name: &str,
3205        priority: Option<u32>,
3206    ) {
3207        self.check_registered();
3208
3209        let pattern = get_signal_pattern(name);
3210        if self.topic_handlers.contains_key(&pattern) {
3211            log::warn!(
3212                "Actor {} attempted duplicate signal subscription to '{pattern}'",
3213                self.actor_id,
3214            );
3215            return;
3216        }
3217        self.topic_handlers.insert(pattern, handler.clone());
3218        msgbus::subscribe_any(pattern, handler, priority);
3219    }
3220
3221    /// Helper method for registering quotes subscriptions from the trait.
3222    pub fn subscribe_quotes(
3223        &mut self,
3224        topic: MStr<Topic>,
3225        handler: TypedHandler<QuoteTick>,
3226        instrument_id: InstrumentId,
3227        client_id: Option<ClientId>,
3228        params: Option<Params>,
3229    ) {
3230        self.check_registered();
3231
3232        self.add_quote_subscription(topic, handler);
3233
3234        let command = SubscribeCommand::Quotes(SubscribeQuotes {
3235            instrument_id,
3236            client_id,
3237            venue: Some(instrument_id.venue),
3238            command_id: UUID4::new(),
3239            ts_init: self.timestamp_ns(),
3240            correlation_id: None,
3241            params,
3242        });
3243
3244        self.send_data_cmd(DataCommand::Subscribe(command));
3245    }
3246
3247    /// Helper method for registering instruments subscriptions from the trait.
3248    pub fn subscribe_instruments(
3249        &mut self,
3250        pattern: MStr<Pattern>,
3251        handler: TypedHandler<InstrumentAny>,
3252        venue: Venue,
3253        client_id: Option<ClientId>,
3254        params: Option<Params>,
3255    ) {
3256        self.check_registered();
3257
3258        self.add_instrument_subscription(pattern, handler);
3259
3260        let command = SubscribeCommand::Instruments(SubscribeInstruments {
3261            client_id,
3262            venue,
3263            command_id: UUID4::new(),
3264            ts_init: self.timestamp_ns(),
3265            correlation_id: None,
3266            params,
3267        });
3268
3269        self.send_data_cmd(DataCommand::Subscribe(command));
3270    }
3271
3272    /// Helper method for registering instrument subscriptions from the trait.
3273    pub fn subscribe_instrument(
3274        &mut self,
3275        topic: MStr<Topic>,
3276        handler: TypedHandler<InstrumentAny>,
3277        instrument_id: InstrumentId,
3278        client_id: Option<ClientId>,
3279        params: Option<Params>,
3280    ) {
3281        self.check_registered();
3282
3283        self.add_instrument_subscription(topic.into(), handler);
3284
3285        let command = SubscribeCommand::Instrument(SubscribeInstrument {
3286            instrument_id,
3287            client_id,
3288            venue: Some(instrument_id.venue),
3289            command_id: UUID4::new(),
3290            ts_init: self.timestamp_ns(),
3291            correlation_id: None,
3292            params,
3293        });
3294
3295        self.send_data_cmd(DataCommand::Subscribe(command));
3296    }
3297
3298    /// Helper method for registering book deltas subscriptions from the trait.
3299    #[expect(clippy::too_many_arguments)]
3300    pub fn subscribe_book_deltas(
3301        &mut self,
3302        pattern: MStr<Pattern>,
3303        handler: TypedHandler<OrderBookDeltas>,
3304        instrument_id: InstrumentId,
3305        book_type: BookType,
3306        depth: Option<NonZeroUsize>,
3307        client_id: Option<ClientId>,
3308        managed: bool,
3309        params: Option<Params>,
3310    ) {
3311        self.check_registered();
3312
3313        self.add_deltas_subscription(pattern, handler);
3314
3315        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3316            instrument_id,
3317            book_type,
3318            client_id,
3319            venue: Some(instrument_id.venue),
3320            command_id: UUID4::new(),
3321            ts_init: self.timestamp_ns(),
3322            depth,
3323            managed,
3324            correlation_id: None,
3325            params,
3326        });
3327
3328        self.send_data_cmd(DataCommand::Subscribe(command));
3329    }
3330
3331    /// Helper method for registering book snapshots subscriptions from the trait.
3332    #[expect(clippy::too_many_arguments)]
3333    pub fn subscribe_book_at_interval(
3334        &mut self,
3335        topic: MStr<Topic>,
3336        handler: TypedHandler<OrderBook>,
3337        instrument_id: InstrumentId,
3338        book_type: BookType,
3339        depth: Option<NonZeroUsize>,
3340        interval_ms: NonZeroUsize,
3341        client_id: Option<ClientId>,
3342        params: Option<Params>,
3343    ) {
3344        self.check_registered();
3345
3346        self.add_book_snapshot_subscription(topic, handler);
3347
3348        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3349            instrument_id,
3350            book_type,
3351            client_id,
3352            venue: Some(instrument_id.venue),
3353            command_id: UUID4::new(),
3354            ts_init: self.timestamp_ns(),
3355            depth,
3356            interval_ms,
3357            correlation_id: None,
3358            params,
3359        });
3360
3361        self.send_data_cmd(DataCommand::Subscribe(command));
3362    }
3363
3364    /// Helper method for registering trades subscriptions from the trait.
3365    pub fn subscribe_trades(
3366        &mut self,
3367        topic: MStr<Topic>,
3368        handler: TypedHandler<TradeTick>,
3369        instrument_id: InstrumentId,
3370        client_id: Option<ClientId>,
3371        params: Option<Params>,
3372    ) {
3373        self.check_registered();
3374
3375        self.add_trade_subscription(topic, handler);
3376
3377        let command = SubscribeCommand::Trades(SubscribeTrades {
3378            instrument_id,
3379            client_id,
3380            venue: Some(instrument_id.venue),
3381            command_id: UUID4::new(),
3382            ts_init: self.timestamp_ns(),
3383            correlation_id: None,
3384            params,
3385        });
3386
3387        self.send_data_cmd(DataCommand::Subscribe(command));
3388    }
3389
3390    /// Helper method for registering bars subscriptions from the trait.
3391    pub fn subscribe_bars(
3392        &mut self,
3393        topic: MStr<Topic>,
3394        handler: TypedHandler<Bar>,
3395        bar_type: BarType,
3396        client_id: Option<ClientId>,
3397        params: Option<Params>,
3398    ) {
3399        self.check_registered();
3400
3401        self.add_bar_subscription(topic, handler);
3402
3403        let command = SubscribeCommand::Bars(SubscribeBars {
3404            bar_type,
3405            client_id,
3406            venue: Some(bar_type.instrument_id().venue),
3407            command_id: UUID4::new(),
3408            ts_init: self.timestamp_ns(),
3409            correlation_id: None,
3410            params,
3411        });
3412
3413        self.send_data_cmd(DataCommand::Subscribe(command));
3414    }
3415
3416    /// Helper method for registering mark prices subscriptions from the trait.
3417    pub fn subscribe_mark_prices(
3418        &mut self,
3419        topic: MStr<Topic>,
3420        handler: TypedHandler<MarkPriceUpdate>,
3421        instrument_id: InstrumentId,
3422        client_id: Option<ClientId>,
3423        params: Option<Params>,
3424    ) {
3425        self.check_registered();
3426
3427        self.add_mark_price_subscription(topic, handler);
3428
3429        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3430            instrument_id,
3431            client_id,
3432            venue: Some(instrument_id.venue),
3433            command_id: UUID4::new(),
3434            ts_init: self.timestamp_ns(),
3435            correlation_id: None,
3436            params,
3437        });
3438
3439        self.send_data_cmd(DataCommand::Subscribe(command));
3440    }
3441
3442    /// Helper method for registering index prices subscriptions from the trait.
3443    pub fn subscribe_index_prices(
3444        &mut self,
3445        topic: MStr<Topic>,
3446        handler: TypedHandler<IndexPriceUpdate>,
3447        instrument_id: InstrumentId,
3448        client_id: Option<ClientId>,
3449        params: Option<Params>,
3450    ) {
3451        self.check_registered();
3452
3453        self.add_index_price_subscription(topic, handler);
3454
3455        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3456            instrument_id,
3457            client_id,
3458            venue: Some(instrument_id.venue),
3459            command_id: UUID4::new(),
3460            ts_init: self.timestamp_ns(),
3461            correlation_id: None,
3462            params,
3463        });
3464
3465        self.send_data_cmd(DataCommand::Subscribe(command));
3466    }
3467
3468    /// Helper method for registering funding rates subscriptions from the trait.
3469    pub fn subscribe_funding_rates(
3470        &mut self,
3471        topic: MStr<Topic>,
3472        handler: TypedHandler<FundingRateUpdate>,
3473        instrument_id: InstrumentId,
3474        client_id: Option<ClientId>,
3475        params: Option<Params>,
3476    ) {
3477        self.check_registered();
3478
3479        self.add_funding_rate_subscription(topic, handler);
3480
3481        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3482            instrument_id,
3483            client_id,
3484            venue: Some(instrument_id.venue),
3485            command_id: UUID4::new(),
3486            ts_init: self.timestamp_ns(),
3487            correlation_id: None,
3488            params,
3489        });
3490
3491        self.send_data_cmd(DataCommand::Subscribe(command));
3492    }
3493
3494    /// Helper method for registering option greeks subscriptions from the trait.
3495    pub fn subscribe_option_greeks(
3496        &mut self,
3497        topic: MStr<Topic>,
3498        handler: TypedHandler<OptionGreeks>,
3499        instrument_id: InstrumentId,
3500        client_id: Option<ClientId>,
3501        params: Option<Params>,
3502    ) {
3503        self.check_registered();
3504
3505        self.add_option_greeks_subscription(topic, handler);
3506
3507        let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3508            instrument_id,
3509            client_id,
3510            venue: Some(instrument_id.venue),
3511            command_id: UUID4::new(),
3512            ts_init: self.timestamp_ns(),
3513            correlation_id: None,
3514            params,
3515        });
3516
3517        self.send_data_cmd(DataCommand::Subscribe(command));
3518    }
3519
3520    /// Helper method for registering instrument status subscriptions from the trait.
3521    pub fn subscribe_instrument_status(
3522        &mut self,
3523        topic: MStr<Topic>,
3524        handler: ShareableMessageHandler,
3525        instrument_id: InstrumentId,
3526        client_id: Option<ClientId>,
3527        params: Option<Params>,
3528    ) {
3529        self.check_registered();
3530
3531        self.add_subscription_any(topic, handler);
3532
3533        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3534            instrument_id,
3535            client_id,
3536            venue: Some(instrument_id.venue),
3537            command_id: UUID4::new(),
3538            ts_init: self.timestamp_ns(),
3539            correlation_id: None,
3540            params,
3541        });
3542
3543        self.send_data_cmd(DataCommand::Subscribe(command));
3544    }
3545
3546    /// Helper method for registering instrument close subscriptions from the trait.
3547    pub fn subscribe_instrument_close(
3548        &mut self,
3549        topic: MStr<Topic>,
3550        handler: ShareableMessageHandler,
3551        instrument_id: InstrumentId,
3552        client_id: Option<ClientId>,
3553        params: Option<Params>,
3554    ) {
3555        self.check_registered();
3556
3557        self.add_instrument_close_subscription(topic, handler);
3558
3559        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3560            instrument_id,
3561            client_id,
3562            venue: Some(instrument_id.venue),
3563            command_id: UUID4::new(),
3564            ts_init: self.timestamp_ns(),
3565            correlation_id: None,
3566            params,
3567        });
3568
3569        self.send_data_cmd(DataCommand::Subscribe(command));
3570    }
3571
3572    /// Helper method for subscribing to option chain snapshots from the trait.
3573    #[allow(clippy::too_many_arguments)]
3574    pub fn subscribe_option_chain(
3575        &mut self,
3576        topic: MStr<Topic>,
3577        handler: TypedHandler<OptionChainSlice>,
3578        series_id: OptionSeriesId,
3579        strike_range: StrikeRange,
3580        snapshot_interval_ms: Option<u64>,
3581        client_id: Option<ClientId>,
3582        params: Option<Params>,
3583    ) {
3584        self.check_registered();
3585
3586        self.add_option_chain_subscription(topic, handler);
3587
3588        let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3589            series_id,
3590            strike_range,
3591            snapshot_interval_ms,
3592            UUID4::new(),
3593            self.timestamp_ns(),
3594            client_id,
3595            Some(series_id.venue),
3596            params,
3597        ));
3598
3599        self.send_data_cmd(DataCommand::Subscribe(command));
3600    }
3601
3602    /// Helper method for registering order fills subscriptions from the trait.
3603    pub fn subscribe_order_fills(
3604        &mut self,
3605        topic: MStr<Topic>,
3606        handler: TypedHandler<OrderEventAny>,
3607    ) {
3608        self.check_registered();
3609        self.add_order_event_subscription(topic, handler);
3610    }
3611
3612    /// Helper method for registering order cancels subscriptions from the trait.
3613    pub fn subscribe_order_cancels(
3614        &mut self,
3615        topic: MStr<Topic>,
3616        handler: TypedHandler<OrderEventAny>,
3617    ) {
3618        self.check_registered();
3619        self.add_order_event_subscription(topic, handler);
3620    }
3621
3622    /// Helper method for unsubscribing from data.
3623    pub fn unsubscribe_data(
3624        &mut self,
3625        data_type: DataType,
3626        client_id: Option<ClientId>,
3627        params: Option<Params>,
3628    ) {
3629        self.check_registered();
3630
3631        let topic = get_custom_topic(&data_type);
3632        self.remove_subscription_any(topic);
3633
3634        if client_id.is_none() {
3635            return;
3636        }
3637
3638        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3639            data_type,
3640            client_id,
3641            venue: None,
3642            command_id: UUID4::new(),
3643            ts_init: self.timestamp_ns(),
3644            correlation_id: None,
3645            params,
3646        });
3647
3648        self.send_data_cmd(DataCommand::Unsubscribe(command));
3649    }
3650
3651    /// Helper method for unsubscribing from signals.
3652    ///
3653    /// # Panics
3654    ///
3655    /// Panics if the actor is not registered with a trader.
3656    pub fn unsubscribe_signal(&mut self, name: &str) {
3657        self.check_registered();
3658
3659        let pattern = get_signal_pattern(name);
3660        if let Some(handler) = self.topic_handlers.remove(&pattern) {
3661            msgbus::unsubscribe_any(pattern, &handler);
3662        } else {
3663            log::warn!(
3664                "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
3665                self.actor_id,
3666            );
3667        }
3668    }
3669
3670    /// Helper method for unsubscribing from instruments.
3671    pub fn unsubscribe_instruments(
3672        &mut self,
3673        venue: Venue,
3674        client_id: Option<ClientId>,
3675        params: Option<Params>,
3676    ) {
3677        self.check_registered();
3678
3679        let pattern = get_instruments_pattern(venue);
3680        self.remove_instrument_subscription(pattern);
3681
3682        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3683            client_id,
3684            venue,
3685            command_id: UUID4::new(),
3686            ts_init: self.timestamp_ns(),
3687            correlation_id: None,
3688            params,
3689        });
3690
3691        self.send_data_cmd(DataCommand::Unsubscribe(command));
3692    }
3693
3694    /// Helper method for unsubscribing from instrument.
3695    pub fn unsubscribe_instrument(
3696        &mut self,
3697        instrument_id: InstrumentId,
3698        client_id: Option<ClientId>,
3699        params: Option<Params>,
3700    ) {
3701        self.check_registered();
3702
3703        let topic = get_instrument_topic(instrument_id);
3704        self.remove_instrument_subscription(topic.into());
3705
3706        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3707            instrument_id,
3708            client_id,
3709            venue: Some(instrument_id.venue),
3710            command_id: UUID4::new(),
3711            ts_init: self.timestamp_ns(),
3712            correlation_id: None,
3713            params,
3714        });
3715
3716        self.send_data_cmd(DataCommand::Unsubscribe(command));
3717    }
3718
3719    /// Helper method for unsubscribing from book deltas.
3720    pub fn unsubscribe_book_deltas(
3721        &mut self,
3722        instrument_id: InstrumentId,
3723        client_id: Option<ClientId>,
3724        params: Option<Params>,
3725    ) {
3726        self.check_registered();
3727
3728        let pattern = if is_parent_subscription(params.as_ref()) {
3729            get_book_deltas_pattern(instrument_id)
3730        } else {
3731            get_book_deltas_topic(instrument_id).into()
3732        };
3733        self.remove_deltas_subscription(pattern);
3734
3735        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3736            instrument_id,
3737            client_id,
3738            venue: Some(instrument_id.venue),
3739            command_id: UUID4::new(),
3740            ts_init: self.timestamp_ns(),
3741            correlation_id: None,
3742            params,
3743        });
3744
3745        self.send_data_cmd(DataCommand::Unsubscribe(command));
3746    }
3747
3748    /// Helper method for unsubscribing from book snapshots at interval.
3749    pub fn unsubscribe_book_at_interval(
3750        &mut self,
3751        instrument_id: InstrumentId,
3752        interval_ms: NonZeroUsize,
3753        client_id: Option<ClientId>,
3754        params: Option<Params>,
3755    ) {
3756        self.check_registered();
3757
3758        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3759        self.remove_book_snapshot_subscription(topic);
3760
3761        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3762            instrument_id,
3763            interval_ms,
3764            client_id,
3765            venue: Some(instrument_id.venue),
3766            command_id: UUID4::new(),
3767            ts_init: self.timestamp_ns(),
3768            correlation_id: None,
3769            params,
3770        });
3771
3772        self.send_data_cmd(DataCommand::Unsubscribe(command));
3773    }
3774
3775    /// Helper method for unsubscribing from quotes.
3776    pub fn unsubscribe_quotes(
3777        &mut self,
3778        instrument_id: InstrumentId,
3779        client_id: Option<ClientId>,
3780        params: Option<Params>,
3781    ) {
3782        self.check_registered();
3783
3784        let topic = get_quotes_topic(instrument_id);
3785        self.remove_quote_subscription(topic);
3786
3787        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3788            instrument_id,
3789            client_id,
3790            venue: Some(instrument_id.venue),
3791            command_id: UUID4::new(),
3792            ts_init: self.timestamp_ns(),
3793            correlation_id: None,
3794            params,
3795        });
3796
3797        self.send_data_cmd(DataCommand::Unsubscribe(command));
3798    }
3799
3800    /// Helper method for unsubscribing from trades.
3801    pub fn unsubscribe_trades(
3802        &mut self,
3803        instrument_id: InstrumentId,
3804        client_id: Option<ClientId>,
3805        params: Option<Params>,
3806    ) {
3807        self.check_registered();
3808
3809        let topic = get_trades_topic(instrument_id);
3810        self.remove_trade_subscription(topic);
3811
3812        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3813            instrument_id,
3814            client_id,
3815            venue: Some(instrument_id.venue),
3816            command_id: UUID4::new(),
3817            ts_init: self.timestamp_ns(),
3818            correlation_id: None,
3819            params,
3820        });
3821
3822        self.send_data_cmd(DataCommand::Unsubscribe(command));
3823    }
3824
3825    /// Helper method for unsubscribing from bars.
3826    pub fn unsubscribe_bars(
3827        &mut self,
3828        bar_type: BarType,
3829        client_id: Option<ClientId>,
3830        params: Option<Params>,
3831    ) {
3832        self.check_registered();
3833
3834        let topic = get_bars_topic(bar_type);
3835        self.remove_bar_subscription(topic);
3836
3837        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3838            bar_type,
3839            client_id,
3840            venue: Some(bar_type.instrument_id().venue),
3841            command_id: UUID4::new(),
3842            ts_init: self.timestamp_ns(),
3843            correlation_id: None,
3844            params,
3845        });
3846
3847        self.send_data_cmd(DataCommand::Unsubscribe(command));
3848    }
3849
3850    /// Helper method for unsubscribing from mark prices.
3851    pub fn unsubscribe_mark_prices(
3852        &mut self,
3853        instrument_id: InstrumentId,
3854        client_id: Option<ClientId>,
3855        params: Option<Params>,
3856    ) {
3857        self.check_registered();
3858
3859        let topic = get_mark_price_topic(instrument_id);
3860        self.remove_mark_price_subscription(topic);
3861
3862        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3863            instrument_id,
3864            client_id,
3865            venue: Some(instrument_id.venue),
3866            command_id: UUID4::new(),
3867            ts_init: self.timestamp_ns(),
3868            correlation_id: None,
3869            params,
3870        });
3871
3872        self.send_data_cmd(DataCommand::Unsubscribe(command));
3873    }
3874
3875    /// Helper method for unsubscribing from index prices.
3876    pub fn unsubscribe_index_prices(
3877        &mut self,
3878        instrument_id: InstrumentId,
3879        client_id: Option<ClientId>,
3880        params: Option<Params>,
3881    ) {
3882        self.check_registered();
3883
3884        let topic = get_index_price_topic(instrument_id);
3885        self.remove_index_price_subscription(topic);
3886
3887        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3888            instrument_id,
3889            client_id,
3890            venue: Some(instrument_id.venue),
3891            command_id: UUID4::new(),
3892            ts_init: self.timestamp_ns(),
3893            correlation_id: None,
3894            params,
3895        });
3896
3897        self.send_data_cmd(DataCommand::Unsubscribe(command));
3898    }
3899
3900    /// Helper method for unsubscribing from funding rates.
3901    pub fn unsubscribe_funding_rates(
3902        &mut self,
3903        instrument_id: InstrumentId,
3904        client_id: Option<ClientId>,
3905        params: Option<Params>,
3906    ) {
3907        self.check_registered();
3908
3909        let topic = get_funding_rate_topic(instrument_id);
3910        self.remove_funding_rate_subscription(topic);
3911
3912        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3913            instrument_id,
3914            client_id,
3915            venue: Some(instrument_id.venue),
3916            command_id: UUID4::new(),
3917            ts_init: self.timestamp_ns(),
3918            correlation_id: None,
3919            params,
3920        });
3921
3922        self.send_data_cmd(DataCommand::Unsubscribe(command));
3923    }
3924
3925    /// Helper method for unsubscribing from option greeks.
3926    pub fn unsubscribe_option_greeks(
3927        &mut self,
3928        instrument_id: InstrumentId,
3929        client_id: Option<ClientId>,
3930        params: Option<Params>,
3931    ) {
3932        self.check_registered();
3933
3934        let topic = get_option_greeks_topic(instrument_id);
3935        self.remove_option_greeks_subscription(topic);
3936
3937        let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
3938            instrument_id,
3939            client_id,
3940            venue: Some(instrument_id.venue),
3941            command_id: UUID4::new(),
3942            ts_init: self.timestamp_ns(),
3943            correlation_id: None,
3944            params,
3945        });
3946
3947        self.send_data_cmd(DataCommand::Unsubscribe(command));
3948    }
3949
3950    /// Helper method for unsubscribing from instrument status.
3951    pub fn unsubscribe_instrument_status(
3952        &mut self,
3953        instrument_id: InstrumentId,
3954        client_id: Option<ClientId>,
3955        params: Option<Params>,
3956    ) {
3957        self.check_registered();
3958
3959        let topic = get_instrument_status_topic(instrument_id);
3960        self.remove_subscription_any(topic);
3961
3962        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3963            instrument_id,
3964            client_id,
3965            venue: Some(instrument_id.venue),
3966            command_id: UUID4::new(),
3967            ts_init: self.timestamp_ns(),
3968            correlation_id: None,
3969            params,
3970        });
3971
3972        self.send_data_cmd(DataCommand::Unsubscribe(command));
3973    }
3974
3975    /// Helper method for unsubscribing from instrument close.
3976    pub fn unsubscribe_instrument_close(
3977        &mut self,
3978        instrument_id: InstrumentId,
3979        client_id: Option<ClientId>,
3980        params: Option<Params>,
3981    ) {
3982        self.check_registered();
3983
3984        let topic = get_instrument_close_topic(instrument_id);
3985        self.remove_instrument_close_subscription(topic);
3986
3987        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3988            instrument_id,
3989            client_id,
3990            venue: Some(instrument_id.venue),
3991            command_id: UUID4::new(),
3992            ts_init: self.timestamp_ns(),
3993            correlation_id: None,
3994            params,
3995        });
3996
3997        self.send_data_cmd(DataCommand::Unsubscribe(command));
3998    }
3999
4000    /// Helper method for unsubscribing from option chain snapshots.
4001    pub fn unsubscribe_option_chain(
4002        &mut self,
4003        series_id: OptionSeriesId,
4004        client_id: Option<ClientId>,
4005    ) {
4006        self.check_registered();
4007
4008        let topic = get_option_chain_topic(series_id);
4009        self.remove_option_chain_subscription(topic);
4010
4011        let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
4012            series_id,
4013            UUID4::new(),
4014            self.timestamp_ns(),
4015            client_id,
4016            Some(series_id.venue),
4017        ));
4018
4019        self.send_data_cmd(DataCommand::Unsubscribe(command));
4020    }
4021
4022    /// Helper method for unsubscribing from order fills.
4023    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
4024        self.check_registered();
4025
4026        let topic = get_order_fills_topic(instrument_id);
4027        self.remove_order_event_subscription(topic);
4028    }
4029
4030    /// Helper method for unsubscribing from order cancels.
4031    pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
4032        self.check_registered();
4033
4034        let topic = get_order_cancels_topic(instrument_id);
4035        self.remove_order_event_subscription(topic);
4036    }
4037
4038    /// Helper method for requesting data.
4039    ///
4040    /// # Errors
4041    ///
4042    /// Returns an error if input parameters are invalid.
4043    #[expect(clippy::too_many_arguments)]
4044    pub fn request_data(
4045        &self,
4046        data_type: DataType,
4047        client_id: ClientId,
4048        start: Option<DateTime<Utc>>,
4049        end: Option<DateTime<Utc>>,
4050        limit: Option<NonZeroUsize>,
4051        params: Option<Params>,
4052        handler: ShareableMessageHandler,
4053    ) -> anyhow::Result<UUID4> {
4054        self.check_registered();
4055
4056        let now = self.clock_ref().utc_now();
4057        check_timestamps(now, start, end)?;
4058
4059        let request_id = UUID4::new();
4060        let command = RequestCommand::Data(RequestCustomData {
4061            client_id,
4062            data_type,
4063            start,
4064            end,
4065            limit,
4066            request_id,
4067            ts_init: self.timestamp_ns(),
4068            params,
4069        });
4070
4071        get_message_bus()
4072            .borrow_mut()
4073            .register_response_handler(command.request_id(), handler)?;
4074
4075        self.send_data_cmd(DataCommand::Request(command));
4076
4077        Ok(request_id)
4078    }
4079
4080    /// Helper method for requesting instrument.
4081    ///
4082    /// # Errors
4083    ///
4084    /// Returns an error if input parameters are invalid.
4085    pub fn request_instrument(
4086        &self,
4087        instrument_id: InstrumentId,
4088        start: Option<DateTime<Utc>>,
4089        end: Option<DateTime<Utc>>,
4090        client_id: Option<ClientId>,
4091        params: Option<Params>,
4092        handler: ShareableMessageHandler,
4093    ) -> anyhow::Result<UUID4> {
4094        self.check_registered();
4095
4096        let now = self.clock_ref().utc_now();
4097        check_timestamps(now, start, end)?;
4098
4099        let request_id = UUID4::new();
4100        let command = RequestCommand::Instrument(RequestInstrument {
4101            instrument_id,
4102            start,
4103            end,
4104            client_id,
4105            request_id,
4106            ts_init: now.into(),
4107            params,
4108        });
4109
4110        get_message_bus()
4111            .borrow_mut()
4112            .register_response_handler(command.request_id(), handler)?;
4113
4114        self.send_data_cmd(DataCommand::Request(command));
4115
4116        Ok(request_id)
4117    }
4118
4119    /// Helper method for requesting instruments.
4120    ///
4121    /// # Errors
4122    ///
4123    /// Returns an error if input parameters are invalid.
4124    pub fn request_instruments(
4125        &self,
4126        venue: Option<Venue>,
4127        start: Option<DateTime<Utc>>,
4128        end: Option<DateTime<Utc>>,
4129        client_id: Option<ClientId>,
4130        params: Option<Params>,
4131        handler: ShareableMessageHandler,
4132    ) -> anyhow::Result<UUID4> {
4133        self.check_registered();
4134
4135        let now = self.clock_ref().utc_now();
4136        check_timestamps(now, start, end)?;
4137
4138        let request_id = UUID4::new();
4139        let command = RequestCommand::Instruments(RequestInstruments {
4140            venue,
4141            start,
4142            end,
4143            client_id,
4144            request_id,
4145            ts_init: now.into(),
4146            params,
4147        });
4148
4149        get_message_bus()
4150            .borrow_mut()
4151            .register_response_handler(command.request_id(), handler)?;
4152
4153        self.send_data_cmd(DataCommand::Request(command));
4154
4155        Ok(request_id)
4156    }
4157
4158    /// Helper method for requesting book snapshot.
4159    ///
4160    /// # Errors
4161    ///
4162    /// Returns an error if input parameters are invalid.
4163    pub fn request_book_snapshot(
4164        &self,
4165        instrument_id: InstrumentId,
4166        depth: Option<NonZeroUsize>,
4167        client_id: Option<ClientId>,
4168        params: Option<Params>,
4169        handler: ShareableMessageHandler,
4170    ) -> anyhow::Result<UUID4> {
4171        self.check_registered();
4172
4173        let request_id = UUID4::new();
4174        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4175            instrument_id,
4176            depth,
4177            client_id,
4178            request_id,
4179            ts_init: self.timestamp_ns(),
4180            params,
4181        });
4182
4183        get_message_bus()
4184            .borrow_mut()
4185            .register_response_handler(command.request_id(), handler)?;
4186
4187        self.send_data_cmd(DataCommand::Request(command));
4188
4189        Ok(request_id)
4190    }
4191
4192    /// Helper method for requesting quotes.
4193    ///
4194    /// # Errors
4195    ///
4196    /// Returns an error if input parameters are invalid.
4197    #[expect(clippy::too_many_arguments)]
4198    pub fn request_quotes(
4199        &self,
4200        instrument_id: InstrumentId,
4201        start: Option<DateTime<Utc>>,
4202        end: Option<DateTime<Utc>>,
4203        limit: Option<NonZeroUsize>,
4204        client_id: Option<ClientId>,
4205        params: Option<Params>,
4206        handler: ShareableMessageHandler,
4207    ) -> anyhow::Result<UUID4> {
4208        self.check_registered();
4209
4210        let now = self.clock_ref().utc_now();
4211        check_timestamps(now, start, end)?;
4212
4213        let request_id = UUID4::new();
4214        let command = RequestCommand::Quotes(RequestQuotes {
4215            instrument_id,
4216            start,
4217            end,
4218            limit,
4219            client_id,
4220            request_id,
4221            ts_init: now.into(),
4222            params,
4223        });
4224
4225        get_message_bus()
4226            .borrow_mut()
4227            .register_response_handler(command.request_id(), handler)?;
4228
4229        self.send_data_cmd(DataCommand::Request(command));
4230
4231        Ok(request_id)
4232    }
4233
4234    /// Helper method for requesting trades.
4235    ///
4236    /// # Errors
4237    ///
4238    /// Returns an error if input parameters are invalid.
4239    #[expect(clippy::too_many_arguments)]
4240    pub fn request_trades(
4241        &self,
4242        instrument_id: InstrumentId,
4243        start: Option<DateTime<Utc>>,
4244        end: Option<DateTime<Utc>>,
4245        limit: Option<NonZeroUsize>,
4246        client_id: Option<ClientId>,
4247        params: Option<Params>,
4248        handler: ShareableMessageHandler,
4249    ) -> anyhow::Result<UUID4> {
4250        self.check_registered();
4251
4252        let now = self.clock_ref().utc_now();
4253        check_timestamps(now, start, end)?;
4254
4255        let request_id = UUID4::new();
4256        let command = RequestCommand::Trades(RequestTrades {
4257            instrument_id,
4258            start,
4259            end,
4260            limit,
4261            client_id,
4262            request_id,
4263            ts_init: now.into(),
4264            params,
4265        });
4266
4267        get_message_bus()
4268            .borrow_mut()
4269            .register_response_handler(command.request_id(), handler)?;
4270
4271        self.send_data_cmd(DataCommand::Request(command));
4272
4273        Ok(request_id)
4274    }
4275
4276    /// Helper method for requesting funding rates.
4277    ///
4278    /// # Errors
4279    ///
4280    /// Returns an error if input parameters are invalid.
4281    #[expect(clippy::too_many_arguments)]
4282    pub fn request_funding_rates(
4283        &self,
4284        instrument_id: InstrumentId,
4285        start: Option<DateTime<Utc>>,
4286        end: Option<DateTime<Utc>>,
4287        limit: Option<NonZeroUsize>,
4288        client_id: Option<ClientId>,
4289        params: Option<Params>,
4290        handler: ShareableMessageHandler,
4291    ) -> anyhow::Result<UUID4> {
4292        self.check_registered();
4293
4294        let now = self.clock_ref().utc_now();
4295        check_timestamps(now, start, end)?;
4296
4297        let request_id = UUID4::new();
4298        let command = RequestCommand::FundingRates(RequestFundingRates {
4299            instrument_id,
4300            start,
4301            end,
4302            limit,
4303            client_id,
4304            request_id,
4305            ts_init: now.into(),
4306            params,
4307        });
4308
4309        get_message_bus()
4310            .borrow_mut()
4311            .register_response_handler(command.request_id(), handler)?;
4312
4313        self.send_data_cmd(DataCommand::Request(command));
4314
4315        Ok(request_id)
4316    }
4317
4318    /// Helper method for requesting bars.
4319    ///
4320    /// # Errors
4321    ///
4322    /// Returns an error if input parameters are invalid.
4323    #[expect(clippy::too_many_arguments)]
4324    pub fn request_bars(
4325        &self,
4326        bar_type: BarType,
4327        start: Option<DateTime<Utc>>,
4328        end: Option<DateTime<Utc>>,
4329        limit: Option<NonZeroUsize>,
4330        client_id: Option<ClientId>,
4331        params: Option<Params>,
4332        handler: ShareableMessageHandler,
4333    ) -> anyhow::Result<UUID4> {
4334        self.check_registered();
4335
4336        let now = self.clock_ref().utc_now();
4337        check_timestamps(now, start, end)?;
4338
4339        let request_id = UUID4::new();
4340        let command = RequestCommand::Bars(RequestBars {
4341            bar_type,
4342            start,
4343            end,
4344            limit,
4345            client_id,
4346            request_id,
4347            ts_init: now.into(),
4348            params,
4349        });
4350
4351        get_message_bus()
4352            .borrow_mut()
4353            .register_response_handler(command.request_id(), handler)?;
4354
4355        self.send_data_cmd(DataCommand::Request(command));
4356
4357        Ok(request_id)
4358    }
4359
4360    #[cfg(test)]
4361    pub fn quote_handler_count(&self) -> usize {
4362        self.quote_handlers.len()
4363    }
4364
4365    #[cfg(test)]
4366    pub fn trade_handler_count(&self) -> usize {
4367        self.trade_handlers.len()
4368    }
4369
4370    #[cfg(test)]
4371    pub fn bar_handler_count(&self) -> usize {
4372        self.bar_handlers.len()
4373    }
4374
4375    #[cfg(test)]
4376    pub fn deltas_handler_count(&self) -> usize {
4377        self.deltas_handlers.len()
4378    }
4379
4380    #[cfg(test)]
4381    pub fn has_quote_handler(&self, topic: &str) -> bool {
4382        self.quote_handlers
4383            .contains_key(&MStr::<Topic>::from(topic))
4384    }
4385
4386    #[cfg(test)]
4387    pub fn has_trade_handler(&self, topic: &str) -> bool {
4388        self.trade_handlers
4389            .contains_key(&MStr::<Topic>::from(topic))
4390    }
4391
4392    #[cfg(test)]
4393    pub fn has_bar_handler(&self, topic: &str) -> bool {
4394        self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4395    }
4396
4397    #[cfg(test)]
4398    pub fn has_deltas_handler(&self, pattern: &str) -> bool {
4399        self.deltas_handlers
4400            .contains_key(&MStr::<Pattern>::from(pattern))
4401    }
4402}
4403
4404fn check_timestamps(
4405    now: DateTime<Utc>,
4406    start: Option<DateTime<Utc>>,
4407    end: Option<DateTime<Utc>>,
4408) -> anyhow::Result<()> {
4409    if let Some(start) = start {
4410        check_predicate_true(start <= now, "start was > now")?;
4411    }
4412
4413    if let Some(end) = end {
4414        check_predicate_true(end <= now, "end was > now")?;
4415    }
4416
4417    if let (Some(start), Some(end)) = (start, end) {
4418        check_predicate_true(start < end, "start was >= end")?;
4419    }
4420
4421    Ok(())
4422}
4423
4424fn log_error(e: &anyhow::Error) {
4425    log::error!("{e}");
4426}
4427
4428fn log_not_running<T>(msg: &T)
4429where
4430    T: Debug,
4431{
4432    log::trace!("Received message when not running - skipping {msg:?}");
4433}
4434
4435fn log_received<T>(msg: &T)
4436where
4437    T: Debug,
4438{
4439    log::debug!("{RECV} {msg:?}");
4440}
4441
4442fn log_received_bulk(kind: &str, correlation_id: &UUID4, records: usize) {
4443    log::debug!("{RECV} {kind} correlation_id={correlation_id} records={records}");
4444}