Skip to main content

nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24    sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39        close::InstrumentClose,
40        option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41    },
42    enums::BookType,
43    events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45    instruments::InstrumentAny,
46    orderbook::OrderBook,
47};
48use ustr::Ustr;
49
50#[cfg(feature = "indicators")]
51use super::indicators::Indicators;
52use super::{
53    Actor,
54    registry::{get_actor_unchecked, try_get_actor_unchecked},
55};
56#[cfg(feature = "defi")]
57use crate::defi;
58#[cfg(feature = "defi")]
59#[allow(unused_imports)]
60use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
61use crate::{
62    cache::Cache,
63    clock::Clock,
64    component::Component,
65    enums::{ComponentState, ComponentTrigger},
66    logging::{CMD, RECV, REQ, SEND},
67    messages::{
68        data::{
69            BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
70            InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
71            RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
72            RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
73            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
74            SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
75            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
76            SubscribeMarkPrices, SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes,
77            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
78            UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
79            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
80            UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
81            UnsubscribeMarkPrices, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
82            UnsubscribeQuotes, UnsubscribeTrades,
83        },
84        system::ShutdownSystem,
85    },
86    msgbus::{
87        self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
88        switchboard::{
89            MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
90            get_custom_topic, get_funding_rate_topic, get_index_price_topic,
91            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
92            get_instruments_pattern, get_mark_price_topic, get_option_chain_topic,
93            get_option_greeks_topic, get_order_cancels_topic, get_order_fills_topic,
94            get_quotes_topic, get_trades_topic,
95        },
96    },
97    signal::Signal,
98    timer::{TimeEvent, TimeEventCallback},
99};
100
101/// Common configuration for [`DataActor`] based components.
102#[derive(Debug, Clone)]
103#[cfg_attr(
104    feature = "python",
105    pyo3::pyclass(
106        module = "nautilus_trader.core.nautilus_pyo3.common",
107        subclass,
108        from_py_object
109    )
110)]
111#[cfg_attr(
112    feature = "python",
113    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
114)]
115pub struct DataActorConfig {
116    /// The custom identifier for the Actor.
117    pub actor_id: Option<ActorId>,
118    /// If events should be logged.
119    pub log_events: bool,
120    /// If commands should be logged.
121    pub log_commands: bool,
122}
123
124impl Default for DataActorConfig {
125    fn default() -> Self {
126        Self {
127            actor_id: None,
128            log_events: true,
129            log_commands: true,
130        }
131    }
132}
133
134/// Configuration for creating actors from importable paths.
135#[derive(Debug, Clone)]
136#[cfg_attr(
137    feature = "python",
138    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
139)]
140#[cfg_attr(
141    feature = "python",
142    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
143)]
144pub struct ImportableActorConfig {
145    /// The fully qualified name of the Actor class.
146    pub actor_path: String,
147    /// The fully qualified name of the Actor config class.
148    pub config_path: String,
149    /// The actor configuration as a dictionary.
150    pub config: HashMap<String, serde_json::Value>,
151}
152
153type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
154
155pub trait DataActor:
156    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
157{
158    /// Actions to be performed when the actor state is saved.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if saving the actor state fails.
163    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
164        Ok(IndexMap::new())
165    }
166
167    /// Actions to be performed when the actor state is loaded.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if loading the actor state fails.
172    #[allow(unused_variables)]
173    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
174        Ok(())
175    }
176
177    /// Actions to be performed on start.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if starting the actor fails.
182    fn on_start(&mut self) -> anyhow::Result<()> {
183        log::warn!(
184            "The `on_start` handler was called when not overridden, \
185            it's expected that any actions required when starting the actor \
186            occur here, such as subscribing/requesting data"
187        );
188        Ok(())
189    }
190
191    /// Actions to be performed on stop.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if stopping the actor fails.
196    fn on_stop(&mut self) -> anyhow::Result<()> {
197        log::warn!(
198            "The `on_stop` handler was called when not overridden, \
199            it's expected that any actions required when stopping the actor \
200            occur here, such as unsubscribing from data",
201        );
202        Ok(())
203    }
204
205    /// Actions to be performed on resume.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if resuming the actor fails.
210    fn on_resume(&mut self) -> anyhow::Result<()> {
211        log::warn!(
212            "The `on_resume` handler was called when not overridden, \
213            it's expected that any actions required when resuming the actor \
214            following a stop occur here"
215        );
216        Ok(())
217    }
218
219    /// Actions to be performed on reset.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if resetting the actor fails.
224    fn on_reset(&mut self) -> anyhow::Result<()> {
225        log::warn!(
226            "The `on_reset` handler was called when not overridden, \
227            it's expected that any actions required when resetting the actor \
228            occur here, such as resetting indicators and other state"
229        );
230        Ok(())
231    }
232
233    /// Actions to be performed on dispose.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if disposing the actor fails.
238    fn on_dispose(&mut self) -> anyhow::Result<()> {
239        Ok(())
240    }
241
242    /// Actions to be performed on degrade.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if degrading the actor fails.
247    fn on_degrade(&mut self) -> anyhow::Result<()> {
248        Ok(())
249    }
250
251    /// Actions to be performed on fault.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if faulting the actor fails.
256    fn on_fault(&mut self) -> anyhow::Result<()> {
257        Ok(())
258    }
259
260    /// Actions to be performed when receiving a time event.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if handling the time event fails.
265    #[allow(unused_variables)]
266    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
267        Ok(())
268    }
269
270    /// Actions to be performed when receiving custom data.
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if handling the data fails.
275    #[allow(unused_variables)]
276    fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
277        Ok(())
278    }
279
280    /// Actions to be performed when receiving a signal.
281    ///
282    /// # Errors
283    ///
284    /// Returns an error if handling the signal fails.
285    #[allow(unused_variables)]
286    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
287        Ok(())
288    }
289
290    /// Actions to be performed when receiving an instrument.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if handling the instrument fails.
295    #[allow(unused_variables)]
296    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
297        Ok(())
298    }
299
300    /// Actions to be performed when receiving order book deltas.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if handling the book deltas fails.
305    #[allow(unused_variables)]
306    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
307        Ok(())
308    }
309
310    /// Actions to be performed when receiving an order book.
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if handling the book fails.
315    #[allow(unused_variables)]
316    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
317        Ok(())
318    }
319
320    /// Actions to be performed when receiving a quote.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if handling the quote fails.
325    #[allow(unused_variables)]
326    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
327        Ok(())
328    }
329
330    /// Actions to be performed when receiving a trade.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if handling the trade fails.
335    #[allow(unused_variables)]
336    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
337        Ok(())
338    }
339
340    /// Actions to be performed when receiving a bar.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if handling the bar fails.
345    #[allow(unused_variables)]
346    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
347        Ok(())
348    }
349
350    /// Actions to be performed when receiving a mark price update.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if handling the mark price update fails.
355    #[allow(unused_variables)]
356    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
357        Ok(())
358    }
359
360    /// Actions to be performed when receiving an index price update.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if handling the index price update fails.
365    #[allow(unused_variables)]
366    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
367        Ok(())
368    }
369
370    /// Actions to be performed when receiving a funding rate update.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if handling the funding rate update fails.
375    #[allow(unused_variables)]
376    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
377        Ok(())
378    }
379
380    /// Actions to be performed when receiving exchange-provided option greeks.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if handling the option greeks fails.
385    #[allow(unused_variables)]
386    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
387        Ok(())
388    }
389
390    /// Actions to be performed when receiving an option chain slice snapshot.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if handling the option chain slice fails.
395    #[allow(unused_variables)]
396    fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
397        Ok(())
398    }
399
400    /// Actions to be performed when receiving an instrument status update.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if handling the instrument status update fails.
405    #[allow(unused_variables)]
406    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
407        Ok(())
408    }
409
410    /// Actions to be performed when receiving an instrument close update.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if handling the instrument close update fails.
415    #[allow(unused_variables)]
416    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
417        Ok(())
418    }
419
420    /// Actions to be performed when receiving an order filled event.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if handling the order filled event fails.
425    #[allow(unused_variables)]
426    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
427        Ok(())
428    }
429
430    /// Actions to be performed when receiving an order canceled event.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if handling the order canceled event fails.
435    #[allow(unused_variables)]
436    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
437        Ok(())
438    }
439
440    #[cfg(feature = "defi")]
441    /// Actions to be performed when receiving a block.
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if handling the block fails.
446    #[allow(unused_variables)]
447    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
448        Ok(())
449    }
450
451    #[cfg(feature = "defi")]
452    /// Actions to be performed when receiving a pool.
453    ///
454    /// # Errors
455    ///
456    /// Returns an error if handling the pool fails.
457    #[allow(unused_variables)]
458    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
459        Ok(())
460    }
461
462    #[cfg(feature = "defi")]
463    /// Actions to be performed when receiving a pool swap.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if handling the pool swap fails.
468    #[allow(unused_variables)]
469    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
470        Ok(())
471    }
472
473    #[cfg(feature = "defi")]
474    /// Actions to be performed when receiving a pool liquidity update.
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if handling the pool liquidity update fails.
479    #[allow(unused_variables)]
480    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
481        Ok(())
482    }
483
484    #[cfg(feature = "defi")]
485    /// Actions to be performed when receiving a pool fee collect event.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if handling the pool fee collect fails.
490    #[allow(unused_variables)]
491    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
492        Ok(())
493    }
494
495    #[cfg(feature = "defi")]
496    /// Actions to be performed when receiving a pool flash event.
497    ///
498    /// # Errors
499    ///
500    /// Returns an error if handling the pool flash fails.
501    #[allow(unused_variables)]
502    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
503        Ok(())
504    }
505
506    /// Actions to be performed when receiving historical data.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if handling the historical data fails.
511    #[allow(unused_variables)]
512    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
513        Ok(())
514    }
515
516    /// Actions to be performed when receiving historical quotes.
517    ///
518    /// # Errors
519    ///
520    /// Returns an error if handling the historical quotes fails.
521    #[allow(unused_variables)]
522    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
523        Ok(())
524    }
525
526    /// Actions to be performed when receiving historical trades.
527    ///
528    /// # Errors
529    ///
530    /// Returns an error if handling the historical trades fails.
531    #[allow(unused_variables)]
532    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
533        Ok(())
534    }
535
536    /// Actions to be performed when receiving historical funding rates.
537    ///
538    /// # Errors
539    ///
540    /// Returns an error if handling the historical funding rates fails.
541    #[allow(unused_variables)]
542    fn on_historical_funding_rates(
543        &mut self,
544        funding_rates: &[FundingRateUpdate],
545    ) -> anyhow::Result<()> {
546        Ok(())
547    }
548
549    /// Actions to be performed when receiving historical bars.
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if handling the historical bars fails.
554    #[allow(unused_variables)]
555    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
556        Ok(())
557    }
558
559    /// Actions to be performed when receiving historical mark prices.
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if handling the historical mark prices fails.
564    #[allow(unused_variables)]
565    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
566        Ok(())
567    }
568
569    /// Actions to be performed when receiving historical index prices.
570    ///
571    /// # Errors
572    ///
573    /// Returns an error if handling the historical index prices fails.
574    #[allow(unused_variables)]
575    fn on_historical_index_prices(
576        &mut self,
577        index_prices: &[IndexPriceUpdate],
578    ) -> anyhow::Result<()> {
579        Ok(())
580    }
581
582    /// Handles a received time event.
583    fn handle_time_event(&mut self, event: &TimeEvent) {
584        log_received(&event);
585
586        if self.not_running() {
587            log_not_running(&event);
588            return;
589        }
590
591        if let Err(e) = DataActor::on_time_event(self, event) {
592            log_error(&e);
593        }
594    }
595
596    /// Handles a received custom data point.
597    fn handle_data(&mut self, data: &CustomData) {
598        log_received(&data);
599
600        if self.not_running() {
601            log_not_running(&data);
602            return;
603        }
604
605        if let Err(e) = self.on_data(data) {
606            log_error(&e);
607        }
608    }
609
610    /// Handles a received signal.
611    fn handle_signal(&mut self, signal: &Signal) {
612        log_received(&signal);
613
614        if self.not_running() {
615            log_not_running(&signal);
616            return;
617        }
618
619        if let Err(e) = self.on_signal(signal) {
620            log_error(&e);
621        }
622    }
623
624    /// Handles a received instrument.
625    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
626        log_received(&instrument);
627
628        if self.not_running() {
629            log_not_running(&instrument);
630            return;
631        }
632
633        if let Err(e) = self.on_instrument(instrument) {
634            log_error(&e);
635        }
636    }
637
638    /// Handles received order book deltas.
639    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
640        log_received(&deltas);
641
642        if self.not_running() {
643            log_not_running(&deltas);
644            return;
645        }
646
647        if let Err(e) = self.on_book_deltas(deltas) {
648            log_error(&e);
649        }
650    }
651
652    /// Handles a received order book reference.
653    fn handle_book(&mut self, book: &OrderBook) {
654        log_received(&book);
655
656        if self.not_running() {
657            log_not_running(&book);
658            return;
659        }
660
661        if let Err(e) = self.on_book(book) {
662            log_error(&e);
663        }
664    }
665
666    /// Handles a received quote.
667    fn handle_quote(&mut self, quote: &QuoteTick) {
668        log_received(&quote);
669
670        if self.not_running() {
671            log_not_running(&quote);
672            return;
673        }
674
675        if let Err(e) = self.on_quote(quote) {
676            log_error(&e);
677        }
678    }
679
680    /// Handles a received trade.
681    fn handle_trade(&mut self, trade: &TradeTick) {
682        log_received(&trade);
683
684        if self.not_running() {
685            log_not_running(&trade);
686            return;
687        }
688
689        if let Err(e) = self.on_trade(trade) {
690            log_error(&e);
691        }
692    }
693
694    /// Handles a receiving bar.
695    fn handle_bar(&mut self, bar: &Bar) {
696        log_received(&bar);
697
698        if self.not_running() {
699            log_not_running(&bar);
700            return;
701        }
702
703        if let Err(e) = self.on_bar(bar) {
704            log_error(&e);
705        }
706    }
707
708    /// Handles a received mark price update.
709    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
710        log_received(&mark_price);
711
712        if self.not_running() {
713            log_not_running(&mark_price);
714            return;
715        }
716
717        if let Err(e) = self.on_mark_price(mark_price) {
718            log_error(&e);
719        }
720    }
721
722    /// Handles a received index price update.
723    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
724        log_received(&index_price);
725
726        if self.not_running() {
727            log_not_running(&index_price);
728            return;
729        }
730
731        if let Err(e) = self.on_index_price(index_price) {
732            log_error(&e);
733        }
734    }
735
736    /// Handles a received funding rate update.
737    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
738        log_received(&funding_rate);
739
740        if self.not_running() {
741            log_not_running(&funding_rate);
742            return;
743        }
744
745        if let Err(e) = self.on_funding_rate(funding_rate) {
746            log_error(&e);
747        }
748    }
749
750    /// Handles a received option greeks update.
751    fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
752        log_received(&greeks);
753
754        if self.not_running() {
755            log_not_running(&greeks);
756            return;
757        }
758
759        if let Err(e) = self.on_option_greeks(greeks) {
760            log_error(&e);
761        }
762    }
763
764    /// Handles a received option chain slice snapshot.
765    fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
766        log_received(&slice);
767
768        if self.not_running() {
769            log_not_running(&slice);
770            return;
771        }
772
773        if let Err(e) = self.on_option_chain(slice) {
774            log_error(&e);
775        }
776    }
777
778    /// Handles a received instrument status.
779    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
780        log_received(&status);
781
782        if self.not_running() {
783            log_not_running(&status);
784            return;
785        }
786
787        if let Err(e) = self.on_instrument_status(status) {
788            log_error(&e);
789        }
790    }
791
792    /// Handles a received instrument close.
793    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
794        log_received(&close);
795
796        if self.not_running() {
797            log_not_running(&close);
798            return;
799        }
800
801        if let Err(e) = self.on_instrument_close(close) {
802            log_error(&e);
803        }
804    }
805
806    /// Handles a received order filled event.
807    fn handle_order_filled(&mut self, event: &OrderFilled) {
808        log_received(&event);
809
810        // Check for double-handling: if the event's strategy_id matches this actor's id,
811        // it means a Strategy is receiving its own fill event through both automatic
812        // subscription and manual subscribe_order_fills, so skip the manual handler.
813        if event.strategy_id.inner() == self.actor_id().inner() {
814            return;
815        }
816
817        if self.not_running() {
818            log_not_running(&event);
819            return;
820        }
821
822        if let Err(e) = self.on_order_filled(event) {
823            log_error(&e);
824        }
825    }
826
827    /// Handles a received order canceled event.
828    fn handle_order_canceled(&mut self, event: &OrderCanceled) {
829        log_received(&event);
830
831        // Check for double-handling: if the event's strategy_id matches this actor's id,
832        // it means a Strategy is receiving its own cancel event through both automatic
833        // subscription and manual subscribe_order_cancels, so skip the manual handler.
834        if event.strategy_id.inner() == self.actor_id().inner() {
835            return;
836        }
837
838        if self.not_running() {
839            log_not_running(&event);
840            return;
841        }
842
843        if let Err(e) = self.on_order_canceled(event) {
844            log_error(&e);
845        }
846    }
847
848    #[cfg(feature = "defi")]
849    /// Handles a received block.
850    fn handle_block(&mut self, block: &Block) {
851        log_received(&block);
852
853        if self.not_running() {
854            log_not_running(&block);
855            return;
856        }
857
858        if let Err(e) = self.on_block(block) {
859            log_error(&e);
860        }
861    }
862
863    #[cfg(feature = "defi")]
864    /// Handles a received pool definition update.
865    fn handle_pool(&mut self, pool: &Pool) {
866        log_received(&pool);
867
868        if self.not_running() {
869            log_not_running(&pool);
870            return;
871        }
872
873        if let Err(e) = self.on_pool(pool) {
874            log_error(&e);
875        }
876    }
877
878    #[cfg(feature = "defi")]
879    /// Handles a received pool swap.
880    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
881        log_received(&swap);
882
883        if self.not_running() {
884            log_not_running(&swap);
885            return;
886        }
887
888        if let Err(e) = self.on_pool_swap(swap) {
889            log_error(&e);
890        }
891    }
892
893    #[cfg(feature = "defi")]
894    /// Handles a received pool liquidity update.
895    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
896        log_received(&update);
897
898        if self.not_running() {
899            log_not_running(&update);
900            return;
901        }
902
903        if let Err(e) = self.on_pool_liquidity_update(update) {
904            log_error(&e);
905        }
906    }
907
908    #[cfg(feature = "defi")]
909    /// Handles a received pool fee collect.
910    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
911        log_received(&collect);
912
913        if self.not_running() {
914            log_not_running(&collect);
915            return;
916        }
917
918        if let Err(e) = self.on_pool_fee_collect(collect) {
919            log_error(&e);
920        }
921    }
922
923    #[cfg(feature = "defi")]
924    /// Handles a received pool flash event.
925    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
926        log_received(&flash);
927
928        if self.not_running() {
929            log_not_running(&flash);
930            return;
931        }
932
933        if let Err(e) = self.on_pool_flash(flash) {
934            log_error(&e);
935        }
936    }
937
938    /// Handles received historical data.
939    fn handle_historical_data(&mut self, data: &dyn Any) {
940        log_received(&data);
941
942        if let Err(e) = self.on_historical_data(data) {
943            log_error(&e);
944        }
945    }
946
947    /// Handles a data response.
948    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
949        log_received(&resp);
950
951        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
952            log_error(&e);
953        }
954    }
955
956    /// Handles an instrument response.
957    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
958        log_received(&resp);
959
960        if let Err(e) = self.on_instrument(&resp.data) {
961            log_error(&e);
962        }
963    }
964
965    /// Handles an instruments response.
966    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
967        log_received(&resp);
968
969        for inst in &resp.data {
970            if let Err(e) = self.on_instrument(inst) {
971                log_error(&e);
972            }
973        }
974    }
975
976    /// Handles a book response.
977    fn handle_book_response(&mut self, resp: &BookResponse) {
978        log_received(&resp);
979
980        if let Err(e) = self.on_book(&resp.data) {
981            log_error(&e);
982        }
983    }
984
985    /// Handles a quotes response.
986    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
987        log_received(&resp);
988
989        if let Err(e) = self.on_historical_quotes(&resp.data) {
990            log_error(&e);
991        }
992    }
993
994    /// Handles a trades response.
995    fn handle_trades_response(&mut self, resp: &TradesResponse) {
996        log_received(&resp);
997
998        if let Err(e) = self.on_historical_trades(&resp.data) {
999            log_error(&e);
1000        }
1001    }
1002
1003    /// Handles a funding rates response.
1004    fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1005        log_received(&resp);
1006
1007        if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1008            log_error(&e);
1009        }
1010    }
1011
1012    /// Handles a bars response.
1013    fn handle_bars_response(&mut self, resp: &BarsResponse) {
1014        log_received(&resp);
1015
1016        if let Err(e) = self.on_historical_bars(&resp.data) {
1017            log_error(&e);
1018        }
1019    }
1020
1021    /// Subscribe to streaming `data_type` data.
1022    fn subscribe_data(
1023        &mut self,
1024        data_type: DataType,
1025        client_id: Option<ClientId>,
1026        params: Option<Params>,
1027    ) where
1028        Self: 'static + Debug + Sized,
1029    {
1030        let actor_id = self.actor_id().inner();
1031        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1032            get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1033        });
1034
1035        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1036    }
1037
1038    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
1039    fn subscribe_quotes(
1040        &mut self,
1041        instrument_id: InstrumentId,
1042        client_id: Option<ClientId>,
1043        params: Option<Params>,
1044    ) where
1045        Self: 'static + Debug + Sized,
1046    {
1047        let actor_id = self.actor_id().inner();
1048        let topic = get_quotes_topic(instrument_id);
1049
1050        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1051            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1052                actor.handle_quote(quote);
1053            } else {
1054                log::error!("Actor {actor_id} not found for quote handling");
1055            }
1056        });
1057
1058        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1059    }
1060
1061    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
1062    fn subscribe_instruments(
1063        &mut self,
1064        venue: Venue,
1065        client_id: Option<ClientId>,
1066        params: Option<Params>,
1067    ) where
1068        Self: 'static + Debug + Sized,
1069    {
1070        let actor_id = self.actor_id().inner();
1071        let pattern = get_instruments_pattern(venue);
1072
1073        let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1074            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1075                actor.handle_instrument(instrument);
1076            } else {
1077                log::error!("Actor {actor_id} not found for instruments handling");
1078            }
1079        });
1080
1081        DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1082    }
1083
1084    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
1085    fn subscribe_instrument(
1086        &mut self,
1087        instrument_id: InstrumentId,
1088        client_id: Option<ClientId>,
1089        params: Option<Params>,
1090    ) where
1091        Self: 'static + Debug + Sized,
1092    {
1093        let actor_id = self.actor_id().inner();
1094        let topic = get_instrument_topic(instrument_id);
1095
1096        let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1097            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1098                actor.handle_instrument(instrument);
1099            } else {
1100                log::error!("Actor {actor_id} not found for instrument handling");
1101            }
1102        });
1103
1104        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1105    }
1106
1107    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1108    fn subscribe_book_deltas(
1109        &mut self,
1110        instrument_id: InstrumentId,
1111        book_type: BookType,
1112        depth: Option<NonZeroUsize>,
1113        client_id: Option<ClientId>,
1114        managed: bool,
1115        params: Option<Params>,
1116    ) where
1117        Self: 'static + Debug + Sized,
1118    {
1119        let actor_id = self.actor_id().inner();
1120        let topic = get_book_deltas_topic(instrument_id);
1121
1122        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1123            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1124        });
1125
1126        DataActorCore::subscribe_book_deltas(
1127            self,
1128            topic,
1129            handler,
1130            instrument_id,
1131            book_type,
1132            depth,
1133            client_id,
1134            managed,
1135            params,
1136        );
1137    }
1138
1139    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1140    fn subscribe_book_at_interval(
1141        &mut self,
1142        instrument_id: InstrumentId,
1143        book_type: BookType,
1144        depth: Option<NonZeroUsize>,
1145        interval_ms: NonZeroUsize,
1146        client_id: Option<ClientId>,
1147        params: Option<Params>,
1148    ) where
1149        Self: 'static + Debug + Sized,
1150    {
1151        let actor_id = self.actor_id().inner();
1152        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1153
1154        let handler = TypedHandler::from(move |book: &OrderBook| {
1155            get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1156        });
1157
1158        DataActorCore::subscribe_book_at_interval(
1159            self,
1160            topic,
1161            handler,
1162            instrument_id,
1163            book_type,
1164            depth,
1165            interval_ms,
1166            client_id,
1167            params,
1168        );
1169    }
1170
1171    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1172    fn subscribe_trades(
1173        &mut self,
1174        instrument_id: InstrumentId,
1175        client_id: Option<ClientId>,
1176        params: Option<Params>,
1177    ) where
1178        Self: 'static + Debug + Sized,
1179    {
1180        let actor_id = self.actor_id().inner();
1181        let topic = get_trades_topic(instrument_id);
1182
1183        let handler = TypedHandler::from(move |trade: &TradeTick| {
1184            get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1185        });
1186
1187        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1188    }
1189
1190    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1191    fn subscribe_bars(
1192        &mut self,
1193        bar_type: BarType,
1194        client_id: Option<ClientId>,
1195        params: Option<Params>,
1196    ) where
1197        Self: 'static + Debug + Sized,
1198    {
1199        let actor_id = self.actor_id().inner();
1200        let topic = get_bars_topic(bar_type);
1201
1202        let handler = TypedHandler::from(move |bar: &Bar| {
1203            get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1204        });
1205
1206        DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1207    }
1208
1209    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1210    fn subscribe_mark_prices(
1211        &mut self,
1212        instrument_id: InstrumentId,
1213        client_id: Option<ClientId>,
1214        params: Option<Params>,
1215    ) where
1216        Self: 'static + Debug + Sized,
1217    {
1218        let actor_id = self.actor_id().inner();
1219        let topic = get_mark_price_topic(instrument_id);
1220
1221        let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1222            get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1223        });
1224
1225        DataActorCore::subscribe_mark_prices(
1226            self,
1227            topic,
1228            handler,
1229            instrument_id,
1230            client_id,
1231            params,
1232        );
1233    }
1234
1235    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1236    fn subscribe_index_prices(
1237        &mut self,
1238        instrument_id: InstrumentId,
1239        client_id: Option<ClientId>,
1240        params: Option<Params>,
1241    ) where
1242        Self: 'static + Debug + Sized,
1243    {
1244        let actor_id = self.actor_id().inner();
1245        let topic = get_index_price_topic(instrument_id);
1246
1247        let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1248            get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1249        });
1250
1251        DataActorCore::subscribe_index_prices(
1252            self,
1253            topic,
1254            handler,
1255            instrument_id,
1256            client_id,
1257            params,
1258        );
1259    }
1260
1261    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1262    fn subscribe_funding_rates(
1263        &mut self,
1264        instrument_id: InstrumentId,
1265        client_id: Option<ClientId>,
1266        params: Option<Params>,
1267    ) where
1268        Self: 'static + Debug + Sized,
1269    {
1270        let actor_id = self.actor_id().inner();
1271        let topic = get_funding_rate_topic(instrument_id);
1272
1273        let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1274            get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1275        });
1276
1277        DataActorCore::subscribe_funding_rates(
1278            self,
1279            topic,
1280            handler,
1281            instrument_id,
1282            client_id,
1283            params,
1284        );
1285    }
1286
1287    /// Subscribe to streaming [`OptionGreeks`] data for the `instrument_id`.
1288    fn subscribe_option_greeks(
1289        &mut self,
1290        instrument_id: InstrumentId,
1291        client_id: Option<ClientId>,
1292        params: Option<Params>,
1293    ) where
1294        Self: 'static + Debug + Sized,
1295    {
1296        let actor_id = self.actor_id().inner();
1297        let topic = get_option_greeks_topic(instrument_id);
1298
1299        let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1300            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1301                actor.handle_option_greeks(option_greeks);
1302            } else {
1303                log::error!("Actor {actor_id} not found for option greeks handling");
1304            }
1305        });
1306
1307        DataActorCore::subscribe_option_greeks(
1308            self,
1309            topic,
1310            handler,
1311            instrument_id,
1312            client_id,
1313            params,
1314        );
1315    }
1316
1317    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1318    fn subscribe_instrument_status(
1319        &mut self,
1320        instrument_id: InstrumentId,
1321        client_id: Option<ClientId>,
1322        params: Option<Params>,
1323    ) where
1324        Self: 'static + Debug + Sized,
1325    {
1326        let actor_id = self.actor_id().inner();
1327        let topic = get_instrument_status_topic(instrument_id);
1328
1329        let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1330            get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1331        });
1332
1333        DataActorCore::subscribe_instrument_status(
1334            self,
1335            topic,
1336            handler,
1337            instrument_id,
1338            client_id,
1339            params,
1340        );
1341    }
1342
1343    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1344    fn subscribe_instrument_close(
1345        &mut self,
1346        instrument_id: InstrumentId,
1347        client_id: Option<ClientId>,
1348        params: Option<Params>,
1349    ) where
1350        Self: 'static + Debug + Sized,
1351    {
1352        let actor_id = self.actor_id().inner();
1353        let topic = get_instrument_close_topic(instrument_id);
1354
1355        let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1356            get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1357        });
1358
1359        DataActorCore::subscribe_instrument_close(
1360            self,
1361            topic,
1362            handler,
1363            instrument_id,
1364            client_id,
1365            params,
1366        );
1367    }
1368
1369    /// Subscribe to streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1370    ///
1371    /// The ATM price is always derived from the exchange-provided forward price
1372    /// embedded in each option greeks/ticker update.
1373    fn subscribe_option_chain(
1374        &mut self,
1375        series_id: OptionSeriesId,
1376        strike_range: StrikeRange,
1377        snapshot_interval_ms: Option<u64>,
1378        client_id: Option<ClientId>,
1379    ) where
1380        Self: 'static + Debug + Sized,
1381    {
1382        let actor_id = self.actor_id().inner();
1383        let topic = get_option_chain_topic(series_id);
1384
1385        let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1386            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1387                actor.handle_option_chain(slice);
1388            } else {
1389                log::error!("Actor {actor_id} not found for option chain handling");
1390            }
1391        });
1392
1393        DataActorCore::subscribe_option_chain(
1394            self,
1395            topic,
1396            handler,
1397            series_id,
1398            strike_range,
1399            snapshot_interval_ms,
1400            client_id,
1401        );
1402    }
1403
1404    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1405    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1406    where
1407        Self: 'static + Debug + Sized,
1408    {
1409        let actor_id = self.actor_id().inner();
1410        let topic = get_order_fills_topic(instrument_id);
1411
1412        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1413            if let OrderEventAny::Filled(filled) = event {
1414                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1415            }
1416        });
1417
1418        DataActorCore::subscribe_order_fills(self, topic, handler);
1419    }
1420
1421    /// Subscribe to [`OrderCanceled`] events for the `instrument_id`.
1422    fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1423    where
1424        Self: 'static + Debug + Sized,
1425    {
1426        let actor_id = self.actor_id().inner();
1427        let topic = get_order_cancels_topic(instrument_id);
1428
1429        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1430            if let OrderEventAny::Canceled(canceled) = event {
1431                get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1432            }
1433        });
1434
1435        DataActorCore::subscribe_order_cancels(self, topic, handler);
1436    }
1437
1438    #[cfg(feature = "defi")]
1439    /// Subscribe to streaming [`Block`] data for the `chain`.
1440    fn subscribe_blocks(
1441        &mut self,
1442        chain: Blockchain,
1443        client_id: Option<ClientId>,
1444        params: Option<Params>,
1445    ) where
1446        Self: 'static + Debug + Sized,
1447    {
1448        let actor_id = self.actor_id().inner();
1449        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1450
1451        let handler = TypedHandler::from(move |block: &Block| {
1452            get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1453        });
1454
1455        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1456    }
1457
1458    #[cfg(feature = "defi")]
1459    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1460    fn subscribe_pool(
1461        &mut self,
1462        instrument_id: InstrumentId,
1463        client_id: Option<ClientId>,
1464        params: Option<Params>,
1465    ) where
1466        Self: 'static + Debug + Sized,
1467    {
1468        let actor_id = self.actor_id().inner();
1469        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1470
1471        let handler = TypedHandler::from(move |pool: &Pool| {
1472            get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1473        });
1474
1475        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1476    }
1477
1478    #[cfg(feature = "defi")]
1479    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1480    fn subscribe_pool_swaps(
1481        &mut self,
1482        instrument_id: InstrumentId,
1483        client_id: Option<ClientId>,
1484        params: Option<Params>,
1485    ) where
1486        Self: 'static + Debug + Sized,
1487    {
1488        let actor_id = self.actor_id().inner();
1489        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1490
1491        let handler = TypedHandler::from(move |swap: &PoolSwap| {
1492            get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1493        });
1494
1495        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1496    }
1497
1498    #[cfg(feature = "defi")]
1499    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1500    fn subscribe_pool_liquidity_updates(
1501        &mut self,
1502        instrument_id: InstrumentId,
1503        client_id: Option<ClientId>,
1504        params: Option<Params>,
1505    ) where
1506        Self: 'static + Debug + Sized,
1507    {
1508        let actor_id = self.actor_id().inner();
1509        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1510
1511        let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1512            get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1513        });
1514
1515        DataActorCore::subscribe_pool_liquidity_updates(
1516            self,
1517            topic,
1518            handler,
1519            instrument_id,
1520            client_id,
1521            params,
1522        );
1523    }
1524
1525    #[cfg(feature = "defi")]
1526    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1527    fn subscribe_pool_fee_collects(
1528        &mut self,
1529        instrument_id: InstrumentId,
1530        client_id: Option<ClientId>,
1531        params: Option<Params>,
1532    ) where
1533        Self: 'static + Debug + Sized,
1534    {
1535        let actor_id = self.actor_id().inner();
1536        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1537
1538        let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1539            get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1540        });
1541
1542        DataActorCore::subscribe_pool_fee_collects(
1543            self,
1544            topic,
1545            handler,
1546            instrument_id,
1547            client_id,
1548            params,
1549        );
1550    }
1551
1552    #[cfg(feature = "defi")]
1553    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1554    fn subscribe_pool_flash_events(
1555        &mut self,
1556        instrument_id: InstrumentId,
1557        client_id: Option<ClientId>,
1558        params: Option<Params>,
1559    ) where
1560        Self: 'static + Debug + Sized,
1561    {
1562        let actor_id = self.actor_id().inner();
1563        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1564
1565        let handler = TypedHandler::from(move |flash: &PoolFlash| {
1566            get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1567        });
1568
1569        DataActorCore::subscribe_pool_flash_events(
1570            self,
1571            topic,
1572            handler,
1573            instrument_id,
1574            client_id,
1575            params,
1576        );
1577    }
1578
1579    /// Unsubscribe from streaming `data_type` data.
1580    fn unsubscribe_data(
1581        &mut self,
1582        data_type: DataType,
1583        client_id: Option<ClientId>,
1584        params: Option<Params>,
1585    ) where
1586        Self: 'static + Debug + Sized,
1587    {
1588        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1589    }
1590
1591    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1592    fn unsubscribe_instruments(
1593        &mut self,
1594        venue: Venue,
1595        client_id: Option<ClientId>,
1596        params: Option<Params>,
1597    ) where
1598        Self: 'static + Debug + Sized,
1599    {
1600        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1601    }
1602
1603    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1604    fn unsubscribe_instrument(
1605        &mut self,
1606        instrument_id: InstrumentId,
1607        client_id: Option<ClientId>,
1608        params: Option<Params>,
1609    ) where
1610        Self: 'static + Debug + Sized,
1611    {
1612        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1613    }
1614
1615    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1616    fn unsubscribe_book_deltas(
1617        &mut self,
1618        instrument_id: InstrumentId,
1619        client_id: Option<ClientId>,
1620        params: Option<Params>,
1621    ) where
1622        Self: 'static + Debug + Sized,
1623    {
1624        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1625    }
1626
1627    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1628    fn unsubscribe_book_at_interval(
1629        &mut self,
1630        instrument_id: InstrumentId,
1631        interval_ms: NonZeroUsize,
1632        client_id: Option<ClientId>,
1633        params: Option<Params>,
1634    ) where
1635        Self: 'static + Debug + Sized,
1636    {
1637        DataActorCore::unsubscribe_book_at_interval(
1638            self,
1639            instrument_id,
1640            interval_ms,
1641            client_id,
1642            params,
1643        );
1644    }
1645
1646    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1647    fn unsubscribe_quotes(
1648        &mut self,
1649        instrument_id: InstrumentId,
1650        client_id: Option<ClientId>,
1651        params: Option<Params>,
1652    ) where
1653        Self: 'static + Debug + Sized,
1654    {
1655        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1656    }
1657
1658    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1659    fn unsubscribe_trades(
1660        &mut self,
1661        instrument_id: InstrumentId,
1662        client_id: Option<ClientId>,
1663        params: Option<Params>,
1664    ) where
1665        Self: 'static + Debug + Sized,
1666    {
1667        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1668    }
1669
1670    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1671    fn unsubscribe_bars(
1672        &mut self,
1673        bar_type: BarType,
1674        client_id: Option<ClientId>,
1675        params: Option<Params>,
1676    ) where
1677        Self: 'static + Debug + Sized,
1678    {
1679        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1680    }
1681
1682    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1683    fn unsubscribe_mark_prices(
1684        &mut self,
1685        instrument_id: InstrumentId,
1686        client_id: Option<ClientId>,
1687        params: Option<Params>,
1688    ) where
1689        Self: 'static + Debug + Sized,
1690    {
1691        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1692    }
1693
1694    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1695    fn unsubscribe_index_prices(
1696        &mut self,
1697        instrument_id: InstrumentId,
1698        client_id: Option<ClientId>,
1699        params: Option<Params>,
1700    ) where
1701        Self: 'static + Debug + Sized,
1702    {
1703        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1704    }
1705
1706    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1707    fn unsubscribe_funding_rates(
1708        &mut self,
1709        instrument_id: InstrumentId,
1710        client_id: Option<ClientId>,
1711        params: Option<Params>,
1712    ) where
1713        Self: 'static + Debug + Sized,
1714    {
1715        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1716    }
1717
1718    /// Unsubscribe from streaming [`OptionGreeks`] data for the `instrument_id`.
1719    fn unsubscribe_option_greeks(
1720        &mut self,
1721        instrument_id: InstrumentId,
1722        client_id: Option<ClientId>,
1723        params: Option<Params>,
1724    ) where
1725        Self: 'static + Debug + Sized,
1726    {
1727        DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1728    }
1729
1730    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1731    fn unsubscribe_instrument_status(
1732        &mut self,
1733        instrument_id: InstrumentId,
1734        client_id: Option<ClientId>,
1735        params: Option<Params>,
1736    ) where
1737        Self: 'static + Debug + Sized,
1738    {
1739        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1740    }
1741
1742    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1743    fn unsubscribe_instrument_close(
1744        &mut self,
1745        instrument_id: InstrumentId,
1746        client_id: Option<ClientId>,
1747        params: Option<Params>,
1748    ) where
1749        Self: 'static + Debug + Sized,
1750    {
1751        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1752    }
1753
1754    /// Unsubscribe from streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1755    fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1756    where
1757        Self: 'static + Debug + Sized,
1758    {
1759        DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1760    }
1761
1762    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
1763    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1764    where
1765        Self: 'static + Debug + Sized,
1766    {
1767        DataActorCore::unsubscribe_order_fills(self, instrument_id);
1768    }
1769
1770    /// Unsubscribe from [`OrderCanceled`] events for the `instrument_id`.
1771    fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1772    where
1773        Self: 'static + Debug + Sized,
1774    {
1775        DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1776    }
1777
1778    #[cfg(feature = "defi")]
1779    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1780    fn unsubscribe_blocks(
1781        &mut self,
1782        chain: Blockchain,
1783        client_id: Option<ClientId>,
1784        params: Option<Params>,
1785    ) where
1786        Self: 'static + Debug + Sized,
1787    {
1788        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1789    }
1790
1791    #[cfg(feature = "defi")]
1792    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1793    fn unsubscribe_pool(
1794        &mut self,
1795        instrument_id: InstrumentId,
1796        client_id: Option<ClientId>,
1797        params: Option<Params>,
1798    ) where
1799        Self: 'static + Debug + Sized,
1800    {
1801        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1802    }
1803
1804    #[cfg(feature = "defi")]
1805    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1806    fn unsubscribe_pool_swaps(
1807        &mut self,
1808        instrument_id: InstrumentId,
1809        client_id: Option<ClientId>,
1810        params: Option<Params>,
1811    ) where
1812        Self: 'static + Debug + Sized,
1813    {
1814        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1815    }
1816
1817    #[cfg(feature = "defi")]
1818    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1819    fn unsubscribe_pool_liquidity_updates(
1820        &mut self,
1821        instrument_id: InstrumentId,
1822        client_id: Option<ClientId>,
1823        params: Option<Params>,
1824    ) where
1825        Self: 'static + Debug + Sized,
1826    {
1827        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1828    }
1829
1830    #[cfg(feature = "defi")]
1831    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
1832    fn unsubscribe_pool_fee_collects(
1833        &mut self,
1834        instrument_id: InstrumentId,
1835        client_id: Option<ClientId>,
1836        params: Option<Params>,
1837    ) where
1838        Self: 'static + Debug + Sized,
1839    {
1840        DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1841    }
1842
1843    #[cfg(feature = "defi")]
1844    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
1845    fn unsubscribe_pool_flash_events(
1846        &mut self,
1847        instrument_id: InstrumentId,
1848        client_id: Option<ClientId>,
1849        params: Option<Params>,
1850    ) where
1851        Self: 'static + Debug + Sized,
1852    {
1853        DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1854    }
1855
1856    /// Request historical custom data of the given `data_type`.
1857    ///
1858    /// # Errors
1859    ///
1860    /// Returns an error if input parameters are invalid.
1861    fn request_data(
1862        &mut self,
1863        data_type: DataType,
1864        client_id: ClientId,
1865        start: Option<DateTime<Utc>>,
1866        end: Option<DateTime<Utc>>,
1867        limit: Option<NonZeroUsize>,
1868        params: Option<Params>,
1869    ) -> anyhow::Result<UUID4>
1870    where
1871        Self: 'static + Debug + Sized,
1872    {
1873        let actor_id = self.actor_id().inner();
1874        let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1875            get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1876        });
1877
1878        DataActorCore::request_data(
1879            self, data_type, client_id, start, end, limit, params, handler,
1880        )
1881    }
1882
1883    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1884    ///
1885    /// # Errors
1886    ///
1887    /// Returns an error if input parameters are invalid.
1888    fn request_instrument(
1889        &mut self,
1890        instrument_id: InstrumentId,
1891        start: Option<DateTime<Utc>>,
1892        end: Option<DateTime<Utc>>,
1893        client_id: Option<ClientId>,
1894        params: Option<Params>,
1895    ) -> anyhow::Result<UUID4>
1896    where
1897        Self: 'static + Debug + Sized,
1898    {
1899        let actor_id = self.actor_id().inner();
1900        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1901            get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1902        });
1903
1904        DataActorCore::request_instrument(
1905            self,
1906            instrument_id,
1907            start,
1908            end,
1909            client_id,
1910            params,
1911            handler,
1912        )
1913    }
1914
1915    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1916    ///
1917    /// # Errors
1918    ///
1919    /// Returns an error if input parameters are invalid.
1920    fn request_instruments(
1921        &mut self,
1922        venue: Option<Venue>,
1923        start: Option<DateTime<Utc>>,
1924        end: Option<DateTime<Utc>>,
1925        client_id: Option<ClientId>,
1926        params: Option<Params>,
1927    ) -> anyhow::Result<UUID4>
1928    where
1929        Self: 'static + Debug + Sized,
1930    {
1931        let actor_id = self.actor_id().inner();
1932        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1933            get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1934        });
1935
1936        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1937    }
1938
1939    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1940    ///
1941    /// # Errors
1942    ///
1943    /// Returns an error if input parameters are invalid.
1944    fn request_book_snapshot(
1945        &mut self,
1946        instrument_id: InstrumentId,
1947        depth: Option<NonZeroUsize>,
1948        client_id: Option<ClientId>,
1949        params: Option<Params>,
1950    ) -> anyhow::Result<UUID4>
1951    where
1952        Self: 'static + Debug + Sized,
1953    {
1954        let actor_id = self.actor_id().inner();
1955        let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
1956            get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1957        });
1958
1959        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1960    }
1961
1962    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1963    ///
1964    /// # Errors
1965    ///
1966    /// Returns an error if input parameters are invalid.
1967    fn request_quotes(
1968        &mut self,
1969        instrument_id: InstrumentId,
1970        start: Option<DateTime<Utc>>,
1971        end: Option<DateTime<Utc>>,
1972        limit: Option<NonZeroUsize>,
1973        client_id: Option<ClientId>,
1974        params: Option<Params>,
1975    ) -> anyhow::Result<UUID4>
1976    where
1977        Self: 'static + Debug + Sized,
1978    {
1979        let actor_id = self.actor_id().inner();
1980        let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
1981            get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1982        });
1983
1984        DataActorCore::request_quotes(
1985            self,
1986            instrument_id,
1987            start,
1988            end,
1989            limit,
1990            client_id,
1991            params,
1992            handler,
1993        )
1994    }
1995
1996    /// Request historical [`TradeTick`] data for the given `instrument_id`.
1997    ///
1998    /// # Errors
1999    ///
2000    /// Returns an error if input parameters are invalid.
2001    fn request_trades(
2002        &mut self,
2003        instrument_id: InstrumentId,
2004        start: Option<DateTime<Utc>>,
2005        end: Option<DateTime<Utc>>,
2006        limit: Option<NonZeroUsize>,
2007        client_id: Option<ClientId>,
2008        params: Option<Params>,
2009    ) -> anyhow::Result<UUID4>
2010    where
2011        Self: 'static + Debug + Sized,
2012    {
2013        let actor_id = self.actor_id().inner();
2014        let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2015            get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2016        });
2017
2018        DataActorCore::request_trades(
2019            self,
2020            instrument_id,
2021            start,
2022            end,
2023            limit,
2024            client_id,
2025            params,
2026            handler,
2027        )
2028    }
2029
2030    /// Request historical [`FundingRateUpdate`] data for the given `instrument_id`.
2031    ///
2032    /// # Errors
2033    ///
2034    /// Returns an error if input parameters are invalid.
2035    fn request_funding_rates(
2036        &mut self,
2037        instrument_id: InstrumentId,
2038        start: Option<DateTime<Utc>>,
2039        end: Option<DateTime<Utc>>,
2040        limit: Option<NonZeroUsize>,
2041        client_id: Option<ClientId>,
2042        params: Option<Params>,
2043    ) -> anyhow::Result<UUID4>
2044    where
2045        Self: 'static + Debug + Sized,
2046    {
2047        let actor_id = self.actor_id().inner();
2048        let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2049            get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2050        });
2051
2052        DataActorCore::request_funding_rates(
2053            self,
2054            instrument_id,
2055            start,
2056            end,
2057            limit,
2058            client_id,
2059            params,
2060            handler,
2061        )
2062    }
2063
2064    /// Request historical [`Bar`] data for the given `bar_type`.
2065    ///
2066    /// # Errors
2067    ///
2068    /// Returns an error if input parameters are invalid.
2069    fn request_bars(
2070        &mut self,
2071        bar_type: BarType,
2072        start: Option<DateTime<Utc>>,
2073        end: Option<DateTime<Utc>>,
2074        limit: Option<NonZeroUsize>,
2075        client_id: Option<ClientId>,
2076        params: Option<Params>,
2077    ) -> anyhow::Result<UUID4>
2078    where
2079        Self: 'static + Debug + Sized,
2080    {
2081        let actor_id = self.actor_id().inner();
2082        let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2083            get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2084        });
2085
2086        DataActorCore::request_bars(
2087            self, bar_type, start, end, limit, client_id, params, handler,
2088        )
2089    }
2090}
2091
2092// Blanket implementation: any DataActor automatically implements Actor
2093impl<T> Actor for T
2094where
2095    T: DataActor + Debug + 'static,
2096{
2097    fn id(&self) -> Ustr {
2098        self.actor_id.inner()
2099    }
2100
2101    #[allow(unused_variables)]
2102    fn handle(&mut self, msg: &dyn Any) {
2103        // Default empty implementation - concrete actors can override if needed
2104    }
2105
2106    fn as_any(&self) -> &dyn Any {
2107        self
2108    }
2109}
2110
2111// Blanket implementation: any DataActor automatically implements Component
2112impl<T> Component for T
2113where
2114    T: DataActor + Debug + 'static,
2115{
2116    fn component_id(&self) -> ComponentId {
2117        ComponentId::new(self.actor_id.inner().as_str())
2118    }
2119
2120    fn state(&self) -> ComponentState {
2121        self.state
2122    }
2123
2124    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2125        self.state = self.state.transition(&trigger)?;
2126        log::info!("{}", self.state.variant_name());
2127        Ok(())
2128    }
2129
2130    fn register(
2131        &mut self,
2132        trader_id: TraderId,
2133        clock: Rc<RefCell<dyn Clock>>,
2134        cache: Rc<RefCell<Cache>>,
2135    ) -> anyhow::Result<()> {
2136        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2137
2138        // Register default time event handler for this actor
2139        let actor_id = self.actor_id().inner();
2140        let callback = TimeEventCallback::from(move |event: TimeEvent| {
2141            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2142                actor.handle_time_event(&event);
2143            } else {
2144                log::error!("Actor {actor_id} not found for time event handling");
2145            }
2146        });
2147
2148        clock.borrow_mut().register_default_handler(callback);
2149
2150        self.initialize()
2151    }
2152
2153    fn on_start(&mut self) -> anyhow::Result<()> {
2154        DataActor::on_start(self)
2155    }
2156
2157    fn on_stop(&mut self) -> anyhow::Result<()> {
2158        DataActor::on_stop(self)
2159    }
2160
2161    fn on_resume(&mut self) -> anyhow::Result<()> {
2162        DataActor::on_resume(self)
2163    }
2164
2165    fn on_degrade(&mut self) -> anyhow::Result<()> {
2166        DataActor::on_degrade(self)
2167    }
2168
2169    fn on_fault(&mut self) -> anyhow::Result<()> {
2170        DataActor::on_fault(self)
2171    }
2172
2173    fn on_reset(&mut self) -> anyhow::Result<()> {
2174        DataActor::on_reset(self)
2175    }
2176
2177    fn on_dispose(&mut self) -> anyhow::Result<()> {
2178        DataActor::on_dispose(self)
2179    }
2180}
2181
2182/// Core functionality for all actors.
2183#[derive(Clone)]
2184#[allow(
2185    dead_code,
2186    reason = "TODO: Under development (pending_requests, signal_classes)"
2187)]
2188pub struct DataActorCore {
2189    /// The actor identifier.
2190    pub actor_id: ActorId,
2191    /// The actors configuration.
2192    pub config: DataActorConfig,
2193    trader_id: Option<TraderId>,
2194    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
2195    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
2196    state: ComponentState,
2197    topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2198    deltas_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDeltas>>,
2199    depth10_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDepth10>>,
2200    book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2201    quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2202    trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2203    bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2204    mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2205    index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2206    funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2207    option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2208    option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2209    order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2210    #[cfg(feature = "defi")]
2211    block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2212    #[cfg(feature = "defi")]
2213    pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2214    #[cfg(feature = "defi")]
2215    pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2216    #[cfg(feature = "defi")]
2217    pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2218    #[cfg(feature = "defi")]
2219    pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2220    #[cfg(feature = "defi")]
2221    pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2222    warning_events: AHashSet<String>, // TODO: TBD
2223    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2224    signal_classes: AHashMap<String, String>,
2225    #[cfg(feature = "indicators")]
2226    indicators: Indicators,
2227}
2228
2229impl Debug for DataActorCore {
2230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2231        f.debug_struct(stringify!(DataActorCore))
2232            .field("actor_id", &self.actor_id)
2233            .field("config", &self.config)
2234            .field("state", &self.state)
2235            .field("trader_id", &self.trader_id)
2236            .finish()
2237    }
2238}
2239
2240impl DataActorCore {
2241    /// Adds a subscription handler for the `topic`.
2242    ///
2243    //// Logs a warning if the actor is already subscribed to the topic.
2244    pub(crate) fn add_subscription_any(
2245        &mut self,
2246        topic: MStr<Topic>,
2247        handler: ShareableMessageHandler,
2248    ) {
2249        let pattern: MStr<Pattern> = topic.into();
2250        if self.topic_handlers.contains_key(&pattern) {
2251            log::warn!(
2252                "Actor {} attempted duplicate subscription to topic '{topic}'",
2253                self.actor_id,
2254            );
2255            return;
2256        }
2257
2258        self.topic_handlers.insert(pattern, handler.clone());
2259        msgbus::subscribe_any(pattern, handler, None);
2260    }
2261
2262    /// Removes a subscription handler for the `topic` if present.
2263    ///
2264    /// Logs a warning if the actor is not currently subscribed to the topic.
2265    pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2266        let pattern: MStr<Pattern> = topic.into();
2267        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2268            msgbus::unsubscribe_any(pattern, &handler);
2269        } else {
2270            log::warn!(
2271                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2272                self.actor_id,
2273            );
2274        }
2275    }
2276
2277    pub(crate) fn add_quote_subscription(
2278        &mut self,
2279        topic: MStr<Topic>,
2280        handler: TypedHandler<QuoteTick>,
2281    ) {
2282        if self.quote_handlers.contains_key(&topic) {
2283            log::warn!(
2284                "Actor {} attempted duplicate quote subscription to '{topic}'",
2285                self.actor_id
2286            );
2287            return;
2288        }
2289        self.quote_handlers.insert(topic, handler.clone());
2290        msgbus::subscribe_quotes(topic.into(), handler, None);
2291    }
2292
2293    #[allow(dead_code)]
2294    pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2295        if let Some(handler) = self.quote_handlers.remove(&topic) {
2296            msgbus::unsubscribe_quotes(topic.into(), &handler);
2297        }
2298    }
2299
2300    pub(crate) fn add_trade_subscription(
2301        &mut self,
2302        topic: MStr<Topic>,
2303        handler: TypedHandler<TradeTick>,
2304    ) {
2305        if self.trade_handlers.contains_key(&topic) {
2306            log::warn!(
2307                "Actor {} attempted duplicate trade subscription to '{topic}'",
2308                self.actor_id
2309            );
2310            return;
2311        }
2312        self.trade_handlers.insert(topic, handler.clone());
2313        msgbus::subscribe_trades(topic.into(), handler, None);
2314    }
2315
2316    #[allow(dead_code)]
2317    pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2318        if let Some(handler) = self.trade_handlers.remove(&topic) {
2319            msgbus::unsubscribe_trades(topic.into(), &handler);
2320        }
2321    }
2322
2323    pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2324        if self.bar_handlers.contains_key(&topic) {
2325            log::warn!(
2326                "Actor {} attempted duplicate bar subscription to '{topic}'",
2327                self.actor_id
2328            );
2329            return;
2330        }
2331        self.bar_handlers.insert(topic, handler.clone());
2332        msgbus::subscribe_bars(topic.into(), handler, None);
2333    }
2334
2335    #[allow(dead_code)]
2336    pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2337        if let Some(handler) = self.bar_handlers.remove(&topic) {
2338            msgbus::unsubscribe_bars(topic.into(), &handler);
2339        }
2340    }
2341
2342    pub(crate) fn add_order_event_subscription(
2343        &mut self,
2344        topic: MStr<Topic>,
2345        handler: TypedHandler<OrderEventAny>,
2346    ) {
2347        if self.order_event_handlers.contains_key(&topic) {
2348            log::warn!(
2349                "Actor {} attempted duplicate order event subscription to '{topic}'",
2350                self.actor_id
2351            );
2352            return;
2353        }
2354        self.order_event_handlers.insert(topic, handler.clone());
2355        msgbus::subscribe_order_events(topic.into(), handler, None);
2356    }
2357
2358    #[allow(dead_code)]
2359    pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2360        if let Some(handler) = self.order_event_handlers.remove(&topic) {
2361            msgbus::unsubscribe_order_events(topic.into(), &handler);
2362        }
2363    }
2364
2365    pub(crate) fn add_deltas_subscription(
2366        &mut self,
2367        topic: MStr<Topic>,
2368        handler: TypedHandler<OrderBookDeltas>,
2369    ) {
2370        if self.deltas_handlers.contains_key(&topic) {
2371            log::warn!(
2372                "Actor {} attempted duplicate deltas subscription to '{topic}'",
2373                self.actor_id
2374            );
2375            return;
2376        }
2377        self.deltas_handlers.insert(topic, handler.clone());
2378        msgbus::subscribe_book_deltas(topic.into(), handler, None);
2379    }
2380
2381    #[allow(dead_code)]
2382    pub(crate) fn remove_deltas_subscription(&mut self, topic: MStr<Topic>) {
2383        if let Some(handler) = self.deltas_handlers.remove(&topic) {
2384            msgbus::unsubscribe_book_deltas(topic.into(), &handler);
2385        }
2386    }
2387
2388    #[allow(dead_code)]
2389    pub(crate) fn add_depth10_subscription(
2390        &mut self,
2391        topic: MStr<Topic>,
2392        handler: TypedHandler<OrderBookDepth10>,
2393    ) {
2394        if self.depth10_handlers.contains_key(&topic) {
2395            log::warn!(
2396                "Actor {} attempted duplicate depth10 subscription to '{topic}'",
2397                self.actor_id
2398            );
2399            return;
2400        }
2401        self.depth10_handlers.insert(topic, handler.clone());
2402        msgbus::subscribe_book_depth10(topic.into(), handler, None);
2403    }
2404
2405    #[allow(dead_code)]
2406    pub(crate) fn remove_depth10_subscription(&mut self, topic: MStr<Topic>) {
2407        if let Some(handler) = self.depth10_handlers.remove(&topic) {
2408            msgbus::unsubscribe_book_depth10(topic.into(), &handler);
2409        }
2410    }
2411
2412    pub(crate) fn add_instrument_subscription(
2413        &mut self,
2414        pattern: MStr<Pattern>,
2415        handler: ShareableMessageHandler,
2416    ) {
2417        if self.topic_handlers.contains_key(&pattern) {
2418            log::warn!(
2419                "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2420                self.actor_id
2421            );
2422            return;
2423        }
2424        self.topic_handlers.insert(pattern, handler.clone());
2425        msgbus::subscribe_any(pattern, handler, None);
2426    }
2427
2428    #[allow(dead_code)]
2429    pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2430        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2431            msgbus::unsubscribe_any(pattern, &handler);
2432        }
2433    }
2434
2435    pub(crate) fn add_instrument_close_subscription(
2436        &mut self,
2437        topic: MStr<Topic>,
2438        handler: ShareableMessageHandler,
2439    ) {
2440        let pattern: MStr<Pattern> = topic.into();
2441        if self.topic_handlers.contains_key(&pattern) {
2442            log::warn!(
2443                "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2444                self.actor_id
2445            );
2446            return;
2447        }
2448        self.topic_handlers.insert(pattern, handler.clone());
2449        msgbus::subscribe_any(pattern, handler, None);
2450    }
2451
2452    #[allow(dead_code)]
2453    pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2454        let pattern: MStr<Pattern> = topic.into();
2455        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2456            msgbus::unsubscribe_any(pattern, &handler);
2457        }
2458    }
2459
2460    pub(crate) fn add_book_snapshot_subscription(
2461        &mut self,
2462        topic: MStr<Topic>,
2463        handler: TypedHandler<OrderBook>,
2464    ) {
2465        if self.book_handlers.contains_key(&topic) {
2466            log::warn!(
2467                "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2468                self.actor_id
2469            );
2470            return;
2471        }
2472        self.book_handlers.insert(topic, handler.clone());
2473        msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2474    }
2475
2476    #[allow(dead_code)]
2477    pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2478        if let Some(handler) = self.book_handlers.remove(&topic) {
2479            msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2480        }
2481    }
2482
2483    pub(crate) fn add_mark_price_subscription(
2484        &mut self,
2485        topic: MStr<Topic>,
2486        handler: TypedHandler<MarkPriceUpdate>,
2487    ) {
2488        if self.mark_price_handlers.contains_key(&topic) {
2489            log::warn!(
2490                "Actor {} attempted duplicate mark price subscription to '{topic}'",
2491                self.actor_id
2492            );
2493            return;
2494        }
2495        self.mark_price_handlers.insert(topic, handler.clone());
2496        msgbus::subscribe_mark_prices(topic.into(), handler, None);
2497    }
2498
2499    #[allow(dead_code)]
2500    pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2501        if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2502            msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2503        }
2504    }
2505
2506    pub(crate) fn add_index_price_subscription(
2507        &mut self,
2508        topic: MStr<Topic>,
2509        handler: TypedHandler<IndexPriceUpdate>,
2510    ) {
2511        if self.index_price_handlers.contains_key(&topic) {
2512            log::warn!(
2513                "Actor {} attempted duplicate index price subscription to '{topic}'",
2514                self.actor_id
2515            );
2516            return;
2517        }
2518        self.index_price_handlers.insert(topic, handler.clone());
2519        msgbus::subscribe_index_prices(topic.into(), handler, None);
2520    }
2521
2522    #[allow(dead_code)]
2523    pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2524        if let Some(handler) = self.index_price_handlers.remove(&topic) {
2525            msgbus::unsubscribe_index_prices(topic.into(), &handler);
2526        }
2527    }
2528
2529    pub(crate) fn add_funding_rate_subscription(
2530        &mut self,
2531        topic: MStr<Topic>,
2532        handler: TypedHandler<FundingRateUpdate>,
2533    ) {
2534        if self.funding_rate_handlers.contains_key(&topic) {
2535            log::warn!(
2536                "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2537                self.actor_id
2538            );
2539            return;
2540        }
2541        self.funding_rate_handlers.insert(topic, handler.clone());
2542        msgbus::subscribe_funding_rates(topic.into(), handler, None);
2543    }
2544
2545    #[allow(dead_code)]
2546    pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2547        if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2548            msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2549        }
2550    }
2551
2552    pub(crate) fn add_option_greeks_subscription(
2553        &mut self,
2554        topic: MStr<Topic>,
2555        handler: TypedHandler<OptionGreeks>,
2556    ) {
2557        if self.option_greeks_handlers.contains_key(&topic) {
2558            log::warn!(
2559                "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2560                self.actor_id
2561            );
2562            return;
2563        }
2564        self.option_greeks_handlers.insert(topic, handler.clone());
2565        msgbus::subscribe_option_greeks(topic.into(), handler, None);
2566    }
2567
2568    #[allow(dead_code)]
2569    pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2570        if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2571            msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2572        }
2573    }
2574
2575    pub(crate) fn add_option_chain_subscription(
2576        &mut self,
2577        topic: MStr<Topic>,
2578        handler: TypedHandler<OptionChainSlice>,
2579    ) {
2580        if self.option_chain_handlers.contains_key(&topic) {
2581            log::warn!(
2582                "Actor {} attempted duplicate option chain subscription to '{topic}'",
2583                self.actor_id
2584            );
2585            return;
2586        }
2587        self.option_chain_handlers.insert(topic, handler.clone());
2588        msgbus::subscribe_option_chain(topic.into(), handler, None);
2589    }
2590
2591    pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2592        if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2593            msgbus::unsubscribe_option_chain(topic.into(), &handler);
2594        }
2595    }
2596
2597    #[cfg(feature = "defi")]
2598    pub(crate) fn add_block_subscription(
2599        &mut self,
2600        topic: MStr<Topic>,
2601        handler: TypedHandler<Block>,
2602    ) {
2603        if self.block_handlers.contains_key(&topic) {
2604            log::warn!(
2605                "Actor {} attempted duplicate block subscription to '{topic}'",
2606                self.actor_id
2607            );
2608            return;
2609        }
2610        self.block_handlers.insert(topic, handler.clone());
2611        msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2612    }
2613
2614    #[cfg(feature = "defi")]
2615    #[allow(dead_code)]
2616    pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2617        if let Some(handler) = self.block_handlers.remove(&topic) {
2618            msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2619        }
2620    }
2621
2622    #[cfg(feature = "defi")]
2623    pub(crate) fn add_pool_subscription(
2624        &mut self,
2625        topic: MStr<Topic>,
2626        handler: TypedHandler<Pool>,
2627    ) {
2628        if self.pool_handlers.contains_key(&topic) {
2629            log::warn!(
2630                "Actor {} attempted duplicate pool subscription to '{topic}'",
2631                self.actor_id
2632            );
2633            return;
2634        }
2635        self.pool_handlers.insert(topic, handler.clone());
2636        msgbus::subscribe_defi_pools(topic.into(), handler, None);
2637    }
2638
2639    #[cfg(feature = "defi")]
2640    #[allow(dead_code)]
2641    pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2642        if let Some(handler) = self.pool_handlers.remove(&topic) {
2643            msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2644        }
2645    }
2646
2647    #[cfg(feature = "defi")]
2648    pub(crate) fn add_pool_swap_subscription(
2649        &mut self,
2650        topic: MStr<Topic>,
2651        handler: TypedHandler<PoolSwap>,
2652    ) {
2653        if self.pool_swap_handlers.contains_key(&topic) {
2654            log::warn!(
2655                "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2656                self.actor_id
2657            );
2658            return;
2659        }
2660        self.pool_swap_handlers.insert(topic, handler.clone());
2661        msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2662    }
2663
2664    #[cfg(feature = "defi")]
2665    #[allow(dead_code)]
2666    pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2667        if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2668            msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2669        }
2670    }
2671
2672    #[cfg(feature = "defi")]
2673    pub(crate) fn add_pool_liquidity_subscription(
2674        &mut self,
2675        topic: MStr<Topic>,
2676        handler: TypedHandler<PoolLiquidityUpdate>,
2677    ) {
2678        if self.pool_liquidity_handlers.contains_key(&topic) {
2679            log::warn!(
2680                "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2681                self.actor_id
2682            );
2683            return;
2684        }
2685        self.pool_liquidity_handlers.insert(topic, handler.clone());
2686        msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2687    }
2688
2689    #[cfg(feature = "defi")]
2690    #[allow(dead_code)]
2691    pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2692        if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2693            msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2694        }
2695    }
2696
2697    #[cfg(feature = "defi")]
2698    pub(crate) fn add_pool_collect_subscription(
2699        &mut self,
2700        topic: MStr<Topic>,
2701        handler: TypedHandler<PoolFeeCollect>,
2702    ) {
2703        if self.pool_collect_handlers.contains_key(&topic) {
2704            log::warn!(
2705                "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2706                self.actor_id
2707            );
2708            return;
2709        }
2710        self.pool_collect_handlers.insert(topic, handler.clone());
2711        msgbus::subscribe_defi_collects(topic.into(), handler, None);
2712    }
2713
2714    #[cfg(feature = "defi")]
2715    #[allow(dead_code)]
2716    pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2717        if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2718            msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2719        }
2720    }
2721
2722    #[cfg(feature = "defi")]
2723    pub(crate) fn add_pool_flash_subscription(
2724        &mut self,
2725        topic: MStr<Topic>,
2726        handler: TypedHandler<PoolFlash>,
2727    ) {
2728        if self.pool_flash_handlers.contains_key(&topic) {
2729            log::warn!(
2730                "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2731                self.actor_id
2732            );
2733            return;
2734        }
2735        self.pool_flash_handlers.insert(topic, handler.clone());
2736        msgbus::subscribe_defi_flash(topic.into(), handler, None);
2737    }
2738
2739    #[cfg(feature = "defi")]
2740    #[allow(dead_code)]
2741    pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2742        if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2743            msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2744        }
2745    }
2746
2747    /// Creates a new [`DataActorCore`] instance.
2748    pub fn new(config: DataActorConfig) -> Self {
2749        let actor_id = config
2750            .actor_id
2751            .unwrap_or_else(|| Self::default_actor_id(&config));
2752
2753        Self {
2754            actor_id,
2755            config,
2756            trader_id: None, // None until registered
2757            clock: None,     // None until registered
2758            cache: None,     // None until registered
2759            state: ComponentState::default(),
2760            topic_handlers: AHashMap::new(),
2761            deltas_handlers: AHashMap::new(),
2762            depth10_handlers: AHashMap::new(),
2763            book_handlers: AHashMap::new(),
2764            quote_handlers: AHashMap::new(),
2765            trade_handlers: AHashMap::new(),
2766            bar_handlers: AHashMap::new(),
2767            mark_price_handlers: AHashMap::new(),
2768            index_price_handlers: AHashMap::new(),
2769            funding_rate_handlers: AHashMap::new(),
2770            option_greeks_handlers: AHashMap::new(),
2771            option_chain_handlers: AHashMap::new(),
2772            order_event_handlers: AHashMap::new(),
2773            #[cfg(feature = "defi")]
2774            block_handlers: AHashMap::new(),
2775            #[cfg(feature = "defi")]
2776            pool_handlers: AHashMap::new(),
2777            #[cfg(feature = "defi")]
2778            pool_swap_handlers: AHashMap::new(),
2779            #[cfg(feature = "defi")]
2780            pool_liquidity_handlers: AHashMap::new(),
2781            #[cfg(feature = "defi")]
2782            pool_collect_handlers: AHashMap::new(),
2783            #[cfg(feature = "defi")]
2784            pool_flash_handlers: AHashMap::new(),
2785            warning_events: AHashSet::new(),
2786            pending_requests: AHashMap::new(),
2787            signal_classes: AHashMap::new(),
2788            #[cfg(feature = "indicators")]
2789            indicators: Indicators::default(),
2790        }
2791    }
2792
2793    /// Returns the memory address of this instance as a hexadecimal string.
2794    #[must_use]
2795    pub fn mem_address(&self) -> String {
2796        format!("{self:p}")
2797    }
2798
2799    /// Returns the actors state.
2800    pub fn state(&self) -> ComponentState {
2801        self.state
2802    }
2803
2804    /// Returns the trader ID this actor is registered to.
2805    pub fn trader_id(&self) -> Option<TraderId> {
2806        self.trader_id
2807    }
2808
2809    /// Returns the actors ID.
2810    pub fn actor_id(&self) -> ActorId {
2811        self.actor_id
2812    }
2813
2814    fn default_actor_id(config: &DataActorConfig) -> ActorId {
2815        let memory_address = std::ptr::from_ref(config) as usize;
2816        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2817    }
2818
2819    /// Returns a UNIX nanoseconds timestamp from the actor's internal clock.
2820    pub fn timestamp_ns(&self) -> UnixNanos {
2821        self.clock_ref().timestamp_ns()
2822    }
2823
2824    /// Returns the clock for the actor (if registered).
2825    ///
2826    /// # Panics
2827    ///
2828    /// Panics if the actor has not been registered with a trader.
2829    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2830        self.clock
2831            .as_ref()
2832            .unwrap_or_else(|| {
2833                panic!(
2834                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2835                    self.actor_id, self.trader_id
2836                )
2837            })
2838            .borrow_mut()
2839    }
2840
2841    /// Returns a clone of the reference-counted clock.
2842    ///
2843    /// # Panics
2844    ///
2845    /// Panics if the actor has not yet been registered (clock is `None`).
2846    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2847        self.clock
2848            .as_ref()
2849            .expect("DataActor must be registered before accessing clock")
2850            .clone()
2851    }
2852
2853    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2854        self.clock
2855            .as_ref()
2856            .unwrap_or_else(|| {
2857                panic!(
2858                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2859                    self.actor_id, self.trader_id
2860                )
2861            })
2862            .borrow()
2863    }
2864
2865    /// Returns a read-only reference to the cache.
2866    ///
2867    /// # Panics
2868    ///
2869    /// Panics if the actor has not yet been registered (cache is `None`).
2870    pub fn cache(&self) -> Ref<'_, Cache> {
2871        self.cache
2872            .as_ref()
2873            .expect("DataActor must be registered before accessing cache")
2874            .borrow()
2875    }
2876
2877    /// Returns a clone of the reference-counted cache.
2878    ///
2879    /// # Panics
2880    ///
2881    /// Panics if the actor has not yet been registered (cache is `None`).
2882    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2883        self.cache
2884            .as_ref()
2885            .expect("DataActor must be registered before accessing cache")
2886            .clone()
2887    }
2888
2889    // -- REGISTRATION ----------------------------------------------------------------------------
2890
2891    /// Register the data actor with a trader.
2892    ///
2893    /// # Errors
2894    ///
2895    /// Returns an error if the actor has already been registered with a trader
2896    /// or if the provided dependencies are invalid.
2897    pub fn register(
2898        &mut self,
2899        trader_id: TraderId,
2900        clock: Rc<RefCell<dyn Clock>>,
2901        cache: Rc<RefCell<Cache>>,
2902    ) -> anyhow::Result<()> {
2903        if let Some(existing_trader_id) = self.trader_id {
2904            anyhow::bail!(
2905                "DataActor {} already registered with trader {existing_trader_id}",
2906                self.actor_id
2907            );
2908        }
2909
2910        // Validate clock by attempting to access it
2911        {
2912            let _timestamp = clock.borrow().timestamp_ns();
2913        }
2914
2915        // Validate cache by attempting to access it
2916        {
2917            let _cache_borrow = cache.borrow();
2918        }
2919
2920        self.trader_id = Some(trader_id);
2921        self.clock = Some(clock);
2922        self.cache = Some(cache);
2923
2924        // Verify complete registration
2925        if !self.is_properly_registered() {
2926            anyhow::bail!(
2927                "DataActor {} registration incomplete - validation failed",
2928                self.actor_id
2929            );
2930        }
2931
2932        log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2933        Ok(())
2934    }
2935
2936    /// Register an event type for warning log levels.
2937    pub fn register_warning_event(&mut self, event_type: &str) {
2938        self.warning_events.insert(event_type.to_string());
2939        log::debug!("Registered event type '{event_type}' for warning logs");
2940    }
2941
2942    /// Deregister an event type from warning log levels.
2943    pub fn deregister_warning_event(&mut self, event_type: &str) {
2944        self.warning_events.remove(event_type);
2945        log::debug!("Deregistered event type '{event_type}' from warning logs");
2946    }
2947
2948    pub fn is_registered(&self) -> bool {
2949        self.trader_id.is_some()
2950    }
2951
2952    pub(crate) fn check_registered(&self) {
2953        assert!(
2954            self.is_registered(),
2955            "Actor has not been registered with a Trader"
2956        );
2957    }
2958
2959    /// Validates registration state without panicking.
2960    fn is_properly_registered(&self) -> bool {
2961        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2962    }
2963
2964    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2965        if self.config.log_commands {
2966            log::info!("{CMD}{SEND} {command:?}");
2967        }
2968
2969        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2970        msgbus::send_data_command(endpoint, command);
2971    }
2972
2973    #[allow(dead_code)]
2974    fn send_data_req(&self, request: &RequestCommand) {
2975        if self.config.log_commands {
2976            log::info!("{REQ}{SEND} {request:?}");
2977        }
2978
2979        // For now, simplified approach - data requests without dynamic handlers
2980        // TODO: Implement proper dynamic dispatch for response handlers
2981        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2982        msgbus::send_any(endpoint, request.as_any());
2983    }
2984
2985    /// Sends a shutdown command to the system with an optional reason.
2986    ///
2987    /// # Panics
2988    ///
2989    /// Panics if the actor is not registered or has no trader ID.
2990    pub fn shutdown_system(&self, reason: Option<String>) {
2991        self.check_registered();
2992
2993        // Checked registered before unwrapping trader ID
2994        let command = ShutdownSystem::new(
2995            self.trader_id().unwrap(),
2996            self.actor_id.inner(),
2997            reason,
2998            UUID4::new(),
2999            self.timestamp_ns(),
3000        );
3001
3002        let endpoint = "command.system.shutdown".into();
3003        msgbus::send_any(endpoint, command.as_any());
3004    }
3005
3006    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
3007
3008    /// Helper method for registering data subscriptions from the trait.
3009    ///
3010    /// # Panics
3011    ///
3012    /// Panics if the actor is not properly registered.
3013    pub fn subscribe_data(
3014        &mut self,
3015        handler: ShareableMessageHandler,
3016        data_type: DataType,
3017        client_id: Option<ClientId>,
3018        params: Option<Params>,
3019    ) {
3020        assert!(
3021            self.is_properly_registered(),
3022            "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3023            self.actor_id,
3024            self.trader_id,
3025            self.clock.is_some(),
3026            self.cache.is_some()
3027        );
3028
3029        let topic = get_custom_topic(&data_type);
3030        self.add_subscription_any(topic, handler);
3031
3032        // If no client ID specified, just subscribe to the topic
3033        if client_id.is_none() {
3034            return;
3035        }
3036
3037        let command = SubscribeCommand::Data(SubscribeCustomData {
3038            data_type,
3039            client_id,
3040            venue: None,
3041            command_id: UUID4::new(),
3042            ts_init: self.timestamp_ns(),
3043            correlation_id: None,
3044            params,
3045        });
3046
3047        self.send_data_cmd(DataCommand::Subscribe(command));
3048    }
3049
3050    /// Helper method for registering quotes subscriptions from the trait.
3051    pub fn subscribe_quotes(
3052        &mut self,
3053        topic: MStr<Topic>,
3054        handler: TypedHandler<QuoteTick>,
3055        instrument_id: InstrumentId,
3056        client_id: Option<ClientId>,
3057        params: Option<Params>,
3058    ) {
3059        self.check_registered();
3060
3061        self.add_quote_subscription(topic, handler);
3062
3063        let command = SubscribeCommand::Quotes(SubscribeQuotes {
3064            instrument_id,
3065            client_id,
3066            venue: Some(instrument_id.venue),
3067            command_id: UUID4::new(),
3068            ts_init: self.timestamp_ns(),
3069            correlation_id: None,
3070            params,
3071        });
3072
3073        self.send_data_cmd(DataCommand::Subscribe(command));
3074    }
3075
3076    /// Helper method for registering instruments subscriptions from the trait.
3077    pub fn subscribe_instruments(
3078        &mut self,
3079        pattern: MStr<Pattern>,
3080        handler: ShareableMessageHandler,
3081        venue: Venue,
3082        client_id: Option<ClientId>,
3083        params: Option<Params>,
3084    ) {
3085        self.check_registered();
3086
3087        self.add_instrument_subscription(pattern, handler);
3088
3089        let command = SubscribeCommand::Instruments(SubscribeInstruments {
3090            client_id,
3091            venue,
3092            command_id: UUID4::new(),
3093            ts_init: self.timestamp_ns(),
3094            correlation_id: None,
3095            params,
3096        });
3097
3098        self.send_data_cmd(DataCommand::Subscribe(command));
3099    }
3100
3101    /// Helper method for registering instrument subscriptions from the trait.
3102    pub fn subscribe_instrument(
3103        &mut self,
3104        topic: MStr<Topic>,
3105        handler: ShareableMessageHandler,
3106        instrument_id: InstrumentId,
3107        client_id: Option<ClientId>,
3108        params: Option<Params>,
3109    ) {
3110        self.check_registered();
3111
3112        self.add_instrument_subscription(topic.into(), handler);
3113
3114        let command = SubscribeCommand::Instrument(SubscribeInstrument {
3115            instrument_id,
3116            client_id,
3117            venue: Some(instrument_id.venue),
3118            command_id: UUID4::new(),
3119            ts_init: self.timestamp_ns(),
3120            correlation_id: None,
3121            params,
3122        });
3123
3124        self.send_data_cmd(DataCommand::Subscribe(command));
3125    }
3126
3127    /// Helper method for registering book deltas subscriptions from the trait.
3128    #[allow(clippy::too_many_arguments)]
3129    pub fn subscribe_book_deltas(
3130        &mut self,
3131        topic: MStr<Topic>,
3132        handler: TypedHandler<OrderBookDeltas>,
3133        instrument_id: InstrumentId,
3134        book_type: BookType,
3135        depth: Option<NonZeroUsize>,
3136        client_id: Option<ClientId>,
3137        managed: bool,
3138        params: Option<Params>,
3139    ) {
3140        self.check_registered();
3141
3142        self.add_deltas_subscription(topic, handler);
3143
3144        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3145            instrument_id,
3146            book_type,
3147            client_id,
3148            venue: Some(instrument_id.venue),
3149            command_id: UUID4::new(),
3150            ts_init: self.timestamp_ns(),
3151            depth,
3152            managed,
3153            correlation_id: None,
3154            params,
3155        });
3156
3157        self.send_data_cmd(DataCommand::Subscribe(command));
3158    }
3159
3160    /// Helper method for registering book snapshots subscriptions from the trait.
3161    #[allow(clippy::too_many_arguments)]
3162    pub fn subscribe_book_at_interval(
3163        &mut self,
3164        topic: MStr<Topic>,
3165        handler: TypedHandler<OrderBook>,
3166        instrument_id: InstrumentId,
3167        book_type: BookType,
3168        depth: Option<NonZeroUsize>,
3169        interval_ms: NonZeroUsize,
3170        client_id: Option<ClientId>,
3171        params: Option<Params>,
3172    ) {
3173        self.check_registered();
3174
3175        self.add_book_snapshot_subscription(topic, handler);
3176
3177        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3178            instrument_id,
3179            book_type,
3180            client_id,
3181            venue: Some(instrument_id.venue),
3182            command_id: UUID4::new(),
3183            ts_init: self.timestamp_ns(),
3184            depth,
3185            interval_ms,
3186            correlation_id: None,
3187            params,
3188        });
3189
3190        self.send_data_cmd(DataCommand::Subscribe(command));
3191    }
3192
3193    /// Helper method for registering trades subscriptions from the trait.
3194    pub fn subscribe_trades(
3195        &mut self,
3196        topic: MStr<Topic>,
3197        handler: TypedHandler<TradeTick>,
3198        instrument_id: InstrumentId,
3199        client_id: Option<ClientId>,
3200        params: Option<Params>,
3201    ) {
3202        self.check_registered();
3203
3204        self.add_trade_subscription(topic, handler);
3205
3206        let command = SubscribeCommand::Trades(SubscribeTrades {
3207            instrument_id,
3208            client_id,
3209            venue: Some(instrument_id.venue),
3210            command_id: UUID4::new(),
3211            ts_init: self.timestamp_ns(),
3212            correlation_id: None,
3213            params,
3214        });
3215
3216        self.send_data_cmd(DataCommand::Subscribe(command));
3217    }
3218
3219    /// Helper method for registering bars subscriptions from the trait.
3220    pub fn subscribe_bars(
3221        &mut self,
3222        topic: MStr<Topic>,
3223        handler: TypedHandler<Bar>,
3224        bar_type: BarType,
3225        client_id: Option<ClientId>,
3226        params: Option<Params>,
3227    ) {
3228        self.check_registered();
3229
3230        self.add_bar_subscription(topic, handler);
3231
3232        let command = SubscribeCommand::Bars(SubscribeBars {
3233            bar_type,
3234            client_id,
3235            venue: Some(bar_type.instrument_id().venue),
3236            command_id: UUID4::new(),
3237            ts_init: self.timestamp_ns(),
3238            correlation_id: None,
3239            params,
3240        });
3241
3242        self.send_data_cmd(DataCommand::Subscribe(command));
3243    }
3244
3245    /// Helper method for registering mark prices subscriptions from the trait.
3246    pub fn subscribe_mark_prices(
3247        &mut self,
3248        topic: MStr<Topic>,
3249        handler: TypedHandler<MarkPriceUpdate>,
3250        instrument_id: InstrumentId,
3251        client_id: Option<ClientId>,
3252        params: Option<Params>,
3253    ) {
3254        self.check_registered();
3255
3256        self.add_mark_price_subscription(topic, handler);
3257
3258        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3259            instrument_id,
3260            client_id,
3261            venue: Some(instrument_id.venue),
3262            command_id: UUID4::new(),
3263            ts_init: self.timestamp_ns(),
3264            correlation_id: None,
3265            params,
3266        });
3267
3268        self.send_data_cmd(DataCommand::Subscribe(command));
3269    }
3270
3271    /// Helper method for registering index prices subscriptions from the trait.
3272    pub fn subscribe_index_prices(
3273        &mut self,
3274        topic: MStr<Topic>,
3275        handler: TypedHandler<IndexPriceUpdate>,
3276        instrument_id: InstrumentId,
3277        client_id: Option<ClientId>,
3278        params: Option<Params>,
3279    ) {
3280        self.check_registered();
3281
3282        self.add_index_price_subscription(topic, handler);
3283
3284        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3285            instrument_id,
3286            client_id,
3287            venue: Some(instrument_id.venue),
3288            command_id: UUID4::new(),
3289            ts_init: self.timestamp_ns(),
3290            correlation_id: None,
3291            params,
3292        });
3293
3294        self.send_data_cmd(DataCommand::Subscribe(command));
3295    }
3296
3297    /// Helper method for registering funding rates subscriptions from the trait.
3298    pub fn subscribe_funding_rates(
3299        &mut self,
3300        topic: MStr<Topic>,
3301        handler: TypedHandler<FundingRateUpdate>,
3302        instrument_id: InstrumentId,
3303        client_id: Option<ClientId>,
3304        params: Option<Params>,
3305    ) {
3306        self.check_registered();
3307
3308        self.add_funding_rate_subscription(topic, handler);
3309
3310        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3311            instrument_id,
3312            client_id,
3313            venue: Some(instrument_id.venue),
3314            command_id: UUID4::new(),
3315            ts_init: self.timestamp_ns(),
3316            correlation_id: None,
3317            params,
3318        });
3319
3320        self.send_data_cmd(DataCommand::Subscribe(command));
3321    }
3322
3323    /// Helper method for registering option greeks subscriptions from the trait.
3324    pub fn subscribe_option_greeks(
3325        &mut self,
3326        topic: MStr<Topic>,
3327        handler: TypedHandler<OptionGreeks>,
3328        instrument_id: InstrumentId,
3329        client_id: Option<ClientId>,
3330        params: Option<Params>,
3331    ) {
3332        self.check_registered();
3333
3334        self.add_option_greeks_subscription(topic, handler);
3335
3336        let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3337            instrument_id,
3338            client_id,
3339            venue: Some(instrument_id.venue),
3340            command_id: UUID4::new(),
3341            ts_init: self.timestamp_ns(),
3342            correlation_id: None,
3343            params,
3344        });
3345
3346        self.send_data_cmd(DataCommand::Subscribe(command));
3347    }
3348
3349    /// Helper method for registering instrument status subscriptions from the trait.
3350    pub fn subscribe_instrument_status(
3351        &mut self,
3352        topic: MStr<Topic>,
3353        handler: ShareableMessageHandler,
3354        instrument_id: InstrumentId,
3355        client_id: Option<ClientId>,
3356        params: Option<Params>,
3357    ) {
3358        self.check_registered();
3359
3360        self.add_subscription_any(topic, handler);
3361
3362        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3363            instrument_id,
3364            client_id,
3365            venue: Some(instrument_id.venue),
3366            command_id: UUID4::new(),
3367            ts_init: self.timestamp_ns(),
3368            correlation_id: None,
3369            params,
3370        });
3371
3372        self.send_data_cmd(DataCommand::Subscribe(command));
3373    }
3374
3375    /// Helper method for registering instrument close subscriptions from the trait.
3376    pub fn subscribe_instrument_close(
3377        &mut self,
3378        topic: MStr<Topic>,
3379        handler: ShareableMessageHandler,
3380        instrument_id: InstrumentId,
3381        client_id: Option<ClientId>,
3382        params: Option<Params>,
3383    ) {
3384        self.check_registered();
3385
3386        self.add_instrument_close_subscription(topic, handler);
3387
3388        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3389            instrument_id,
3390            client_id,
3391            venue: Some(instrument_id.venue),
3392            command_id: UUID4::new(),
3393            ts_init: self.timestamp_ns(),
3394            correlation_id: None,
3395            params,
3396        });
3397
3398        self.send_data_cmd(DataCommand::Subscribe(command));
3399    }
3400
3401    /// Helper method for subscribing to option chain snapshots from the trait.
3402    #[allow(clippy::too_many_arguments)]
3403    pub fn subscribe_option_chain(
3404        &mut self,
3405        topic: MStr<Topic>,
3406        handler: TypedHandler<OptionChainSlice>,
3407        series_id: OptionSeriesId,
3408        strike_range: StrikeRange,
3409        snapshot_interval_ms: Option<u64>,
3410        client_id: Option<ClientId>,
3411    ) {
3412        self.check_registered();
3413
3414        self.add_option_chain_subscription(topic, handler);
3415
3416        let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3417            series_id,
3418            strike_range,
3419            snapshot_interval_ms,
3420            UUID4::new(),
3421            self.timestamp_ns(),
3422            client_id,
3423            Some(series_id.venue),
3424        ));
3425
3426        self.send_data_cmd(DataCommand::Subscribe(command));
3427    }
3428
3429    /// Helper method for registering order fills subscriptions from the trait.
3430    pub fn subscribe_order_fills(
3431        &mut self,
3432        topic: MStr<Topic>,
3433        handler: TypedHandler<OrderEventAny>,
3434    ) {
3435        self.check_registered();
3436        self.add_order_event_subscription(topic, handler);
3437    }
3438
3439    /// Helper method for registering order cancels subscriptions from the trait.
3440    pub fn subscribe_order_cancels(
3441        &mut self,
3442        topic: MStr<Topic>,
3443        handler: TypedHandler<OrderEventAny>,
3444    ) {
3445        self.check_registered();
3446        self.add_order_event_subscription(topic, handler);
3447    }
3448
3449    /// Helper method for unsubscribing from data.
3450    pub fn unsubscribe_data(
3451        &mut self,
3452        data_type: DataType,
3453        client_id: Option<ClientId>,
3454        params: Option<Params>,
3455    ) {
3456        self.check_registered();
3457
3458        let topic = get_custom_topic(&data_type);
3459        self.remove_subscription_any(topic);
3460
3461        if client_id.is_none() {
3462            return;
3463        }
3464
3465        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3466            data_type,
3467            client_id,
3468            venue: None,
3469            command_id: UUID4::new(),
3470            ts_init: self.timestamp_ns(),
3471            correlation_id: None,
3472            params,
3473        });
3474
3475        self.send_data_cmd(DataCommand::Unsubscribe(command));
3476    }
3477
3478    /// Helper method for unsubscribing from instruments.
3479    pub fn unsubscribe_instruments(
3480        &mut self,
3481        venue: Venue,
3482        client_id: Option<ClientId>,
3483        params: Option<Params>,
3484    ) {
3485        self.check_registered();
3486
3487        let pattern = get_instruments_pattern(venue);
3488        self.remove_instrument_subscription(pattern);
3489
3490        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3491            client_id,
3492            venue,
3493            command_id: UUID4::new(),
3494            ts_init: self.timestamp_ns(),
3495            correlation_id: None,
3496            params,
3497        });
3498
3499        self.send_data_cmd(DataCommand::Unsubscribe(command));
3500    }
3501
3502    /// Helper method for unsubscribing from instrument.
3503    pub fn unsubscribe_instrument(
3504        &mut self,
3505        instrument_id: InstrumentId,
3506        client_id: Option<ClientId>,
3507        params: Option<Params>,
3508    ) {
3509        self.check_registered();
3510
3511        let topic = get_instrument_topic(instrument_id);
3512        self.remove_instrument_subscription(topic.into());
3513
3514        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3515            instrument_id,
3516            client_id,
3517            venue: Some(instrument_id.venue),
3518            command_id: UUID4::new(),
3519            ts_init: self.timestamp_ns(),
3520            correlation_id: None,
3521            params,
3522        });
3523
3524        self.send_data_cmd(DataCommand::Unsubscribe(command));
3525    }
3526
3527    /// Helper method for unsubscribing from book deltas.
3528    pub fn unsubscribe_book_deltas(
3529        &mut self,
3530        instrument_id: InstrumentId,
3531        client_id: Option<ClientId>,
3532        params: Option<Params>,
3533    ) {
3534        self.check_registered();
3535
3536        let topic = get_book_deltas_topic(instrument_id);
3537        self.remove_deltas_subscription(topic);
3538
3539        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3540            instrument_id,
3541            client_id,
3542            venue: Some(instrument_id.venue),
3543            command_id: UUID4::new(),
3544            ts_init: self.timestamp_ns(),
3545            correlation_id: None,
3546            params,
3547        });
3548
3549        self.send_data_cmd(DataCommand::Unsubscribe(command));
3550    }
3551
3552    /// Helper method for unsubscribing from book snapshots at interval.
3553    pub fn unsubscribe_book_at_interval(
3554        &mut self,
3555        instrument_id: InstrumentId,
3556        interval_ms: NonZeroUsize,
3557        client_id: Option<ClientId>,
3558        params: Option<Params>,
3559    ) {
3560        self.check_registered();
3561
3562        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3563        self.remove_book_snapshot_subscription(topic);
3564
3565        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3566            instrument_id,
3567            client_id,
3568            venue: Some(instrument_id.venue),
3569            command_id: UUID4::new(),
3570            ts_init: self.timestamp_ns(),
3571            correlation_id: None,
3572            params,
3573        });
3574
3575        self.send_data_cmd(DataCommand::Unsubscribe(command));
3576    }
3577
3578    /// Helper method for unsubscribing from quotes.
3579    pub fn unsubscribe_quotes(
3580        &mut self,
3581        instrument_id: InstrumentId,
3582        client_id: Option<ClientId>,
3583        params: Option<Params>,
3584    ) {
3585        self.check_registered();
3586
3587        let topic = get_quotes_topic(instrument_id);
3588        self.remove_quote_subscription(topic);
3589
3590        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3591            instrument_id,
3592            client_id,
3593            venue: Some(instrument_id.venue),
3594            command_id: UUID4::new(),
3595            ts_init: self.timestamp_ns(),
3596            correlation_id: None,
3597            params,
3598        });
3599
3600        self.send_data_cmd(DataCommand::Unsubscribe(command));
3601    }
3602
3603    /// Helper method for unsubscribing from trades.
3604    pub fn unsubscribe_trades(
3605        &mut self,
3606        instrument_id: InstrumentId,
3607        client_id: Option<ClientId>,
3608        params: Option<Params>,
3609    ) {
3610        self.check_registered();
3611
3612        let topic = get_trades_topic(instrument_id);
3613        self.remove_trade_subscription(topic);
3614
3615        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3616            instrument_id,
3617            client_id,
3618            venue: Some(instrument_id.venue),
3619            command_id: UUID4::new(),
3620            ts_init: self.timestamp_ns(),
3621            correlation_id: None,
3622            params,
3623        });
3624
3625        self.send_data_cmd(DataCommand::Unsubscribe(command));
3626    }
3627
3628    /// Helper method for unsubscribing from bars.
3629    pub fn unsubscribe_bars(
3630        &mut self,
3631        bar_type: BarType,
3632        client_id: Option<ClientId>,
3633        params: Option<Params>,
3634    ) {
3635        self.check_registered();
3636
3637        let topic = get_bars_topic(bar_type);
3638        self.remove_bar_subscription(topic);
3639
3640        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3641            bar_type,
3642            client_id,
3643            venue: Some(bar_type.instrument_id().venue),
3644            command_id: UUID4::new(),
3645            ts_init: self.timestamp_ns(),
3646            correlation_id: None,
3647            params,
3648        });
3649
3650        self.send_data_cmd(DataCommand::Unsubscribe(command));
3651    }
3652
3653    /// Helper method for unsubscribing from mark prices.
3654    pub fn unsubscribe_mark_prices(
3655        &mut self,
3656        instrument_id: InstrumentId,
3657        client_id: Option<ClientId>,
3658        params: Option<Params>,
3659    ) {
3660        self.check_registered();
3661
3662        let topic = get_mark_price_topic(instrument_id);
3663        self.remove_mark_price_subscription(topic);
3664
3665        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3666            instrument_id,
3667            client_id,
3668            venue: Some(instrument_id.venue),
3669            command_id: UUID4::new(),
3670            ts_init: self.timestamp_ns(),
3671            correlation_id: None,
3672            params,
3673        });
3674
3675        self.send_data_cmd(DataCommand::Unsubscribe(command));
3676    }
3677
3678    /// Helper method for unsubscribing from index prices.
3679    pub fn unsubscribe_index_prices(
3680        &mut self,
3681        instrument_id: InstrumentId,
3682        client_id: Option<ClientId>,
3683        params: Option<Params>,
3684    ) {
3685        self.check_registered();
3686
3687        let topic = get_index_price_topic(instrument_id);
3688        self.remove_index_price_subscription(topic);
3689
3690        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3691            instrument_id,
3692            client_id,
3693            venue: Some(instrument_id.venue),
3694            command_id: UUID4::new(),
3695            ts_init: self.timestamp_ns(),
3696            correlation_id: None,
3697            params,
3698        });
3699
3700        self.send_data_cmd(DataCommand::Unsubscribe(command));
3701    }
3702
3703    /// Helper method for unsubscribing from funding rates.
3704    pub fn unsubscribe_funding_rates(
3705        &mut self,
3706        instrument_id: InstrumentId,
3707        client_id: Option<ClientId>,
3708        params: Option<Params>,
3709    ) {
3710        self.check_registered();
3711
3712        let topic = get_funding_rate_topic(instrument_id);
3713        self.remove_funding_rate_subscription(topic);
3714
3715        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3716            instrument_id,
3717            client_id,
3718            venue: Some(instrument_id.venue),
3719            command_id: UUID4::new(),
3720            ts_init: self.timestamp_ns(),
3721            correlation_id: None,
3722            params,
3723        });
3724
3725        self.send_data_cmd(DataCommand::Unsubscribe(command));
3726    }
3727
3728    /// Helper method for unsubscribing from option greeks.
3729    pub fn unsubscribe_option_greeks(
3730        &mut self,
3731        instrument_id: InstrumentId,
3732        client_id: Option<ClientId>,
3733        params: Option<Params>,
3734    ) {
3735        self.check_registered();
3736
3737        let topic = get_option_greeks_topic(instrument_id);
3738        self.remove_option_greeks_subscription(topic);
3739
3740        let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
3741            instrument_id,
3742            client_id,
3743            venue: Some(instrument_id.venue),
3744            command_id: UUID4::new(),
3745            ts_init: self.timestamp_ns(),
3746            correlation_id: None,
3747            params,
3748        });
3749
3750        self.send_data_cmd(DataCommand::Unsubscribe(command));
3751    }
3752
3753    /// Helper method for unsubscribing from instrument status.
3754    pub fn unsubscribe_instrument_status(
3755        &mut self,
3756        instrument_id: InstrumentId,
3757        client_id: Option<ClientId>,
3758        params: Option<Params>,
3759    ) {
3760        self.check_registered();
3761
3762        let topic = get_instrument_status_topic(instrument_id);
3763        self.remove_subscription_any(topic);
3764
3765        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3766            instrument_id,
3767            client_id,
3768            venue: Some(instrument_id.venue),
3769            command_id: UUID4::new(),
3770            ts_init: self.timestamp_ns(),
3771            correlation_id: None,
3772            params,
3773        });
3774
3775        self.send_data_cmd(DataCommand::Unsubscribe(command));
3776    }
3777
3778    /// Helper method for unsubscribing from instrument close.
3779    pub fn unsubscribe_instrument_close(
3780        &mut self,
3781        instrument_id: InstrumentId,
3782        client_id: Option<ClientId>,
3783        params: Option<Params>,
3784    ) {
3785        self.check_registered();
3786
3787        let topic = get_instrument_close_topic(instrument_id);
3788        self.remove_instrument_close_subscription(topic);
3789
3790        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3791            instrument_id,
3792            client_id,
3793            venue: Some(instrument_id.venue),
3794            command_id: UUID4::new(),
3795            ts_init: self.timestamp_ns(),
3796            correlation_id: None,
3797            params,
3798        });
3799
3800        self.send_data_cmd(DataCommand::Unsubscribe(command));
3801    }
3802
3803    /// Helper method for unsubscribing from option chain snapshots.
3804    pub fn unsubscribe_option_chain(
3805        &mut self,
3806        series_id: OptionSeriesId,
3807        client_id: Option<ClientId>,
3808    ) {
3809        self.check_registered();
3810
3811        let topic = get_option_chain_topic(series_id);
3812        self.remove_option_chain_subscription(topic);
3813
3814        let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
3815            series_id,
3816            UUID4::new(),
3817            self.timestamp_ns(),
3818            client_id,
3819            Some(series_id.venue),
3820        ));
3821
3822        self.send_data_cmd(DataCommand::Unsubscribe(command));
3823    }
3824
3825    /// Helper method for unsubscribing from order fills.
3826    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3827        self.check_registered();
3828
3829        let topic = get_order_fills_topic(instrument_id);
3830        self.remove_order_event_subscription(topic);
3831    }
3832
3833    /// Helper method for unsubscribing from order cancels.
3834    pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3835        self.check_registered();
3836
3837        let topic = get_order_cancels_topic(instrument_id);
3838        self.remove_order_event_subscription(topic);
3839    }
3840
3841    /// Helper method for requesting data.
3842    ///
3843    /// # Errors
3844    ///
3845    /// Returns an error if input parameters are invalid.
3846    #[allow(clippy::too_many_arguments)]
3847    pub fn request_data(
3848        &self,
3849        data_type: DataType,
3850        client_id: ClientId,
3851        start: Option<DateTime<Utc>>,
3852        end: Option<DateTime<Utc>>,
3853        limit: Option<NonZeroUsize>,
3854        params: Option<Params>,
3855        handler: ShareableMessageHandler,
3856    ) -> anyhow::Result<UUID4> {
3857        self.check_registered();
3858
3859        let now = self.clock_ref().utc_now();
3860        check_timestamps(now, start, end)?;
3861
3862        let request_id = UUID4::new();
3863        let command = RequestCommand::Data(RequestCustomData {
3864            client_id,
3865            data_type,
3866            start,
3867            end,
3868            limit,
3869            request_id,
3870            ts_init: self.timestamp_ns(),
3871            params,
3872        });
3873
3874        get_message_bus()
3875            .borrow_mut()
3876            .register_response_handler(command.request_id(), handler)?;
3877
3878        self.send_data_cmd(DataCommand::Request(command));
3879
3880        Ok(request_id)
3881    }
3882
3883    /// Helper method for requesting instrument.
3884    ///
3885    /// # Errors
3886    ///
3887    /// Returns an error if input parameters are invalid.
3888    pub fn request_instrument(
3889        &self,
3890        instrument_id: InstrumentId,
3891        start: Option<DateTime<Utc>>,
3892        end: Option<DateTime<Utc>>,
3893        client_id: Option<ClientId>,
3894        params: Option<Params>,
3895        handler: ShareableMessageHandler,
3896    ) -> anyhow::Result<UUID4> {
3897        self.check_registered();
3898
3899        let now = self.clock_ref().utc_now();
3900        check_timestamps(now, start, end)?;
3901
3902        let request_id = UUID4::new();
3903        let command = RequestCommand::Instrument(RequestInstrument {
3904            instrument_id,
3905            start,
3906            end,
3907            client_id,
3908            request_id,
3909            ts_init: now.into(),
3910            params,
3911        });
3912
3913        get_message_bus()
3914            .borrow_mut()
3915            .register_response_handler(command.request_id(), handler)?;
3916
3917        self.send_data_cmd(DataCommand::Request(command));
3918
3919        Ok(request_id)
3920    }
3921
3922    /// Helper method for requesting instruments.
3923    ///
3924    /// # Errors
3925    ///
3926    /// Returns an error if input parameters are invalid.
3927    pub fn request_instruments(
3928        &self,
3929        venue: Option<Venue>,
3930        start: Option<DateTime<Utc>>,
3931        end: Option<DateTime<Utc>>,
3932        client_id: Option<ClientId>,
3933        params: Option<Params>,
3934        handler: ShareableMessageHandler,
3935    ) -> anyhow::Result<UUID4> {
3936        self.check_registered();
3937
3938        let now = self.clock_ref().utc_now();
3939        check_timestamps(now, start, end)?;
3940
3941        let request_id = UUID4::new();
3942        let command = RequestCommand::Instruments(RequestInstruments {
3943            venue,
3944            start,
3945            end,
3946            client_id,
3947            request_id,
3948            ts_init: now.into(),
3949            params,
3950        });
3951
3952        get_message_bus()
3953            .borrow_mut()
3954            .register_response_handler(command.request_id(), handler)?;
3955
3956        self.send_data_cmd(DataCommand::Request(command));
3957
3958        Ok(request_id)
3959    }
3960
3961    /// Helper method for requesting book snapshot.
3962    ///
3963    /// # Errors
3964    ///
3965    /// Returns an error if input parameters are invalid.
3966    pub fn request_book_snapshot(
3967        &self,
3968        instrument_id: InstrumentId,
3969        depth: Option<NonZeroUsize>,
3970        client_id: Option<ClientId>,
3971        params: Option<Params>,
3972        handler: ShareableMessageHandler,
3973    ) -> anyhow::Result<UUID4> {
3974        self.check_registered();
3975
3976        let request_id = UUID4::new();
3977        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3978            instrument_id,
3979            depth,
3980            client_id,
3981            request_id,
3982            ts_init: self.timestamp_ns(),
3983            params,
3984        });
3985
3986        get_message_bus()
3987            .borrow_mut()
3988            .register_response_handler(command.request_id(), handler)?;
3989
3990        self.send_data_cmd(DataCommand::Request(command));
3991
3992        Ok(request_id)
3993    }
3994
3995    /// Helper method for requesting quotes.
3996    ///
3997    /// # Errors
3998    ///
3999    /// Returns an error if input parameters are invalid.
4000    #[allow(clippy::too_many_arguments)]
4001    pub fn request_quotes(
4002        &self,
4003        instrument_id: InstrumentId,
4004        start: Option<DateTime<Utc>>,
4005        end: Option<DateTime<Utc>>,
4006        limit: Option<NonZeroUsize>,
4007        client_id: Option<ClientId>,
4008        params: Option<Params>,
4009        handler: ShareableMessageHandler,
4010    ) -> anyhow::Result<UUID4> {
4011        self.check_registered();
4012
4013        let now = self.clock_ref().utc_now();
4014        check_timestamps(now, start, end)?;
4015
4016        let request_id = UUID4::new();
4017        let command = RequestCommand::Quotes(RequestQuotes {
4018            instrument_id,
4019            start,
4020            end,
4021            limit,
4022            client_id,
4023            request_id,
4024            ts_init: now.into(),
4025            params,
4026        });
4027
4028        get_message_bus()
4029            .borrow_mut()
4030            .register_response_handler(command.request_id(), handler)?;
4031
4032        self.send_data_cmd(DataCommand::Request(command));
4033
4034        Ok(request_id)
4035    }
4036
4037    /// Helper method for requesting trades.
4038    ///
4039    /// # Errors
4040    ///
4041    /// Returns an error if input parameters are invalid.
4042    #[allow(clippy::too_many_arguments)]
4043    pub fn request_trades(
4044        &self,
4045        instrument_id: InstrumentId,
4046        start: Option<DateTime<Utc>>,
4047        end: Option<DateTime<Utc>>,
4048        limit: Option<NonZeroUsize>,
4049        client_id: Option<ClientId>,
4050        params: Option<Params>,
4051        handler: ShareableMessageHandler,
4052    ) -> anyhow::Result<UUID4> {
4053        self.check_registered();
4054
4055        let now = self.clock_ref().utc_now();
4056        check_timestamps(now, start, end)?;
4057
4058        let request_id = UUID4::new();
4059        let command = RequestCommand::Trades(RequestTrades {
4060            instrument_id,
4061            start,
4062            end,
4063            limit,
4064            client_id,
4065            request_id,
4066            ts_init: now.into(),
4067            params,
4068        });
4069
4070        get_message_bus()
4071            .borrow_mut()
4072            .register_response_handler(command.request_id(), handler)?;
4073
4074        self.send_data_cmd(DataCommand::Request(command));
4075
4076        Ok(request_id)
4077    }
4078
4079    /// Helper method for requesting funding rates.
4080    ///
4081    /// # Errors
4082    ///
4083    /// Returns an error if input parameters are invalid.
4084    #[allow(clippy::too_many_arguments)]
4085    pub fn request_funding_rates(
4086        &self,
4087        instrument_id: InstrumentId,
4088        start: Option<DateTime<Utc>>,
4089        end: Option<DateTime<Utc>>,
4090        limit: Option<NonZeroUsize>,
4091        client_id: Option<ClientId>,
4092        params: Option<Params>,
4093        handler: ShareableMessageHandler,
4094    ) -> anyhow::Result<UUID4> {
4095        self.check_registered();
4096
4097        let now = self.clock_ref().utc_now();
4098        check_timestamps(now, start, end)?;
4099
4100        let request_id = UUID4::new();
4101        let command = RequestCommand::FundingRates(RequestFundingRates {
4102            instrument_id,
4103            start,
4104            end,
4105            limit,
4106            client_id,
4107            request_id,
4108            ts_init: now.into(),
4109            params,
4110        });
4111
4112        get_message_bus()
4113            .borrow_mut()
4114            .register_response_handler(command.request_id(), handler)?;
4115
4116        self.send_data_cmd(DataCommand::Request(command));
4117
4118        Ok(request_id)
4119    }
4120
4121    /// Helper method for requesting bars.
4122    ///
4123    /// # Errors
4124    ///
4125    /// Returns an error if input parameters are invalid.
4126    #[allow(clippy::too_many_arguments)]
4127    pub fn request_bars(
4128        &self,
4129        bar_type: BarType,
4130        start: Option<DateTime<Utc>>,
4131        end: Option<DateTime<Utc>>,
4132        limit: Option<NonZeroUsize>,
4133        client_id: Option<ClientId>,
4134        params: Option<Params>,
4135        handler: ShareableMessageHandler,
4136    ) -> anyhow::Result<UUID4> {
4137        self.check_registered();
4138
4139        let now = self.clock_ref().utc_now();
4140        check_timestamps(now, start, end)?;
4141
4142        let request_id = UUID4::new();
4143        let command = RequestCommand::Bars(RequestBars {
4144            bar_type,
4145            start,
4146            end,
4147            limit,
4148            client_id,
4149            request_id,
4150            ts_init: now.into(),
4151            params,
4152        });
4153
4154        get_message_bus()
4155            .borrow_mut()
4156            .register_response_handler(command.request_id(), handler)?;
4157
4158        self.send_data_cmd(DataCommand::Request(command));
4159
4160        Ok(request_id)
4161    }
4162
4163    #[cfg(test)]
4164    pub fn quote_handler_count(&self) -> usize {
4165        self.quote_handlers.len()
4166    }
4167
4168    #[cfg(test)]
4169    pub fn trade_handler_count(&self) -> usize {
4170        self.trade_handlers.len()
4171    }
4172
4173    #[cfg(test)]
4174    pub fn bar_handler_count(&self) -> usize {
4175        self.bar_handlers.len()
4176    }
4177
4178    #[cfg(test)]
4179    pub fn deltas_handler_count(&self) -> usize {
4180        self.deltas_handlers.len()
4181    }
4182
4183    #[cfg(test)]
4184    pub fn has_quote_handler(&self, topic: &str) -> bool {
4185        self.quote_handlers
4186            .contains_key(&MStr::<Topic>::from(topic))
4187    }
4188
4189    #[cfg(test)]
4190    pub fn has_trade_handler(&self, topic: &str) -> bool {
4191        self.trade_handlers
4192            .contains_key(&MStr::<Topic>::from(topic))
4193    }
4194
4195    #[cfg(test)]
4196    pub fn has_bar_handler(&self, topic: &str) -> bool {
4197        self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4198    }
4199
4200    #[cfg(test)]
4201    pub fn has_deltas_handler(&self, topic: &str) -> bool {
4202        self.deltas_handlers
4203            .contains_key(&MStr::<Topic>::from(topic))
4204    }
4205}
4206
4207fn check_timestamps(
4208    now: DateTime<Utc>,
4209    start: Option<DateTime<Utc>>,
4210    end: Option<DateTime<Utc>>,
4211) -> anyhow::Result<()> {
4212    if let Some(start) = start {
4213        check_predicate_true(start <= now, "start was > now")?;
4214    }
4215
4216    if let Some(end) = end {
4217        check_predicate_true(end <= now, "end was > now")?;
4218    }
4219
4220    if let (Some(start), Some(end)) = (start, end) {
4221        check_predicate_true(start < end, "start was >= end")?;
4222    }
4223
4224    Ok(())
4225}
4226
4227fn log_error(e: &anyhow::Error) {
4228    log::error!("{e}");
4229}
4230
4231fn log_not_running<T>(msg: &T)
4232where
4233    T: Debug,
4234{
4235    log::trace!("Received message when not running - skipping {msg:?}");
4236}
4237
4238fn log_received<T>(msg: &T)
4239where
4240    T: Debug,
4241{
4242    log::debug!("{RECV} {msg:?}");
4243}