Skip to main content

nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24    sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39        close::InstrumentClose,
40        option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41    },
42    enums::BookType,
43    events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45    instruments::{InstrumentAny, SyntheticInstrument},
46    orderbook::OrderBook,
47};
48use serde::{Deserialize, Serialize};
49use ustr::Ustr;
50
51#[cfg(feature = "indicators")]
52use super::indicators::Indicators;
53use super::{
54    Actor,
55    registry::{get_actor_unchecked, try_get_actor_unchecked},
56};
57#[cfg(feature = "defi")]
58use crate::defi;
59#[cfg(feature = "defi")]
60#[allow(unused_imports)]
61use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
62use crate::{
63    cache::Cache,
64    clock::Clock,
65    component::Component,
66    enums::{ComponentState, ComponentTrigger},
67    logging::{CMD, RECV, REQ, SEND},
68    messages::{
69        data::{
70            BarsResponse, BookDeltasResponse, BookDepthResponse, BookResponse, CustomDataResponse,
71            DataCommand, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
72            QuotesResponse, RequestBars, RequestBookDeltas, RequestBookDepth, RequestBookSnapshot,
73            RequestCommand, RequestCustomData, RequestFundingRates, RequestInstrument,
74            RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
75            SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
76            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
77            SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices,
78            SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades,
79            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
80            UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
81            UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
82            UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices,
83            UnsubscribeOptionChain, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
84            is_parent_subscription,
85        },
86        system::ShutdownSystem,
87    },
88    msgbus::{
89        self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
90        switchboard::{
91            MessagingSwitchboard, get_bars_topic, get_book_deltas_pattern, get_book_deltas_topic,
92            get_book_snapshots_topic, get_custom_topic, get_funding_rate_topic,
93            get_index_price_topic, get_instrument_close_topic, get_instrument_status_topic,
94            get_instrument_topic, get_instruments_pattern, get_mark_price_topic,
95            get_option_chain_topic, get_option_greeks_topic, get_order_cancels_topic,
96            get_order_fills_topic, get_quotes_topic, get_signal_pattern, get_trades_topic,
97        },
98    },
99    signal::Signal,
100    timer::{TimeEvent, TimeEventCallback},
101};
102
103/// Common configuration for [`DataActor`] based components.
104#[derive(Debug, Clone, Deserialize, Serialize)]
105#[serde(default, deny_unknown_fields)]
106#[cfg_attr(
107    feature = "python",
108    pyo3::pyclass(
109        module = "nautilus_trader.core.nautilus_pyo3.common",
110        subclass,
111        from_py_object
112    )
113)]
114#[cfg_attr(
115    feature = "python",
116    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
117)]
118pub struct DataActorConfig {
119    /// The custom identifier for the Actor.
120    pub actor_id: Option<ActorId>,
121    /// If events should be logged.
122    pub log_events: bool,
123    /// If commands should be logged.
124    pub log_commands: bool,
125}
126
127impl Default for DataActorConfig {
128    fn default() -> Self {
129        Self {
130            actor_id: None,
131            log_events: true,
132            log_commands: true,
133        }
134    }
135}
136
137/// Configuration for creating actors from importable paths.
138#[derive(Debug, Clone, Deserialize, Serialize)]
139#[serde(deny_unknown_fields)]
140#[cfg_attr(
141    feature = "python",
142    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
143)]
144#[cfg_attr(
145    feature = "python",
146    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
147)]
148pub struct ImportableActorConfig {
149    /// The fully qualified name of the Actor class.
150    pub actor_path: String,
151    /// The fully qualified name of the Actor config class.
152    pub config_path: String,
153    /// The actor configuration as a dictionary.
154    pub config: HashMap<String, serde_json::Value>,
155}
156
157type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
158
159pub trait DataActor:
160    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
161{
162    /// Actions to be performed when the actor state is saved.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if saving the actor state fails.
167    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
168        Ok(IndexMap::new())
169    }
170
171    /// Actions to be performed when the actor state is loaded.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if loading the actor state fails.
176    #[allow(unused_variables)]
177    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
178        Ok(())
179    }
180
181    /// Actions to be performed on start.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if starting the actor fails.
186    fn on_start(&mut self) -> anyhow::Result<()> {
187        log::warn!(
188            "The `on_start` handler was called when not overridden, \
189            it's expected that any actions required when starting the actor \
190            occur here, such as subscribing/requesting data"
191        );
192        Ok(())
193    }
194
195    /// Actions to be performed on stop.
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if stopping the actor fails.
200    fn on_stop(&mut self) -> anyhow::Result<()> {
201        log::warn!(
202            "The `on_stop` handler was called when not overridden, \
203            it's expected that any actions required when stopping the actor \
204            occur here, such as unsubscribing from data",
205        );
206        Ok(())
207    }
208
209    /// Actions to be performed on resume.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if resuming the actor fails.
214    fn on_resume(&mut self) -> anyhow::Result<()> {
215        log::warn!(
216            "The `on_resume` handler was called when not overridden, \
217            it's expected that any actions required when resuming the actor \
218            following a stop occur here"
219        );
220        Ok(())
221    }
222
223    /// Actions to be performed on reset.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if resetting the actor fails.
228    fn on_reset(&mut self) -> anyhow::Result<()> {
229        log::warn!(
230            "The `on_reset` handler was called when not overridden, \
231            it's expected that any actions required when resetting the actor \
232            occur here, such as resetting indicators and other state"
233        );
234        Ok(())
235    }
236
237    /// Actions to be performed on dispose.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if disposing the actor fails.
242    fn on_dispose(&mut self) -> anyhow::Result<()> {
243        Ok(())
244    }
245
246    /// Actions to be performed on degrade.
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if degrading the actor fails.
251    fn on_degrade(&mut self) -> anyhow::Result<()> {
252        Ok(())
253    }
254
255    /// Actions to be performed on fault.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if faulting the actor fails.
260    fn on_fault(&mut self) -> anyhow::Result<()> {
261        Ok(())
262    }
263
264    /// Actions to be performed when receiving a time event.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if handling the time event fails.
269    #[allow(unused_variables)]
270    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
271        Ok(())
272    }
273
274    /// Actions to be performed when receiving custom data.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if handling the data fails.
279    #[allow(unused_variables)]
280    fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
281        Ok(())
282    }
283
284    /// Actions to be performed when receiving a signal.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if handling the signal fails.
289    #[allow(unused_variables)]
290    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
291        Ok(())
292    }
293
294    /// Actions to be performed when receiving an instrument.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if handling the instrument fails.
299    #[allow(unused_variables)]
300    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
301        Ok(())
302    }
303
304    /// Actions to be performed when receiving order book deltas.
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if handling the book deltas fails.
309    #[allow(unused_variables)]
310    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
311        Ok(())
312    }
313
314    /// Actions to be performed when receiving an order book.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if handling the book fails.
319    #[allow(unused_variables)]
320    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
321        Ok(())
322    }
323
324    /// Actions to be performed when receiving a quote.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if handling the quote fails.
329    #[allow(unused_variables)]
330    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
331        Ok(())
332    }
333
334    /// Actions to be performed when receiving a trade.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if handling the trade fails.
339    #[allow(unused_variables)]
340    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
341        Ok(())
342    }
343
344    /// Actions to be performed when receiving a bar.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if handling the bar fails.
349    #[allow(unused_variables)]
350    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
351        Ok(())
352    }
353
354    /// Actions to be performed when receiving a mark price update.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if handling the mark price update fails.
359    #[allow(unused_variables)]
360    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
361        Ok(())
362    }
363
364    /// Actions to be performed when receiving an index price update.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if handling the index price update fails.
369    #[allow(unused_variables)]
370    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
371        Ok(())
372    }
373
374    /// Actions to be performed when receiving a funding rate update.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if handling the funding rate update fails.
379    #[allow(unused_variables)]
380    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
381        Ok(())
382    }
383
384    /// Actions to be performed when receiving exchange-provided option greeks.
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if handling the option greeks fails.
389    #[allow(unused_variables)]
390    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
391        Ok(())
392    }
393
394    /// Actions to be performed when receiving an option chain slice snapshot.
395    ///
396    /// # Errors
397    ///
398    /// Returns an error if handling the option chain slice fails.
399    #[allow(unused_variables)]
400    fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
401        Ok(())
402    }
403
404    /// Actions to be performed when receiving an instrument status update.
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if handling the instrument status update fails.
409    #[allow(unused_variables)]
410    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
411        Ok(())
412    }
413
414    /// Actions to be performed when receiving an instrument close update.
415    ///
416    /// # Errors
417    ///
418    /// Returns an error if handling the instrument close update fails.
419    #[allow(unused_variables)]
420    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
421        Ok(())
422    }
423
424    /// Actions to be performed when receiving an order filled event.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if handling the order filled event fails.
429    #[allow(unused_variables)]
430    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
431        Ok(())
432    }
433
434    /// Actions to be performed when receiving an order canceled event.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if handling the order canceled event fails.
439    #[allow(unused_variables)]
440    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
441        Ok(())
442    }
443
444    #[cfg(feature = "defi")]
445    /// Actions to be performed when receiving a block.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if handling the block fails.
450    #[allow(unused_variables)]
451    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
452        Ok(())
453    }
454
455    #[cfg(feature = "defi")]
456    /// Actions to be performed when receiving a pool.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if handling the pool fails.
461    #[allow(unused_variables)]
462    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
463        Ok(())
464    }
465
466    #[cfg(feature = "defi")]
467    /// Actions to be performed when receiving a pool swap.
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if handling the pool swap fails.
472    #[allow(unused_variables)]
473    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
474        Ok(())
475    }
476
477    #[cfg(feature = "defi")]
478    /// Actions to be performed when receiving a pool liquidity update.
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if handling the pool liquidity update fails.
483    #[allow(unused_variables)]
484    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
485        Ok(())
486    }
487
488    #[cfg(feature = "defi")]
489    /// Actions to be performed when receiving a pool fee collect event.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if handling the pool fee collect fails.
494    #[allow(unused_variables)]
495    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
496        Ok(())
497    }
498
499    #[cfg(feature = "defi")]
500    /// Actions to be performed when receiving a pool flash event.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if handling the pool flash fails.
505    #[allow(unused_variables)]
506    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
507        Ok(())
508    }
509
510    /// Actions to be performed when receiving historical data.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if handling the historical data fails.
515    #[allow(unused_variables)]
516    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
517        Ok(())
518    }
519
520    /// Actions to be performed when receiving historical book deltas.
521    ///
522    /// # Errors
523    ///
524    /// Returns an error if handling the historical book deltas fails.
525    #[allow(unused_variables)]
526    fn on_historical_book_deltas(&mut self, deltas: &[OrderBookDelta]) -> anyhow::Result<()> {
527        Ok(())
528    }
529
530    /// Actions to be performed when receiving historical book depth.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if handling the historical book depth fails.
535    #[allow(unused_variables)]
536    fn on_historical_book_depth(&mut self, depths: &[OrderBookDepth10]) -> anyhow::Result<()> {
537        Ok(())
538    }
539
540    /// Actions to be performed when receiving historical quotes.
541    ///
542    /// # Errors
543    ///
544    /// Returns an error if handling the historical quotes fails.
545    #[allow(unused_variables)]
546    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
547        Ok(())
548    }
549
550    /// Actions to be performed when receiving historical trades.
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if handling the historical trades fails.
555    #[allow(unused_variables)]
556    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
557        Ok(())
558    }
559
560    /// Actions to be performed when receiving historical bars.
561    ///
562    /// # Errors
563    ///
564    /// Returns an error if handling the historical bars fails.
565    #[allow(unused_variables)]
566    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
567        Ok(())
568    }
569
570    /// Actions to be performed when receiving historical mark prices.
571    ///
572    /// # Errors
573    ///
574    /// Returns an error if handling the historical mark prices fails.
575    #[allow(unused_variables)]
576    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
577        Ok(())
578    }
579
580    /// Actions to be performed when receiving historical index prices.
581    ///
582    /// # Errors
583    ///
584    /// Returns an error if handling the historical index prices fails.
585    #[allow(unused_variables)]
586    fn on_historical_index_prices(
587        &mut self,
588        index_prices: &[IndexPriceUpdate],
589    ) -> anyhow::Result<()> {
590        Ok(())
591    }
592
593    /// Actions to be performed when receiving historical funding rates.
594    ///
595    /// # Errors
596    ///
597    /// Returns an error if handling the historical funding rates fails.
598    #[allow(unused_variables)]
599    fn on_historical_funding_rates(
600        &mut self,
601        funding_rates: &[FundingRateUpdate],
602    ) -> anyhow::Result<()> {
603        Ok(())
604    }
605
606    /// Handles a received time event.
607    fn handle_time_event(&mut self, event: &TimeEvent) {
608        log_received(&event);
609
610        if self.not_running() {
611            log_not_running(&event);
612            return;
613        }
614
615        if let Err(e) = DataActor::on_time_event(self, event) {
616            log_error(&e);
617        }
618    }
619
620    /// Handles a received custom data point.
621    fn handle_data(&mut self, data: &CustomData) {
622        log_received(&data);
623
624        if self.not_running() {
625            log_not_running(&data);
626            return;
627        }
628
629        if let Err(e) = self.on_data(data) {
630            log_error(&e);
631        }
632    }
633
634    /// Handles a received signal.
635    fn handle_signal(&mut self, signal: &Signal) {
636        log_received(&signal);
637
638        if self.not_running() {
639            log_not_running(&signal);
640            return;
641        }
642
643        if let Err(e) = self.on_signal(signal) {
644            log_error(&e);
645        }
646    }
647
648    /// Handles a received instrument.
649    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
650        log_received(&instrument);
651
652        if self.not_running() {
653            log_not_running(&instrument);
654            return;
655        }
656
657        if let Err(e) = self.on_instrument(instrument) {
658            log_error(&e);
659        }
660    }
661
662    /// Handles received order book deltas.
663    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
664        log_received(&deltas);
665
666        if self.not_running() {
667            log_not_running(&deltas);
668            return;
669        }
670
671        if let Err(e) = self.on_book_deltas(deltas) {
672            log_error(&e);
673        }
674    }
675
676    /// Handles a received order book reference.
677    fn handle_book(&mut self, book: &OrderBook) {
678        log_received(&book);
679
680        if self.not_running() {
681            log_not_running(&book);
682            return;
683        }
684
685        if let Err(e) = self.on_book(book) {
686            log_error(&e);
687        }
688    }
689
690    /// Handles a received quote.
691    fn handle_quote(&mut self, quote: &QuoteTick) {
692        log_received(&quote);
693
694        if self.not_running() {
695            log_not_running(&quote);
696            return;
697        }
698
699        if let Err(e) = self.on_quote(quote) {
700            log_error(&e);
701        }
702    }
703
704    /// Handles a received trade.
705    fn handle_trade(&mut self, trade: &TradeTick) {
706        log_received(&trade);
707
708        if self.not_running() {
709            log_not_running(&trade);
710            return;
711        }
712
713        if let Err(e) = self.on_trade(trade) {
714            log_error(&e);
715        }
716    }
717
718    /// Handles a receiving bar.
719    fn handle_bar(&mut self, bar: &Bar) {
720        log_received(&bar);
721
722        if self.not_running() {
723            log_not_running(&bar);
724            return;
725        }
726
727        if let Err(e) = self.on_bar(bar) {
728            log_error(&e);
729        }
730    }
731
732    /// Handles a received mark price update.
733    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
734        log_received(&mark_price);
735
736        if self.not_running() {
737            log_not_running(&mark_price);
738            return;
739        }
740
741        if let Err(e) = self.on_mark_price(mark_price) {
742            log_error(&e);
743        }
744    }
745
746    /// Handles a received index price update.
747    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
748        log_received(&index_price);
749
750        if self.not_running() {
751            log_not_running(&index_price);
752            return;
753        }
754
755        if let Err(e) = self.on_index_price(index_price) {
756            log_error(&e);
757        }
758    }
759
760    /// Handles a received funding rate update.
761    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
762        log_received(&funding_rate);
763
764        if self.not_running() {
765            log_not_running(&funding_rate);
766            return;
767        }
768
769        if let Err(e) = self.on_funding_rate(funding_rate) {
770            log_error(&e);
771        }
772    }
773
774    /// Handles a received option greeks update.
775    fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
776        log_received(&greeks);
777
778        if self.not_running() {
779            log_not_running(&greeks);
780            return;
781        }
782
783        if let Err(e) = self.on_option_greeks(greeks) {
784            log_error(&e);
785        }
786    }
787
788    /// Handles a received option chain slice snapshot.
789    fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
790        log_received(&slice);
791
792        if self.not_running() {
793            log_not_running(&slice);
794            return;
795        }
796
797        if let Err(e) = self.on_option_chain(slice) {
798            log_error(&e);
799        }
800    }
801
802    /// Handles a received instrument status.
803    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
804        log_received(&status);
805
806        if self.not_running() {
807            log_not_running(&status);
808            return;
809        }
810
811        if let Err(e) = self.on_instrument_status(status) {
812            log_error(&e);
813        }
814    }
815
816    /// Handles a received instrument close.
817    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
818        log_received(&close);
819
820        if self.not_running() {
821            log_not_running(&close);
822            return;
823        }
824
825        if let Err(e) = self.on_instrument_close(close) {
826            log_error(&e);
827        }
828    }
829
830    /// Handles a received order filled event.
831    fn handle_order_filled(&mut self, event: &OrderFilled) {
832        log_received(&event);
833
834        // Check for double-handling: if the event's strategy_id matches this actor's id,
835        // it means a Strategy is receiving its own fill event through both automatic
836        // subscription and manual subscribe_order_fills, so skip the manual handler.
837        if event.strategy_id.inner() == self.actor_id().inner() {
838            return;
839        }
840
841        if self.not_running() {
842            log_not_running(&event);
843            return;
844        }
845
846        if let Err(e) = self.on_order_filled(event) {
847            log_error(&e);
848        }
849    }
850
851    /// Handles a received order canceled event.
852    fn handle_order_canceled(&mut self, event: &OrderCanceled) {
853        log_received(&event);
854
855        // Check for double-handling: if the event's strategy_id matches this actor's id,
856        // it means a Strategy is receiving its own cancel event through both automatic
857        // subscription and manual subscribe_order_cancels, so skip the manual handler.
858        if event.strategy_id.inner() == self.actor_id().inner() {
859            return;
860        }
861
862        if self.not_running() {
863            log_not_running(&event);
864            return;
865        }
866
867        if let Err(e) = self.on_order_canceled(event) {
868            log_error(&e);
869        }
870    }
871
872    #[cfg(feature = "defi")]
873    /// Handles a received block.
874    fn handle_block(&mut self, block: &Block) {
875        log_received(&block);
876
877        if self.not_running() {
878            log_not_running(&block);
879            return;
880        }
881
882        if let Err(e) = self.on_block(block) {
883            log_error(&e);
884        }
885    }
886
887    #[cfg(feature = "defi")]
888    /// Handles a received pool definition update.
889    fn handle_pool(&mut self, pool: &Pool) {
890        log_received(&pool);
891
892        if self.not_running() {
893            log_not_running(&pool);
894            return;
895        }
896
897        if let Err(e) = self.on_pool(pool) {
898            log_error(&e);
899        }
900    }
901
902    #[cfg(feature = "defi")]
903    /// Handles a received pool swap.
904    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
905        log_received(&swap);
906
907        if self.not_running() {
908            log_not_running(&swap);
909            return;
910        }
911
912        if let Err(e) = self.on_pool_swap(swap) {
913            log_error(&e);
914        }
915    }
916
917    #[cfg(feature = "defi")]
918    /// Handles a received pool liquidity update.
919    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
920        log_received(&update);
921
922        if self.not_running() {
923            log_not_running(&update);
924            return;
925        }
926
927        if let Err(e) = self.on_pool_liquidity_update(update) {
928            log_error(&e);
929        }
930    }
931
932    #[cfg(feature = "defi")]
933    /// Handles a received pool fee collect.
934    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
935        log_received(&collect);
936
937        if self.not_running() {
938            log_not_running(&collect);
939            return;
940        }
941
942        if let Err(e) = self.on_pool_fee_collect(collect) {
943            log_error(&e);
944        }
945    }
946
947    #[cfg(feature = "defi")]
948    /// Handles a received pool flash event.
949    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
950        log_received(&flash);
951
952        if self.not_running() {
953            log_not_running(&flash);
954            return;
955        }
956
957        if let Err(e) = self.on_pool_flash(flash) {
958            log_error(&e);
959        }
960    }
961
962    /// Handles received historical data.
963    fn handle_historical_data(&mut self, data: &dyn Any) {
964        log_received(&data);
965
966        if let Err(e) = self.on_historical_data(data) {
967            log_error(&e);
968        }
969    }
970
971    /// Handles a data response.
972    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
973        log_received(&resp);
974
975        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
976            log_error(&e);
977        }
978    }
979
980    /// Handles an instrument response.
981    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
982        log_received(&resp);
983
984        if let Err(e) = self.on_instrument(&resp.data) {
985            log_error(&e);
986        }
987    }
988
989    /// Handles an instruments response.
990    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
991        log_received_bulk("InstrumentsResponse", &resp.correlation_id, resp.data.len());
992        log::trace!("{RECV} {resp:?}");
993
994        for inst in &resp.data {
995            if let Err(e) = self.on_instrument(inst) {
996                log_error(&e);
997            }
998        }
999    }
1000
1001    /// Handles a book response.
1002    fn handle_book_response(&mut self, resp: &BookResponse) {
1003        log_received(&resp);
1004
1005        if let Err(e) = self.on_book(&resp.data) {
1006            log_error(&e);
1007        }
1008    }
1009
1010    /// Handles a book deltas response.
1011    fn handle_book_deltas_response(&mut self, resp: &BookDeltasResponse) {
1012        log_received_bulk("BookDeltasResponse", &resp.correlation_id, resp.data.len());
1013        log::trace!("{RECV} {resp:?}");
1014
1015        if let Err(e) = self.on_historical_book_deltas(&resp.data) {
1016            log_error(&e);
1017        }
1018    }
1019
1020    /// Handles a book depth response.
1021    fn handle_book_depth_response(&mut self, resp: &BookDepthResponse) {
1022        log_received_bulk("BookDepthResponse", &resp.correlation_id, resp.data.len());
1023        log::trace!("{RECV} {resp:?}");
1024
1025        if let Err(e) = self.on_historical_book_depth(&resp.data) {
1026            log_error(&e);
1027        }
1028    }
1029
1030    /// Handles a quotes response.
1031    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
1032        log_received_bulk("QuotesResponse", &resp.correlation_id, resp.data.len());
1033        log::trace!("{RECV} {resp:?}");
1034
1035        if let Err(e) = self.on_historical_quotes(&resp.data) {
1036            log_error(&e);
1037        }
1038    }
1039
1040    /// Handles a trades response.
1041    fn handle_trades_response(&mut self, resp: &TradesResponse) {
1042        log_received_bulk("TradesResponse", &resp.correlation_id, resp.data.len());
1043        log::trace!("{RECV} {resp:?}");
1044
1045        if let Err(e) = self.on_historical_trades(&resp.data) {
1046            log_error(&e);
1047        }
1048    }
1049
1050    /// Handles a bars response.
1051    fn handle_bars_response(&mut self, resp: &BarsResponse) {
1052        log_received_bulk("BarsResponse", &resp.correlation_id, resp.data.len());
1053        log::trace!("{RECV} {resp:?}");
1054
1055        if let Err(e) = self.on_historical_bars(&resp.data) {
1056            log_error(&e);
1057        }
1058    }
1059
1060    /// Handles a funding rates response.
1061    fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1062        log_received_bulk(
1063            "FundingRatesResponse",
1064            &resp.correlation_id,
1065            resp.data.len(),
1066        );
1067        log::trace!("{RECV} {resp:?}");
1068
1069        if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1070            log_error(&e);
1071        }
1072    }
1073
1074    /// Subscribe to streaming `data_type` data.
1075    fn subscribe_data(
1076        &mut self,
1077        data_type: DataType,
1078        client_id: Option<ClientId>,
1079        params: Option<Params>,
1080    ) where
1081        Self: 'static + Debug + Sized,
1082    {
1083        let actor_id = self.actor_id().inner();
1084        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1085            get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1086        });
1087
1088        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1089    }
1090
1091    /// Subscribe to [`Signal`] data by `name`.
1092    ///
1093    /// An empty `name` subscribes to every signal.
1094    ///
1095    /// # Parameters
1096    ///
1097    /// - `name`: signal name to subscribe to.
1098    /// - `priority`: optional dispatch priority. Pass `None` for default
1099    ///   ordering (by pattern then handler ID). Pass `Some(p)` when actors
1100    ///   sharing a signal need deterministic ordering: higher-priority
1101    ///   handlers receive the message before lower-priority handlers.
1102    ///
1103    /// Re-subscribing does not update an existing priority; call
1104    /// [`unsubscribe_signal`](Self::unsubscribe_signal) first.
1105    fn subscribe_signal(&mut self, name: &str, priority: Option<u32>)
1106    where
1107        Self: 'static + Debug + Sized,
1108    {
1109        let actor_id = self.actor_id().inner();
1110        // Signals are published as `CustomData` wrapping a `Signal`; downcast
1111        // the inner value so subscribers receive the typed `Signal` in `on_signal`.
1112        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1113            if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1114                if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1115                    actor.handle_signal(signal);
1116                } else {
1117                    log::error!("Actor {actor_id} not found for signal handling");
1118                }
1119            }
1120        });
1121
1122        DataActorCore::subscribe_signal(self, handler, name, priority);
1123    }
1124
1125    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
1126    fn subscribe_quotes(
1127        &mut self,
1128        instrument_id: InstrumentId,
1129        client_id: Option<ClientId>,
1130        params: Option<Params>,
1131    ) where
1132        Self: 'static + Debug + Sized,
1133    {
1134        let actor_id = self.actor_id().inner();
1135        let topic = get_quotes_topic(instrument_id);
1136
1137        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1138            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1139                actor.handle_quote(quote);
1140            } else {
1141                log::error!("Actor {actor_id} not found for quote handling");
1142            }
1143        });
1144
1145        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1146    }
1147
1148    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
1149    fn subscribe_instruments(
1150        &mut self,
1151        venue: Venue,
1152        client_id: Option<ClientId>,
1153        params: Option<Params>,
1154    ) where
1155        Self: 'static + Debug + Sized,
1156    {
1157        let actor_id = self.actor_id().inner();
1158        let pattern = get_instruments_pattern(venue);
1159
1160        let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1161            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1162                actor.handle_instrument(instrument);
1163            } else {
1164                log::error!("Actor {actor_id} not found for instruments handling");
1165            }
1166        });
1167
1168        DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1169    }
1170
1171    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
1172    fn subscribe_instrument(
1173        &mut self,
1174        instrument_id: InstrumentId,
1175        client_id: Option<ClientId>,
1176        params: Option<Params>,
1177    ) where
1178        Self: 'static + Debug + Sized,
1179    {
1180        let actor_id = self.actor_id().inner();
1181        let topic = get_instrument_topic(instrument_id);
1182
1183        let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1184            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1185                actor.handle_instrument(instrument);
1186            } else {
1187                log::error!("Actor {actor_id} not found for instrument handling");
1188            }
1189        });
1190
1191        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1192    }
1193
1194    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1195    fn subscribe_book_deltas(
1196        &mut self,
1197        instrument_id: InstrumentId,
1198        book_type: BookType,
1199        depth: Option<NonZeroUsize>,
1200        client_id: Option<ClientId>,
1201        managed: bool,
1202        params: Option<Params>,
1203    ) where
1204        Self: 'static + Debug + Sized,
1205    {
1206        let actor_id = self.actor_id().inner();
1207        let is_parent = is_parent_subscription(params.as_ref());
1208        let pattern = if is_parent {
1209            get_book_deltas_pattern(instrument_id)
1210        } else {
1211            get_book_deltas_topic(instrument_id).into()
1212        };
1213
1214        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1215            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1216        });
1217
1218        DataActorCore::subscribe_book_deltas(
1219            self,
1220            pattern,
1221            handler,
1222            instrument_id,
1223            book_type,
1224            depth,
1225            client_id,
1226            managed,
1227            params,
1228        );
1229    }
1230
1231    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1232    fn subscribe_book_at_interval(
1233        &mut self,
1234        instrument_id: InstrumentId,
1235        book_type: BookType,
1236        depth: Option<NonZeroUsize>,
1237        interval_ms: NonZeroUsize,
1238        client_id: Option<ClientId>,
1239        params: Option<Params>,
1240    ) where
1241        Self: 'static + Debug + Sized,
1242    {
1243        let actor_id = self.actor_id().inner();
1244        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1245
1246        let handler = TypedHandler::from(move |book: &OrderBook| {
1247            get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1248        });
1249
1250        DataActorCore::subscribe_book_at_interval(
1251            self,
1252            topic,
1253            handler,
1254            instrument_id,
1255            book_type,
1256            depth,
1257            interval_ms,
1258            client_id,
1259            params,
1260        );
1261    }
1262
1263    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1264    fn subscribe_trades(
1265        &mut self,
1266        instrument_id: InstrumentId,
1267        client_id: Option<ClientId>,
1268        params: Option<Params>,
1269    ) where
1270        Self: 'static + Debug + Sized,
1271    {
1272        let actor_id = self.actor_id().inner();
1273        let topic = get_trades_topic(instrument_id);
1274
1275        let handler = TypedHandler::from(move |trade: &TradeTick| {
1276            get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1277        });
1278
1279        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1280    }
1281
1282    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1283    fn subscribe_bars(
1284        &mut self,
1285        bar_type: BarType,
1286        client_id: Option<ClientId>,
1287        params: Option<Params>,
1288    ) where
1289        Self: 'static + Debug + Sized,
1290    {
1291        let actor_id = self.actor_id().inner();
1292        let topic = get_bars_topic(bar_type);
1293
1294        let handler = TypedHandler::from(move |bar: &Bar| {
1295            get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1296        });
1297
1298        DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1299    }
1300
1301    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1302    fn subscribe_mark_prices(
1303        &mut self,
1304        instrument_id: InstrumentId,
1305        client_id: Option<ClientId>,
1306        params: Option<Params>,
1307    ) where
1308        Self: 'static + Debug + Sized,
1309    {
1310        let actor_id = self.actor_id().inner();
1311        let topic = get_mark_price_topic(instrument_id);
1312
1313        let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1314            get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1315        });
1316
1317        DataActorCore::subscribe_mark_prices(
1318            self,
1319            topic,
1320            handler,
1321            instrument_id,
1322            client_id,
1323            params,
1324        );
1325    }
1326
1327    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1328    fn subscribe_index_prices(
1329        &mut self,
1330        instrument_id: InstrumentId,
1331        client_id: Option<ClientId>,
1332        params: Option<Params>,
1333    ) where
1334        Self: 'static + Debug + Sized,
1335    {
1336        let actor_id = self.actor_id().inner();
1337        let topic = get_index_price_topic(instrument_id);
1338
1339        let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1340            get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1341        });
1342
1343        DataActorCore::subscribe_index_prices(
1344            self,
1345            topic,
1346            handler,
1347            instrument_id,
1348            client_id,
1349            params,
1350        );
1351    }
1352
1353    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1354    fn subscribe_funding_rates(
1355        &mut self,
1356        instrument_id: InstrumentId,
1357        client_id: Option<ClientId>,
1358        params: Option<Params>,
1359    ) where
1360        Self: 'static + Debug + Sized,
1361    {
1362        let actor_id = self.actor_id().inner();
1363        let topic = get_funding_rate_topic(instrument_id);
1364
1365        let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1366            get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1367        });
1368
1369        DataActorCore::subscribe_funding_rates(
1370            self,
1371            topic,
1372            handler,
1373            instrument_id,
1374            client_id,
1375            params,
1376        );
1377    }
1378
1379    /// Subscribe to streaming [`OptionGreeks`] data for the `instrument_id`.
1380    fn subscribe_option_greeks(
1381        &mut self,
1382        instrument_id: InstrumentId,
1383        client_id: Option<ClientId>,
1384        params: Option<Params>,
1385    ) where
1386        Self: 'static + Debug + Sized,
1387    {
1388        let actor_id = self.actor_id().inner();
1389        let topic = get_option_greeks_topic(instrument_id);
1390
1391        let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1392            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1393                actor.handle_option_greeks(option_greeks);
1394            } else {
1395                log::error!("Actor {actor_id} not found for option greeks handling");
1396            }
1397        });
1398
1399        DataActorCore::subscribe_option_greeks(
1400            self,
1401            topic,
1402            handler,
1403            instrument_id,
1404            client_id,
1405            params,
1406        );
1407    }
1408
1409    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1410    fn subscribe_instrument_status(
1411        &mut self,
1412        instrument_id: InstrumentId,
1413        client_id: Option<ClientId>,
1414        params: Option<Params>,
1415    ) where
1416        Self: 'static + Debug + Sized,
1417    {
1418        let actor_id = self.actor_id().inner();
1419        let topic = get_instrument_status_topic(instrument_id);
1420
1421        let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1422            get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1423        });
1424
1425        DataActorCore::subscribe_instrument_status(
1426            self,
1427            topic,
1428            handler,
1429            instrument_id,
1430            client_id,
1431            params,
1432        );
1433    }
1434
1435    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1436    fn subscribe_instrument_close(
1437        &mut self,
1438        instrument_id: InstrumentId,
1439        client_id: Option<ClientId>,
1440        params: Option<Params>,
1441    ) where
1442        Self: 'static + Debug + Sized,
1443    {
1444        let actor_id = self.actor_id().inner();
1445        let topic = get_instrument_close_topic(instrument_id);
1446
1447        let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1448            get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1449        });
1450
1451        DataActorCore::subscribe_instrument_close(
1452            self,
1453            topic,
1454            handler,
1455            instrument_id,
1456            client_id,
1457            params,
1458        );
1459    }
1460
1461    /// Subscribe to streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1462    ///
1463    /// The ATM price is always derived from the exchange-provided forward price
1464    /// embedded in each option greeks/ticker update.
1465    fn subscribe_option_chain(
1466        &mut self,
1467        series_id: OptionSeriesId,
1468        strike_range: StrikeRange,
1469        snapshot_interval_ms: Option<u64>,
1470        client_id: Option<ClientId>,
1471        params: Option<Params>,
1472    ) where
1473        Self: 'static + Debug + Sized,
1474    {
1475        let actor_id = self.actor_id().inner();
1476        let topic = get_option_chain_topic(series_id);
1477
1478        let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1479            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1480                actor.handle_option_chain(slice);
1481            } else {
1482                log::error!("Actor {actor_id} not found for option chain handling");
1483            }
1484        });
1485
1486        DataActorCore::subscribe_option_chain(
1487            self,
1488            topic,
1489            handler,
1490            series_id,
1491            strike_range,
1492            snapshot_interval_ms,
1493            client_id,
1494            params,
1495        );
1496    }
1497
1498    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1499    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1500    where
1501        Self: 'static + Debug + Sized,
1502    {
1503        let actor_id = self.actor_id().inner();
1504        let topic = get_order_fills_topic(instrument_id);
1505
1506        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1507            if let OrderEventAny::Filled(filled) = event {
1508                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1509            }
1510        });
1511
1512        DataActorCore::subscribe_order_fills(self, topic, handler);
1513    }
1514
1515    /// Subscribe to [`OrderCanceled`] events for the `instrument_id`.
1516    fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1517    where
1518        Self: 'static + Debug + Sized,
1519    {
1520        let actor_id = self.actor_id().inner();
1521        let topic = get_order_cancels_topic(instrument_id);
1522
1523        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1524            if let OrderEventAny::Canceled(canceled) = event {
1525                get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1526            }
1527        });
1528
1529        DataActorCore::subscribe_order_cancels(self, topic, handler);
1530    }
1531
1532    #[cfg(feature = "defi")]
1533    /// Subscribe to streaming [`Block`] data for the `chain`.
1534    fn subscribe_blocks(
1535        &mut self,
1536        chain: Blockchain,
1537        client_id: Option<ClientId>,
1538        params: Option<Params>,
1539    ) where
1540        Self: 'static + Debug + Sized,
1541    {
1542        let actor_id = self.actor_id().inner();
1543        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1544
1545        let handler = TypedHandler::from(move |block: &Block| {
1546            get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1547        });
1548
1549        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1550    }
1551
1552    #[cfg(feature = "defi")]
1553    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1554    fn subscribe_pool(
1555        &mut self,
1556        instrument_id: InstrumentId,
1557        client_id: Option<ClientId>,
1558        params: Option<Params>,
1559    ) where
1560        Self: 'static + Debug + Sized,
1561    {
1562        let actor_id = self.actor_id().inner();
1563        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1564
1565        let handler = TypedHandler::from(move |pool: &Pool| {
1566            get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1567        });
1568
1569        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1570    }
1571
1572    #[cfg(feature = "defi")]
1573    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1574    fn subscribe_pool_swaps(
1575        &mut self,
1576        instrument_id: InstrumentId,
1577        client_id: Option<ClientId>,
1578        params: Option<Params>,
1579    ) where
1580        Self: 'static + Debug + Sized,
1581    {
1582        let actor_id = self.actor_id().inner();
1583        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1584
1585        let handler = TypedHandler::from(move |swap: &PoolSwap| {
1586            get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1587        });
1588
1589        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1590    }
1591
1592    #[cfg(feature = "defi")]
1593    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1594    fn subscribe_pool_liquidity_updates(
1595        &mut self,
1596        instrument_id: InstrumentId,
1597        client_id: Option<ClientId>,
1598        params: Option<Params>,
1599    ) where
1600        Self: 'static + Debug + Sized,
1601    {
1602        let actor_id = self.actor_id().inner();
1603        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1604
1605        let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1606            get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1607        });
1608
1609        DataActorCore::subscribe_pool_liquidity_updates(
1610            self,
1611            topic,
1612            handler,
1613            instrument_id,
1614            client_id,
1615            params,
1616        );
1617    }
1618
1619    #[cfg(feature = "defi")]
1620    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1621    fn subscribe_pool_fee_collects(
1622        &mut self,
1623        instrument_id: InstrumentId,
1624        client_id: Option<ClientId>,
1625        params: Option<Params>,
1626    ) where
1627        Self: 'static + Debug + Sized,
1628    {
1629        let actor_id = self.actor_id().inner();
1630        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1631
1632        let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1633            get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1634        });
1635
1636        DataActorCore::subscribe_pool_fee_collects(
1637            self,
1638            topic,
1639            handler,
1640            instrument_id,
1641            client_id,
1642            params,
1643        );
1644    }
1645
1646    #[cfg(feature = "defi")]
1647    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1648    fn subscribe_pool_flash_events(
1649        &mut self,
1650        instrument_id: InstrumentId,
1651        client_id: Option<ClientId>,
1652        params: Option<Params>,
1653    ) where
1654        Self: 'static + Debug + Sized,
1655    {
1656        let actor_id = self.actor_id().inner();
1657        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1658
1659        let handler = TypedHandler::from(move |flash: &PoolFlash| {
1660            get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1661        });
1662
1663        DataActorCore::subscribe_pool_flash_events(
1664            self,
1665            topic,
1666            handler,
1667            instrument_id,
1668            client_id,
1669            params,
1670        );
1671    }
1672
1673    /// Unsubscribe from streaming `data_type` data.
1674    fn unsubscribe_data(
1675        &mut self,
1676        data_type: DataType,
1677        client_id: Option<ClientId>,
1678        params: Option<Params>,
1679    ) where
1680        Self: 'static + Debug + Sized,
1681    {
1682        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1683    }
1684
1685    /// Unsubscribe from [`Signal`] data by `name`.
1686    fn unsubscribe_signal(&mut self, name: &str)
1687    where
1688        Self: 'static + Debug + Sized,
1689    {
1690        DataActorCore::unsubscribe_signal(self, name);
1691    }
1692
1693    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1694    fn unsubscribe_instruments(
1695        &mut self,
1696        venue: Venue,
1697        client_id: Option<ClientId>,
1698        params: Option<Params>,
1699    ) where
1700        Self: 'static + Debug + Sized,
1701    {
1702        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1703    }
1704
1705    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1706    fn unsubscribe_instrument(
1707        &mut self,
1708        instrument_id: InstrumentId,
1709        client_id: Option<ClientId>,
1710        params: Option<Params>,
1711    ) where
1712        Self: 'static + Debug + Sized,
1713    {
1714        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1715    }
1716
1717    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1718    fn unsubscribe_book_deltas(
1719        &mut self,
1720        instrument_id: InstrumentId,
1721        client_id: Option<ClientId>,
1722        params: Option<Params>,
1723    ) where
1724        Self: 'static + Debug + Sized,
1725    {
1726        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1727    }
1728
1729    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1730    fn unsubscribe_book_at_interval(
1731        &mut self,
1732        instrument_id: InstrumentId,
1733        interval_ms: NonZeroUsize,
1734        client_id: Option<ClientId>,
1735        params: Option<Params>,
1736    ) where
1737        Self: 'static + Debug + Sized,
1738    {
1739        DataActorCore::unsubscribe_book_at_interval(
1740            self,
1741            instrument_id,
1742            interval_ms,
1743            client_id,
1744            params,
1745        );
1746    }
1747
1748    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1749    fn unsubscribe_quotes(
1750        &mut self,
1751        instrument_id: InstrumentId,
1752        client_id: Option<ClientId>,
1753        params: Option<Params>,
1754    ) where
1755        Self: 'static + Debug + Sized,
1756    {
1757        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1758    }
1759
1760    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1761    fn unsubscribe_trades(
1762        &mut self,
1763        instrument_id: InstrumentId,
1764        client_id: Option<ClientId>,
1765        params: Option<Params>,
1766    ) where
1767        Self: 'static + Debug + Sized,
1768    {
1769        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1770    }
1771
1772    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1773    fn unsubscribe_bars(
1774        &mut self,
1775        bar_type: BarType,
1776        client_id: Option<ClientId>,
1777        params: Option<Params>,
1778    ) where
1779        Self: 'static + Debug + Sized,
1780    {
1781        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1782    }
1783
1784    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1785    fn unsubscribe_mark_prices(
1786        &mut self,
1787        instrument_id: InstrumentId,
1788        client_id: Option<ClientId>,
1789        params: Option<Params>,
1790    ) where
1791        Self: 'static + Debug + Sized,
1792    {
1793        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1794    }
1795
1796    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1797    fn unsubscribe_index_prices(
1798        &mut self,
1799        instrument_id: InstrumentId,
1800        client_id: Option<ClientId>,
1801        params: Option<Params>,
1802    ) where
1803        Self: 'static + Debug + Sized,
1804    {
1805        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1806    }
1807
1808    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1809    fn unsubscribe_funding_rates(
1810        &mut self,
1811        instrument_id: InstrumentId,
1812        client_id: Option<ClientId>,
1813        params: Option<Params>,
1814    ) where
1815        Self: 'static + Debug + Sized,
1816    {
1817        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1818    }
1819
1820    /// Unsubscribe from streaming [`OptionGreeks`] data for the `instrument_id`.
1821    fn unsubscribe_option_greeks(
1822        &mut self,
1823        instrument_id: InstrumentId,
1824        client_id: Option<ClientId>,
1825        params: Option<Params>,
1826    ) where
1827        Self: 'static + Debug + Sized,
1828    {
1829        DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1830    }
1831
1832    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1833    fn unsubscribe_instrument_status(
1834        &mut self,
1835        instrument_id: InstrumentId,
1836        client_id: Option<ClientId>,
1837        params: Option<Params>,
1838    ) where
1839        Self: 'static + Debug + Sized,
1840    {
1841        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1842    }
1843
1844    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1845    fn unsubscribe_instrument_close(
1846        &mut self,
1847        instrument_id: InstrumentId,
1848        client_id: Option<ClientId>,
1849        params: Option<Params>,
1850    ) where
1851        Self: 'static + Debug + Sized,
1852    {
1853        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1854    }
1855
1856    /// Unsubscribe from streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1857    fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1858    where
1859        Self: 'static + Debug + Sized,
1860    {
1861        DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1862    }
1863
1864    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
1865    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1866    where
1867        Self: 'static + Debug + Sized,
1868    {
1869        DataActorCore::unsubscribe_order_fills(self, instrument_id);
1870    }
1871
1872    /// Unsubscribe from [`OrderCanceled`] events for the `instrument_id`.
1873    fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1874    where
1875        Self: 'static + Debug + Sized,
1876    {
1877        DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1878    }
1879
1880    #[cfg(feature = "defi")]
1881    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1882    fn unsubscribe_blocks(
1883        &mut self,
1884        chain: Blockchain,
1885        client_id: Option<ClientId>,
1886        params: Option<Params>,
1887    ) where
1888        Self: 'static + Debug + Sized,
1889    {
1890        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1891    }
1892
1893    #[cfg(feature = "defi")]
1894    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1895    fn unsubscribe_pool(
1896        &mut self,
1897        instrument_id: InstrumentId,
1898        client_id: Option<ClientId>,
1899        params: Option<Params>,
1900    ) where
1901        Self: 'static + Debug + Sized,
1902    {
1903        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1904    }
1905
1906    #[cfg(feature = "defi")]
1907    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1908    fn unsubscribe_pool_swaps(
1909        &mut self,
1910        instrument_id: InstrumentId,
1911        client_id: Option<ClientId>,
1912        params: Option<Params>,
1913    ) where
1914        Self: 'static + Debug + Sized,
1915    {
1916        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1917    }
1918
1919    #[cfg(feature = "defi")]
1920    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1921    fn unsubscribe_pool_liquidity_updates(
1922        &mut self,
1923        instrument_id: InstrumentId,
1924        client_id: Option<ClientId>,
1925        params: Option<Params>,
1926    ) where
1927        Self: 'static + Debug + Sized,
1928    {
1929        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1930    }
1931
1932    #[cfg(feature = "defi")]
1933    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
1934    fn unsubscribe_pool_fee_collects(
1935        &mut self,
1936        instrument_id: InstrumentId,
1937        client_id: Option<ClientId>,
1938        params: Option<Params>,
1939    ) where
1940        Self: 'static + Debug + Sized,
1941    {
1942        DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1943    }
1944
1945    #[cfg(feature = "defi")]
1946    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
1947    fn unsubscribe_pool_flash_events(
1948        &mut self,
1949        instrument_id: InstrumentId,
1950        client_id: Option<ClientId>,
1951        params: Option<Params>,
1952    ) where
1953        Self: 'static + Debug + Sized,
1954    {
1955        DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1956    }
1957
1958    /// Request historical custom data of the given `data_type`.
1959    ///
1960    /// # Errors
1961    ///
1962    /// Returns an error if input parameters are invalid.
1963    fn request_data(
1964        &mut self,
1965        data_type: DataType,
1966        client_id: ClientId,
1967        start: Option<DateTime<Utc>>,
1968        end: Option<DateTime<Utc>>,
1969        limit: Option<NonZeroUsize>,
1970        params: Option<Params>,
1971    ) -> anyhow::Result<UUID4>
1972    where
1973        Self: 'static + Debug + Sized,
1974    {
1975        let actor_id = self.actor_id().inner();
1976        let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1977            get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1978        });
1979
1980        DataActorCore::request_data(
1981            self, data_type, client_id, start, end, limit, params, handler,
1982        )
1983    }
1984
1985    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1986    ///
1987    /// # Errors
1988    ///
1989    /// Returns an error if input parameters are invalid.
1990    fn request_instrument(
1991        &mut self,
1992        instrument_id: InstrumentId,
1993        start: Option<DateTime<Utc>>,
1994        end: Option<DateTime<Utc>>,
1995        client_id: Option<ClientId>,
1996        params: Option<Params>,
1997    ) -> anyhow::Result<UUID4>
1998    where
1999        Self: 'static + Debug + Sized,
2000    {
2001        let actor_id = self.actor_id().inner();
2002        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
2003            get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
2004        });
2005
2006        DataActorCore::request_instrument(
2007            self,
2008            instrument_id,
2009            start,
2010            end,
2011            client_id,
2012            params,
2013            handler,
2014        )
2015    }
2016
2017    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
2018    ///
2019    /// # Errors
2020    ///
2021    /// Returns an error if input parameters are invalid.
2022    fn request_instruments(
2023        &mut self,
2024        venue: Option<Venue>,
2025        start: Option<DateTime<Utc>>,
2026        end: Option<DateTime<Utc>>,
2027        client_id: Option<ClientId>,
2028        params: Option<Params>,
2029    ) -> anyhow::Result<UUID4>
2030    where
2031        Self: 'static + Debug + Sized,
2032    {
2033        let actor_id = self.actor_id().inner();
2034        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
2035            get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
2036        });
2037
2038        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
2039    }
2040
2041    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
2042    ///
2043    /// # Errors
2044    ///
2045    /// Returns an error if input parameters are invalid.
2046    fn request_book_snapshot(
2047        &mut self,
2048        instrument_id: InstrumentId,
2049        depth: Option<NonZeroUsize>,
2050        client_id: Option<ClientId>,
2051        params: Option<Params>,
2052    ) -> anyhow::Result<UUID4>
2053    where
2054        Self: 'static + Debug + Sized,
2055    {
2056        let actor_id = self.actor_id().inner();
2057        let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
2058            get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
2059        });
2060
2061        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
2062    }
2063
2064    /// Request historical [`OrderBookDelta`] data for the given `instrument_id`.
2065    ///
2066    /// # Errors
2067    ///
2068    /// Returns an error if input parameters are invalid.
2069    fn request_book_deltas(
2070        &mut self,
2071        instrument_id: InstrumentId,
2072        start: Option<DateTime<Utc>>,
2073        end: Option<DateTime<Utc>>,
2074        limit: Option<NonZeroUsize>,
2075        client_id: Option<ClientId>,
2076        params: Option<Params>,
2077    ) -> anyhow::Result<UUID4>
2078    where
2079        Self: 'static + Debug + Sized,
2080    {
2081        let actor_id = self.actor_id().inner();
2082        let handler = ShareableMessageHandler::from_typed(move |resp: &BookDeltasResponse| {
2083            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas_response(resp);
2084        });
2085
2086        DataActorCore::request_book_deltas(
2087            self,
2088            instrument_id,
2089            start,
2090            end,
2091            limit,
2092            client_id,
2093            params,
2094            handler,
2095        )
2096    }
2097
2098    /// Request historical [`OrderBookDepth10`] data for the given `instrument_id`.
2099    ///
2100    /// # Errors
2101    ///
2102    /// Returns an error if input parameters are invalid.
2103    #[expect(clippy::too_many_arguments)]
2104    fn request_book_depth(
2105        &mut self,
2106        instrument_id: InstrumentId,
2107        start: Option<DateTime<Utc>>,
2108        end: Option<DateTime<Utc>>,
2109        limit: Option<NonZeroUsize>,
2110        depth: Option<NonZeroUsize>,
2111        client_id: Option<ClientId>,
2112        params: Option<Params>,
2113    ) -> anyhow::Result<UUID4>
2114    where
2115        Self: 'static + Debug + Sized,
2116    {
2117        let actor_id = self.actor_id().inner();
2118        let handler = ShareableMessageHandler::from_typed(move |resp: &BookDepthResponse| {
2119            get_actor_unchecked::<Self>(&actor_id).handle_book_depth_response(resp);
2120        });
2121
2122        DataActorCore::request_book_depth(
2123            self,
2124            instrument_id,
2125            start,
2126            end,
2127            limit,
2128            depth,
2129            client_id,
2130            params,
2131            handler,
2132        )
2133    }
2134
2135    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
2136    ///
2137    /// # Errors
2138    ///
2139    /// Returns an error if input parameters are invalid.
2140    fn request_quotes(
2141        &mut self,
2142        instrument_id: InstrumentId,
2143        start: Option<DateTime<Utc>>,
2144        end: Option<DateTime<Utc>>,
2145        limit: Option<NonZeroUsize>,
2146        client_id: Option<ClientId>,
2147        params: Option<Params>,
2148    ) -> anyhow::Result<UUID4>
2149    where
2150        Self: 'static + Debug + Sized,
2151    {
2152        let actor_id = self.actor_id().inner();
2153        let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2154            get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2155        });
2156
2157        DataActorCore::request_quotes(
2158            self,
2159            instrument_id,
2160            start,
2161            end,
2162            limit,
2163            client_id,
2164            params,
2165            handler,
2166        )
2167    }
2168
2169    /// Request historical [`TradeTick`] data for the given `instrument_id`.
2170    ///
2171    /// # Errors
2172    ///
2173    /// Returns an error if input parameters are invalid.
2174    fn request_trades(
2175        &mut self,
2176        instrument_id: InstrumentId,
2177        start: Option<DateTime<Utc>>,
2178        end: Option<DateTime<Utc>>,
2179        limit: Option<NonZeroUsize>,
2180        client_id: Option<ClientId>,
2181        params: Option<Params>,
2182    ) -> anyhow::Result<UUID4>
2183    where
2184        Self: 'static + Debug + Sized,
2185    {
2186        let actor_id = self.actor_id().inner();
2187        let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2188            get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2189        });
2190
2191        DataActorCore::request_trades(
2192            self,
2193            instrument_id,
2194            start,
2195            end,
2196            limit,
2197            client_id,
2198            params,
2199            handler,
2200        )
2201    }
2202
2203    /// Request historical [`Bar`] data for the given `bar_type`.
2204    ///
2205    /// # Errors
2206    ///
2207    /// Returns an error if input parameters are invalid.
2208    fn request_bars(
2209        &mut self,
2210        bar_type: BarType,
2211        start: Option<DateTime<Utc>>,
2212        end: Option<DateTime<Utc>>,
2213        limit: Option<NonZeroUsize>,
2214        client_id: Option<ClientId>,
2215        params: Option<Params>,
2216    ) -> anyhow::Result<UUID4>
2217    where
2218        Self: 'static + Debug + Sized,
2219    {
2220        let actor_id = self.actor_id().inner();
2221        let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2222            get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2223        });
2224
2225        DataActorCore::request_bars(
2226            self, bar_type, start, end, limit, client_id, params, handler,
2227        )
2228    }
2229
2230    /// Request historical [`FundingRateUpdate`] data for the given `instrument_id`.
2231    ///
2232    /// # Errors
2233    ///
2234    /// Returns an error if input parameters are invalid.
2235    fn request_funding_rates(
2236        &mut self,
2237        instrument_id: InstrumentId,
2238        start: Option<DateTime<Utc>>,
2239        end: Option<DateTime<Utc>>,
2240        limit: Option<NonZeroUsize>,
2241        client_id: Option<ClientId>,
2242        params: Option<Params>,
2243    ) -> anyhow::Result<UUID4>
2244    where
2245        Self: 'static + Debug + Sized,
2246    {
2247        let actor_id = self.actor_id().inner();
2248        let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2249            get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2250        });
2251
2252        DataActorCore::request_funding_rates(
2253            self,
2254            instrument_id,
2255            start,
2256            end,
2257            limit,
2258            client_id,
2259            params,
2260            handler,
2261        )
2262    }
2263}
2264
2265// Blanket implementation: any DataActor automatically implements Actor
2266impl<T> Actor for T
2267where
2268    T: DataActor + Debug + 'static,
2269{
2270    fn id(&self) -> Ustr {
2271        self.actor_id.inner()
2272    }
2273
2274    #[allow(unused_variables)]
2275    fn handle(&mut self, msg: &dyn Any) {
2276        // Default empty implementation - concrete actors can override if needed
2277    }
2278
2279    fn as_any(&self) -> &dyn Any {
2280        self
2281    }
2282}
2283
2284// Blanket implementation: any DataActor automatically implements Component
2285impl<T> Component for T
2286where
2287    T: DataActor + Debug + 'static,
2288{
2289    fn component_id(&self) -> ComponentId {
2290        ComponentId::new(self.actor_id.inner().as_str())
2291    }
2292
2293    fn state(&self) -> ComponentState {
2294        self.state
2295    }
2296
2297    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2298        self.state = self.state.transition(&trigger)?;
2299        log::info!(component = self.component_id().as_str(); "{}", self.state.variant_name());
2300        Ok(())
2301    }
2302
2303    fn register(
2304        &mut self,
2305        trader_id: TraderId,
2306        clock: Rc<RefCell<dyn Clock>>,
2307        cache: Rc<RefCell<Cache>>,
2308    ) -> anyhow::Result<()> {
2309        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2310
2311        // Register default time event handler for this actor
2312        let actor_id = self.actor_id().inner();
2313        let callback = TimeEventCallback::from(move |event: TimeEvent| {
2314            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2315                actor.handle_time_event(&event);
2316            } else {
2317                log::error!("Actor {actor_id} not found for time event handling");
2318            }
2319        });
2320
2321        clock.borrow_mut().register_default_handler(callback);
2322
2323        self.initialize()
2324    }
2325
2326    fn on_start(&mut self) -> anyhow::Result<()> {
2327        DataActor::on_start(self)
2328    }
2329
2330    fn on_stop(&mut self) -> anyhow::Result<()> {
2331        DataActor::on_stop(self)
2332    }
2333
2334    fn on_resume(&mut self) -> anyhow::Result<()> {
2335        DataActor::on_resume(self)
2336    }
2337
2338    fn on_degrade(&mut self) -> anyhow::Result<()> {
2339        DataActor::on_degrade(self)
2340    }
2341
2342    fn on_fault(&mut self) -> anyhow::Result<()> {
2343        DataActor::on_fault(self)
2344    }
2345
2346    fn on_reset(&mut self) -> anyhow::Result<()> {
2347        DataActor::on_reset(self)
2348    }
2349
2350    fn on_dispose(&mut self) -> anyhow::Result<()> {
2351        DataActor::on_dispose(self)
2352    }
2353}
2354
2355/// Core functionality for all actors.
2356#[derive(Clone)]
2357#[allow(
2358    dead_code,
2359    reason = "TODO: Under development (pending_requests, signal_classes)"
2360)]
2361pub struct DataActorCore {
2362    /// The actor identifier.
2363    pub actor_id: ActorId,
2364    /// The actors configuration.
2365    pub config: DataActorConfig,
2366    trader_id: Option<TraderId>,
2367    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
2368    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
2369    state: ComponentState,
2370    topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2371    instrument_handlers: AHashMap<MStr<Pattern>, TypedHandler<InstrumentAny>>,
2372    deltas_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDeltas>>,
2373    depth10_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDepth10>>,
2374    book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2375    quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2376    trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2377    bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2378    mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2379    index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2380    funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2381    option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2382    option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2383    order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2384    #[cfg(feature = "defi")]
2385    block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2386    #[cfg(feature = "defi")]
2387    pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2388    #[cfg(feature = "defi")]
2389    pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2390    #[cfg(feature = "defi")]
2391    pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2392    #[cfg(feature = "defi")]
2393    pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2394    #[cfg(feature = "defi")]
2395    pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2396    warning_events: AHashSet<String>, // TODO: TBD
2397    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2398    signal_classes: AHashMap<String, String>,
2399    #[cfg(feature = "indicators")]
2400    indicators: Indicators,
2401}
2402
2403impl Debug for DataActorCore {
2404    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2405        f.debug_struct(stringify!(DataActorCore))
2406            .field("actor_id", &self.actor_id)
2407            .field("config", &self.config)
2408            .field("state", &self.state)
2409            .field("trader_id", &self.trader_id)
2410            .finish()
2411    }
2412}
2413
2414impl DataActorCore {
2415    /// Adds a subscription handler for the `topic`.
2416    ///
2417    //// Logs a warning if the actor is already subscribed to the topic.
2418    pub(crate) fn add_subscription_any(
2419        &mut self,
2420        topic: MStr<Topic>,
2421        handler: ShareableMessageHandler,
2422    ) {
2423        let pattern: MStr<Pattern> = topic.into();
2424        if self.topic_handlers.contains_key(&pattern) {
2425            log::warn!(
2426                "Actor {} attempted duplicate subscription to topic '{topic}'",
2427                self.actor_id,
2428            );
2429            return;
2430        }
2431
2432        self.topic_handlers.insert(pattern, handler.clone());
2433        msgbus::subscribe_any(pattern, handler, None);
2434    }
2435
2436    /// Removes a subscription handler for the `topic` if present.
2437    ///
2438    /// Logs a warning if the actor is not currently subscribed to the topic.
2439    pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2440        let pattern: MStr<Pattern> = topic.into();
2441        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2442            msgbus::unsubscribe_any(pattern, &handler);
2443        } else {
2444            log::warn!(
2445                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2446                self.actor_id,
2447            );
2448        }
2449    }
2450
2451    pub(crate) fn add_quote_subscription(
2452        &mut self,
2453        topic: MStr<Topic>,
2454        handler: TypedHandler<QuoteTick>,
2455    ) {
2456        if self.quote_handlers.contains_key(&topic) {
2457            log::warn!(
2458                "Actor {} attempted duplicate quote subscription to '{topic}'",
2459                self.actor_id
2460            );
2461            return;
2462        }
2463        self.quote_handlers.insert(topic, handler.clone());
2464        msgbus::subscribe_quotes(topic.into(), handler, None);
2465    }
2466
2467    #[allow(dead_code)]
2468    pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2469        if let Some(handler) = self.quote_handlers.remove(&topic) {
2470            msgbus::unsubscribe_quotes(topic.into(), &handler);
2471        }
2472    }
2473
2474    pub(crate) fn add_trade_subscription(
2475        &mut self,
2476        topic: MStr<Topic>,
2477        handler: TypedHandler<TradeTick>,
2478    ) {
2479        if self.trade_handlers.contains_key(&topic) {
2480            log::warn!(
2481                "Actor {} attempted duplicate trade subscription to '{topic}'",
2482                self.actor_id
2483            );
2484            return;
2485        }
2486        self.trade_handlers.insert(topic, handler.clone());
2487        msgbus::subscribe_trades(topic.into(), handler, None);
2488    }
2489
2490    #[allow(dead_code)]
2491    pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2492        if let Some(handler) = self.trade_handlers.remove(&topic) {
2493            msgbus::unsubscribe_trades(topic.into(), &handler);
2494        }
2495    }
2496
2497    pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2498        if self.bar_handlers.contains_key(&topic) {
2499            log::warn!(
2500                "Actor {} attempted duplicate bar subscription to '{topic}'",
2501                self.actor_id
2502            );
2503            return;
2504        }
2505        self.bar_handlers.insert(topic, handler.clone());
2506        msgbus::subscribe_bars(topic.into(), handler, None);
2507    }
2508
2509    #[allow(dead_code)]
2510    pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2511        if let Some(handler) = self.bar_handlers.remove(&topic) {
2512            msgbus::unsubscribe_bars(topic.into(), &handler);
2513        }
2514    }
2515
2516    pub(crate) fn add_order_event_subscription(
2517        &mut self,
2518        topic: MStr<Topic>,
2519        handler: TypedHandler<OrderEventAny>,
2520    ) {
2521        if self.order_event_handlers.contains_key(&topic) {
2522            log::warn!(
2523                "Actor {} attempted duplicate order event subscription to '{topic}'",
2524                self.actor_id
2525            );
2526            return;
2527        }
2528        self.order_event_handlers.insert(topic, handler.clone());
2529        msgbus::subscribe_order_events(topic.into(), handler, None);
2530    }
2531
2532    #[allow(dead_code)]
2533    pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2534        if let Some(handler) = self.order_event_handlers.remove(&topic) {
2535            msgbus::unsubscribe_order_events(topic.into(), &handler);
2536        }
2537    }
2538
2539    pub(crate) fn add_deltas_subscription(
2540        &mut self,
2541        pattern: MStr<Pattern>,
2542        handler: TypedHandler<OrderBookDeltas>,
2543    ) {
2544        if self.deltas_handlers.contains_key(&pattern) {
2545            log::warn!(
2546                "Actor {} attempted duplicate deltas subscription to '{pattern}'",
2547                self.actor_id
2548            );
2549            return;
2550        }
2551        self.deltas_handlers.insert(pattern, handler.clone());
2552        msgbus::subscribe_book_deltas(pattern, handler, None);
2553    }
2554
2555    #[allow(dead_code)]
2556    pub(crate) fn remove_deltas_subscription(&mut self, pattern: MStr<Pattern>) {
2557        if let Some(handler) = self.deltas_handlers.remove(&pattern) {
2558            msgbus::unsubscribe_book_deltas(pattern, &handler);
2559        }
2560    }
2561
2562    #[allow(dead_code)]
2563    pub(crate) fn add_depth10_subscription(
2564        &mut self,
2565        pattern: MStr<Pattern>,
2566        handler: TypedHandler<OrderBookDepth10>,
2567    ) {
2568        if self.depth10_handlers.contains_key(&pattern) {
2569            log::warn!(
2570                "Actor {} attempted duplicate depth10 subscription to '{pattern}'",
2571                self.actor_id
2572            );
2573            return;
2574        }
2575        self.depth10_handlers.insert(pattern, handler.clone());
2576        msgbus::subscribe_book_depth10(pattern, handler, None);
2577    }
2578
2579    #[allow(dead_code)]
2580    pub(crate) fn remove_depth10_subscription(&mut self, pattern: MStr<Pattern>) {
2581        if let Some(handler) = self.depth10_handlers.remove(&pattern) {
2582            msgbus::unsubscribe_book_depth10(pattern, &handler);
2583        }
2584    }
2585
2586    pub(crate) fn add_instrument_subscription(
2587        &mut self,
2588        pattern: MStr<Pattern>,
2589        handler: TypedHandler<InstrumentAny>,
2590    ) {
2591        if self.instrument_handlers.contains_key(&pattern) {
2592            log::warn!(
2593                "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2594                self.actor_id
2595            );
2596            return;
2597        }
2598        self.instrument_handlers.insert(pattern, handler.clone());
2599        msgbus::subscribe_instruments(pattern, handler, None);
2600    }
2601
2602    #[allow(dead_code)]
2603    pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2604        if let Some(handler) = self.instrument_handlers.remove(&pattern) {
2605            msgbus::unsubscribe_instruments(pattern, &handler);
2606        }
2607    }
2608
2609    pub(crate) fn add_instrument_close_subscription(
2610        &mut self,
2611        topic: MStr<Topic>,
2612        handler: ShareableMessageHandler,
2613    ) {
2614        let pattern: MStr<Pattern> = topic.into();
2615        if self.topic_handlers.contains_key(&pattern) {
2616            log::warn!(
2617                "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2618                self.actor_id
2619            );
2620            return;
2621        }
2622        self.topic_handlers.insert(pattern, handler.clone());
2623        msgbus::subscribe_any(pattern, handler, None);
2624    }
2625
2626    #[allow(dead_code)]
2627    pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2628        let pattern: MStr<Pattern> = topic.into();
2629        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2630            msgbus::unsubscribe_any(pattern, &handler);
2631        }
2632    }
2633
2634    pub(crate) fn add_book_snapshot_subscription(
2635        &mut self,
2636        topic: MStr<Topic>,
2637        handler: TypedHandler<OrderBook>,
2638    ) {
2639        if self.book_handlers.contains_key(&topic) {
2640            log::warn!(
2641                "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2642                self.actor_id
2643            );
2644            return;
2645        }
2646        self.book_handlers.insert(topic, handler.clone());
2647        msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2648    }
2649
2650    #[allow(dead_code)]
2651    pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2652        if let Some(handler) = self.book_handlers.remove(&topic) {
2653            msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2654        }
2655    }
2656
2657    pub(crate) fn add_mark_price_subscription(
2658        &mut self,
2659        topic: MStr<Topic>,
2660        handler: TypedHandler<MarkPriceUpdate>,
2661    ) {
2662        if self.mark_price_handlers.contains_key(&topic) {
2663            log::warn!(
2664                "Actor {} attempted duplicate mark price subscription to '{topic}'",
2665                self.actor_id
2666            );
2667            return;
2668        }
2669        self.mark_price_handlers.insert(topic, handler.clone());
2670        msgbus::subscribe_mark_prices(topic.into(), handler, None);
2671    }
2672
2673    #[allow(dead_code)]
2674    pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2675        if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2676            msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2677        }
2678    }
2679
2680    pub(crate) fn add_index_price_subscription(
2681        &mut self,
2682        topic: MStr<Topic>,
2683        handler: TypedHandler<IndexPriceUpdate>,
2684    ) {
2685        if self.index_price_handlers.contains_key(&topic) {
2686            log::warn!(
2687                "Actor {} attempted duplicate index price subscription to '{topic}'",
2688                self.actor_id
2689            );
2690            return;
2691        }
2692        self.index_price_handlers.insert(topic, handler.clone());
2693        msgbus::subscribe_index_prices(topic.into(), handler, None);
2694    }
2695
2696    #[allow(dead_code)]
2697    pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2698        if let Some(handler) = self.index_price_handlers.remove(&topic) {
2699            msgbus::unsubscribe_index_prices(topic.into(), &handler);
2700        }
2701    }
2702
2703    pub(crate) fn add_funding_rate_subscription(
2704        &mut self,
2705        topic: MStr<Topic>,
2706        handler: TypedHandler<FundingRateUpdate>,
2707    ) {
2708        if self.funding_rate_handlers.contains_key(&topic) {
2709            log::warn!(
2710                "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2711                self.actor_id
2712            );
2713            return;
2714        }
2715        self.funding_rate_handlers.insert(topic, handler.clone());
2716        msgbus::subscribe_funding_rates(topic.into(), handler, None);
2717    }
2718
2719    #[allow(dead_code)]
2720    pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2721        if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2722            msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2723        }
2724    }
2725
2726    pub(crate) fn add_option_greeks_subscription(
2727        &mut self,
2728        topic: MStr<Topic>,
2729        handler: TypedHandler<OptionGreeks>,
2730    ) {
2731        if self.option_greeks_handlers.contains_key(&topic) {
2732            log::warn!(
2733                "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2734                self.actor_id
2735            );
2736            return;
2737        }
2738        self.option_greeks_handlers.insert(topic, handler.clone());
2739        msgbus::subscribe_option_greeks(topic.into(), handler, None);
2740    }
2741
2742    #[allow(dead_code)]
2743    pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2744        if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2745            msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2746        }
2747    }
2748
2749    pub(crate) fn add_option_chain_subscription(
2750        &mut self,
2751        topic: MStr<Topic>,
2752        handler: TypedHandler<OptionChainSlice>,
2753    ) {
2754        if self.option_chain_handlers.contains_key(&topic) {
2755            log::warn!(
2756                "Actor {} attempted duplicate option chain subscription to '{topic}'",
2757                self.actor_id
2758            );
2759            return;
2760        }
2761        self.option_chain_handlers.insert(topic, handler.clone());
2762        msgbus::subscribe_option_chain(topic.into(), handler, None);
2763    }
2764
2765    pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2766        if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2767            msgbus::unsubscribe_option_chain(topic.into(), &handler);
2768        }
2769    }
2770
2771    #[cfg(feature = "defi")]
2772    pub(crate) fn add_block_subscription(
2773        &mut self,
2774        topic: MStr<Topic>,
2775        handler: TypedHandler<Block>,
2776    ) {
2777        if self.block_handlers.contains_key(&topic) {
2778            log::warn!(
2779                "Actor {} attempted duplicate block subscription to '{topic}'",
2780                self.actor_id
2781            );
2782            return;
2783        }
2784        self.block_handlers.insert(topic, handler.clone());
2785        msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2786    }
2787
2788    #[cfg(feature = "defi")]
2789    #[allow(dead_code)]
2790    pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2791        if let Some(handler) = self.block_handlers.remove(&topic) {
2792            msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2793        }
2794    }
2795
2796    #[cfg(feature = "defi")]
2797    pub(crate) fn add_pool_subscription(
2798        &mut self,
2799        topic: MStr<Topic>,
2800        handler: TypedHandler<Pool>,
2801    ) {
2802        if self.pool_handlers.contains_key(&topic) {
2803            log::warn!(
2804                "Actor {} attempted duplicate pool subscription to '{topic}'",
2805                self.actor_id
2806            );
2807            return;
2808        }
2809        self.pool_handlers.insert(topic, handler.clone());
2810        msgbus::subscribe_defi_pools(topic.into(), handler, None);
2811    }
2812
2813    #[cfg(feature = "defi")]
2814    #[allow(dead_code)]
2815    pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2816        if let Some(handler) = self.pool_handlers.remove(&topic) {
2817            msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2818        }
2819    }
2820
2821    #[cfg(feature = "defi")]
2822    pub(crate) fn add_pool_swap_subscription(
2823        &mut self,
2824        topic: MStr<Topic>,
2825        handler: TypedHandler<PoolSwap>,
2826    ) {
2827        if self.pool_swap_handlers.contains_key(&topic) {
2828            log::warn!(
2829                "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2830                self.actor_id
2831            );
2832            return;
2833        }
2834        self.pool_swap_handlers.insert(topic, handler.clone());
2835        msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2836    }
2837
2838    #[cfg(feature = "defi")]
2839    #[allow(dead_code)]
2840    pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2841        if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2842            msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2843        }
2844    }
2845
2846    #[cfg(feature = "defi")]
2847    pub(crate) fn add_pool_liquidity_subscription(
2848        &mut self,
2849        topic: MStr<Topic>,
2850        handler: TypedHandler<PoolLiquidityUpdate>,
2851    ) {
2852        if self.pool_liquidity_handlers.contains_key(&topic) {
2853            log::warn!(
2854                "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2855                self.actor_id
2856            );
2857            return;
2858        }
2859        self.pool_liquidity_handlers.insert(topic, handler.clone());
2860        msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2861    }
2862
2863    #[cfg(feature = "defi")]
2864    #[allow(dead_code)]
2865    pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2866        if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2867            msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2868        }
2869    }
2870
2871    #[cfg(feature = "defi")]
2872    pub(crate) fn add_pool_collect_subscription(
2873        &mut self,
2874        topic: MStr<Topic>,
2875        handler: TypedHandler<PoolFeeCollect>,
2876    ) {
2877        if self.pool_collect_handlers.contains_key(&topic) {
2878            log::warn!(
2879                "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2880                self.actor_id
2881            );
2882            return;
2883        }
2884        self.pool_collect_handlers.insert(topic, handler.clone());
2885        msgbus::subscribe_defi_collects(topic.into(), handler, None);
2886    }
2887
2888    #[cfg(feature = "defi")]
2889    #[allow(dead_code)]
2890    pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2891        if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2892            msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2893        }
2894    }
2895
2896    #[cfg(feature = "defi")]
2897    pub(crate) fn add_pool_flash_subscription(
2898        &mut self,
2899        topic: MStr<Topic>,
2900        handler: TypedHandler<PoolFlash>,
2901    ) {
2902        if self.pool_flash_handlers.contains_key(&topic) {
2903            log::warn!(
2904                "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2905                self.actor_id
2906            );
2907            return;
2908        }
2909        self.pool_flash_handlers.insert(topic, handler.clone());
2910        msgbus::subscribe_defi_flash(topic.into(), handler, None);
2911    }
2912
2913    #[cfg(feature = "defi")]
2914    #[allow(dead_code)]
2915    pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2916        if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2917            msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2918        }
2919    }
2920
2921    /// Creates a new [`DataActorCore`] instance.
2922    pub fn new(config: DataActorConfig) -> Self {
2923        let actor_id = config
2924            .actor_id
2925            .unwrap_or_else(|| Self::default_actor_id(&config));
2926
2927        Self {
2928            actor_id,
2929            config,
2930            trader_id: None, // None until registered
2931            clock: None,     // None until registered
2932            cache: None,     // None until registered
2933            state: ComponentState::default(),
2934            topic_handlers: AHashMap::new(),
2935            instrument_handlers: AHashMap::new(),
2936            deltas_handlers: AHashMap::new(),
2937            depth10_handlers: AHashMap::new(),
2938            book_handlers: AHashMap::new(),
2939            quote_handlers: AHashMap::new(),
2940            trade_handlers: AHashMap::new(),
2941            bar_handlers: AHashMap::new(),
2942            mark_price_handlers: AHashMap::new(),
2943            index_price_handlers: AHashMap::new(),
2944            funding_rate_handlers: AHashMap::new(),
2945            option_greeks_handlers: AHashMap::new(),
2946            option_chain_handlers: AHashMap::new(),
2947            order_event_handlers: AHashMap::new(),
2948            #[cfg(feature = "defi")]
2949            block_handlers: AHashMap::new(),
2950            #[cfg(feature = "defi")]
2951            pool_handlers: AHashMap::new(),
2952            #[cfg(feature = "defi")]
2953            pool_swap_handlers: AHashMap::new(),
2954            #[cfg(feature = "defi")]
2955            pool_liquidity_handlers: AHashMap::new(),
2956            #[cfg(feature = "defi")]
2957            pool_collect_handlers: AHashMap::new(),
2958            #[cfg(feature = "defi")]
2959            pool_flash_handlers: AHashMap::new(),
2960            warning_events: AHashSet::new(),
2961            pending_requests: AHashMap::new(),
2962            signal_classes: AHashMap::new(),
2963            #[cfg(feature = "indicators")]
2964            indicators: Indicators::default(),
2965        }
2966    }
2967
2968    /// Returns the memory address of this instance as a hexadecimal string.
2969    #[must_use]
2970    pub fn mem_address(&self) -> String {
2971        format!("{self:p}")
2972    }
2973
2974    /// Returns the actors state.
2975    pub fn state(&self) -> ComponentState {
2976        self.state
2977    }
2978
2979    /// Returns the trader ID this actor is registered to.
2980    pub fn trader_id(&self) -> Option<TraderId> {
2981        self.trader_id
2982    }
2983
2984    /// Returns the actors ID.
2985    pub fn actor_id(&self) -> ActorId {
2986        self.actor_id
2987    }
2988
2989    fn default_actor_id(config: &DataActorConfig) -> ActorId {
2990        let memory_address = std::ptr::from_ref(config) as usize;
2991        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2992    }
2993
2994    /// Returns a UNIX nanoseconds timestamp from the actor's internal clock.
2995    pub fn timestamp_ns(&self) -> UnixNanos {
2996        self.clock_ref().timestamp_ns()
2997    }
2998
2999    /// Returns the clock for the actor (if registered).
3000    ///
3001    /// # Panics
3002    ///
3003    /// Panics if the actor has not been registered with a trader.
3004    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
3005        self.clock
3006            .as_ref()
3007            .unwrap_or_else(|| {
3008                panic!(
3009                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
3010                    self.actor_id, self.trader_id
3011                )
3012            })
3013            .borrow_mut()
3014    }
3015
3016    /// Returns a clone of the reference-counted clock.
3017    ///
3018    /// # Panics
3019    ///
3020    /// Panics if the actor has not yet been registered (clock is `None`).
3021    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
3022        self.clock
3023            .as_ref()
3024            .expect("DataActor must be registered before accessing clock")
3025            .clone()
3026    }
3027
3028    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
3029        self.clock
3030            .as_ref()
3031            .unwrap_or_else(|| {
3032                panic!(
3033                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
3034                    self.actor_id, self.trader_id
3035                )
3036            })
3037            .borrow()
3038    }
3039
3040    /// Returns a read-only reference to the cache.
3041    ///
3042    /// # Panics
3043    ///
3044    /// Panics if the actor has not yet been registered (cache is `None`).
3045    pub fn cache(&self) -> Ref<'_, Cache> {
3046        self.cache
3047            .as_ref()
3048            .expect("DataActor must be registered before accessing cache")
3049            .borrow()
3050    }
3051
3052    /// Returns a clone of the reference-counted cache.
3053    ///
3054    /// # Panics
3055    ///
3056    /// Panics if the actor has not yet been registered (cache is `None`).
3057    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
3058        self.cache
3059            .as_ref()
3060            .expect("DataActor must be registered before accessing cache")
3061            .clone()
3062    }
3063
3064    /// Register the data actor with a trader.
3065    ///
3066    /// # Errors
3067    ///
3068    /// Returns an error if the actor has already been registered with a trader
3069    /// or if the provided dependencies are invalid.
3070    pub fn register(
3071        &mut self,
3072        trader_id: TraderId,
3073        clock: Rc<RefCell<dyn Clock>>,
3074        cache: Rc<RefCell<Cache>>,
3075    ) -> anyhow::Result<()> {
3076        if let Some(existing_trader_id) = self.trader_id {
3077            anyhow::bail!(
3078                "DataActor {} already registered with trader {existing_trader_id}",
3079                self.actor_id
3080            );
3081        }
3082
3083        // Validate clock by attempting to access it
3084        {
3085            let _timestamp = clock.borrow().timestamp_ns();
3086        }
3087
3088        // Validate cache by attempting to access it
3089        {
3090            let _cache_borrow = cache.borrow();
3091        }
3092
3093        self.trader_id = Some(trader_id);
3094        self.clock = Some(clock);
3095        self.cache = Some(cache);
3096
3097        // Verify complete registration
3098        if !self.is_properly_registered() {
3099            anyhow::bail!(
3100                "DataActor {} registration incomplete - validation failed",
3101                self.actor_id
3102            );
3103        }
3104
3105        log::debug!("Registered {} with trader {trader_id}", self.actor_id);
3106        Ok(())
3107    }
3108
3109    /// Register an event type for warning log levels.
3110    pub fn register_warning_event(&mut self, event_type: &str) {
3111        self.warning_events.insert(event_type.to_string());
3112        log::debug!("Registered event type '{event_type}' for warning logs");
3113    }
3114
3115    /// Deregister an event type from warning log levels.
3116    pub fn deregister_warning_event(&mut self, event_type: &str) {
3117        self.warning_events.remove(event_type);
3118        log::debug!("Deregistered event type '{event_type}' from warning logs");
3119    }
3120
3121    pub fn is_registered(&self) -> bool {
3122        self.trader_id.is_some()
3123    }
3124
3125    pub(crate) fn check_registered(&self) {
3126        assert!(
3127            self.is_registered(),
3128            "Actor has not been registered with a Trader"
3129        );
3130    }
3131
3132    /// Validates registration state without panicking.
3133    fn is_properly_registered(&self) -> bool {
3134        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
3135    }
3136
3137    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
3138        if self.config.log_commands {
3139            log::info!("{CMD}{SEND} {command:?}");
3140        }
3141
3142        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3143        msgbus::send_data_command(endpoint, command);
3144    }
3145
3146    #[allow(dead_code)]
3147    fn send_data_req(&self, request: &RequestCommand) {
3148        if self.config.log_commands {
3149            log::info!("{REQ}{SEND} {request:?}");
3150        }
3151
3152        // For now, simplified approach - data requests without dynamic handlers
3153        // TODO: Implement proper dynamic dispatch for response handlers
3154        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3155        msgbus::send_any(endpoint, request.as_any());
3156    }
3157
3158    /// Sends a shutdown command to the system with an optional reason.
3159    ///
3160    /// # Panics
3161    ///
3162    /// Panics if the actor is not registered or has no trader ID.
3163    pub fn shutdown_system(&self, reason: Option<String>) {
3164        self.check_registered();
3165
3166        // Checked registered before unwrapping trader ID
3167        let command = ShutdownSystem::new(
3168            self.trader_id().unwrap(),
3169            self.actor_id.inner(),
3170            reason,
3171            UUID4::new(),
3172            self.timestamp_ns(),
3173            None, // correlation_id
3174        );
3175
3176        let topic = MessagingSwitchboard::shutdown_system_topic();
3177        msgbus::publish_any(topic, command.as_any());
3178    }
3179
3180    /// Publishes `data` on the message bus under the topic derived from `data_type`.
3181    ///
3182    /// `data_type` is kept as an explicit parameter (rather than deriving it from
3183    /// `data.data_type`) to mirror the v1 Python `publish_data(data_type, data)` API and
3184    /// to allow callers to override the routing topic from the payload's intrinsic type.
3185    ///
3186    /// # Panics
3187    ///
3188    /// Panics if the actor is not registered with a trader.
3189    pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3190        self.check_registered();
3191
3192        let topic = get_custom_topic(data_type);
3193        msgbus::publish_any(topic, data);
3194    }
3195
3196    /// Publishes a [`Signal`] constructed from `name` and `value`, wrapped in [`CustomData`]
3197    /// so it is consumed by signal subscribers and by any `CustomData`-aware pipeline
3198    /// (for example the feather persistence writer).
3199    ///
3200    /// The topic mirrors the v1 Python scheme `data.Signal<TitleName>` so subscribers
3201    /// using either a specific name or the global wildcard are both notified.
3202    /// If `ts_event` is zero then the current clock timestamp is used.
3203    ///
3204    /// # Panics
3205    ///
3206    /// Panics if the actor is not registered with a trader.
3207    pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3208        self.check_registered();
3209
3210        let now = self.timestamp_ns();
3211        let ts_event = if ts_event.as_u64() == 0 {
3212            now
3213        } else {
3214            ts_event
3215        };
3216        let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3217
3218        let data_type = DataType::new(
3219            &format!(
3220                "Signal{}",
3221                nautilus_core::string::conversions::title_case(name)
3222            ),
3223            None,
3224            None,
3225        );
3226        let data = CustomData::new(Arc::new(signal), data_type);
3227        let topic = get_custom_topic(&data.data_type);
3228        msgbus::publish_any(topic, &data);
3229    }
3230
3231    /// Adds the `synthetic` instrument to the cache.
3232    ///
3233    /// # Errors
3234    ///
3235    /// Returns an error if a synthetic with the same ID already exists, or if the
3236    /// backing cache fails to persist it. Panics if the actor is not registered
3237    /// with a trader. // panics-doc-ok
3238    pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3239        self.check_registered();
3240
3241        let cache = self.cache_rc();
3242        if cache.borrow().synthetic(&synthetic.id).is_some() {
3243            anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3244        }
3245        cache.borrow_mut().add_synthetic(synthetic)
3246    }
3247
3248    /// Updates the `synthetic` instrument in the cache, replacing the existing entry.
3249    ///
3250    /// # Errors
3251    ///
3252    /// Returns an error if no synthetic with the same ID already exists, or if the
3253    /// backing cache fails to persist the replacement. Panics if the actor is not
3254    /// registered with a trader. // panics-doc-ok
3255    pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3256        self.check_registered();
3257
3258        let cache = self.cache_rc();
3259        if cache.borrow().synthetic(&synthetic.id).is_none() {
3260            anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3261        }
3262        cache.borrow_mut().add_synthetic(synthetic)
3263    }
3264
3265    /// Helper method for registering data subscriptions from the trait.
3266    ///
3267    /// # Panics
3268    ///
3269    /// Panics if the actor is not properly registered.
3270    pub fn subscribe_data(
3271        &mut self,
3272        handler: ShareableMessageHandler,
3273        data_type: DataType,
3274        client_id: Option<ClientId>,
3275        params: Option<Params>,
3276    ) {
3277        assert!(
3278            self.is_properly_registered(),
3279            "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3280            self.actor_id,
3281            self.trader_id,
3282            self.clock.is_some(),
3283            self.cache.is_some()
3284        );
3285
3286        let topic = get_custom_topic(&data_type);
3287        self.add_subscription_any(topic, handler);
3288
3289        // If no client ID specified, just subscribe to the topic
3290        if client_id.is_none() {
3291            return;
3292        }
3293
3294        let command = SubscribeCommand::Data(SubscribeCustomData {
3295            data_type,
3296            client_id,
3297            venue: None,
3298            command_id: UUID4::new(),
3299            ts_init: self.timestamp_ns(),
3300            correlation_id: None,
3301            params,
3302        });
3303
3304        self.send_data_cmd(DataCommand::Subscribe(command));
3305    }
3306
3307    /// Helper method for registering signal subscriptions from the trait.
3308    ///
3309    /// An empty `name` subscribes to every signal via the `data.Signal*` wildcard pattern.
3310    ///
3311    /// # Panics
3312    ///
3313    /// Panics if the actor is not registered with a trader.
3314    pub fn subscribe_signal(
3315        &mut self,
3316        handler: ShareableMessageHandler,
3317        name: &str,
3318        priority: Option<u32>,
3319    ) {
3320        self.check_registered();
3321
3322        let pattern = get_signal_pattern(name);
3323        if self.topic_handlers.contains_key(&pattern) {
3324            log::warn!(
3325                "Actor {} attempted duplicate signal subscription to '{pattern}'",
3326                self.actor_id,
3327            );
3328            return;
3329        }
3330        self.topic_handlers.insert(pattern, handler.clone());
3331        msgbus::subscribe_any(pattern, handler, priority);
3332    }
3333
3334    /// Helper method for registering quotes subscriptions from the trait.
3335    pub fn subscribe_quotes(
3336        &mut self,
3337        topic: MStr<Topic>,
3338        handler: TypedHandler<QuoteTick>,
3339        instrument_id: InstrumentId,
3340        client_id: Option<ClientId>,
3341        params: Option<Params>,
3342    ) {
3343        self.check_registered();
3344
3345        self.add_quote_subscription(topic, handler);
3346
3347        let command = SubscribeCommand::Quotes(SubscribeQuotes {
3348            instrument_id,
3349            client_id,
3350            venue: Some(instrument_id.venue),
3351            command_id: UUID4::new(),
3352            ts_init: self.timestamp_ns(),
3353            correlation_id: None,
3354            params,
3355        });
3356
3357        self.send_data_cmd(DataCommand::Subscribe(command));
3358    }
3359
3360    /// Helper method for registering instruments subscriptions from the trait.
3361    pub fn subscribe_instruments(
3362        &mut self,
3363        pattern: MStr<Pattern>,
3364        handler: TypedHandler<InstrumentAny>,
3365        venue: Venue,
3366        client_id: Option<ClientId>,
3367        params: Option<Params>,
3368    ) {
3369        self.check_registered();
3370
3371        self.add_instrument_subscription(pattern, handler);
3372
3373        let command = SubscribeCommand::Instruments(SubscribeInstruments {
3374            client_id,
3375            venue,
3376            command_id: UUID4::new(),
3377            ts_init: self.timestamp_ns(),
3378            correlation_id: None,
3379            params,
3380        });
3381
3382        self.send_data_cmd(DataCommand::Subscribe(command));
3383    }
3384
3385    /// Helper method for registering instrument subscriptions from the trait.
3386    pub fn subscribe_instrument(
3387        &mut self,
3388        topic: MStr<Topic>,
3389        handler: TypedHandler<InstrumentAny>,
3390        instrument_id: InstrumentId,
3391        client_id: Option<ClientId>,
3392        params: Option<Params>,
3393    ) {
3394        self.check_registered();
3395
3396        self.add_instrument_subscription(topic.into(), handler);
3397
3398        let command = SubscribeCommand::Instrument(SubscribeInstrument {
3399            instrument_id,
3400            client_id,
3401            venue: Some(instrument_id.venue),
3402            command_id: UUID4::new(),
3403            ts_init: self.timestamp_ns(),
3404            correlation_id: None,
3405            params,
3406        });
3407
3408        self.send_data_cmd(DataCommand::Subscribe(command));
3409    }
3410
3411    /// Helper method for registering book deltas subscriptions from the trait.
3412    #[expect(clippy::too_many_arguments)]
3413    pub fn subscribe_book_deltas(
3414        &mut self,
3415        pattern: MStr<Pattern>,
3416        handler: TypedHandler<OrderBookDeltas>,
3417        instrument_id: InstrumentId,
3418        book_type: BookType,
3419        depth: Option<NonZeroUsize>,
3420        client_id: Option<ClientId>,
3421        managed: bool,
3422        params: Option<Params>,
3423    ) {
3424        self.check_registered();
3425
3426        self.add_deltas_subscription(pattern, handler);
3427
3428        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3429            instrument_id,
3430            book_type,
3431            client_id,
3432            venue: Some(instrument_id.venue),
3433            command_id: UUID4::new(),
3434            ts_init: self.timestamp_ns(),
3435            depth,
3436            managed,
3437            correlation_id: None,
3438            params,
3439        });
3440
3441        self.send_data_cmd(DataCommand::Subscribe(command));
3442    }
3443
3444    /// Helper method for registering book snapshots subscriptions from the trait.
3445    #[expect(clippy::too_many_arguments)]
3446    pub fn subscribe_book_at_interval(
3447        &mut self,
3448        topic: MStr<Topic>,
3449        handler: TypedHandler<OrderBook>,
3450        instrument_id: InstrumentId,
3451        book_type: BookType,
3452        depth: Option<NonZeroUsize>,
3453        interval_ms: NonZeroUsize,
3454        client_id: Option<ClientId>,
3455        params: Option<Params>,
3456    ) {
3457        self.check_registered();
3458
3459        self.add_book_snapshot_subscription(topic, handler);
3460
3461        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3462            instrument_id,
3463            book_type,
3464            client_id,
3465            venue: Some(instrument_id.venue),
3466            command_id: UUID4::new(),
3467            ts_init: self.timestamp_ns(),
3468            depth,
3469            interval_ms,
3470            correlation_id: None,
3471            params,
3472        });
3473
3474        self.send_data_cmd(DataCommand::Subscribe(command));
3475    }
3476
3477    /// Helper method for registering trades subscriptions from the trait.
3478    pub fn subscribe_trades(
3479        &mut self,
3480        topic: MStr<Topic>,
3481        handler: TypedHandler<TradeTick>,
3482        instrument_id: InstrumentId,
3483        client_id: Option<ClientId>,
3484        params: Option<Params>,
3485    ) {
3486        self.check_registered();
3487
3488        self.add_trade_subscription(topic, handler);
3489
3490        let command = SubscribeCommand::Trades(SubscribeTrades {
3491            instrument_id,
3492            client_id,
3493            venue: Some(instrument_id.venue),
3494            command_id: UUID4::new(),
3495            ts_init: self.timestamp_ns(),
3496            correlation_id: None,
3497            params,
3498        });
3499
3500        self.send_data_cmd(DataCommand::Subscribe(command));
3501    }
3502
3503    /// Helper method for registering bars subscriptions from the trait.
3504    pub fn subscribe_bars(
3505        &mut self,
3506        topic: MStr<Topic>,
3507        handler: TypedHandler<Bar>,
3508        bar_type: BarType,
3509        client_id: Option<ClientId>,
3510        params: Option<Params>,
3511    ) {
3512        self.check_registered();
3513
3514        self.add_bar_subscription(topic, handler);
3515
3516        let command = SubscribeCommand::Bars(SubscribeBars {
3517            bar_type,
3518            client_id,
3519            venue: Some(bar_type.instrument_id().venue),
3520            command_id: UUID4::new(),
3521            ts_init: self.timestamp_ns(),
3522            correlation_id: None,
3523            params,
3524        });
3525
3526        self.send_data_cmd(DataCommand::Subscribe(command));
3527    }
3528
3529    /// Helper method for registering mark prices subscriptions from the trait.
3530    pub fn subscribe_mark_prices(
3531        &mut self,
3532        topic: MStr<Topic>,
3533        handler: TypedHandler<MarkPriceUpdate>,
3534        instrument_id: InstrumentId,
3535        client_id: Option<ClientId>,
3536        params: Option<Params>,
3537    ) {
3538        self.check_registered();
3539
3540        self.add_mark_price_subscription(topic, handler);
3541
3542        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3543            instrument_id,
3544            client_id,
3545            venue: Some(instrument_id.venue),
3546            command_id: UUID4::new(),
3547            ts_init: self.timestamp_ns(),
3548            correlation_id: None,
3549            params,
3550        });
3551
3552        self.send_data_cmd(DataCommand::Subscribe(command));
3553    }
3554
3555    /// Helper method for registering index prices subscriptions from the trait.
3556    pub fn subscribe_index_prices(
3557        &mut self,
3558        topic: MStr<Topic>,
3559        handler: TypedHandler<IndexPriceUpdate>,
3560        instrument_id: InstrumentId,
3561        client_id: Option<ClientId>,
3562        params: Option<Params>,
3563    ) {
3564        self.check_registered();
3565
3566        self.add_index_price_subscription(topic, handler);
3567
3568        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3569            instrument_id,
3570            client_id,
3571            venue: Some(instrument_id.venue),
3572            command_id: UUID4::new(),
3573            ts_init: self.timestamp_ns(),
3574            correlation_id: None,
3575            params,
3576        });
3577
3578        self.send_data_cmd(DataCommand::Subscribe(command));
3579    }
3580
3581    /// Helper method for registering funding rates subscriptions from the trait.
3582    pub fn subscribe_funding_rates(
3583        &mut self,
3584        topic: MStr<Topic>,
3585        handler: TypedHandler<FundingRateUpdate>,
3586        instrument_id: InstrumentId,
3587        client_id: Option<ClientId>,
3588        params: Option<Params>,
3589    ) {
3590        self.check_registered();
3591
3592        self.add_funding_rate_subscription(topic, handler);
3593
3594        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3595            instrument_id,
3596            client_id,
3597            venue: Some(instrument_id.venue),
3598            command_id: UUID4::new(),
3599            ts_init: self.timestamp_ns(),
3600            correlation_id: None,
3601            params,
3602        });
3603
3604        self.send_data_cmd(DataCommand::Subscribe(command));
3605    }
3606
3607    /// Helper method for registering option greeks subscriptions from the trait.
3608    pub fn subscribe_option_greeks(
3609        &mut self,
3610        topic: MStr<Topic>,
3611        handler: TypedHandler<OptionGreeks>,
3612        instrument_id: InstrumentId,
3613        client_id: Option<ClientId>,
3614        params: Option<Params>,
3615    ) {
3616        self.check_registered();
3617
3618        self.add_option_greeks_subscription(topic, handler);
3619
3620        let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3621            instrument_id,
3622            client_id,
3623            venue: Some(instrument_id.venue),
3624            command_id: UUID4::new(),
3625            ts_init: self.timestamp_ns(),
3626            correlation_id: None,
3627            params,
3628        });
3629
3630        self.send_data_cmd(DataCommand::Subscribe(command));
3631    }
3632
3633    /// Helper method for registering instrument status subscriptions from the trait.
3634    pub fn subscribe_instrument_status(
3635        &mut self,
3636        topic: MStr<Topic>,
3637        handler: ShareableMessageHandler,
3638        instrument_id: InstrumentId,
3639        client_id: Option<ClientId>,
3640        params: Option<Params>,
3641    ) {
3642        self.check_registered();
3643
3644        self.add_subscription_any(topic, handler);
3645
3646        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3647            instrument_id,
3648            client_id,
3649            venue: Some(instrument_id.venue),
3650            command_id: UUID4::new(),
3651            ts_init: self.timestamp_ns(),
3652            correlation_id: None,
3653            params,
3654        });
3655
3656        self.send_data_cmd(DataCommand::Subscribe(command));
3657    }
3658
3659    /// Helper method for registering instrument close subscriptions from the trait.
3660    pub fn subscribe_instrument_close(
3661        &mut self,
3662        topic: MStr<Topic>,
3663        handler: ShareableMessageHandler,
3664        instrument_id: InstrumentId,
3665        client_id: Option<ClientId>,
3666        params: Option<Params>,
3667    ) {
3668        self.check_registered();
3669
3670        self.add_instrument_close_subscription(topic, handler);
3671
3672        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3673            instrument_id,
3674            client_id,
3675            venue: Some(instrument_id.venue),
3676            command_id: UUID4::new(),
3677            ts_init: self.timestamp_ns(),
3678            correlation_id: None,
3679            params,
3680        });
3681
3682        self.send_data_cmd(DataCommand::Subscribe(command));
3683    }
3684
3685    /// Helper method for subscribing to option chain snapshots from the trait.
3686    #[allow(clippy::too_many_arguments)]
3687    pub fn subscribe_option_chain(
3688        &mut self,
3689        topic: MStr<Topic>,
3690        handler: TypedHandler<OptionChainSlice>,
3691        series_id: OptionSeriesId,
3692        strike_range: StrikeRange,
3693        snapshot_interval_ms: Option<u64>,
3694        client_id: Option<ClientId>,
3695        params: Option<Params>,
3696    ) {
3697        self.check_registered();
3698
3699        self.add_option_chain_subscription(topic, handler);
3700
3701        let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3702            series_id,
3703            strike_range,
3704            snapshot_interval_ms,
3705            UUID4::new(),
3706            self.timestamp_ns(),
3707            client_id,
3708            Some(series_id.venue),
3709            params,
3710        ));
3711
3712        self.send_data_cmd(DataCommand::Subscribe(command));
3713    }
3714
3715    /// Helper method for registering order fills subscriptions from the trait.
3716    pub fn subscribe_order_fills(
3717        &mut self,
3718        topic: MStr<Topic>,
3719        handler: TypedHandler<OrderEventAny>,
3720    ) {
3721        self.check_registered();
3722        self.add_order_event_subscription(topic, handler);
3723    }
3724
3725    /// Helper method for registering order cancels subscriptions from the trait.
3726    pub fn subscribe_order_cancels(
3727        &mut self,
3728        topic: MStr<Topic>,
3729        handler: TypedHandler<OrderEventAny>,
3730    ) {
3731        self.check_registered();
3732        self.add_order_event_subscription(topic, handler);
3733    }
3734
3735    /// Helper method for unsubscribing from data.
3736    pub fn unsubscribe_data(
3737        &mut self,
3738        data_type: DataType,
3739        client_id: Option<ClientId>,
3740        params: Option<Params>,
3741    ) {
3742        self.check_registered();
3743
3744        let topic = get_custom_topic(&data_type);
3745        self.remove_subscription_any(topic);
3746
3747        if client_id.is_none() {
3748            return;
3749        }
3750
3751        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3752            data_type,
3753            client_id,
3754            venue: None,
3755            command_id: UUID4::new(),
3756            ts_init: self.timestamp_ns(),
3757            correlation_id: None,
3758            params,
3759        });
3760
3761        self.send_data_cmd(DataCommand::Unsubscribe(command));
3762    }
3763
3764    /// Helper method for unsubscribing from signals.
3765    ///
3766    /// # Panics
3767    ///
3768    /// Panics if the actor is not registered with a trader.
3769    pub fn unsubscribe_signal(&mut self, name: &str) {
3770        self.check_registered();
3771
3772        let pattern = get_signal_pattern(name);
3773        if let Some(handler) = self.topic_handlers.remove(&pattern) {
3774            msgbus::unsubscribe_any(pattern, &handler);
3775        } else {
3776            log::warn!(
3777                "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
3778                self.actor_id,
3779            );
3780        }
3781    }
3782
3783    /// Helper method for unsubscribing from instruments.
3784    pub fn unsubscribe_instruments(
3785        &mut self,
3786        venue: Venue,
3787        client_id: Option<ClientId>,
3788        params: Option<Params>,
3789    ) {
3790        self.check_registered();
3791
3792        let pattern = get_instruments_pattern(venue);
3793        self.remove_instrument_subscription(pattern);
3794
3795        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3796            client_id,
3797            venue,
3798            command_id: UUID4::new(),
3799            ts_init: self.timestamp_ns(),
3800            correlation_id: None,
3801            params,
3802        });
3803
3804        self.send_data_cmd(DataCommand::Unsubscribe(command));
3805    }
3806
3807    /// Helper method for unsubscribing from instrument.
3808    pub fn unsubscribe_instrument(
3809        &mut self,
3810        instrument_id: InstrumentId,
3811        client_id: Option<ClientId>,
3812        params: Option<Params>,
3813    ) {
3814        self.check_registered();
3815
3816        let topic = get_instrument_topic(instrument_id);
3817        self.remove_instrument_subscription(topic.into());
3818
3819        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3820            instrument_id,
3821            client_id,
3822            venue: Some(instrument_id.venue),
3823            command_id: UUID4::new(),
3824            ts_init: self.timestamp_ns(),
3825            correlation_id: None,
3826            params,
3827        });
3828
3829        self.send_data_cmd(DataCommand::Unsubscribe(command));
3830    }
3831
3832    /// Helper method for unsubscribing from book deltas.
3833    pub fn unsubscribe_book_deltas(
3834        &mut self,
3835        instrument_id: InstrumentId,
3836        client_id: Option<ClientId>,
3837        params: Option<Params>,
3838    ) {
3839        self.check_registered();
3840
3841        let pattern = if is_parent_subscription(params.as_ref()) {
3842            get_book_deltas_pattern(instrument_id)
3843        } else {
3844            get_book_deltas_topic(instrument_id).into()
3845        };
3846        self.remove_deltas_subscription(pattern);
3847
3848        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3849            instrument_id,
3850            client_id,
3851            venue: Some(instrument_id.venue),
3852            command_id: UUID4::new(),
3853            ts_init: self.timestamp_ns(),
3854            correlation_id: None,
3855            params,
3856        });
3857
3858        self.send_data_cmd(DataCommand::Unsubscribe(command));
3859    }
3860
3861    /// Helper method for unsubscribing from book snapshots at interval.
3862    pub fn unsubscribe_book_at_interval(
3863        &mut self,
3864        instrument_id: InstrumentId,
3865        interval_ms: NonZeroUsize,
3866        client_id: Option<ClientId>,
3867        params: Option<Params>,
3868    ) {
3869        self.check_registered();
3870
3871        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3872        self.remove_book_snapshot_subscription(topic);
3873
3874        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3875            instrument_id,
3876            interval_ms,
3877            client_id,
3878            venue: Some(instrument_id.venue),
3879            command_id: UUID4::new(),
3880            ts_init: self.timestamp_ns(),
3881            correlation_id: None,
3882            params,
3883        });
3884
3885        self.send_data_cmd(DataCommand::Unsubscribe(command));
3886    }
3887
3888    /// Helper method for unsubscribing from quotes.
3889    pub fn unsubscribe_quotes(
3890        &mut self,
3891        instrument_id: InstrumentId,
3892        client_id: Option<ClientId>,
3893        params: Option<Params>,
3894    ) {
3895        self.check_registered();
3896
3897        let topic = get_quotes_topic(instrument_id);
3898        self.remove_quote_subscription(topic);
3899
3900        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3901            instrument_id,
3902            client_id,
3903            venue: Some(instrument_id.venue),
3904            command_id: UUID4::new(),
3905            ts_init: self.timestamp_ns(),
3906            correlation_id: None,
3907            params,
3908        });
3909
3910        self.send_data_cmd(DataCommand::Unsubscribe(command));
3911    }
3912
3913    /// Helper method for unsubscribing from trades.
3914    pub fn unsubscribe_trades(
3915        &mut self,
3916        instrument_id: InstrumentId,
3917        client_id: Option<ClientId>,
3918        params: Option<Params>,
3919    ) {
3920        self.check_registered();
3921
3922        let topic = get_trades_topic(instrument_id);
3923        self.remove_trade_subscription(topic);
3924
3925        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3926            instrument_id,
3927            client_id,
3928            venue: Some(instrument_id.venue),
3929            command_id: UUID4::new(),
3930            ts_init: self.timestamp_ns(),
3931            correlation_id: None,
3932            params,
3933        });
3934
3935        self.send_data_cmd(DataCommand::Unsubscribe(command));
3936    }
3937
3938    /// Helper method for unsubscribing from bars.
3939    pub fn unsubscribe_bars(
3940        &mut self,
3941        bar_type: BarType,
3942        client_id: Option<ClientId>,
3943        params: Option<Params>,
3944    ) {
3945        self.check_registered();
3946
3947        let topic = get_bars_topic(bar_type);
3948        self.remove_bar_subscription(topic);
3949
3950        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3951            bar_type,
3952            client_id,
3953            venue: Some(bar_type.instrument_id().venue),
3954            command_id: UUID4::new(),
3955            ts_init: self.timestamp_ns(),
3956            correlation_id: None,
3957            params,
3958        });
3959
3960        self.send_data_cmd(DataCommand::Unsubscribe(command));
3961    }
3962
3963    /// Helper method for unsubscribing from mark prices.
3964    pub fn unsubscribe_mark_prices(
3965        &mut self,
3966        instrument_id: InstrumentId,
3967        client_id: Option<ClientId>,
3968        params: Option<Params>,
3969    ) {
3970        self.check_registered();
3971
3972        let topic = get_mark_price_topic(instrument_id);
3973        self.remove_mark_price_subscription(topic);
3974
3975        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3976            instrument_id,
3977            client_id,
3978            venue: Some(instrument_id.venue),
3979            command_id: UUID4::new(),
3980            ts_init: self.timestamp_ns(),
3981            correlation_id: None,
3982            params,
3983        });
3984
3985        self.send_data_cmd(DataCommand::Unsubscribe(command));
3986    }
3987
3988    /// Helper method for unsubscribing from index prices.
3989    pub fn unsubscribe_index_prices(
3990        &mut self,
3991        instrument_id: InstrumentId,
3992        client_id: Option<ClientId>,
3993        params: Option<Params>,
3994    ) {
3995        self.check_registered();
3996
3997        let topic = get_index_price_topic(instrument_id);
3998        self.remove_index_price_subscription(topic);
3999
4000        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
4001            instrument_id,
4002            client_id,
4003            venue: Some(instrument_id.venue),
4004            command_id: UUID4::new(),
4005            ts_init: self.timestamp_ns(),
4006            correlation_id: None,
4007            params,
4008        });
4009
4010        self.send_data_cmd(DataCommand::Unsubscribe(command));
4011    }
4012
4013    /// Helper method for unsubscribing from funding rates.
4014    pub fn unsubscribe_funding_rates(
4015        &mut self,
4016        instrument_id: InstrumentId,
4017        client_id: Option<ClientId>,
4018        params: Option<Params>,
4019    ) {
4020        self.check_registered();
4021
4022        let topic = get_funding_rate_topic(instrument_id);
4023        self.remove_funding_rate_subscription(topic);
4024
4025        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
4026            instrument_id,
4027            client_id,
4028            venue: Some(instrument_id.venue),
4029            command_id: UUID4::new(),
4030            ts_init: self.timestamp_ns(),
4031            correlation_id: None,
4032            params,
4033        });
4034
4035        self.send_data_cmd(DataCommand::Unsubscribe(command));
4036    }
4037
4038    /// Helper method for unsubscribing from option greeks.
4039    pub fn unsubscribe_option_greeks(
4040        &mut self,
4041        instrument_id: InstrumentId,
4042        client_id: Option<ClientId>,
4043        params: Option<Params>,
4044    ) {
4045        self.check_registered();
4046
4047        let topic = get_option_greeks_topic(instrument_id);
4048        self.remove_option_greeks_subscription(topic);
4049
4050        let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
4051            instrument_id,
4052            client_id,
4053            venue: Some(instrument_id.venue),
4054            command_id: UUID4::new(),
4055            ts_init: self.timestamp_ns(),
4056            correlation_id: None,
4057            params,
4058        });
4059
4060        self.send_data_cmd(DataCommand::Unsubscribe(command));
4061    }
4062
4063    /// Helper method for unsubscribing from instrument status.
4064    pub fn unsubscribe_instrument_status(
4065        &mut self,
4066        instrument_id: InstrumentId,
4067        client_id: Option<ClientId>,
4068        params: Option<Params>,
4069    ) {
4070        self.check_registered();
4071
4072        let topic = get_instrument_status_topic(instrument_id);
4073        self.remove_subscription_any(topic);
4074
4075        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
4076            instrument_id,
4077            client_id,
4078            venue: Some(instrument_id.venue),
4079            command_id: UUID4::new(),
4080            ts_init: self.timestamp_ns(),
4081            correlation_id: None,
4082            params,
4083        });
4084
4085        self.send_data_cmd(DataCommand::Unsubscribe(command));
4086    }
4087
4088    /// Helper method for unsubscribing from instrument close.
4089    pub fn unsubscribe_instrument_close(
4090        &mut self,
4091        instrument_id: InstrumentId,
4092        client_id: Option<ClientId>,
4093        params: Option<Params>,
4094    ) {
4095        self.check_registered();
4096
4097        let topic = get_instrument_close_topic(instrument_id);
4098        self.remove_instrument_close_subscription(topic);
4099
4100        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
4101            instrument_id,
4102            client_id,
4103            venue: Some(instrument_id.venue),
4104            command_id: UUID4::new(),
4105            ts_init: self.timestamp_ns(),
4106            correlation_id: None,
4107            params,
4108        });
4109
4110        self.send_data_cmd(DataCommand::Unsubscribe(command));
4111    }
4112
4113    /// Helper method for unsubscribing from option chain snapshots.
4114    pub fn unsubscribe_option_chain(
4115        &mut self,
4116        series_id: OptionSeriesId,
4117        client_id: Option<ClientId>,
4118    ) {
4119        self.check_registered();
4120
4121        let topic = get_option_chain_topic(series_id);
4122        self.remove_option_chain_subscription(topic);
4123
4124        let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
4125            series_id,
4126            UUID4::new(),
4127            self.timestamp_ns(),
4128            client_id,
4129            Some(series_id.venue),
4130        ));
4131
4132        self.send_data_cmd(DataCommand::Unsubscribe(command));
4133    }
4134
4135    /// Helper method for unsubscribing from order fills.
4136    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
4137        self.check_registered();
4138
4139        let topic = get_order_fills_topic(instrument_id);
4140        self.remove_order_event_subscription(topic);
4141    }
4142
4143    /// Helper method for unsubscribing from order cancels.
4144    pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
4145        self.check_registered();
4146
4147        let topic = get_order_cancels_topic(instrument_id);
4148        self.remove_order_event_subscription(topic);
4149    }
4150
4151    /// Helper method for requesting data.
4152    ///
4153    /// # Errors
4154    ///
4155    /// Returns an error if input parameters are invalid.
4156    #[expect(clippy::too_many_arguments)]
4157    pub fn request_data(
4158        &self,
4159        data_type: DataType,
4160        client_id: ClientId,
4161        start: Option<DateTime<Utc>>,
4162        end: Option<DateTime<Utc>>,
4163        limit: Option<NonZeroUsize>,
4164        params: Option<Params>,
4165        handler: ShareableMessageHandler,
4166    ) -> anyhow::Result<UUID4> {
4167        self.check_registered();
4168
4169        let now = self.clock_ref().utc_now();
4170        check_timestamps(now, start, end)?;
4171
4172        let request_id = UUID4::new();
4173        let command = RequestCommand::Data(RequestCustomData {
4174            client_id,
4175            data_type,
4176            start,
4177            end,
4178            limit,
4179            request_id,
4180            ts_init: self.timestamp_ns(),
4181            params,
4182        });
4183
4184        get_message_bus()
4185            .borrow_mut()
4186            .register_response_handler(command.request_id(), handler)?;
4187
4188        self.send_data_cmd(DataCommand::Request(command));
4189
4190        Ok(request_id)
4191    }
4192
4193    /// Helper method for requesting instrument.
4194    ///
4195    /// # Errors
4196    ///
4197    /// Returns an error if input parameters are invalid.
4198    pub fn request_instrument(
4199        &self,
4200        instrument_id: InstrumentId,
4201        start: Option<DateTime<Utc>>,
4202        end: Option<DateTime<Utc>>,
4203        client_id: Option<ClientId>,
4204        params: Option<Params>,
4205        handler: ShareableMessageHandler,
4206    ) -> anyhow::Result<UUID4> {
4207        self.check_registered();
4208
4209        let now = self.clock_ref().utc_now();
4210        check_timestamps(now, start, end)?;
4211
4212        let request_id = UUID4::new();
4213        let command = RequestCommand::Instrument(RequestInstrument {
4214            instrument_id,
4215            start,
4216            end,
4217            client_id,
4218            request_id,
4219            ts_init: now.into(),
4220            params,
4221        });
4222
4223        get_message_bus()
4224            .borrow_mut()
4225            .register_response_handler(command.request_id(), handler)?;
4226
4227        self.send_data_cmd(DataCommand::Request(command));
4228
4229        Ok(request_id)
4230    }
4231
4232    /// Helper method for requesting instruments.
4233    ///
4234    /// # Errors
4235    ///
4236    /// Returns an error if input parameters are invalid.
4237    pub fn request_instruments(
4238        &self,
4239        venue: Option<Venue>,
4240        start: Option<DateTime<Utc>>,
4241        end: Option<DateTime<Utc>>,
4242        client_id: Option<ClientId>,
4243        params: Option<Params>,
4244        handler: ShareableMessageHandler,
4245    ) -> anyhow::Result<UUID4> {
4246        self.check_registered();
4247
4248        let now = self.clock_ref().utc_now();
4249        check_timestamps(now, start, end)?;
4250
4251        let request_id = UUID4::new();
4252        let command = RequestCommand::Instruments(RequestInstruments {
4253            venue,
4254            start,
4255            end,
4256            client_id,
4257            request_id,
4258            ts_init: now.into(),
4259            params,
4260        });
4261
4262        get_message_bus()
4263            .borrow_mut()
4264            .register_response_handler(command.request_id(), handler)?;
4265
4266        self.send_data_cmd(DataCommand::Request(command));
4267
4268        Ok(request_id)
4269    }
4270
4271    /// Helper method for requesting book snapshot.
4272    ///
4273    /// # Errors
4274    ///
4275    /// Returns an error if input parameters are invalid.
4276    pub fn request_book_snapshot(
4277        &self,
4278        instrument_id: InstrumentId,
4279        depth: Option<NonZeroUsize>,
4280        client_id: Option<ClientId>,
4281        params: Option<Params>,
4282        handler: ShareableMessageHandler,
4283    ) -> anyhow::Result<UUID4> {
4284        self.check_registered();
4285
4286        let request_id = UUID4::new();
4287        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4288            instrument_id,
4289            depth,
4290            client_id,
4291            request_id,
4292            ts_init: self.timestamp_ns(),
4293            params,
4294        });
4295
4296        get_message_bus()
4297            .borrow_mut()
4298            .register_response_handler(command.request_id(), handler)?;
4299
4300        self.send_data_cmd(DataCommand::Request(command));
4301
4302        Ok(request_id)
4303    }
4304
4305    /// Helper method for requesting book deltas.
4306    ///
4307    /// # Errors
4308    ///
4309    /// Returns an error if input parameters are invalid.
4310    #[expect(clippy::too_many_arguments)]
4311    pub fn request_book_deltas(
4312        &self,
4313        instrument_id: InstrumentId,
4314        start: Option<DateTime<Utc>>,
4315        end: Option<DateTime<Utc>>,
4316        limit: Option<NonZeroUsize>,
4317        client_id: Option<ClientId>,
4318        params: Option<Params>,
4319        handler: ShareableMessageHandler,
4320    ) -> anyhow::Result<UUID4> {
4321        self.check_registered();
4322
4323        let now = self.clock_ref().utc_now();
4324        check_timestamps(now, start, end)?;
4325
4326        let request_id = UUID4::new();
4327        let command = RequestCommand::BookDeltas(RequestBookDeltas {
4328            instrument_id,
4329            start,
4330            end,
4331            limit,
4332            client_id,
4333            request_id,
4334            ts_init: now.into(),
4335            params,
4336        });
4337
4338        get_message_bus()
4339            .borrow_mut()
4340            .register_response_handler(command.request_id(), handler)?;
4341
4342        self.send_data_cmd(DataCommand::Request(command));
4343
4344        Ok(request_id)
4345    }
4346
4347    /// Sends a request for historical book depth.
4348    ///
4349    /// # Errors
4350    ///
4351    /// Returns an error if input parameters are invalid.
4352    #[expect(clippy::too_many_arguments)]
4353    pub fn request_book_depth(
4354        &self,
4355        instrument_id: InstrumentId,
4356        start: Option<DateTime<Utc>>,
4357        end: Option<DateTime<Utc>>,
4358        limit: Option<NonZeroUsize>,
4359        depth: Option<NonZeroUsize>,
4360        client_id: Option<ClientId>,
4361        params: Option<Params>,
4362        handler: ShareableMessageHandler,
4363    ) -> anyhow::Result<UUID4> {
4364        self.check_registered();
4365
4366        let now = self.clock_ref().utc_now();
4367        check_timestamps(now, start, end)?;
4368
4369        let request_id = UUID4::new();
4370        let command = RequestCommand::BookDepth(RequestBookDepth {
4371            instrument_id,
4372            start,
4373            end,
4374            limit,
4375            depth,
4376            client_id,
4377            request_id,
4378            ts_init: now.into(),
4379            params,
4380        });
4381
4382        get_message_bus()
4383            .borrow_mut()
4384            .register_response_handler(command.request_id(), handler)?;
4385
4386        self.send_data_cmd(DataCommand::Request(command));
4387
4388        Ok(request_id)
4389    }
4390
4391    /// Helper method for requesting quotes.
4392    ///
4393    /// # Errors
4394    ///
4395    /// Returns an error if input parameters are invalid.
4396    #[expect(clippy::too_many_arguments)]
4397    pub fn request_quotes(
4398        &self,
4399        instrument_id: InstrumentId,
4400        start: Option<DateTime<Utc>>,
4401        end: Option<DateTime<Utc>>,
4402        limit: Option<NonZeroUsize>,
4403        client_id: Option<ClientId>,
4404        params: Option<Params>,
4405        handler: ShareableMessageHandler,
4406    ) -> anyhow::Result<UUID4> {
4407        self.check_registered();
4408
4409        let now = self.clock_ref().utc_now();
4410        check_timestamps(now, start, end)?;
4411
4412        let request_id = UUID4::new();
4413        let command = RequestCommand::Quotes(RequestQuotes {
4414            instrument_id,
4415            start,
4416            end,
4417            limit,
4418            client_id,
4419            request_id,
4420            ts_init: now.into(),
4421            params,
4422        });
4423
4424        get_message_bus()
4425            .borrow_mut()
4426            .register_response_handler(command.request_id(), handler)?;
4427
4428        self.send_data_cmd(DataCommand::Request(command));
4429
4430        Ok(request_id)
4431    }
4432
4433    /// Helper method for requesting trades.
4434    ///
4435    /// # Errors
4436    ///
4437    /// Returns an error if input parameters are invalid.
4438    #[expect(clippy::too_many_arguments)]
4439    pub fn request_trades(
4440        &self,
4441        instrument_id: InstrumentId,
4442        start: Option<DateTime<Utc>>,
4443        end: Option<DateTime<Utc>>,
4444        limit: Option<NonZeroUsize>,
4445        client_id: Option<ClientId>,
4446        params: Option<Params>,
4447        handler: ShareableMessageHandler,
4448    ) -> anyhow::Result<UUID4> {
4449        self.check_registered();
4450
4451        let now = self.clock_ref().utc_now();
4452        check_timestamps(now, start, end)?;
4453
4454        let request_id = UUID4::new();
4455        let command = RequestCommand::Trades(RequestTrades {
4456            instrument_id,
4457            start,
4458            end,
4459            limit,
4460            client_id,
4461            request_id,
4462            ts_init: now.into(),
4463            params,
4464        });
4465
4466        get_message_bus()
4467            .borrow_mut()
4468            .register_response_handler(command.request_id(), handler)?;
4469
4470        self.send_data_cmd(DataCommand::Request(command));
4471
4472        Ok(request_id)
4473    }
4474
4475    /// Helper method for requesting bars.
4476    ///
4477    /// # Errors
4478    ///
4479    /// Returns an error if input parameters are invalid.
4480    #[expect(clippy::too_many_arguments)]
4481    pub fn request_bars(
4482        &self,
4483        bar_type: BarType,
4484        start: Option<DateTime<Utc>>,
4485        end: Option<DateTime<Utc>>,
4486        limit: Option<NonZeroUsize>,
4487        client_id: Option<ClientId>,
4488        params: Option<Params>,
4489        handler: ShareableMessageHandler,
4490    ) -> anyhow::Result<UUID4> {
4491        self.check_registered();
4492
4493        let now = self.clock_ref().utc_now();
4494        check_timestamps(now, start, end)?;
4495
4496        let request_id = UUID4::new();
4497        let command = RequestCommand::Bars(RequestBars {
4498            bar_type,
4499            start,
4500            end,
4501            limit,
4502            client_id,
4503            request_id,
4504            ts_init: now.into(),
4505            params,
4506        });
4507
4508        get_message_bus()
4509            .borrow_mut()
4510            .register_response_handler(command.request_id(), handler)?;
4511
4512        self.send_data_cmd(DataCommand::Request(command));
4513
4514        Ok(request_id)
4515    }
4516
4517    /// Helper method for requesting funding rates.
4518    ///
4519    /// # Errors
4520    ///
4521    /// Returns an error if input parameters are invalid.
4522    #[expect(clippy::too_many_arguments)]
4523    pub fn request_funding_rates(
4524        &self,
4525        instrument_id: InstrumentId,
4526        start: Option<DateTime<Utc>>,
4527        end: Option<DateTime<Utc>>,
4528        limit: Option<NonZeroUsize>,
4529        client_id: Option<ClientId>,
4530        params: Option<Params>,
4531        handler: ShareableMessageHandler,
4532    ) -> anyhow::Result<UUID4> {
4533        self.check_registered();
4534
4535        let now = self.clock_ref().utc_now();
4536        check_timestamps(now, start, end)?;
4537
4538        let request_id = UUID4::new();
4539        let command = RequestCommand::FundingRates(RequestFundingRates {
4540            instrument_id,
4541            start,
4542            end,
4543            limit,
4544            client_id,
4545            request_id,
4546            ts_init: now.into(),
4547            params,
4548        });
4549
4550        get_message_bus()
4551            .borrow_mut()
4552            .register_response_handler(command.request_id(), handler)?;
4553
4554        self.send_data_cmd(DataCommand::Request(command));
4555
4556        Ok(request_id)
4557    }
4558
4559    #[cfg(test)]
4560    pub fn quote_handler_count(&self) -> usize {
4561        self.quote_handlers.len()
4562    }
4563
4564    #[cfg(test)]
4565    pub fn trade_handler_count(&self) -> usize {
4566        self.trade_handlers.len()
4567    }
4568
4569    #[cfg(test)]
4570    pub fn bar_handler_count(&self) -> usize {
4571        self.bar_handlers.len()
4572    }
4573
4574    #[cfg(test)]
4575    pub fn deltas_handler_count(&self) -> usize {
4576        self.deltas_handlers.len()
4577    }
4578
4579    #[cfg(test)]
4580    pub fn has_quote_handler(&self, topic: &str) -> bool {
4581        self.quote_handlers
4582            .contains_key(&MStr::<Topic>::from(topic))
4583    }
4584
4585    #[cfg(test)]
4586    pub fn has_trade_handler(&self, topic: &str) -> bool {
4587        self.trade_handlers
4588            .contains_key(&MStr::<Topic>::from(topic))
4589    }
4590
4591    #[cfg(test)]
4592    pub fn has_bar_handler(&self, topic: &str) -> bool {
4593        self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4594    }
4595
4596    #[cfg(test)]
4597    pub fn has_deltas_handler(&self, pattern: &str) -> bool {
4598        self.deltas_handlers
4599            .contains_key(&MStr::<Pattern>::from(pattern))
4600    }
4601}
4602
4603fn check_timestamps(
4604    now: DateTime<Utc>,
4605    start: Option<DateTime<Utc>>,
4606    end: Option<DateTime<Utc>>,
4607) -> anyhow::Result<()> {
4608    if let Some(start) = start {
4609        check_predicate_true(start <= now, "start was > now")?;
4610    }
4611
4612    if let Some(end) = end {
4613        check_predicate_true(end <= now, "end was > now")?;
4614    }
4615
4616    if let (Some(start), Some(end)) = (start, end) {
4617        check_predicate_true(start < end, "start was >= end")?;
4618    }
4619
4620    Ok(())
4621}
4622
4623fn log_error(e: &anyhow::Error) {
4624    log::error!("{e}");
4625}
4626
4627fn log_not_running<T>(msg: &T)
4628where
4629    T: Debug,
4630{
4631    log::trace!("Received message when not running - skipping {msg:?}");
4632}
4633
4634fn log_received<T>(msg: &T)
4635where
4636    T: Debug,
4637{
4638    log::debug!("{RECV} {msg:?}");
4639}
4640
4641fn log_received_bulk(kind: &str, correlation_id: &UUID4, records: usize) {
4642    log::debug!("{RECV} {kind} correlation_id={correlation_id} records={records}");
4643}