1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 rc::Rc,
23 sync::Arc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use chrono::{DateTime, Utc};
28use indexmap::IndexMap;
29use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
30#[cfg(feature = "defi")]
31use nautilus_model::defi::{
32 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
33};
34use nautilus_model::{
35 data::{
36 Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
37 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
38 close::InstrumentClose,
39 option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
40 },
41 enums::BookType,
42 events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
43 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
44 instruments::{InstrumentAny, SyntheticInstrument},
45 orderbook::OrderBook,
46};
47use serde::{Deserialize, Serialize};
48use ustr::Ustr;
49
50use super::{
51 Actor,
52 indicators::{Indicators, SharedActorIndicator},
53 registry::{get_actor_unchecked, try_get_actor_unchecked},
54};
55#[cfg(feature = "defi")]
56use crate::defi;
57#[cfg(feature = "defi")]
58#[allow(unused_imports)]
59use crate::defi::data_actor as _; use crate::{
61 cache::{Cache, CacheApi},
62 clock::{Clock, ClockApi},
63 component::Component,
64 enums::{ComponentState, ComponentTrigger},
65 logging::{CMD, RECV, REQ, SEND},
66 messages::{
67 data::{
68 BarsResponse, BookDeltasResponse, BookDepthResponse, BookResponse, CustomDataResponse,
69 DataCommand, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
70 QuotesResponse, RequestBars, RequestBookDeltas, RequestBookDepth, RequestBookSnapshot,
71 RequestCommand, RequestCustomData, RequestFundingRates, RequestInstrument,
72 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
73 SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
74 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
75 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices,
76 SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades,
77 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
78 UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
79 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
80 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices,
81 UnsubscribeOptionChain, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
82 is_parent_subscription,
83 },
84 system::ShutdownSystem,
85 },
86 msgbus::{
87 self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
88 switchboard::{
89 MessagingSwitchboard, get_bars_topic, get_book_deltas_pattern, get_book_deltas_topic,
90 get_book_snapshots_topic, get_custom_topic, get_funding_rate_topic,
91 get_index_price_topic, get_instrument_close_topic, get_instrument_status_topic,
92 get_instrument_topic, get_instruments_pattern, get_mark_price_topic,
93 get_option_chain_topic, get_option_greeks_topic, get_order_canceled_topic,
94 get_order_filled_topic, get_quotes_topic, get_signal_pattern, get_trades_topic,
95 },
96 },
97 signal::Signal,
98 timer::{TimeEvent, TimeEventCallback},
99};
100
101#[derive(Debug, Clone, Deserialize, Serialize)]
103#[serde(default, deny_unknown_fields)]
104#[cfg_attr(
105 feature = "python",
106 pyo3::pyclass(
107 module = "nautilus_trader.core.nautilus_pyo3.common",
108 subclass,
109 from_py_object
110 )
111)]
112#[cfg_attr(
113 feature = "python",
114 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
115)]
116pub struct DataActorConfig {
117 pub actor_id: Option<ActorId>,
119 pub log_events: bool,
121 pub log_commands: bool,
123}
124
125impl Default for DataActorConfig {
126 fn default() -> Self {
127 Self {
128 actor_id: None,
129 log_events: true,
130 log_commands: true,
131 }
132 }
133}
134
135#[derive(Debug, Clone, Deserialize, Serialize)]
137#[serde(deny_unknown_fields)]
138#[cfg_attr(
139 feature = "python",
140 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
141)]
142#[cfg_attr(
143 feature = "python",
144 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
145)]
146pub struct ImportableActorConfig {
147 pub actor_path: String,
149 pub config_path: String,
151 pub config: HashMap<String, serde_json::Value>,
153}
154
155type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
156
157pub trait DataActorNative {
169 fn core(&self) -> &DataActorCore;
171
172 fn core_mut(&mut self) -> &mut DataActorCore;
174
175 fn clock_mut(&mut self) -> RefMut<'_, dyn Clock> {
181 let core = self.core_mut();
182 core.clock
183 .as_ref()
184 .unwrap_or_else(|| {
185 panic!(
186 "DataActor {} must be registered before calling `clock_mut()` - trader_id: {:?}",
187 core.actor_id, core.trader_id
188 )
189 })
190 .borrow_mut()
191 }
192
193 fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
199 self.core()
200 .clock
201 .as_ref()
202 .expect("DataActor must be registered before accessing clock")
203 .clone()
204 }
205
206 fn cache_ref(&self) -> Ref<'_, Cache> {
212 self.core()
213 .cache
214 .as_ref()
215 .expect("DataActor must be registered before accessing cache")
216 .borrow()
217 }
218
219 fn cache_rc(&self) -> Rc<RefCell<Cache>> {
225 self.core()
226 .cache
227 .as_ref()
228 .expect("DataActor must be registered before accessing cache")
229 .clone()
230 }
231}
232
233pub trait DataActor: Component {
241 fn actor_id(&self) -> ActorId
243 where
244 Self: DataActorNative,
245 {
246 self.core().actor_id()
247 }
248
249 fn trader_id(&self) -> Option<TraderId>
251 where
252 Self: DataActorNative,
253 {
254 self.core().trader_id()
255 }
256
257 fn is_registered(&self) -> bool
259 where
260 Self: DataActorNative,
261 {
262 self.core().is_registered()
263 }
264
265 fn config(&self) -> &DataActorConfig
267 where
268 Self: DataActorNative,
269 {
270 &self.core().config
271 }
272
273 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
279 Ok(IndexMap::new())
280 }
281
282 #[allow(unused_variables)]
288 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
289 Ok(())
290 }
291
292 fn on_start(&mut self) -> anyhow::Result<()> {
298 log::warn!(
299 "The `on_start` handler was called when not overridden, \
300 it's expected that any actions required when starting the actor \
301 occur here, such as subscribing/requesting data"
302 );
303 Ok(())
304 }
305
306 fn on_stop(&mut self) -> anyhow::Result<()> {
312 log::warn!(
313 "The `on_stop` handler was called when not overridden, \
314 it's expected that any actions required when stopping the actor \
315 occur here, such as unsubscribing from data",
316 );
317 Ok(())
318 }
319
320 fn on_resume(&mut self) -> anyhow::Result<()> {
326 log::warn!(
327 "The `on_resume` handler was called when not overridden, \
328 it's expected that any actions required when resuming the actor \
329 following a stop occur here"
330 );
331 Ok(())
332 }
333
334 fn on_reset(&mut self) -> anyhow::Result<()> {
340 log::warn!(
341 "The `on_reset` handler was called when not overridden, \
342 it's expected that any actions required when resetting the actor \
343 occur here, such as resetting indicators and other state"
344 );
345 Ok(())
346 }
347
348 fn on_dispose(&mut self) -> anyhow::Result<()> {
354 Ok(())
355 }
356
357 fn on_degrade(&mut self) -> anyhow::Result<()> {
363 Ok(())
364 }
365
366 fn on_fault(&mut self) -> anyhow::Result<()> {
372 Ok(())
373 }
374
375 #[allow(unused_variables)]
381 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
382 Ok(())
383 }
384
385 #[allow(unused_variables)]
391 fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
392 Ok(())
393 }
394
395 #[allow(unused_variables)]
401 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
402 Ok(())
403 }
404
405 #[allow(unused_variables)]
411 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
412 Ok(())
413 }
414
415 #[allow(unused_variables)]
421 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
422 Ok(())
423 }
424
425 #[allow(unused_variables)]
431 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
432 Ok(())
433 }
434
435 #[allow(unused_variables)]
441 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
442 Ok(())
443 }
444
445 #[allow(unused_variables)]
451 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
452 Ok(())
453 }
454
455 #[allow(unused_variables)]
461 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
462 Ok(())
463 }
464
465 #[allow(unused_variables)]
471 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
472 Ok(())
473 }
474
475 #[allow(unused_variables)]
481 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
482 Ok(())
483 }
484
485 #[allow(unused_variables)]
491 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
492 Ok(())
493 }
494
495 #[allow(unused_variables)]
501 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
502 Ok(())
503 }
504
505 #[allow(unused_variables)]
511 fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
512 Ok(())
513 }
514
515 #[allow(unused_variables)]
521 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
522 Ok(())
523 }
524
525 #[allow(unused_variables)]
531 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
532 Ok(())
533 }
534
535 #[allow(unused_variables)]
541 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
542 Ok(())
543 }
544
545 #[allow(unused_variables)]
551 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
552 Ok(())
553 }
554
555 #[cfg(feature = "defi")]
556 #[allow(unused_variables)]
562 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
563 Ok(())
564 }
565
566 #[cfg(feature = "defi")]
567 #[allow(unused_variables)]
573 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
574 Ok(())
575 }
576
577 #[cfg(feature = "defi")]
578 #[allow(unused_variables)]
584 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
585 Ok(())
586 }
587
588 #[cfg(feature = "defi")]
589 #[allow(unused_variables)]
595 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
596 Ok(())
597 }
598
599 #[cfg(feature = "defi")]
600 #[allow(unused_variables)]
606 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
607 Ok(())
608 }
609
610 #[cfg(feature = "defi")]
611 #[allow(unused_variables)]
617 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
618 Ok(())
619 }
620
621 #[allow(unused_variables)]
627 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
628 Ok(())
629 }
630
631 #[allow(unused_variables)]
637 fn on_historical_book_deltas(&mut self, deltas: &[OrderBookDelta]) -> anyhow::Result<()> {
638 Ok(())
639 }
640
641 #[allow(unused_variables)]
647 fn on_historical_book_depth(&mut self, depths: &[OrderBookDepth10]) -> anyhow::Result<()> {
648 Ok(())
649 }
650
651 #[allow(unused_variables)]
657 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
658 Ok(())
659 }
660
661 #[allow(unused_variables)]
667 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
668 Ok(())
669 }
670
671 #[allow(unused_variables)]
677 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
678 Ok(())
679 }
680
681 #[allow(unused_variables)]
687 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
688 Ok(())
689 }
690
691 #[allow(unused_variables)]
697 fn on_historical_index_prices(
698 &mut self,
699 index_prices: &[IndexPriceUpdate],
700 ) -> anyhow::Result<()> {
701 Ok(())
702 }
703
704 #[allow(unused_variables)]
710 fn on_historical_funding_rates(
711 &mut self,
712 funding_rates: &[FundingRateUpdate],
713 ) -> anyhow::Result<()> {
714 Ok(())
715 }
716
717 fn clock(&self) -> ClockApi<'_>
719 where
720 Self: DataActorNative,
721 {
722 self.core().clock_api()
723 }
724
725 fn cache(&self) -> CacheApi<'_>
727 where
728 Self: DataActorNative,
729 {
730 self.core().cache_api()
731 }
732
733 fn shutdown_system(&self, reason: Option<String>)
739 where
740 Self: DataActorNative,
741 {
742 self.core().shutdown_system(reason);
743 }
744
745 fn publish_data(&self, data_type: &DataType, data: &CustomData)
754 where
755 Self: DataActorNative,
756 {
757 self.core().publish_data(data_type, data);
758 }
759
760 fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos)
766 where
767 Self: DataActorNative,
768 {
769 self.core().publish_signal(name, value, ts_event);
770 }
771
772 fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()>
784 where
785 Self: DataActorNative,
786 {
787 self.core().add_synthetic(synthetic)
788 }
789
790 fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()>
802 where
803 Self: DataActorNative,
804 {
805 self.core().update_synthetic(synthetic)
806 }
807
808 fn handle_time_event(&mut self, event: &TimeEvent) {
810 log_received(&event);
811
812 if self.not_running() {
813 log_not_running(&event);
814 return;
815 }
816
817 if let Err(e) = DataActor::on_time_event(self, event) {
818 log_error(&e);
819 }
820 }
821
822 fn handle_data(&mut self, data: &CustomData) {
824 log_received(&data);
825
826 if self.not_running() {
827 log_not_running(&data);
828 return;
829 }
830
831 if let Err(e) = self.on_data(data) {
832 log_error(&e);
833 }
834 }
835
836 fn handle_signal(&mut self, signal: &Signal) {
838 log_received(&signal);
839
840 if self.not_running() {
841 log_not_running(&signal);
842 return;
843 }
844
845 if let Err(e) = self.on_signal(signal) {
846 log_error(&e);
847 }
848 }
849
850 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
852 log_received(&instrument);
853
854 if self.not_running() {
855 log_not_running(&instrument);
856 return;
857 }
858
859 if let Err(e) = self.on_instrument(instrument) {
860 log_error(&e);
861 }
862 }
863
864 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
866 log_received(&deltas);
867
868 if self.not_running() {
869 log_not_running(&deltas);
870 return;
871 }
872
873 if let Err(e) = self.on_book_deltas(deltas) {
874 log_error(&e);
875 }
876 }
877
878 fn handle_book(&mut self, book: &OrderBook) {
880 log_received(&book);
881
882 if self.not_running() {
883 log_not_running(&book);
884 return;
885 }
886
887 if let Err(e) = self.on_book(book) {
888 log_error(&e);
889 }
890 }
891
892 fn handle_quote(&mut self, quote: &QuoteTick)
894 where
895 Self: DataActorNative,
896 {
897 log_received("e);
898
899 if let Err(e) = self.core().handle_indicators_for_quote(quote) {
900 log_error(&e);
901 return;
902 }
903
904 if self.not_running() {
905 log_not_running("e);
906 return;
907 }
908
909 if let Err(e) = self.on_quote(quote) {
910 log_error(&e);
911 }
912 }
913
914 fn handle_trade(&mut self, trade: &TradeTick)
916 where
917 Self: DataActorNative,
918 {
919 log_received(&trade);
920
921 if let Err(e) = self.core().handle_indicators_for_trade(trade) {
922 log_error(&e);
923 return;
924 }
925
926 if self.not_running() {
927 log_not_running(&trade);
928 return;
929 }
930
931 if let Err(e) = self.on_trade(trade) {
932 log_error(&e);
933 }
934 }
935
936 fn handle_bar(&mut self, bar: &Bar)
938 where
939 Self: DataActorNative,
940 {
941 log_received(&bar);
942
943 if let Err(e) = self.core().handle_indicators_for_bar(bar) {
944 log_error(&e);
945 return;
946 }
947
948 if self.not_running() {
949 log_not_running(&bar);
950 return;
951 }
952
953 if let Err(e) = self.on_bar(bar) {
954 log_error(&e);
955 }
956 }
957
958 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
960 log_received(&mark_price);
961
962 if self.not_running() {
963 log_not_running(&mark_price);
964 return;
965 }
966
967 if let Err(e) = self.on_mark_price(mark_price) {
968 log_error(&e);
969 }
970 }
971
972 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
974 log_received(&index_price);
975
976 if self.not_running() {
977 log_not_running(&index_price);
978 return;
979 }
980
981 if let Err(e) = self.on_index_price(index_price) {
982 log_error(&e);
983 }
984 }
985
986 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
988 log_received(&funding_rate);
989
990 if self.not_running() {
991 log_not_running(&funding_rate);
992 return;
993 }
994
995 if let Err(e) = self.on_funding_rate(funding_rate) {
996 log_error(&e);
997 }
998 }
999
1000 fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
1002 log_received(&greeks);
1003
1004 if self.not_running() {
1005 log_not_running(&greeks);
1006 return;
1007 }
1008
1009 if let Err(e) = self.on_option_greeks(greeks) {
1010 log_error(&e);
1011 }
1012 }
1013
1014 fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
1016 log_received(&slice);
1017
1018 if self.not_running() {
1019 log_not_running(&slice);
1020 return;
1021 }
1022
1023 if let Err(e) = self.on_option_chain(slice) {
1024 log_error(&e);
1025 }
1026 }
1027
1028 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
1030 log_received(&status);
1031
1032 if self.not_running() {
1033 log_not_running(&status);
1034 return;
1035 }
1036
1037 if let Err(e) = self.on_instrument_status(status) {
1038 log_error(&e);
1039 }
1040 }
1041
1042 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
1044 log_received(&close);
1045
1046 if self.not_running() {
1047 log_not_running(&close);
1048 return;
1049 }
1050
1051 if let Err(e) = self.on_instrument_close(close) {
1052 log_error(&e);
1053 }
1054 }
1055
1056 fn handle_order_filled(&mut self, event: &OrderFilled)
1058 where
1059 Self: DataActorNative,
1060 {
1061 log_received(&event);
1062
1063 if event.strategy_id.inner() == self.core().actor_id().inner() {
1067 return;
1068 }
1069
1070 if self.not_running() {
1071 log_not_running(&event);
1072 return;
1073 }
1074
1075 if let Err(e) = self.on_order_filled(event) {
1076 log_error(&e);
1077 }
1078 }
1079
1080 fn handle_order_canceled(&mut self, event: &OrderCanceled)
1082 where
1083 Self: DataActorNative,
1084 {
1085 log_received(&event);
1086
1087 if event.strategy_id.inner() == self.core().actor_id().inner() {
1091 return;
1092 }
1093
1094 if self.not_running() {
1095 log_not_running(&event);
1096 return;
1097 }
1098
1099 if let Err(e) = self.on_order_canceled(event) {
1100 log_error(&e);
1101 }
1102 }
1103
1104 #[cfg(feature = "defi")]
1105 fn handle_block(&mut self, block: &Block) {
1107 log_received(&block);
1108
1109 if self.not_running() {
1110 log_not_running(&block);
1111 return;
1112 }
1113
1114 if let Err(e) = self.on_block(block) {
1115 log_error(&e);
1116 }
1117 }
1118
1119 #[cfg(feature = "defi")]
1120 fn handle_pool(&mut self, pool: &Pool) {
1122 log_received(&pool);
1123
1124 if self.not_running() {
1125 log_not_running(&pool);
1126 return;
1127 }
1128
1129 if let Err(e) = self.on_pool(pool) {
1130 log_error(&e);
1131 }
1132 }
1133
1134 #[cfg(feature = "defi")]
1135 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
1137 log_received(&swap);
1138
1139 if self.not_running() {
1140 log_not_running(&swap);
1141 return;
1142 }
1143
1144 if let Err(e) = self.on_pool_swap(swap) {
1145 log_error(&e);
1146 }
1147 }
1148
1149 #[cfg(feature = "defi")]
1150 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
1152 log_received(&update);
1153
1154 if self.not_running() {
1155 log_not_running(&update);
1156 return;
1157 }
1158
1159 if let Err(e) = self.on_pool_liquidity_update(update) {
1160 log_error(&e);
1161 }
1162 }
1163
1164 #[cfg(feature = "defi")]
1165 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
1167 log_received(&collect);
1168
1169 if self.not_running() {
1170 log_not_running(&collect);
1171 return;
1172 }
1173
1174 if let Err(e) = self.on_pool_fee_collect(collect) {
1175 log_error(&e);
1176 }
1177 }
1178
1179 #[cfg(feature = "defi")]
1180 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
1182 log_received(&flash);
1183
1184 if self.not_running() {
1185 log_not_running(&flash);
1186 return;
1187 }
1188
1189 if let Err(e) = self.on_pool_flash(flash) {
1190 log_error(&e);
1191 }
1192 }
1193
1194 fn handle_historical_data(&mut self, data: &dyn Any) {
1196 log_received(&data);
1197
1198 if let Err(e) = self.on_historical_data(data) {
1199 log_error(&e);
1200 }
1201 }
1202
1203 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
1205 log_received(&resp);
1206
1207 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
1208 log_error(&e);
1209 }
1210 }
1211
1212 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
1214 log_received(&resp);
1215
1216 if let Err(e) = self.on_instrument(&resp.data) {
1217 log_error(&e);
1218 }
1219 }
1220
1221 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
1223 log_received_bulk("InstrumentsResponse", &resp.correlation_id, resp.data.len());
1224 log::trace!("{RECV} {resp:?}");
1225
1226 for inst in &resp.data {
1227 if let Err(e) = self.on_instrument(inst) {
1228 log_error(&e);
1229 }
1230 }
1231 }
1232
1233 fn handle_book_response(&mut self, resp: &BookResponse) {
1235 log_received(&resp);
1236
1237 if let Err(e) = self.on_book(&resp.data) {
1238 log_error(&e);
1239 }
1240 }
1241
1242 fn handle_book_deltas_response(&mut self, resp: &BookDeltasResponse) {
1244 log_received_bulk("BookDeltasResponse", &resp.correlation_id, resp.data.len());
1245 log::trace!("{RECV} {resp:?}");
1246
1247 if let Err(e) = self.on_historical_book_deltas(&resp.data) {
1248 log_error(&e);
1249 }
1250 }
1251
1252 fn handle_book_depth_response(&mut self, resp: &BookDepthResponse) {
1254 log_received_bulk("BookDepthResponse", &resp.correlation_id, resp.data.len());
1255 log::trace!("{RECV} {resp:?}");
1256
1257 if let Err(e) = self.on_historical_book_depth(&resp.data) {
1258 log_error(&e);
1259 }
1260 }
1261
1262 fn handle_quotes_response(&mut self, resp: &QuotesResponse)
1264 where
1265 Self: DataActorNative,
1266 {
1267 log_received_bulk("QuotesResponse", &resp.correlation_id, resp.data.len());
1268 log::trace!("{RECV} {resp:?}");
1269
1270 if let Err(e) = self.core().handle_indicators_for_quotes(&resp.data) {
1271 log_error(&e);
1272 return;
1273 }
1274
1275 if let Err(e) = self.on_historical_quotes(&resp.data) {
1276 log_error(&e);
1277 }
1278 }
1279
1280 fn handle_trades_response(&mut self, resp: &TradesResponse)
1282 where
1283 Self: DataActorNative,
1284 {
1285 log_received_bulk("TradesResponse", &resp.correlation_id, resp.data.len());
1286 log::trace!("{RECV} {resp:?}");
1287
1288 if let Err(e) = self.core().handle_indicators_for_trades(&resp.data) {
1289 log_error(&e);
1290 return;
1291 }
1292
1293 if let Err(e) = self.on_historical_trades(&resp.data) {
1294 log_error(&e);
1295 }
1296 }
1297
1298 fn handle_bars_response(&mut self, resp: &BarsResponse)
1300 where
1301 Self: DataActorNative,
1302 {
1303 log_received_bulk("BarsResponse", &resp.correlation_id, resp.data.len());
1304 log::trace!("{RECV} {resp:?}");
1305
1306 if let Err(e) = self.core().handle_indicators_for_bars(&resp.data) {
1307 log_error(&e);
1308 return;
1309 }
1310
1311 if let Err(e) = self.on_historical_bars(&resp.data) {
1312 log_error(&e);
1313 }
1314 }
1315
1316 fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1318 log_received_bulk(
1319 "FundingRatesResponse",
1320 &resp.correlation_id,
1321 resp.data.len(),
1322 );
1323 log::trace!("{RECV} {resp:?}");
1324
1325 if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1326 log_error(&e);
1327 }
1328 }
1329
1330 fn subscribe_data(
1332 &mut self,
1333 data_type: DataType,
1334 client_id: Option<ClientId>,
1335 params: Option<Params>,
1336 ) where
1337 Self: DataActorNative,
1338 Self: 'static + Debug + Sized,
1339 {
1340 let actor_id = self.core().actor_id().inner();
1341 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1342 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1343 });
1344
1345 DataActorCore::subscribe_data(self.core_mut(), handler, data_type, client_id, params);
1346 }
1347
1348 fn subscribe_signal(&mut self, name: &str, priority: Option<u32>)
1363 where
1364 Self: DataActorNative,
1365 Self: 'static + Debug + Sized,
1366 {
1367 let actor_id = self.core().actor_id().inner();
1368 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1371 if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1372 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1373 actor.handle_signal(signal);
1374 } else {
1375 log::error!("Actor {actor_id} not found for signal handling");
1376 }
1377 }
1378 });
1379
1380 DataActorCore::subscribe_signal(self.core_mut(), handler, name, priority);
1381 }
1382
1383 fn subscribe_quotes(
1385 &mut self,
1386 instrument_id: InstrumentId,
1387 client_id: Option<ClientId>,
1388 params: Option<Params>,
1389 ) where
1390 Self: DataActorNative,
1391 Self: 'static + Debug + Sized,
1392 {
1393 let actor_id = self.core().actor_id().inner();
1394 let topic = get_quotes_topic(instrument_id);
1395
1396 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1397 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1398 actor.handle_quote(quote);
1399 } else {
1400 log::error!("Actor {actor_id} not found for quote handling");
1401 }
1402 });
1403
1404 DataActorCore::subscribe_quotes(
1405 self.core_mut(),
1406 topic,
1407 handler,
1408 instrument_id,
1409 client_id,
1410 params,
1411 );
1412 }
1413
1414 fn subscribe_instruments(
1416 &mut self,
1417 venue: Venue,
1418 client_id: Option<ClientId>,
1419 params: Option<Params>,
1420 ) where
1421 Self: DataActorNative,
1422 Self: 'static + Debug + Sized,
1423 {
1424 let actor_id = self.core().actor_id().inner();
1425 let pattern = get_instruments_pattern(venue);
1426
1427 let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1428 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1429 actor.handle_instrument(instrument);
1430 } else {
1431 log::error!("Actor {actor_id} not found for instruments handling");
1432 }
1433 });
1434
1435 DataActorCore::subscribe_instruments(
1436 self.core_mut(),
1437 pattern,
1438 handler,
1439 venue,
1440 client_id,
1441 params,
1442 );
1443 }
1444
1445 fn subscribe_instrument(
1447 &mut self,
1448 instrument_id: InstrumentId,
1449 client_id: Option<ClientId>,
1450 params: Option<Params>,
1451 ) where
1452 Self: DataActorNative,
1453 Self: 'static + Debug + Sized,
1454 {
1455 let actor_id = self.core().actor_id().inner();
1456 let topic = get_instrument_topic(instrument_id);
1457
1458 let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1459 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1460 actor.handle_instrument(instrument);
1461 } else {
1462 log::error!("Actor {actor_id} not found for instrument handling");
1463 }
1464 });
1465
1466 DataActorCore::subscribe_instrument(
1467 self.core_mut(),
1468 topic,
1469 handler,
1470 instrument_id,
1471 client_id,
1472 params,
1473 );
1474 }
1475
1476 fn subscribe_book_deltas(
1478 &mut self,
1479 instrument_id: InstrumentId,
1480 book_type: BookType,
1481 depth: Option<NonZeroUsize>,
1482 client_id: Option<ClientId>,
1483 managed: bool,
1484 params: Option<Params>,
1485 ) where
1486 Self: DataActorNative,
1487 Self: 'static + Debug + Sized,
1488 {
1489 let actor_id = self.core().actor_id().inner();
1490 let is_parent = is_parent_subscription(params.as_ref());
1491 let pattern = if is_parent {
1492 get_book_deltas_pattern(instrument_id)
1493 } else {
1494 get_book_deltas_topic(instrument_id).into()
1495 };
1496
1497 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1498 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1499 });
1500
1501 DataActorCore::subscribe_book_deltas(
1502 self.core_mut(),
1503 pattern,
1504 handler,
1505 instrument_id,
1506 book_type,
1507 depth,
1508 client_id,
1509 managed,
1510 params,
1511 );
1512 }
1513
1514 fn subscribe_book_at_interval(
1516 &mut self,
1517 instrument_id: InstrumentId,
1518 book_type: BookType,
1519 depth: Option<NonZeroUsize>,
1520 interval_ms: NonZeroUsize,
1521 client_id: Option<ClientId>,
1522 params: Option<Params>,
1523 ) where
1524 Self: DataActorNative,
1525 Self: 'static + Debug + Sized,
1526 {
1527 let actor_id = self.core().actor_id().inner();
1528 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1529
1530 let handler = TypedHandler::from(move |book: &OrderBook| {
1531 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1532 });
1533
1534 DataActorCore::subscribe_book_at_interval(
1535 self.core_mut(),
1536 topic,
1537 handler,
1538 instrument_id,
1539 book_type,
1540 depth,
1541 interval_ms,
1542 client_id,
1543 params,
1544 );
1545 }
1546
1547 fn subscribe_trades(
1549 &mut self,
1550 instrument_id: InstrumentId,
1551 client_id: Option<ClientId>,
1552 params: Option<Params>,
1553 ) where
1554 Self: DataActorNative,
1555 Self: 'static + Debug + Sized,
1556 {
1557 let actor_id = self.core().actor_id().inner();
1558 let topic = get_trades_topic(instrument_id);
1559
1560 let handler = TypedHandler::from(move |trade: &TradeTick| {
1561 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1562 });
1563
1564 DataActorCore::subscribe_trades(
1565 self.core_mut(),
1566 topic,
1567 handler,
1568 instrument_id,
1569 client_id,
1570 params,
1571 );
1572 }
1573
1574 fn subscribe_bars(
1576 &mut self,
1577 bar_type: BarType,
1578 client_id: Option<ClientId>,
1579 params: Option<Params>,
1580 ) where
1581 Self: DataActorNative,
1582 Self: 'static + Debug + Sized,
1583 {
1584 let actor_id = self.core().actor_id().inner();
1585 let topic = get_bars_topic(bar_type);
1586
1587 let handler = TypedHandler::from(move |bar: &Bar| {
1588 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1589 });
1590
1591 DataActorCore::subscribe_bars(self.core_mut(), topic, handler, bar_type, client_id, params);
1592 }
1593
1594 fn subscribe_mark_prices(
1596 &mut self,
1597 instrument_id: InstrumentId,
1598 client_id: Option<ClientId>,
1599 params: Option<Params>,
1600 ) where
1601 Self: DataActorNative,
1602 Self: 'static + Debug + Sized,
1603 {
1604 let actor_id = self.core().actor_id().inner();
1605 let topic = get_mark_price_topic(instrument_id);
1606
1607 let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1608 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1609 });
1610
1611 DataActorCore::subscribe_mark_prices(
1612 self.core_mut(),
1613 topic,
1614 handler,
1615 instrument_id,
1616 client_id,
1617 params,
1618 );
1619 }
1620
1621 fn subscribe_index_prices(
1623 &mut self,
1624 instrument_id: InstrumentId,
1625 client_id: Option<ClientId>,
1626 params: Option<Params>,
1627 ) where
1628 Self: DataActorNative,
1629 Self: 'static + Debug + Sized,
1630 {
1631 let actor_id = self.core().actor_id().inner();
1632 let topic = get_index_price_topic(instrument_id);
1633
1634 let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1635 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1636 });
1637
1638 DataActorCore::subscribe_index_prices(
1639 self.core_mut(),
1640 topic,
1641 handler,
1642 instrument_id,
1643 client_id,
1644 params,
1645 );
1646 }
1647
1648 fn subscribe_funding_rates(
1650 &mut self,
1651 instrument_id: InstrumentId,
1652 client_id: Option<ClientId>,
1653 params: Option<Params>,
1654 ) where
1655 Self: DataActorNative,
1656 Self: 'static + Debug + Sized,
1657 {
1658 let actor_id = self.core().actor_id().inner();
1659 let topic = get_funding_rate_topic(instrument_id);
1660
1661 let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1662 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1663 });
1664
1665 DataActorCore::subscribe_funding_rates(
1666 self.core_mut(),
1667 topic,
1668 handler,
1669 instrument_id,
1670 client_id,
1671 params,
1672 );
1673 }
1674
1675 fn subscribe_option_greeks(
1677 &mut self,
1678 instrument_id: InstrumentId,
1679 client_id: Option<ClientId>,
1680 params: Option<Params>,
1681 ) where
1682 Self: DataActorNative,
1683 Self: 'static + Debug + Sized,
1684 {
1685 let actor_id = self.core().actor_id().inner();
1686 let topic = get_option_greeks_topic(instrument_id);
1687
1688 let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1689 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1690 actor.handle_option_greeks(option_greeks);
1691 } else {
1692 log::error!("Actor {actor_id} not found for option greeks handling");
1693 }
1694 });
1695
1696 DataActorCore::subscribe_option_greeks(
1697 self.core_mut(),
1698 topic,
1699 handler,
1700 instrument_id,
1701 client_id,
1702 params,
1703 );
1704 }
1705
1706 fn subscribe_instrument_status(
1708 &mut self,
1709 instrument_id: InstrumentId,
1710 client_id: Option<ClientId>,
1711 params: Option<Params>,
1712 ) where
1713 Self: DataActorNative,
1714 Self: 'static + Debug + Sized,
1715 {
1716 let actor_id = self.core().actor_id().inner();
1717 let topic = get_instrument_status_topic(instrument_id);
1718
1719 let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1720 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1721 });
1722
1723 DataActorCore::subscribe_instrument_status(
1724 self.core_mut(),
1725 topic,
1726 handler,
1727 instrument_id,
1728 client_id,
1729 params,
1730 );
1731 }
1732
1733 fn subscribe_instrument_close(
1735 &mut self,
1736 instrument_id: InstrumentId,
1737 client_id: Option<ClientId>,
1738 params: Option<Params>,
1739 ) where
1740 Self: DataActorNative,
1741 Self: 'static + Debug + Sized,
1742 {
1743 let actor_id = self.core().actor_id().inner();
1744 let topic = get_instrument_close_topic(instrument_id);
1745
1746 let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1747 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1748 });
1749
1750 DataActorCore::subscribe_instrument_close(
1751 self.core_mut(),
1752 topic,
1753 handler,
1754 instrument_id,
1755 client_id,
1756 params,
1757 );
1758 }
1759
1760 fn subscribe_option_chain(
1765 &mut self,
1766 series_id: OptionSeriesId,
1767 strike_range: StrikeRange,
1768 snapshot_interval_ms: Option<u64>,
1769 client_id: Option<ClientId>,
1770 params: Option<Params>,
1771 ) where
1772 Self: DataActorNative,
1773 Self: 'static + Debug + Sized,
1774 {
1775 let actor_id = self.core().actor_id().inner();
1776 let topic = get_option_chain_topic(series_id);
1777
1778 let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1779 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1780 actor.handle_option_chain(slice);
1781 } else {
1782 log::error!("Actor {actor_id} not found for option chain handling");
1783 }
1784 });
1785
1786 DataActorCore::subscribe_option_chain(
1787 self.core_mut(),
1788 topic,
1789 handler,
1790 series_id,
1791 strike_range,
1792 snapshot_interval_ms,
1793 client_id,
1794 params,
1795 );
1796 }
1797
1798 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1800 where
1801 Self: DataActorNative,
1802 Self: 'static + Debug + Sized,
1803 {
1804 let actor_id = self.core().actor_id().inner();
1805 let topic = get_order_filled_topic(instrument_id);
1806
1807 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1808 if let OrderEventAny::Filled(filled) = event {
1809 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1810 }
1811 });
1812
1813 DataActorCore::subscribe_order_fills(self.core_mut(), topic, handler);
1814 }
1815
1816 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1818 where
1819 Self: DataActorNative,
1820 Self: 'static + Debug + Sized,
1821 {
1822 let actor_id = self.core().actor_id().inner();
1823 let topic = get_order_canceled_topic(instrument_id);
1824
1825 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1826 if let OrderEventAny::Canceled(canceled) = event {
1827 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1828 }
1829 });
1830
1831 DataActorCore::subscribe_order_cancels(self.core_mut(), topic, handler);
1832 }
1833
1834 #[cfg(feature = "defi")]
1835 fn subscribe_blocks(
1837 &mut self,
1838 chain: Blockchain,
1839 client_id: Option<ClientId>,
1840 params: Option<Params>,
1841 ) where
1842 Self: DataActorNative,
1843 Self: 'static + Debug + Sized,
1844 {
1845 let actor_id = self.core().actor_id().inner();
1846 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1847
1848 let handler = TypedHandler::from(move |block: &Block| {
1849 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1850 });
1851
1852 DataActorCore::subscribe_blocks(self.core_mut(), topic, handler, chain, client_id, params);
1853 }
1854
1855 #[cfg(feature = "defi")]
1856 fn subscribe_pool(
1858 &mut self,
1859 instrument_id: InstrumentId,
1860 client_id: Option<ClientId>,
1861 params: Option<Params>,
1862 ) where
1863 Self: DataActorNative,
1864 Self: 'static + Debug + Sized,
1865 {
1866 let actor_id = self.core().actor_id().inner();
1867 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1868
1869 let handler = TypedHandler::from(move |pool: &Pool| {
1870 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1871 });
1872
1873 DataActorCore::subscribe_pool(
1874 self.core_mut(),
1875 topic,
1876 handler,
1877 instrument_id,
1878 client_id,
1879 params,
1880 );
1881 }
1882
1883 #[cfg(feature = "defi")]
1884 fn subscribe_pool_swaps(
1886 &mut self,
1887 instrument_id: InstrumentId,
1888 client_id: Option<ClientId>,
1889 params: Option<Params>,
1890 ) where
1891 Self: DataActorNative,
1892 Self: 'static + Debug + Sized,
1893 {
1894 let actor_id = self.core().actor_id().inner();
1895 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1896
1897 let handler = TypedHandler::from(move |swap: &PoolSwap| {
1898 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1899 });
1900
1901 DataActorCore::subscribe_pool_swaps(
1902 self.core_mut(),
1903 topic,
1904 handler,
1905 instrument_id,
1906 client_id,
1907 params,
1908 );
1909 }
1910
1911 #[cfg(feature = "defi")]
1912 fn subscribe_pool_liquidity_updates(
1914 &mut self,
1915 instrument_id: InstrumentId,
1916 client_id: Option<ClientId>,
1917 params: Option<Params>,
1918 ) where
1919 Self: DataActorNative,
1920 Self: 'static + Debug + Sized,
1921 {
1922 let actor_id = self.core().actor_id().inner();
1923 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1924
1925 let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1926 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1927 });
1928
1929 DataActorCore::subscribe_pool_liquidity_updates(
1930 self.core_mut(),
1931 topic,
1932 handler,
1933 instrument_id,
1934 client_id,
1935 params,
1936 );
1937 }
1938
1939 #[cfg(feature = "defi")]
1940 fn subscribe_pool_fee_collects(
1942 &mut self,
1943 instrument_id: InstrumentId,
1944 client_id: Option<ClientId>,
1945 params: Option<Params>,
1946 ) where
1947 Self: DataActorNative,
1948 Self: 'static + Debug + Sized,
1949 {
1950 let actor_id = self.core().actor_id().inner();
1951 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1952
1953 let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1954 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1955 });
1956
1957 DataActorCore::subscribe_pool_fee_collects(
1958 self.core_mut(),
1959 topic,
1960 handler,
1961 instrument_id,
1962 client_id,
1963 params,
1964 );
1965 }
1966
1967 #[cfg(feature = "defi")]
1968 fn subscribe_pool_flash_events(
1970 &mut self,
1971 instrument_id: InstrumentId,
1972 client_id: Option<ClientId>,
1973 params: Option<Params>,
1974 ) where
1975 Self: DataActorNative,
1976 Self: 'static + Debug + Sized,
1977 {
1978 let actor_id = self.core().actor_id().inner();
1979 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1980
1981 let handler = TypedHandler::from(move |flash: &PoolFlash| {
1982 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1983 });
1984
1985 DataActorCore::subscribe_pool_flash_events(
1986 self.core_mut(),
1987 topic,
1988 handler,
1989 instrument_id,
1990 client_id,
1991 params,
1992 );
1993 }
1994
1995 fn unsubscribe_data(
1997 &mut self,
1998 data_type: DataType,
1999 client_id: Option<ClientId>,
2000 params: Option<Params>,
2001 ) where
2002 Self: DataActorNative,
2003 Self: 'static + Debug + Sized,
2004 {
2005 DataActorCore::unsubscribe_data(self.core_mut(), data_type, client_id, params);
2006 }
2007
2008 fn unsubscribe_signal(&mut self, name: &str)
2010 where
2011 Self: DataActorNative,
2012 Self: 'static + Debug + Sized,
2013 {
2014 DataActorCore::unsubscribe_signal(self.core_mut(), name);
2015 }
2016
2017 fn unsubscribe_instruments(
2019 &mut self,
2020 venue: Venue,
2021 client_id: Option<ClientId>,
2022 params: Option<Params>,
2023 ) where
2024 Self: DataActorNative,
2025 Self: 'static + Debug + Sized,
2026 {
2027 DataActorCore::unsubscribe_instruments(self.core_mut(), venue, client_id, params);
2028 }
2029
2030 fn unsubscribe_instrument(
2032 &mut self,
2033 instrument_id: InstrumentId,
2034 client_id: Option<ClientId>,
2035 params: Option<Params>,
2036 ) where
2037 Self: DataActorNative,
2038 Self: 'static + Debug + Sized,
2039 {
2040 DataActorCore::unsubscribe_instrument(self.core_mut(), instrument_id, client_id, params);
2041 }
2042
2043 fn unsubscribe_book_deltas(
2045 &mut self,
2046 instrument_id: InstrumentId,
2047 client_id: Option<ClientId>,
2048 params: Option<Params>,
2049 ) where
2050 Self: DataActorNative,
2051 Self: 'static + Debug + Sized,
2052 {
2053 DataActorCore::unsubscribe_book_deltas(self.core_mut(), instrument_id, client_id, params);
2054 }
2055
2056 fn unsubscribe_book_at_interval(
2058 &mut self,
2059 instrument_id: InstrumentId,
2060 interval_ms: NonZeroUsize,
2061 client_id: Option<ClientId>,
2062 params: Option<Params>,
2063 ) where
2064 Self: DataActorNative,
2065 Self: 'static + Debug + Sized,
2066 {
2067 DataActorCore::unsubscribe_book_at_interval(
2068 self.core_mut(),
2069 instrument_id,
2070 interval_ms,
2071 client_id,
2072 params,
2073 );
2074 }
2075
2076 fn unsubscribe_quotes(
2078 &mut self,
2079 instrument_id: InstrumentId,
2080 client_id: Option<ClientId>,
2081 params: Option<Params>,
2082 ) where
2083 Self: DataActorNative,
2084 Self: 'static + Debug + Sized,
2085 {
2086 DataActorCore::unsubscribe_quotes(self.core_mut(), instrument_id, client_id, params);
2087 }
2088
2089 fn unsubscribe_trades(
2091 &mut self,
2092 instrument_id: InstrumentId,
2093 client_id: Option<ClientId>,
2094 params: Option<Params>,
2095 ) where
2096 Self: DataActorNative,
2097 Self: 'static + Debug + Sized,
2098 {
2099 DataActorCore::unsubscribe_trades(self.core_mut(), instrument_id, client_id, params);
2100 }
2101
2102 fn unsubscribe_bars(
2104 &mut self,
2105 bar_type: BarType,
2106 client_id: Option<ClientId>,
2107 params: Option<Params>,
2108 ) where
2109 Self: DataActorNative,
2110 Self: 'static + Debug + Sized,
2111 {
2112 DataActorCore::unsubscribe_bars(self.core_mut(), bar_type, client_id, params);
2113 }
2114
2115 fn unsubscribe_mark_prices(
2117 &mut self,
2118 instrument_id: InstrumentId,
2119 client_id: Option<ClientId>,
2120 params: Option<Params>,
2121 ) where
2122 Self: DataActorNative,
2123 Self: 'static + Debug + Sized,
2124 {
2125 DataActorCore::unsubscribe_mark_prices(self.core_mut(), instrument_id, client_id, params);
2126 }
2127
2128 fn unsubscribe_index_prices(
2130 &mut self,
2131 instrument_id: InstrumentId,
2132 client_id: Option<ClientId>,
2133 params: Option<Params>,
2134 ) where
2135 Self: DataActorNative,
2136 Self: 'static + Debug + Sized,
2137 {
2138 DataActorCore::unsubscribe_index_prices(self.core_mut(), instrument_id, client_id, params);
2139 }
2140
2141 fn unsubscribe_funding_rates(
2143 &mut self,
2144 instrument_id: InstrumentId,
2145 client_id: Option<ClientId>,
2146 params: Option<Params>,
2147 ) where
2148 Self: DataActorNative,
2149 Self: 'static + Debug + Sized,
2150 {
2151 DataActorCore::unsubscribe_funding_rates(self.core_mut(), instrument_id, client_id, params);
2152 }
2153
2154 fn unsubscribe_option_greeks(
2156 &mut self,
2157 instrument_id: InstrumentId,
2158 client_id: Option<ClientId>,
2159 params: Option<Params>,
2160 ) where
2161 Self: DataActorNative,
2162 Self: 'static + Debug + Sized,
2163 {
2164 DataActorCore::unsubscribe_option_greeks(self.core_mut(), instrument_id, client_id, params);
2165 }
2166
2167 fn unsubscribe_instrument_status(
2169 &mut self,
2170 instrument_id: InstrumentId,
2171 client_id: Option<ClientId>,
2172 params: Option<Params>,
2173 ) where
2174 Self: DataActorNative,
2175 Self: 'static + Debug + Sized,
2176 {
2177 DataActorCore::unsubscribe_instrument_status(
2178 self.core_mut(),
2179 instrument_id,
2180 client_id,
2181 params,
2182 );
2183 }
2184
2185 fn unsubscribe_instrument_close(
2187 &mut self,
2188 instrument_id: InstrumentId,
2189 client_id: Option<ClientId>,
2190 params: Option<Params>,
2191 ) where
2192 Self: DataActorNative,
2193 Self: 'static + Debug + Sized,
2194 {
2195 DataActorCore::unsubscribe_instrument_close(
2196 self.core_mut(),
2197 instrument_id,
2198 client_id,
2199 params,
2200 );
2201 }
2202
2203 fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
2205 where
2206 Self: DataActorNative,
2207 Self: 'static + Debug + Sized,
2208 {
2209 DataActorCore::unsubscribe_option_chain(self.core_mut(), series_id, client_id);
2210 }
2211
2212 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
2214 where
2215 Self: DataActorNative,
2216 Self: 'static + Debug + Sized,
2217 {
2218 DataActorCore::unsubscribe_order_fills(self.core_mut(), instrument_id);
2219 }
2220
2221 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
2223 where
2224 Self: DataActorNative,
2225 Self: 'static + Debug + Sized,
2226 {
2227 DataActorCore::unsubscribe_order_cancels(self.core_mut(), instrument_id);
2228 }
2229
2230 #[cfg(feature = "defi")]
2231 fn unsubscribe_blocks(
2233 &mut self,
2234 chain: Blockchain,
2235 client_id: Option<ClientId>,
2236 params: Option<Params>,
2237 ) where
2238 Self: DataActorNative,
2239 Self: 'static + Debug + Sized,
2240 {
2241 DataActorCore::unsubscribe_blocks(self.core_mut(), chain, client_id, params);
2242 }
2243
2244 #[cfg(feature = "defi")]
2245 fn unsubscribe_pool(
2247 &mut self,
2248 instrument_id: InstrumentId,
2249 client_id: Option<ClientId>,
2250 params: Option<Params>,
2251 ) where
2252 Self: DataActorNative,
2253 Self: 'static + Debug + Sized,
2254 {
2255 DataActorCore::unsubscribe_pool(self.core_mut(), instrument_id, client_id, params);
2256 }
2257
2258 #[cfg(feature = "defi")]
2259 fn unsubscribe_pool_swaps(
2261 &mut self,
2262 instrument_id: InstrumentId,
2263 client_id: Option<ClientId>,
2264 params: Option<Params>,
2265 ) where
2266 Self: DataActorNative,
2267 Self: 'static + Debug + Sized,
2268 {
2269 DataActorCore::unsubscribe_pool_swaps(self.core_mut(), instrument_id, client_id, params);
2270 }
2271
2272 #[cfg(feature = "defi")]
2273 fn unsubscribe_pool_liquidity_updates(
2275 &mut self,
2276 instrument_id: InstrumentId,
2277 client_id: Option<ClientId>,
2278 params: Option<Params>,
2279 ) where
2280 Self: DataActorNative,
2281 Self: 'static + Debug + Sized,
2282 {
2283 DataActorCore::unsubscribe_pool_liquidity_updates(
2284 self.core_mut(),
2285 instrument_id,
2286 client_id,
2287 params,
2288 );
2289 }
2290
2291 #[cfg(feature = "defi")]
2292 fn unsubscribe_pool_fee_collects(
2294 &mut self,
2295 instrument_id: InstrumentId,
2296 client_id: Option<ClientId>,
2297 params: Option<Params>,
2298 ) where
2299 Self: DataActorNative,
2300 Self: 'static + Debug + Sized,
2301 {
2302 DataActorCore::unsubscribe_pool_fee_collects(
2303 self.core_mut(),
2304 instrument_id,
2305 client_id,
2306 params,
2307 );
2308 }
2309
2310 #[cfg(feature = "defi")]
2311 fn unsubscribe_pool_flash_events(
2313 &mut self,
2314 instrument_id: InstrumentId,
2315 client_id: Option<ClientId>,
2316 params: Option<Params>,
2317 ) where
2318 Self: DataActorNative,
2319 Self: 'static + Debug + Sized,
2320 {
2321 DataActorCore::unsubscribe_pool_flash_events(
2322 self.core_mut(),
2323 instrument_id,
2324 client_id,
2325 params,
2326 );
2327 }
2328
2329 fn request_data(
2335 &mut self,
2336 data_type: DataType,
2337 client_id: ClientId,
2338 start: Option<DateTime<Utc>>,
2339 end: Option<DateTime<Utc>>,
2340 limit: Option<NonZeroUsize>,
2341 params: Option<Params>,
2342 ) -> anyhow::Result<UUID4>
2343 where
2344 Self: DataActorNative,
2345 Self: 'static + Debug + Sized,
2346 {
2347 let actor_id = self.core().actor_id().inner();
2348 let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
2349 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
2350 });
2351
2352 DataActorCore::request_data(
2353 self.core_mut(),
2354 data_type,
2355 client_id,
2356 start,
2357 end,
2358 limit,
2359 params,
2360 handler,
2361 )
2362 }
2363
2364 fn request_instrument(
2370 &mut self,
2371 instrument_id: InstrumentId,
2372 start: Option<DateTime<Utc>>,
2373 end: Option<DateTime<Utc>>,
2374 client_id: Option<ClientId>,
2375 params: Option<Params>,
2376 ) -> anyhow::Result<UUID4>
2377 where
2378 Self: DataActorNative,
2379 Self: 'static + Debug + Sized,
2380 {
2381 let actor_id = self.core().actor_id().inner();
2382 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
2383 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
2384 });
2385
2386 DataActorCore::request_instrument(
2387 self.core_mut(),
2388 instrument_id,
2389 start,
2390 end,
2391 client_id,
2392 params,
2393 handler,
2394 )
2395 }
2396
2397 fn request_instruments(
2403 &mut self,
2404 venue: Option<Venue>,
2405 start: Option<DateTime<Utc>>,
2406 end: Option<DateTime<Utc>>,
2407 client_id: Option<ClientId>,
2408 params: Option<Params>,
2409 ) -> anyhow::Result<UUID4>
2410 where
2411 Self: DataActorNative,
2412 Self: 'static + Debug + Sized,
2413 {
2414 let actor_id = self.core().actor_id().inner();
2415 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
2416 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
2417 });
2418
2419 DataActorCore::request_instruments(
2420 self.core_mut(),
2421 venue,
2422 start,
2423 end,
2424 client_id,
2425 params,
2426 handler,
2427 )
2428 }
2429
2430 fn request_book_snapshot(
2436 &mut self,
2437 instrument_id: InstrumentId,
2438 depth: Option<NonZeroUsize>,
2439 client_id: Option<ClientId>,
2440 params: Option<Params>,
2441 ) -> anyhow::Result<UUID4>
2442 where
2443 Self: DataActorNative,
2444 Self: 'static + Debug + Sized,
2445 {
2446 let actor_id = self.core().actor_id().inner();
2447 let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
2448 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
2449 });
2450
2451 DataActorCore::request_book_snapshot(
2452 self.core_mut(),
2453 instrument_id,
2454 depth,
2455 client_id,
2456 params,
2457 handler,
2458 )
2459 }
2460
2461 fn request_book_deltas(
2467 &mut self,
2468 instrument_id: InstrumentId,
2469 start: Option<DateTime<Utc>>,
2470 end: Option<DateTime<Utc>>,
2471 limit: Option<NonZeroUsize>,
2472 client_id: Option<ClientId>,
2473 params: Option<Params>,
2474 ) -> anyhow::Result<UUID4>
2475 where
2476 Self: DataActorNative,
2477 Self: 'static + Debug + Sized,
2478 {
2479 let actor_id = self.core().actor_id().inner();
2480 let handler = ShareableMessageHandler::from_typed(move |resp: &BookDeltasResponse| {
2481 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas_response(resp);
2482 });
2483
2484 DataActorCore::request_book_deltas(
2485 self.core_mut(),
2486 instrument_id,
2487 start,
2488 end,
2489 limit,
2490 client_id,
2491 params,
2492 handler,
2493 )
2494 }
2495
2496 #[expect(clippy::too_many_arguments)]
2502 fn request_book_depth(
2503 &mut self,
2504 instrument_id: InstrumentId,
2505 start: Option<DateTime<Utc>>,
2506 end: Option<DateTime<Utc>>,
2507 limit: Option<NonZeroUsize>,
2508 depth: Option<NonZeroUsize>,
2509 client_id: Option<ClientId>,
2510 params: Option<Params>,
2511 ) -> anyhow::Result<UUID4>
2512 where
2513 Self: DataActorNative,
2514 Self: 'static + Debug + Sized,
2515 {
2516 let actor_id = self.core().actor_id().inner();
2517 let handler = ShareableMessageHandler::from_typed(move |resp: &BookDepthResponse| {
2518 get_actor_unchecked::<Self>(&actor_id).handle_book_depth_response(resp);
2519 });
2520
2521 DataActorCore::request_book_depth(
2522 self.core_mut(),
2523 instrument_id,
2524 start,
2525 end,
2526 limit,
2527 depth,
2528 client_id,
2529 params,
2530 handler,
2531 )
2532 }
2533
2534 fn request_quotes(
2540 &mut self,
2541 instrument_id: InstrumentId,
2542 start: Option<DateTime<Utc>>,
2543 end: Option<DateTime<Utc>>,
2544 limit: Option<NonZeroUsize>,
2545 client_id: Option<ClientId>,
2546 params: Option<Params>,
2547 ) -> anyhow::Result<UUID4>
2548 where
2549 Self: DataActorNative,
2550 Self: 'static + Debug + Sized,
2551 {
2552 let actor_id = self.core().actor_id().inner();
2553 let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2554 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2555 });
2556
2557 DataActorCore::request_quotes(
2558 self.core_mut(),
2559 instrument_id,
2560 start,
2561 end,
2562 limit,
2563 client_id,
2564 params,
2565 handler,
2566 )
2567 }
2568
2569 fn request_trades(
2575 &mut self,
2576 instrument_id: InstrumentId,
2577 start: Option<DateTime<Utc>>,
2578 end: Option<DateTime<Utc>>,
2579 limit: Option<NonZeroUsize>,
2580 client_id: Option<ClientId>,
2581 params: Option<Params>,
2582 ) -> anyhow::Result<UUID4>
2583 where
2584 Self: DataActorNative,
2585 Self: 'static + Debug + Sized,
2586 {
2587 let actor_id = self.core().actor_id().inner();
2588 let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2589 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2590 });
2591
2592 DataActorCore::request_trades(
2593 self.core_mut(),
2594 instrument_id,
2595 start,
2596 end,
2597 limit,
2598 client_id,
2599 params,
2600 handler,
2601 )
2602 }
2603
2604 fn request_bars(
2610 &mut self,
2611 bar_type: BarType,
2612 start: Option<DateTime<Utc>>,
2613 end: Option<DateTime<Utc>>,
2614 limit: Option<NonZeroUsize>,
2615 client_id: Option<ClientId>,
2616 params: Option<Params>,
2617 ) -> anyhow::Result<UUID4>
2618 where
2619 Self: DataActorNative,
2620 Self: 'static + Debug + Sized,
2621 {
2622 let actor_id = self.core().actor_id().inner();
2623 let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2624 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2625 });
2626
2627 DataActorCore::request_bars(
2628 self.core_mut(),
2629 bar_type,
2630 start,
2631 end,
2632 limit,
2633 client_id,
2634 params,
2635 handler,
2636 )
2637 }
2638
2639 fn request_funding_rates(
2645 &mut self,
2646 instrument_id: InstrumentId,
2647 start: Option<DateTime<Utc>>,
2648 end: Option<DateTime<Utc>>,
2649 limit: Option<NonZeroUsize>,
2650 client_id: Option<ClientId>,
2651 params: Option<Params>,
2652 ) -> anyhow::Result<UUID4>
2653 where
2654 Self: DataActorNative,
2655 Self: 'static + Debug + Sized,
2656 {
2657 let actor_id = self.core().actor_id().inner();
2658 let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2659 get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2660 });
2661
2662 DataActorCore::request_funding_rates(
2663 self.core_mut(),
2664 instrument_id,
2665 start,
2666 end,
2667 limit,
2668 client_id,
2669 params,
2670 handler,
2671 )
2672 }
2673}
2674
2675impl<T> Actor for T
2677where
2678 T: DataActor + DataActorNative + Debug + 'static,
2679{
2680 fn id(&self) -> Ustr {
2681 self.core().actor_id.inner()
2682 }
2683
2684 #[allow(unused_variables)]
2685 fn handle(&mut self, msg: &dyn Any) {
2686 }
2688
2689 fn as_any(&self) -> &dyn Any {
2690 self
2691 }
2692}
2693
2694impl<T> Component for T
2696where
2697 T: DataActor + DataActorNative + Debug + 'static,
2698{
2699 fn component_id(&self) -> ComponentId {
2700 ComponentId::new(self.core().actor_id.inner().as_str())
2701 }
2702
2703 fn state(&self) -> ComponentState {
2704 self.core().state
2705 }
2706
2707 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2708 let core = self.core_mut();
2709 core.state = core.state.transition(&trigger)?;
2710 log::info!(
2711 component = core.actor_id.inner().as_str();
2712 "{}",
2713 core.state.variant_name()
2714 );
2715 Ok(())
2716 }
2717
2718 fn register(
2719 &mut self,
2720 trader_id: TraderId,
2721 clock: Rc<RefCell<dyn Clock>>,
2722 cache: Rc<RefCell<Cache>>,
2723 ) -> anyhow::Result<()> {
2724 DataActorCore::register(self.core_mut(), trader_id, clock.clone(), cache)?;
2725
2726 let actor_id = self.core().actor_id().inner();
2728 let callback = TimeEventCallback::from(move |event: TimeEvent| {
2729 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2730 actor.handle_time_event(&event);
2731 } else {
2732 log::error!("Actor {actor_id} not found for time event handling");
2733 }
2734 });
2735
2736 clock.borrow_mut().register_default_handler(callback);
2737
2738 self.initialize()
2739 }
2740
2741 fn on_start(&mut self) -> anyhow::Result<()> {
2742 DataActor::on_start(self)
2743 }
2744
2745 fn on_stop(&mut self) -> anyhow::Result<()> {
2746 DataActor::on_stop(self)
2747 }
2748
2749 fn on_resume(&mut self) -> anyhow::Result<()> {
2750 DataActor::on_resume(self)
2751 }
2752
2753 fn on_degrade(&mut self) -> anyhow::Result<()> {
2754 DataActor::on_degrade(self)
2755 }
2756
2757 fn on_fault(&mut self) -> anyhow::Result<()> {
2758 DataActor::on_fault(self)
2759 }
2760
2761 fn on_reset(&mut self) -> anyhow::Result<()> {
2762 DataActor::on_reset(self)
2763 }
2764
2765 fn on_dispose(&mut self) -> anyhow::Result<()> {
2766 DataActor::on_dispose(self)
2767 }
2768}
2769
2770#[derive(Clone)]
2772#[allow(
2773 dead_code,
2774 reason = "TODO: Under development (pending_requests, signal_classes)"
2775)]
2776pub struct DataActorCore {
2777 pub actor_id: ActorId,
2779 pub config: DataActorConfig,
2781 trader_id: Option<TraderId>,
2782 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2785 topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2786 instrument_handlers: AHashMap<MStr<Pattern>, TypedHandler<InstrumentAny>>,
2787 deltas_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDeltas>>,
2788 depth10_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDepth10>>,
2789 book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2790 quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2791 trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2792 bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2793 mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2794 index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2795 funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2796 option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2797 option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2798 order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2799 #[cfg(feature = "defi")]
2800 block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2801 #[cfg(feature = "defi")]
2802 pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2803 #[cfg(feature = "defi")]
2804 pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2805 #[cfg(feature = "defi")]
2806 pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2807 #[cfg(feature = "defi")]
2808 pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2809 #[cfg(feature = "defi")]
2810 pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2811 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2813 signal_classes: AHashMap<String, String>,
2814 indicators: Indicators,
2815}
2816
2817impl Debug for DataActorCore {
2818 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2819 f.debug_struct(stringify!(DataActorCore))
2820 .field("actor_id", &self.actor_id)
2821 .field("config", &self.config)
2822 .field("state", &self.state)
2823 .field("trader_id", &self.trader_id)
2824 .finish()
2825 }
2826}
2827
2828impl DataActorCore {
2829 pub(crate) fn add_subscription_any(
2833 &mut self,
2834 topic: MStr<Topic>,
2835 handler: ShareableMessageHandler,
2836 ) {
2837 let pattern: MStr<Pattern> = topic.into();
2838 if self.topic_handlers.contains_key(&pattern) {
2839 log::warn!(
2840 "Actor {} attempted duplicate subscription to topic '{topic}'",
2841 self.actor_id,
2842 );
2843 return;
2844 }
2845
2846 self.topic_handlers.insert(pattern, handler.clone());
2847 msgbus::subscribe_any(pattern, handler, None);
2848 }
2849
2850 pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2854 let pattern: MStr<Pattern> = topic.into();
2855 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2856 msgbus::unsubscribe_any(pattern, &handler);
2857 } else {
2858 log::warn!(
2859 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2860 self.actor_id,
2861 );
2862 }
2863 }
2864
2865 pub(crate) fn add_quote_subscription(
2866 &mut self,
2867 topic: MStr<Topic>,
2868 handler: TypedHandler<QuoteTick>,
2869 ) {
2870 if self.quote_handlers.contains_key(&topic) {
2871 log::warn!(
2872 "Actor {} attempted duplicate quote subscription to '{topic}'",
2873 self.actor_id
2874 );
2875 return;
2876 }
2877 self.quote_handlers.insert(topic, handler.clone());
2878 msgbus::subscribe_quotes(topic.into(), handler, None);
2879 }
2880
2881 #[allow(dead_code)]
2882 pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2883 if let Some(handler) = self.quote_handlers.remove(&topic) {
2884 msgbus::unsubscribe_quotes(topic.into(), &handler);
2885 }
2886 }
2887
2888 pub(crate) fn add_trade_subscription(
2889 &mut self,
2890 topic: MStr<Topic>,
2891 handler: TypedHandler<TradeTick>,
2892 ) {
2893 if self.trade_handlers.contains_key(&topic) {
2894 log::warn!(
2895 "Actor {} attempted duplicate trade subscription to '{topic}'",
2896 self.actor_id
2897 );
2898 return;
2899 }
2900 self.trade_handlers.insert(topic, handler.clone());
2901 msgbus::subscribe_trades(topic.into(), handler, None);
2902 }
2903
2904 #[allow(dead_code)]
2905 pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2906 if let Some(handler) = self.trade_handlers.remove(&topic) {
2907 msgbus::unsubscribe_trades(topic.into(), &handler);
2908 }
2909 }
2910
2911 pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2912 if self.bar_handlers.contains_key(&topic) {
2913 log::warn!(
2914 "Actor {} attempted duplicate bar subscription to '{topic}'",
2915 self.actor_id
2916 );
2917 return;
2918 }
2919 self.bar_handlers.insert(topic, handler.clone());
2920 msgbus::subscribe_bars(topic.into(), handler, None);
2921 }
2922
2923 #[allow(dead_code)]
2924 pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2925 if let Some(handler) = self.bar_handlers.remove(&topic) {
2926 msgbus::unsubscribe_bars(topic.into(), &handler);
2927 }
2928 }
2929
2930 pub(crate) fn add_order_event_subscription(
2931 &mut self,
2932 topic: MStr<Topic>,
2933 handler: TypedHandler<OrderEventAny>,
2934 ) {
2935 if self.order_event_handlers.contains_key(&topic) {
2936 log::warn!(
2937 "Actor {} attempted duplicate order event subscription to '{topic}'",
2938 self.actor_id
2939 );
2940 return;
2941 }
2942 self.order_event_handlers.insert(topic, handler.clone());
2943 msgbus::subscribe_order_events(topic.into(), handler, None);
2944 }
2945
2946 #[allow(dead_code)]
2947 pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2948 if let Some(handler) = self.order_event_handlers.remove(&topic) {
2949 msgbus::unsubscribe_order_events(topic.into(), &handler);
2950 }
2951 }
2952
2953 pub(crate) fn add_deltas_subscription(
2954 &mut self,
2955 pattern: MStr<Pattern>,
2956 handler: TypedHandler<OrderBookDeltas>,
2957 ) {
2958 if self.deltas_handlers.contains_key(&pattern) {
2959 log::warn!(
2960 "Actor {} attempted duplicate deltas subscription to '{pattern}'",
2961 self.actor_id
2962 );
2963 return;
2964 }
2965 self.deltas_handlers.insert(pattern, handler.clone());
2966 msgbus::subscribe_book_deltas(pattern, handler, None);
2967 }
2968
2969 #[allow(dead_code)]
2970 pub(crate) fn remove_deltas_subscription(&mut self, pattern: MStr<Pattern>) {
2971 if let Some(handler) = self.deltas_handlers.remove(&pattern) {
2972 msgbus::unsubscribe_book_deltas(pattern, &handler);
2973 }
2974 }
2975
2976 #[allow(dead_code)]
2977 pub(crate) fn add_depth10_subscription(
2978 &mut self,
2979 pattern: MStr<Pattern>,
2980 handler: TypedHandler<OrderBookDepth10>,
2981 ) {
2982 if self.depth10_handlers.contains_key(&pattern) {
2983 log::warn!(
2984 "Actor {} attempted duplicate depth10 subscription to '{pattern}'",
2985 self.actor_id
2986 );
2987 return;
2988 }
2989 self.depth10_handlers.insert(pattern, handler.clone());
2990 msgbus::subscribe_book_depth10(pattern, handler, None);
2991 }
2992
2993 #[allow(dead_code)]
2994 pub(crate) fn remove_depth10_subscription(&mut self, pattern: MStr<Pattern>) {
2995 if let Some(handler) = self.depth10_handlers.remove(&pattern) {
2996 msgbus::unsubscribe_book_depth10(pattern, &handler);
2997 }
2998 }
2999
3000 pub(crate) fn add_instrument_subscription(
3001 &mut self,
3002 pattern: MStr<Pattern>,
3003 handler: TypedHandler<InstrumentAny>,
3004 ) {
3005 if self.instrument_handlers.contains_key(&pattern) {
3006 log::warn!(
3007 "Actor {} attempted duplicate instrument subscription to '{pattern}'",
3008 self.actor_id
3009 );
3010 return;
3011 }
3012 self.instrument_handlers.insert(pattern, handler.clone());
3013 msgbus::subscribe_instruments(pattern, handler, None);
3014 }
3015
3016 #[allow(dead_code)]
3017 pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
3018 if let Some(handler) = self.instrument_handlers.remove(&pattern) {
3019 msgbus::unsubscribe_instruments(pattern, &handler);
3020 }
3021 }
3022
3023 pub(crate) fn add_instrument_close_subscription(
3024 &mut self,
3025 topic: MStr<Topic>,
3026 handler: ShareableMessageHandler,
3027 ) {
3028 let pattern: MStr<Pattern> = topic.into();
3029 if self.topic_handlers.contains_key(&pattern) {
3030 log::warn!(
3031 "Actor {} attempted duplicate instrument close subscription to '{topic}'",
3032 self.actor_id
3033 );
3034 return;
3035 }
3036 self.topic_handlers.insert(pattern, handler.clone());
3037 msgbus::subscribe_any(pattern, handler, None);
3038 }
3039
3040 #[allow(dead_code)]
3041 pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
3042 let pattern: MStr<Pattern> = topic.into();
3043 if let Some(handler) = self.topic_handlers.remove(&pattern) {
3044 msgbus::unsubscribe_any(pattern, &handler);
3045 }
3046 }
3047
3048 pub(crate) fn add_book_snapshot_subscription(
3049 &mut self,
3050 topic: MStr<Topic>,
3051 handler: TypedHandler<OrderBook>,
3052 ) {
3053 if self.book_handlers.contains_key(&topic) {
3054 log::warn!(
3055 "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
3056 self.actor_id
3057 );
3058 return;
3059 }
3060 self.book_handlers.insert(topic, handler.clone());
3061 msgbus::subscribe_book_snapshots(topic.into(), handler, None);
3062 }
3063
3064 #[allow(dead_code)]
3065 pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
3066 if let Some(handler) = self.book_handlers.remove(&topic) {
3067 msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
3068 }
3069 }
3070
3071 pub(crate) fn add_mark_price_subscription(
3072 &mut self,
3073 topic: MStr<Topic>,
3074 handler: TypedHandler<MarkPriceUpdate>,
3075 ) {
3076 if self.mark_price_handlers.contains_key(&topic) {
3077 log::warn!(
3078 "Actor {} attempted duplicate mark price subscription to '{topic}'",
3079 self.actor_id
3080 );
3081 return;
3082 }
3083 self.mark_price_handlers.insert(topic, handler.clone());
3084 msgbus::subscribe_mark_prices(topic.into(), handler, None);
3085 }
3086
3087 #[allow(dead_code)]
3088 pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
3089 if let Some(handler) = self.mark_price_handlers.remove(&topic) {
3090 msgbus::unsubscribe_mark_prices(topic.into(), &handler);
3091 }
3092 }
3093
3094 pub(crate) fn add_index_price_subscription(
3095 &mut self,
3096 topic: MStr<Topic>,
3097 handler: TypedHandler<IndexPriceUpdate>,
3098 ) {
3099 if self.index_price_handlers.contains_key(&topic) {
3100 log::warn!(
3101 "Actor {} attempted duplicate index price subscription to '{topic}'",
3102 self.actor_id
3103 );
3104 return;
3105 }
3106 self.index_price_handlers.insert(topic, handler.clone());
3107 msgbus::subscribe_index_prices(topic.into(), handler, None);
3108 }
3109
3110 #[allow(dead_code)]
3111 pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
3112 if let Some(handler) = self.index_price_handlers.remove(&topic) {
3113 msgbus::unsubscribe_index_prices(topic.into(), &handler);
3114 }
3115 }
3116
3117 pub(crate) fn add_funding_rate_subscription(
3118 &mut self,
3119 topic: MStr<Topic>,
3120 handler: TypedHandler<FundingRateUpdate>,
3121 ) {
3122 if self.funding_rate_handlers.contains_key(&topic) {
3123 log::warn!(
3124 "Actor {} attempted duplicate funding rate subscription to '{topic}'",
3125 self.actor_id
3126 );
3127 return;
3128 }
3129 self.funding_rate_handlers.insert(topic, handler.clone());
3130 msgbus::subscribe_funding_rates(topic.into(), handler, None);
3131 }
3132
3133 #[allow(dead_code)]
3134 pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
3135 if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
3136 msgbus::unsubscribe_funding_rates(topic.into(), &handler);
3137 }
3138 }
3139
3140 pub(crate) fn add_option_greeks_subscription(
3141 &mut self,
3142 topic: MStr<Topic>,
3143 handler: TypedHandler<OptionGreeks>,
3144 ) {
3145 if self.option_greeks_handlers.contains_key(&topic) {
3146 log::warn!(
3147 "Actor {} attempted duplicate option greeks subscription to '{topic}'",
3148 self.actor_id
3149 );
3150 return;
3151 }
3152 self.option_greeks_handlers.insert(topic, handler.clone());
3153 msgbus::subscribe_option_greeks(topic.into(), handler, None);
3154 }
3155
3156 #[allow(dead_code)]
3157 pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
3158 if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
3159 msgbus::unsubscribe_option_greeks(topic.into(), &handler);
3160 }
3161 }
3162
3163 pub(crate) fn add_option_chain_subscription(
3164 &mut self,
3165 topic: MStr<Topic>,
3166 handler: TypedHandler<OptionChainSlice>,
3167 ) {
3168 if self.option_chain_handlers.contains_key(&topic) {
3169 log::warn!(
3170 "Actor {} attempted duplicate option chain subscription to '{topic}'",
3171 self.actor_id
3172 );
3173 return;
3174 }
3175 self.option_chain_handlers.insert(topic, handler.clone());
3176 msgbus::subscribe_option_chain(topic.into(), handler, None);
3177 }
3178
3179 pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
3180 if let Some(handler) = self.option_chain_handlers.remove(&topic) {
3181 msgbus::unsubscribe_option_chain(topic.into(), &handler);
3182 }
3183 }
3184
3185 #[cfg(feature = "defi")]
3186 pub(crate) fn add_block_subscription(
3187 &mut self,
3188 topic: MStr<Topic>,
3189 handler: TypedHandler<Block>,
3190 ) {
3191 if self.block_handlers.contains_key(&topic) {
3192 log::warn!(
3193 "Actor {} attempted duplicate block subscription to '{topic}'",
3194 self.actor_id
3195 );
3196 return;
3197 }
3198 self.block_handlers.insert(topic, handler.clone());
3199 msgbus::subscribe_defi_blocks(topic.into(), handler, None);
3200 }
3201
3202 #[cfg(feature = "defi")]
3203 #[allow(dead_code)]
3204 pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
3205 if let Some(handler) = self.block_handlers.remove(&topic) {
3206 msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
3207 }
3208 }
3209
3210 #[cfg(feature = "defi")]
3211 pub(crate) fn add_pool_subscription(
3212 &mut self,
3213 topic: MStr<Topic>,
3214 handler: TypedHandler<Pool>,
3215 ) {
3216 if self.pool_handlers.contains_key(&topic) {
3217 log::warn!(
3218 "Actor {} attempted duplicate pool subscription to '{topic}'",
3219 self.actor_id
3220 );
3221 return;
3222 }
3223 self.pool_handlers.insert(topic, handler.clone());
3224 msgbus::subscribe_defi_pools(topic.into(), handler, None);
3225 }
3226
3227 #[cfg(feature = "defi")]
3228 #[allow(dead_code)]
3229 pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
3230 if let Some(handler) = self.pool_handlers.remove(&topic) {
3231 msgbus::unsubscribe_defi_pools(topic.into(), &handler);
3232 }
3233 }
3234
3235 #[cfg(feature = "defi")]
3236 pub(crate) fn add_pool_swap_subscription(
3237 &mut self,
3238 topic: MStr<Topic>,
3239 handler: TypedHandler<PoolSwap>,
3240 ) {
3241 if self.pool_swap_handlers.contains_key(&topic) {
3242 log::warn!(
3243 "Actor {} attempted duplicate pool swap subscription to '{topic}'",
3244 self.actor_id
3245 );
3246 return;
3247 }
3248 self.pool_swap_handlers.insert(topic, handler.clone());
3249 msgbus::subscribe_defi_swaps(topic.into(), handler, None);
3250 }
3251
3252 #[cfg(feature = "defi")]
3253 #[allow(dead_code)]
3254 pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
3255 if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
3256 msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
3257 }
3258 }
3259
3260 #[cfg(feature = "defi")]
3261 pub(crate) fn add_pool_liquidity_subscription(
3262 &mut self,
3263 topic: MStr<Topic>,
3264 handler: TypedHandler<PoolLiquidityUpdate>,
3265 ) {
3266 if self.pool_liquidity_handlers.contains_key(&topic) {
3267 log::warn!(
3268 "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
3269 self.actor_id
3270 );
3271 return;
3272 }
3273 self.pool_liquidity_handlers.insert(topic, handler.clone());
3274 msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
3275 }
3276
3277 #[cfg(feature = "defi")]
3278 #[allow(dead_code)]
3279 pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
3280 if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
3281 msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
3282 }
3283 }
3284
3285 #[cfg(feature = "defi")]
3286 pub(crate) fn add_pool_collect_subscription(
3287 &mut self,
3288 topic: MStr<Topic>,
3289 handler: TypedHandler<PoolFeeCollect>,
3290 ) {
3291 if self.pool_collect_handlers.contains_key(&topic) {
3292 log::warn!(
3293 "Actor {} attempted duplicate pool collect subscription to '{topic}'",
3294 self.actor_id
3295 );
3296 return;
3297 }
3298 self.pool_collect_handlers.insert(topic, handler.clone());
3299 msgbus::subscribe_defi_collects(topic.into(), handler, None);
3300 }
3301
3302 #[cfg(feature = "defi")]
3303 #[allow(dead_code)]
3304 pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
3305 if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
3306 msgbus::unsubscribe_defi_collects(topic.into(), &handler);
3307 }
3308 }
3309
3310 #[cfg(feature = "defi")]
3311 pub(crate) fn add_pool_flash_subscription(
3312 &mut self,
3313 topic: MStr<Topic>,
3314 handler: TypedHandler<PoolFlash>,
3315 ) {
3316 if self.pool_flash_handlers.contains_key(&topic) {
3317 log::warn!(
3318 "Actor {} attempted duplicate pool flash subscription to '{topic}'",
3319 self.actor_id
3320 );
3321 return;
3322 }
3323 self.pool_flash_handlers.insert(topic, handler.clone());
3324 msgbus::subscribe_defi_flash(topic.into(), handler, None);
3325 }
3326
3327 #[cfg(feature = "defi")]
3328 #[allow(dead_code)]
3329 pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
3330 if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
3331 msgbus::unsubscribe_defi_flash(topic.into(), &handler);
3332 }
3333 }
3334
3335 pub fn new(config: DataActorConfig) -> Self {
3337 let actor_id = config
3338 .actor_id
3339 .unwrap_or_else(|| Self::default_actor_id(&config));
3340
3341 Self {
3342 actor_id,
3343 config,
3344 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
3348 topic_handlers: AHashMap::new(),
3349 instrument_handlers: AHashMap::new(),
3350 deltas_handlers: AHashMap::new(),
3351 depth10_handlers: AHashMap::new(),
3352 book_handlers: AHashMap::new(),
3353 quote_handlers: AHashMap::new(),
3354 trade_handlers: AHashMap::new(),
3355 bar_handlers: AHashMap::new(),
3356 mark_price_handlers: AHashMap::new(),
3357 index_price_handlers: AHashMap::new(),
3358 funding_rate_handlers: AHashMap::new(),
3359 option_greeks_handlers: AHashMap::new(),
3360 option_chain_handlers: AHashMap::new(),
3361 order_event_handlers: AHashMap::new(),
3362 #[cfg(feature = "defi")]
3363 block_handlers: AHashMap::new(),
3364 #[cfg(feature = "defi")]
3365 pool_handlers: AHashMap::new(),
3366 #[cfg(feature = "defi")]
3367 pool_swap_handlers: AHashMap::new(),
3368 #[cfg(feature = "defi")]
3369 pool_liquidity_handlers: AHashMap::new(),
3370 #[cfg(feature = "defi")]
3371 pool_collect_handlers: AHashMap::new(),
3372 #[cfg(feature = "defi")]
3373 pool_flash_handlers: AHashMap::new(),
3374 warning_events: AHashSet::new(),
3375 pending_requests: AHashMap::new(),
3376 signal_classes: AHashMap::new(),
3377 indicators: Indicators::default(),
3378 }
3379 }
3380
3381 #[must_use]
3383 pub fn registered_indicators(&self) -> Vec<SharedActorIndicator> {
3384 self.indicators.registered_indicators()
3385 }
3386
3387 pub fn indicators_initialized(&self) -> anyhow::Result<bool> {
3393 self.indicators.initialized()
3394 }
3395
3396 pub fn register_indicator_for_quote_ticks(
3398 &mut self,
3399 instrument_id: InstrumentId,
3400 indicator: SharedActorIndicator,
3401 ) {
3402 self.indicators
3403 .register_indicator_for_quote_ticks(instrument_id, indicator);
3404 }
3405
3406 pub fn register_indicator_for_trade_ticks(
3408 &mut self,
3409 instrument_id: InstrumentId,
3410 indicator: SharedActorIndicator,
3411 ) {
3412 self.indicators
3413 .register_indicator_for_trade_ticks(instrument_id, indicator);
3414 }
3415
3416 pub fn register_indicator_for_bars(
3418 &mut self,
3419 bar_type: BarType,
3420 indicator: SharedActorIndicator,
3421 ) {
3422 self.indicators
3423 .register_indicator_for_bars(bar_type, indicator);
3424 }
3425
3426 pub(crate) fn handle_indicators_for_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
3427 self.indicators.handle_quote(quote)
3428 }
3429
3430 pub(crate) fn handle_indicators_for_quotes(&self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
3431 self.indicators.handle_quotes(quotes)
3432 }
3433
3434 pub(crate) fn handle_indicators_for_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
3435 self.indicators.handle_trade(trade)
3436 }
3437
3438 pub(crate) fn handle_indicators_for_trades(&self, trades: &[TradeTick]) -> anyhow::Result<()> {
3439 self.indicators.handle_trades(trades)
3440 }
3441
3442 pub(crate) fn handle_indicators_for_bar(&self, bar: &Bar) -> anyhow::Result<()> {
3443 self.indicators.handle_bar(bar)
3444 }
3445
3446 pub(crate) fn handle_indicators_for_bars(&self, bars: &[Bar]) -> anyhow::Result<()> {
3447 self.indicators.handle_bars(bars)
3448 }
3449
3450 #[must_use]
3452 pub fn mem_address(&self) -> String {
3453 format!("{self:p}")
3454 }
3455
3456 pub fn state(&self) -> ComponentState {
3458 self.state
3459 }
3460
3461 pub fn trader_id(&self) -> Option<TraderId> {
3463 self.trader_id
3464 }
3465
3466 pub fn actor_id(&self) -> ActorId {
3468 self.actor_id
3469 }
3470
3471 fn default_actor_id(config: &DataActorConfig) -> ActorId {
3472 let memory_address = std::ptr::from_ref(config) as usize;
3473 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
3474 }
3475
3476 pub fn timestamp_ns(&self) -> UnixNanos {
3478 self.clock_ref().timestamp_ns()
3479 }
3480
3481 fn clock_api(&self) -> ClockApi<'_> {
3482 let clock = self.clock.as_ref().unwrap_or_else(|| {
3483 panic!(
3484 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
3485 self.actor_id, self.trader_id
3486 )
3487 });
3488 ClockApi::new(clock.as_ref())
3489 }
3490
3491 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
3492 self.clock
3493 .as_ref()
3494 .unwrap_or_else(|| {
3495 panic!(
3496 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
3497 self.actor_id, self.trader_id
3498 )
3499 })
3500 .borrow()
3501 }
3502
3503 fn cache_api(&self) -> CacheApi<'_> {
3504 let cache = self.cache.as_ref().unwrap_or_else(|| {
3505 panic!(
3506 "DataActor {} must be registered before calling `cache()` - trader_id: {:?}",
3507 self.actor_id, self.trader_id
3508 )
3509 });
3510 CacheApi::new(cache.as_ref())
3511 }
3512
3513 pub fn register(
3520 &mut self,
3521 trader_id: TraderId,
3522 clock: Rc<RefCell<dyn Clock>>,
3523 cache: Rc<RefCell<Cache>>,
3524 ) -> anyhow::Result<()> {
3525 if let Some(existing_trader_id) = self.trader_id {
3526 anyhow::bail!(
3527 "DataActor {} already registered with trader {existing_trader_id}",
3528 self.actor_id
3529 );
3530 }
3531
3532 {
3534 let _timestamp = clock.borrow().timestamp_ns();
3535 }
3536
3537 {
3539 let _cache_borrow = cache.borrow();
3540 }
3541
3542 self.trader_id = Some(trader_id);
3543 self.clock = Some(clock);
3544 self.cache = Some(cache);
3545
3546 if !self.is_properly_registered() {
3548 anyhow::bail!(
3549 "DataActor {} registration incomplete - validation failed",
3550 self.actor_id
3551 );
3552 }
3553
3554 log::debug!("Registered {} with trader {trader_id}", self.actor_id);
3555 Ok(())
3556 }
3557
3558 pub fn register_warning_event(&mut self, event_type: &str) {
3560 self.warning_events.insert(event_type.to_string());
3561 log::debug!("Registered event type '{event_type}' for warning logs");
3562 }
3563
3564 pub fn deregister_warning_event(&mut self, event_type: &str) {
3566 self.warning_events.remove(event_type);
3567 log::debug!("Deregistered event type '{event_type}' from warning logs");
3568 }
3569
3570 pub fn is_registered(&self) -> bool {
3571 self.trader_id.is_some()
3572 }
3573
3574 pub(crate) fn check_registered(&self) {
3575 assert!(
3576 self.is_registered(),
3577 "Actor has not been registered with a Trader"
3578 );
3579 }
3580
3581 fn is_properly_registered(&self) -> bool {
3583 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
3584 }
3585
3586 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
3587 if self.config.log_commands {
3588 log::info!("{CMD}{SEND} {command:?}");
3589 }
3590
3591 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3592 msgbus::send_data_command(endpoint, command);
3593 }
3594
3595 #[allow(dead_code)]
3596 fn send_data_req(&self, request: &RequestCommand) {
3597 if self.config.log_commands {
3598 log::info!("{REQ}{SEND} {request:?}");
3599 }
3600
3601 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3604 msgbus::send_any(endpoint, request.as_any());
3605 }
3606
3607 pub fn shutdown_system(&self, reason: Option<String>) {
3613 self.check_registered();
3614
3615 let command = ShutdownSystem::new(
3617 self.trader_id().unwrap(),
3618 self.actor_id.inner(),
3619 reason,
3620 UUID4::new(),
3621 self.timestamp_ns(),
3622 None, );
3624
3625 let topic = MessagingSwitchboard::shutdown_system_topic();
3626 msgbus::publish_any(topic, command.as_any());
3627 }
3628
3629 pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3639 self.check_registered();
3640
3641 let topic = get_custom_topic(data_type);
3642 msgbus::publish_any(topic, data);
3643 }
3644
3645 pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3657 self.check_registered();
3658
3659 let now = self.timestamp_ns();
3660 let ts_event = if ts_event.as_u64() == 0 {
3661 now
3662 } else {
3663 ts_event
3664 };
3665 let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3666
3667 let data_type = DataType::new(
3668 &format!(
3669 "Signal{}",
3670 nautilus_core::string::conversions::title_case(name)
3671 ),
3672 None,
3673 None,
3674 );
3675 let data = CustomData::new(Arc::new(signal), data_type);
3676 let topic = get_custom_topic(&data.data_type);
3677 msgbus::publish_any(topic, &data);
3678 }
3679
3680 pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3688 self.check_registered();
3689
3690 let cache = self.cache_rc();
3691 if cache.borrow().synthetic(&synthetic.id).is_some() {
3692 anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3693 }
3694 cache.borrow_mut().add_synthetic(synthetic)
3695 }
3696
3697 pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3705 self.check_registered();
3706
3707 let cache = self.cache_rc();
3708 if cache.borrow().synthetic(&synthetic.id).is_none() {
3709 anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3710 }
3711 cache.borrow_mut().add_synthetic(synthetic)
3712 }
3713
3714 pub fn subscribe_data(
3720 &mut self,
3721 handler: ShareableMessageHandler,
3722 data_type: DataType,
3723 client_id: Option<ClientId>,
3724 params: Option<Params>,
3725 ) {
3726 assert!(
3727 self.is_properly_registered(),
3728 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3729 self.actor_id,
3730 self.trader_id,
3731 self.clock.is_some(),
3732 self.cache.is_some()
3733 );
3734
3735 let topic = get_custom_topic(&data_type);
3736 self.add_subscription_any(topic, handler);
3737
3738 if client_id.is_none() {
3740 return;
3741 }
3742
3743 let command = SubscribeCommand::Data(SubscribeCustomData {
3744 data_type,
3745 client_id,
3746 venue: None,
3747 command_id: UUID4::new(),
3748 ts_init: self.timestamp_ns(),
3749 correlation_id: None,
3750 params,
3751 });
3752
3753 self.send_data_cmd(DataCommand::Subscribe(command));
3754 }
3755
3756 pub fn subscribe_signal(
3764 &mut self,
3765 handler: ShareableMessageHandler,
3766 name: &str,
3767 priority: Option<u32>,
3768 ) {
3769 self.check_registered();
3770
3771 let pattern = get_signal_pattern(name);
3772 if self.topic_handlers.contains_key(&pattern) {
3773 log::warn!(
3774 "Actor {} attempted duplicate signal subscription to '{pattern}'",
3775 self.actor_id,
3776 );
3777 return;
3778 }
3779 self.topic_handlers.insert(pattern, handler.clone());
3780 msgbus::subscribe_any(pattern, handler, priority);
3781 }
3782
3783 pub fn subscribe_quotes(
3785 &mut self,
3786 topic: MStr<Topic>,
3787 handler: TypedHandler<QuoteTick>,
3788 instrument_id: InstrumentId,
3789 client_id: Option<ClientId>,
3790 params: Option<Params>,
3791 ) {
3792 self.check_registered();
3793
3794 self.add_quote_subscription(topic, handler);
3795
3796 let command = SubscribeCommand::Quotes(SubscribeQuotes {
3797 instrument_id,
3798 client_id,
3799 venue: Some(instrument_id.venue),
3800 command_id: UUID4::new(),
3801 ts_init: self.timestamp_ns(),
3802 correlation_id: None,
3803 params,
3804 });
3805
3806 self.send_data_cmd(DataCommand::Subscribe(command));
3807 }
3808
3809 pub fn subscribe_instruments(
3811 &mut self,
3812 pattern: MStr<Pattern>,
3813 handler: TypedHandler<InstrumentAny>,
3814 venue: Venue,
3815 client_id: Option<ClientId>,
3816 params: Option<Params>,
3817 ) {
3818 self.check_registered();
3819
3820 self.add_instrument_subscription(pattern, handler);
3821
3822 let command = SubscribeCommand::Instruments(SubscribeInstruments {
3823 client_id,
3824 venue,
3825 command_id: UUID4::new(),
3826 ts_init: self.timestamp_ns(),
3827 correlation_id: None,
3828 params,
3829 });
3830
3831 self.send_data_cmd(DataCommand::Subscribe(command));
3832 }
3833
3834 pub fn subscribe_instrument(
3836 &mut self,
3837 topic: MStr<Topic>,
3838 handler: TypedHandler<InstrumentAny>,
3839 instrument_id: InstrumentId,
3840 client_id: Option<ClientId>,
3841 params: Option<Params>,
3842 ) {
3843 self.check_registered();
3844
3845 self.add_instrument_subscription(topic.into(), handler);
3846
3847 let command = SubscribeCommand::Instrument(SubscribeInstrument {
3848 instrument_id,
3849 client_id,
3850 venue: Some(instrument_id.venue),
3851 command_id: UUID4::new(),
3852 ts_init: self.timestamp_ns(),
3853 correlation_id: None,
3854 params,
3855 });
3856
3857 self.send_data_cmd(DataCommand::Subscribe(command));
3858 }
3859
3860 #[expect(clippy::too_many_arguments)]
3862 pub fn subscribe_book_deltas(
3863 &mut self,
3864 pattern: MStr<Pattern>,
3865 handler: TypedHandler<OrderBookDeltas>,
3866 instrument_id: InstrumentId,
3867 book_type: BookType,
3868 depth: Option<NonZeroUsize>,
3869 client_id: Option<ClientId>,
3870 managed: bool,
3871 params: Option<Params>,
3872 ) {
3873 self.check_registered();
3874
3875 self.add_deltas_subscription(pattern, handler);
3876
3877 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3878 instrument_id,
3879 book_type,
3880 client_id,
3881 venue: Some(instrument_id.venue),
3882 command_id: UUID4::new(),
3883 ts_init: self.timestamp_ns(),
3884 depth,
3885 managed,
3886 correlation_id: None,
3887 params,
3888 });
3889
3890 self.send_data_cmd(DataCommand::Subscribe(command));
3891 }
3892
3893 #[expect(clippy::too_many_arguments)]
3895 pub fn subscribe_book_at_interval(
3896 &mut self,
3897 topic: MStr<Topic>,
3898 handler: TypedHandler<OrderBook>,
3899 instrument_id: InstrumentId,
3900 book_type: BookType,
3901 depth: Option<NonZeroUsize>,
3902 interval_ms: NonZeroUsize,
3903 client_id: Option<ClientId>,
3904 params: Option<Params>,
3905 ) {
3906 self.check_registered();
3907
3908 self.add_book_snapshot_subscription(topic, handler);
3909
3910 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3911 instrument_id,
3912 book_type,
3913 client_id,
3914 venue: Some(instrument_id.venue),
3915 command_id: UUID4::new(),
3916 ts_init: self.timestamp_ns(),
3917 depth,
3918 interval_ms,
3919 correlation_id: None,
3920 params,
3921 });
3922
3923 self.send_data_cmd(DataCommand::Subscribe(command));
3924 }
3925
3926 pub fn subscribe_trades(
3928 &mut self,
3929 topic: MStr<Topic>,
3930 handler: TypedHandler<TradeTick>,
3931 instrument_id: InstrumentId,
3932 client_id: Option<ClientId>,
3933 params: Option<Params>,
3934 ) {
3935 self.check_registered();
3936
3937 self.add_trade_subscription(topic, handler);
3938
3939 let command = SubscribeCommand::Trades(SubscribeTrades {
3940 instrument_id,
3941 client_id,
3942 venue: Some(instrument_id.venue),
3943 command_id: UUID4::new(),
3944 ts_init: self.timestamp_ns(),
3945 correlation_id: None,
3946 params,
3947 });
3948
3949 self.send_data_cmd(DataCommand::Subscribe(command));
3950 }
3951
3952 pub fn subscribe_bars(
3954 &mut self,
3955 topic: MStr<Topic>,
3956 handler: TypedHandler<Bar>,
3957 bar_type: BarType,
3958 client_id: Option<ClientId>,
3959 params: Option<Params>,
3960 ) {
3961 self.check_registered();
3962
3963 self.add_bar_subscription(topic, handler);
3964
3965 let command = SubscribeCommand::Bars(SubscribeBars {
3966 bar_type,
3967 client_id,
3968 venue: Some(bar_type.instrument_id().venue),
3969 command_id: UUID4::new(),
3970 ts_init: self.timestamp_ns(),
3971 correlation_id: None,
3972 params,
3973 });
3974
3975 self.send_data_cmd(DataCommand::Subscribe(command));
3976 }
3977
3978 pub fn subscribe_mark_prices(
3980 &mut self,
3981 topic: MStr<Topic>,
3982 handler: TypedHandler<MarkPriceUpdate>,
3983 instrument_id: InstrumentId,
3984 client_id: Option<ClientId>,
3985 params: Option<Params>,
3986 ) {
3987 self.check_registered();
3988
3989 self.add_mark_price_subscription(topic, handler);
3990
3991 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3992 instrument_id,
3993 client_id,
3994 venue: Some(instrument_id.venue),
3995 command_id: UUID4::new(),
3996 ts_init: self.timestamp_ns(),
3997 correlation_id: None,
3998 params,
3999 });
4000
4001 self.send_data_cmd(DataCommand::Subscribe(command));
4002 }
4003
4004 pub fn subscribe_index_prices(
4006 &mut self,
4007 topic: MStr<Topic>,
4008 handler: TypedHandler<IndexPriceUpdate>,
4009 instrument_id: InstrumentId,
4010 client_id: Option<ClientId>,
4011 params: Option<Params>,
4012 ) {
4013 self.check_registered();
4014
4015 self.add_index_price_subscription(topic, handler);
4016
4017 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
4018 instrument_id,
4019 client_id,
4020 venue: Some(instrument_id.venue),
4021 command_id: UUID4::new(),
4022 ts_init: self.timestamp_ns(),
4023 correlation_id: None,
4024 params,
4025 });
4026
4027 self.send_data_cmd(DataCommand::Subscribe(command));
4028 }
4029
4030 pub fn subscribe_funding_rates(
4032 &mut self,
4033 topic: MStr<Topic>,
4034 handler: TypedHandler<FundingRateUpdate>,
4035 instrument_id: InstrumentId,
4036 client_id: Option<ClientId>,
4037 params: Option<Params>,
4038 ) {
4039 self.check_registered();
4040
4041 self.add_funding_rate_subscription(topic, handler);
4042
4043 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
4044 instrument_id,
4045 client_id,
4046 venue: Some(instrument_id.venue),
4047 command_id: UUID4::new(),
4048 ts_init: self.timestamp_ns(),
4049 correlation_id: None,
4050 params,
4051 });
4052
4053 self.send_data_cmd(DataCommand::Subscribe(command));
4054 }
4055
4056 pub fn subscribe_option_greeks(
4058 &mut self,
4059 topic: MStr<Topic>,
4060 handler: TypedHandler<OptionGreeks>,
4061 instrument_id: InstrumentId,
4062 client_id: Option<ClientId>,
4063 params: Option<Params>,
4064 ) {
4065 self.check_registered();
4066
4067 self.add_option_greeks_subscription(topic, handler);
4068
4069 let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
4070 instrument_id,
4071 client_id,
4072 venue: Some(instrument_id.venue),
4073 command_id: UUID4::new(),
4074 ts_init: self.timestamp_ns(),
4075 correlation_id: None,
4076 params,
4077 });
4078
4079 self.send_data_cmd(DataCommand::Subscribe(command));
4080 }
4081
4082 pub fn subscribe_instrument_status(
4084 &mut self,
4085 topic: MStr<Topic>,
4086 handler: ShareableMessageHandler,
4087 instrument_id: InstrumentId,
4088 client_id: Option<ClientId>,
4089 params: Option<Params>,
4090 ) {
4091 self.check_registered();
4092
4093 self.add_subscription_any(topic, handler);
4094
4095 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
4096 instrument_id,
4097 client_id,
4098 venue: Some(instrument_id.venue),
4099 command_id: UUID4::new(),
4100 ts_init: self.timestamp_ns(),
4101 correlation_id: None,
4102 params,
4103 });
4104
4105 self.send_data_cmd(DataCommand::Subscribe(command));
4106 }
4107
4108 pub fn subscribe_instrument_close(
4110 &mut self,
4111 topic: MStr<Topic>,
4112 handler: ShareableMessageHandler,
4113 instrument_id: InstrumentId,
4114 client_id: Option<ClientId>,
4115 params: Option<Params>,
4116 ) {
4117 self.check_registered();
4118
4119 self.add_instrument_close_subscription(topic, handler);
4120
4121 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
4122 instrument_id,
4123 client_id,
4124 venue: Some(instrument_id.venue),
4125 command_id: UUID4::new(),
4126 ts_init: self.timestamp_ns(),
4127 correlation_id: None,
4128 params,
4129 });
4130
4131 self.send_data_cmd(DataCommand::Subscribe(command));
4132 }
4133
4134 #[expect(
4136 clippy::too_many_arguments,
4137 reason = "subscription command mirrors the option chain request fields"
4138 )]
4139 pub fn subscribe_option_chain(
4140 &mut self,
4141 topic: MStr<Topic>,
4142 handler: TypedHandler<OptionChainSlice>,
4143 series_id: OptionSeriesId,
4144 strike_range: StrikeRange,
4145 snapshot_interval_ms: Option<u64>,
4146 client_id: Option<ClientId>,
4147 params: Option<Params>,
4148 ) {
4149 self.check_registered();
4150
4151 self.add_option_chain_subscription(topic, handler);
4152
4153 let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
4154 series_id,
4155 strike_range,
4156 snapshot_interval_ms,
4157 UUID4::new(),
4158 self.timestamp_ns(),
4159 client_id,
4160 Some(series_id.venue),
4161 params,
4162 ));
4163
4164 self.send_data_cmd(DataCommand::Subscribe(command));
4165 }
4166
4167 pub fn subscribe_order_fills(
4169 &mut self,
4170 topic: MStr<Topic>,
4171 handler: TypedHandler<OrderEventAny>,
4172 ) {
4173 self.check_registered();
4174 self.add_order_event_subscription(topic, handler);
4175 }
4176
4177 pub fn subscribe_order_cancels(
4179 &mut self,
4180 topic: MStr<Topic>,
4181 handler: TypedHandler<OrderEventAny>,
4182 ) {
4183 self.check_registered();
4184 self.add_order_event_subscription(topic, handler);
4185 }
4186
4187 pub fn unsubscribe_data(
4189 &mut self,
4190 data_type: DataType,
4191 client_id: Option<ClientId>,
4192 params: Option<Params>,
4193 ) {
4194 self.check_registered();
4195
4196 let topic = get_custom_topic(&data_type);
4197 self.remove_subscription_any(topic);
4198
4199 if client_id.is_none() {
4200 return;
4201 }
4202
4203 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
4204 data_type,
4205 client_id,
4206 venue: None,
4207 command_id: UUID4::new(),
4208 ts_init: self.timestamp_ns(),
4209 correlation_id: None,
4210 params,
4211 });
4212
4213 self.send_data_cmd(DataCommand::Unsubscribe(command));
4214 }
4215
4216 pub fn unsubscribe_signal(&mut self, name: &str) {
4222 self.check_registered();
4223
4224 let pattern = get_signal_pattern(name);
4225 if let Some(handler) = self.topic_handlers.remove(&pattern) {
4226 msgbus::unsubscribe_any(pattern, &handler);
4227 } else {
4228 log::warn!(
4229 "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
4230 self.actor_id,
4231 );
4232 }
4233 }
4234
4235 pub fn unsubscribe_instruments(
4237 &mut self,
4238 venue: Venue,
4239 client_id: Option<ClientId>,
4240 params: Option<Params>,
4241 ) {
4242 self.check_registered();
4243
4244 let pattern = get_instruments_pattern(venue);
4245 self.remove_instrument_subscription(pattern);
4246
4247 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
4248 client_id,
4249 venue,
4250 command_id: UUID4::new(),
4251 ts_init: self.timestamp_ns(),
4252 correlation_id: None,
4253 params,
4254 });
4255
4256 self.send_data_cmd(DataCommand::Unsubscribe(command));
4257 }
4258
4259 pub fn unsubscribe_instrument(
4261 &mut self,
4262 instrument_id: InstrumentId,
4263 client_id: Option<ClientId>,
4264 params: Option<Params>,
4265 ) {
4266 self.check_registered();
4267
4268 let topic = get_instrument_topic(instrument_id);
4269 self.remove_instrument_subscription(topic.into());
4270
4271 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
4272 instrument_id,
4273 client_id,
4274 venue: Some(instrument_id.venue),
4275 command_id: UUID4::new(),
4276 ts_init: self.timestamp_ns(),
4277 correlation_id: None,
4278 params,
4279 });
4280
4281 self.send_data_cmd(DataCommand::Unsubscribe(command));
4282 }
4283
4284 pub fn unsubscribe_book_deltas(
4286 &mut self,
4287 instrument_id: InstrumentId,
4288 client_id: Option<ClientId>,
4289 params: Option<Params>,
4290 ) {
4291 self.check_registered();
4292
4293 let pattern = if is_parent_subscription(params.as_ref()) {
4294 get_book_deltas_pattern(instrument_id)
4295 } else {
4296 get_book_deltas_topic(instrument_id).into()
4297 };
4298 self.remove_deltas_subscription(pattern);
4299
4300 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
4301 instrument_id,
4302 client_id,
4303 venue: Some(instrument_id.venue),
4304 command_id: UUID4::new(),
4305 ts_init: self.timestamp_ns(),
4306 correlation_id: None,
4307 params,
4308 });
4309
4310 self.send_data_cmd(DataCommand::Unsubscribe(command));
4311 }
4312
4313 pub fn unsubscribe_book_at_interval(
4315 &mut self,
4316 instrument_id: InstrumentId,
4317 interval_ms: NonZeroUsize,
4318 client_id: Option<ClientId>,
4319 params: Option<Params>,
4320 ) {
4321 self.check_registered();
4322
4323 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
4324 self.remove_book_snapshot_subscription(topic);
4325
4326 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
4327 instrument_id,
4328 interval_ms,
4329 client_id,
4330 venue: Some(instrument_id.venue),
4331 command_id: UUID4::new(),
4332 ts_init: self.timestamp_ns(),
4333 correlation_id: None,
4334 params,
4335 });
4336
4337 self.send_data_cmd(DataCommand::Unsubscribe(command));
4338 }
4339
4340 pub fn unsubscribe_quotes(
4342 &mut self,
4343 instrument_id: InstrumentId,
4344 client_id: Option<ClientId>,
4345 params: Option<Params>,
4346 ) {
4347 self.check_registered();
4348
4349 let topic = get_quotes_topic(instrument_id);
4350 self.remove_quote_subscription(topic);
4351
4352 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
4353 instrument_id,
4354 client_id,
4355 venue: Some(instrument_id.venue),
4356 command_id: UUID4::new(),
4357 ts_init: self.timestamp_ns(),
4358 correlation_id: None,
4359 params,
4360 });
4361
4362 self.send_data_cmd(DataCommand::Unsubscribe(command));
4363 }
4364
4365 pub fn unsubscribe_trades(
4367 &mut self,
4368 instrument_id: InstrumentId,
4369 client_id: Option<ClientId>,
4370 params: Option<Params>,
4371 ) {
4372 self.check_registered();
4373
4374 let topic = get_trades_topic(instrument_id);
4375 self.remove_trade_subscription(topic);
4376
4377 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
4378 instrument_id,
4379 client_id,
4380 venue: Some(instrument_id.venue),
4381 command_id: UUID4::new(),
4382 ts_init: self.timestamp_ns(),
4383 correlation_id: None,
4384 params,
4385 });
4386
4387 self.send_data_cmd(DataCommand::Unsubscribe(command));
4388 }
4389
4390 pub fn unsubscribe_bars(
4392 &mut self,
4393 bar_type: BarType,
4394 client_id: Option<ClientId>,
4395 params: Option<Params>,
4396 ) {
4397 self.check_registered();
4398
4399 let topic = get_bars_topic(bar_type);
4400 self.remove_bar_subscription(topic);
4401
4402 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
4403 bar_type,
4404 client_id,
4405 venue: Some(bar_type.instrument_id().venue),
4406 command_id: UUID4::new(),
4407 ts_init: self.timestamp_ns(),
4408 correlation_id: None,
4409 params,
4410 });
4411
4412 self.send_data_cmd(DataCommand::Unsubscribe(command));
4413 }
4414
4415 pub fn unsubscribe_mark_prices(
4417 &mut self,
4418 instrument_id: InstrumentId,
4419 client_id: Option<ClientId>,
4420 params: Option<Params>,
4421 ) {
4422 self.check_registered();
4423
4424 let topic = get_mark_price_topic(instrument_id);
4425 self.remove_mark_price_subscription(topic);
4426
4427 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
4428 instrument_id,
4429 client_id,
4430 venue: Some(instrument_id.venue),
4431 command_id: UUID4::new(),
4432 ts_init: self.timestamp_ns(),
4433 correlation_id: None,
4434 params,
4435 });
4436
4437 self.send_data_cmd(DataCommand::Unsubscribe(command));
4438 }
4439
4440 pub fn unsubscribe_index_prices(
4442 &mut self,
4443 instrument_id: InstrumentId,
4444 client_id: Option<ClientId>,
4445 params: Option<Params>,
4446 ) {
4447 self.check_registered();
4448
4449 let topic = get_index_price_topic(instrument_id);
4450 self.remove_index_price_subscription(topic);
4451
4452 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
4453 instrument_id,
4454 client_id,
4455 venue: Some(instrument_id.venue),
4456 command_id: UUID4::new(),
4457 ts_init: self.timestamp_ns(),
4458 correlation_id: None,
4459 params,
4460 });
4461
4462 self.send_data_cmd(DataCommand::Unsubscribe(command));
4463 }
4464
4465 pub fn unsubscribe_funding_rates(
4467 &mut self,
4468 instrument_id: InstrumentId,
4469 client_id: Option<ClientId>,
4470 params: Option<Params>,
4471 ) {
4472 self.check_registered();
4473
4474 let topic = get_funding_rate_topic(instrument_id);
4475 self.remove_funding_rate_subscription(topic);
4476
4477 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
4478 instrument_id,
4479 client_id,
4480 venue: Some(instrument_id.venue),
4481 command_id: UUID4::new(),
4482 ts_init: self.timestamp_ns(),
4483 correlation_id: None,
4484 params,
4485 });
4486
4487 self.send_data_cmd(DataCommand::Unsubscribe(command));
4488 }
4489
4490 pub fn unsubscribe_option_greeks(
4492 &mut self,
4493 instrument_id: InstrumentId,
4494 client_id: Option<ClientId>,
4495 params: Option<Params>,
4496 ) {
4497 self.check_registered();
4498
4499 let topic = get_option_greeks_topic(instrument_id);
4500 self.remove_option_greeks_subscription(topic);
4501
4502 let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
4503 instrument_id,
4504 client_id,
4505 venue: Some(instrument_id.venue),
4506 command_id: UUID4::new(),
4507 ts_init: self.timestamp_ns(),
4508 correlation_id: None,
4509 params,
4510 });
4511
4512 self.send_data_cmd(DataCommand::Unsubscribe(command));
4513 }
4514
4515 pub fn unsubscribe_instrument_status(
4517 &mut self,
4518 instrument_id: InstrumentId,
4519 client_id: Option<ClientId>,
4520 params: Option<Params>,
4521 ) {
4522 self.check_registered();
4523
4524 let topic = get_instrument_status_topic(instrument_id);
4525 self.remove_subscription_any(topic);
4526
4527 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
4528 instrument_id,
4529 client_id,
4530 venue: Some(instrument_id.venue),
4531 command_id: UUID4::new(),
4532 ts_init: self.timestamp_ns(),
4533 correlation_id: None,
4534 params,
4535 });
4536
4537 self.send_data_cmd(DataCommand::Unsubscribe(command));
4538 }
4539
4540 pub fn unsubscribe_instrument_close(
4542 &mut self,
4543 instrument_id: InstrumentId,
4544 client_id: Option<ClientId>,
4545 params: Option<Params>,
4546 ) {
4547 self.check_registered();
4548
4549 let topic = get_instrument_close_topic(instrument_id);
4550 self.remove_instrument_close_subscription(topic);
4551
4552 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
4553 instrument_id,
4554 client_id,
4555 venue: Some(instrument_id.venue),
4556 command_id: UUID4::new(),
4557 ts_init: self.timestamp_ns(),
4558 correlation_id: None,
4559 params,
4560 });
4561
4562 self.send_data_cmd(DataCommand::Unsubscribe(command));
4563 }
4564
4565 pub fn unsubscribe_option_chain(
4567 &mut self,
4568 series_id: OptionSeriesId,
4569 client_id: Option<ClientId>,
4570 ) {
4571 self.check_registered();
4572
4573 let topic = get_option_chain_topic(series_id);
4574 self.remove_option_chain_subscription(topic);
4575
4576 let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
4577 series_id,
4578 UUID4::new(),
4579 self.timestamp_ns(),
4580 client_id,
4581 Some(series_id.venue),
4582 ));
4583
4584 self.send_data_cmd(DataCommand::Unsubscribe(command));
4585 }
4586
4587 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
4589 self.check_registered();
4590
4591 let topic = get_order_filled_topic(instrument_id);
4592 self.remove_order_event_subscription(topic);
4593 }
4594
4595 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
4597 self.check_registered();
4598
4599 let topic = get_order_canceled_topic(instrument_id);
4600 self.remove_order_event_subscription(topic);
4601 }
4602
4603 #[expect(clippy::too_many_arguments)]
4609 pub fn request_data(
4610 &self,
4611 data_type: DataType,
4612 client_id: ClientId,
4613 start: Option<DateTime<Utc>>,
4614 end: Option<DateTime<Utc>>,
4615 limit: Option<NonZeroUsize>,
4616 params: Option<Params>,
4617 handler: ShareableMessageHandler,
4618 ) -> anyhow::Result<UUID4> {
4619 self.check_registered();
4620
4621 let now = self.clock_ref().utc_now();
4622 check_timestamps(now, start, end)?;
4623
4624 let request_id = UUID4::new();
4625 let command = RequestCommand::Data(RequestCustomData {
4626 client_id,
4627 data_type,
4628 start,
4629 end,
4630 limit,
4631 request_id,
4632 ts_init: self.timestamp_ns(),
4633 params,
4634 });
4635
4636 get_message_bus()
4637 .borrow_mut()
4638 .register_response_handler(command.request_id(), handler)?;
4639
4640 self.send_data_cmd(DataCommand::Request(command));
4641
4642 Ok(request_id)
4643 }
4644
4645 pub fn request_instrument(
4651 &self,
4652 instrument_id: InstrumentId,
4653 start: Option<DateTime<Utc>>,
4654 end: Option<DateTime<Utc>>,
4655 client_id: Option<ClientId>,
4656 params: Option<Params>,
4657 handler: ShareableMessageHandler,
4658 ) -> anyhow::Result<UUID4> {
4659 self.check_registered();
4660
4661 let now = self.clock_ref().utc_now();
4662 check_timestamps(now, start, end)?;
4663
4664 let request_id = UUID4::new();
4665 let command = RequestCommand::Instrument(RequestInstrument {
4666 instrument_id,
4667 start,
4668 end,
4669 client_id,
4670 request_id,
4671 ts_init: now.into(),
4672 params,
4673 });
4674
4675 get_message_bus()
4676 .borrow_mut()
4677 .register_response_handler(command.request_id(), handler)?;
4678
4679 self.send_data_cmd(DataCommand::Request(command));
4680
4681 Ok(request_id)
4682 }
4683
4684 pub fn request_instruments(
4690 &self,
4691 venue: Option<Venue>,
4692 start: Option<DateTime<Utc>>,
4693 end: Option<DateTime<Utc>>,
4694 client_id: Option<ClientId>,
4695 params: Option<Params>,
4696 handler: ShareableMessageHandler,
4697 ) -> anyhow::Result<UUID4> {
4698 self.check_registered();
4699
4700 let now = self.clock_ref().utc_now();
4701 check_timestamps(now, start, end)?;
4702
4703 let request_id = UUID4::new();
4704 let command = RequestCommand::Instruments(RequestInstruments {
4705 venue,
4706 start,
4707 end,
4708 client_id,
4709 request_id,
4710 ts_init: now.into(),
4711 params,
4712 });
4713
4714 get_message_bus()
4715 .borrow_mut()
4716 .register_response_handler(command.request_id(), handler)?;
4717
4718 self.send_data_cmd(DataCommand::Request(command));
4719
4720 Ok(request_id)
4721 }
4722
4723 pub fn request_book_snapshot(
4729 &self,
4730 instrument_id: InstrumentId,
4731 depth: Option<NonZeroUsize>,
4732 client_id: Option<ClientId>,
4733 params: Option<Params>,
4734 handler: ShareableMessageHandler,
4735 ) -> anyhow::Result<UUID4> {
4736 self.check_registered();
4737
4738 let request_id = UUID4::new();
4739 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4740 instrument_id,
4741 depth,
4742 client_id,
4743 request_id,
4744 ts_init: self.timestamp_ns(),
4745 params,
4746 });
4747
4748 get_message_bus()
4749 .borrow_mut()
4750 .register_response_handler(command.request_id(), handler)?;
4751
4752 self.send_data_cmd(DataCommand::Request(command));
4753
4754 Ok(request_id)
4755 }
4756
4757 #[expect(clippy::too_many_arguments)]
4763 pub fn request_book_deltas(
4764 &self,
4765 instrument_id: InstrumentId,
4766 start: Option<DateTime<Utc>>,
4767 end: Option<DateTime<Utc>>,
4768 limit: Option<NonZeroUsize>,
4769 client_id: Option<ClientId>,
4770 params: Option<Params>,
4771 handler: ShareableMessageHandler,
4772 ) -> anyhow::Result<UUID4> {
4773 self.check_registered();
4774
4775 let now = self.clock_ref().utc_now();
4776 check_timestamps(now, start, end)?;
4777
4778 let request_id = UUID4::new();
4779 let command = RequestCommand::BookDeltas(RequestBookDeltas {
4780 instrument_id,
4781 start,
4782 end,
4783 limit,
4784 client_id,
4785 request_id,
4786 ts_init: now.into(),
4787 params,
4788 });
4789
4790 get_message_bus()
4791 .borrow_mut()
4792 .register_response_handler(command.request_id(), handler)?;
4793
4794 self.send_data_cmd(DataCommand::Request(command));
4795
4796 Ok(request_id)
4797 }
4798
4799 #[expect(clippy::too_many_arguments)]
4805 pub fn request_book_depth(
4806 &self,
4807 instrument_id: InstrumentId,
4808 start: Option<DateTime<Utc>>,
4809 end: Option<DateTime<Utc>>,
4810 limit: Option<NonZeroUsize>,
4811 depth: Option<NonZeroUsize>,
4812 client_id: Option<ClientId>,
4813 params: Option<Params>,
4814 handler: ShareableMessageHandler,
4815 ) -> anyhow::Result<UUID4> {
4816 self.check_registered();
4817
4818 let now = self.clock_ref().utc_now();
4819 check_timestamps(now, start, end)?;
4820
4821 let request_id = UUID4::new();
4822 let command = RequestCommand::BookDepth(RequestBookDepth {
4823 instrument_id,
4824 start,
4825 end,
4826 limit,
4827 depth,
4828 client_id,
4829 request_id,
4830 ts_init: now.into(),
4831 params,
4832 });
4833
4834 get_message_bus()
4835 .borrow_mut()
4836 .register_response_handler(command.request_id(), handler)?;
4837
4838 self.send_data_cmd(DataCommand::Request(command));
4839
4840 Ok(request_id)
4841 }
4842
4843 #[expect(clippy::too_many_arguments)]
4849 pub fn request_quotes(
4850 &self,
4851 instrument_id: InstrumentId,
4852 start: Option<DateTime<Utc>>,
4853 end: Option<DateTime<Utc>>,
4854 limit: Option<NonZeroUsize>,
4855 client_id: Option<ClientId>,
4856 params: Option<Params>,
4857 handler: ShareableMessageHandler,
4858 ) -> anyhow::Result<UUID4> {
4859 self.check_registered();
4860
4861 let now = self.clock_ref().utc_now();
4862 check_timestamps(now, start, end)?;
4863
4864 let request_id = UUID4::new();
4865 let command = RequestCommand::Quotes(RequestQuotes {
4866 instrument_id,
4867 start,
4868 end,
4869 limit,
4870 client_id,
4871 request_id,
4872 ts_init: now.into(),
4873 params,
4874 });
4875
4876 get_message_bus()
4877 .borrow_mut()
4878 .register_response_handler(command.request_id(), handler)?;
4879
4880 self.send_data_cmd(DataCommand::Request(command));
4881
4882 Ok(request_id)
4883 }
4884
4885 #[expect(clippy::too_many_arguments)]
4891 pub fn request_trades(
4892 &self,
4893 instrument_id: InstrumentId,
4894 start: Option<DateTime<Utc>>,
4895 end: Option<DateTime<Utc>>,
4896 limit: Option<NonZeroUsize>,
4897 client_id: Option<ClientId>,
4898 params: Option<Params>,
4899 handler: ShareableMessageHandler,
4900 ) -> anyhow::Result<UUID4> {
4901 self.check_registered();
4902
4903 let now = self.clock_ref().utc_now();
4904 check_timestamps(now, start, end)?;
4905
4906 let request_id = UUID4::new();
4907 let command = RequestCommand::Trades(RequestTrades {
4908 instrument_id,
4909 start,
4910 end,
4911 limit,
4912 client_id,
4913 request_id,
4914 ts_init: now.into(),
4915 params,
4916 });
4917
4918 get_message_bus()
4919 .borrow_mut()
4920 .register_response_handler(command.request_id(), handler)?;
4921
4922 self.send_data_cmd(DataCommand::Request(command));
4923
4924 Ok(request_id)
4925 }
4926
4927 #[expect(clippy::too_many_arguments)]
4933 pub fn request_bars(
4934 &self,
4935 bar_type: BarType,
4936 start: Option<DateTime<Utc>>,
4937 end: Option<DateTime<Utc>>,
4938 limit: Option<NonZeroUsize>,
4939 client_id: Option<ClientId>,
4940 params: Option<Params>,
4941 handler: ShareableMessageHandler,
4942 ) -> anyhow::Result<UUID4> {
4943 self.check_registered();
4944
4945 let now = self.clock_ref().utc_now();
4946 check_timestamps(now, start, end)?;
4947
4948 let request_id = UUID4::new();
4949 let command = RequestCommand::Bars(RequestBars {
4950 bar_type,
4951 start,
4952 end,
4953 limit,
4954 client_id,
4955 request_id,
4956 ts_init: now.into(),
4957 params,
4958 });
4959
4960 get_message_bus()
4961 .borrow_mut()
4962 .register_response_handler(command.request_id(), handler)?;
4963
4964 self.send_data_cmd(DataCommand::Request(command));
4965
4966 Ok(request_id)
4967 }
4968
4969 #[expect(clippy::too_many_arguments)]
4975 pub fn request_funding_rates(
4976 &self,
4977 instrument_id: InstrumentId,
4978 start: Option<DateTime<Utc>>,
4979 end: Option<DateTime<Utc>>,
4980 limit: Option<NonZeroUsize>,
4981 client_id: Option<ClientId>,
4982 params: Option<Params>,
4983 handler: ShareableMessageHandler,
4984 ) -> anyhow::Result<UUID4> {
4985 self.check_registered();
4986
4987 let now = self.clock_ref().utc_now();
4988 check_timestamps(now, start, end)?;
4989
4990 let request_id = UUID4::new();
4991 let command = RequestCommand::FundingRates(RequestFundingRates {
4992 instrument_id,
4993 start,
4994 end,
4995 limit,
4996 client_id,
4997 request_id,
4998 ts_init: now.into(),
4999 params,
5000 });
5001
5002 get_message_bus()
5003 .borrow_mut()
5004 .register_response_handler(command.request_id(), handler)?;
5005
5006 self.send_data_cmd(DataCommand::Request(command));
5007
5008 Ok(request_id)
5009 }
5010
5011 #[cfg(test)]
5012 pub fn quote_handler_count(&self) -> usize {
5013 self.quote_handlers.len()
5014 }
5015
5016 #[cfg(test)]
5017 pub fn trade_handler_count(&self) -> usize {
5018 self.trade_handlers.len()
5019 }
5020
5021 #[cfg(test)]
5022 pub fn bar_handler_count(&self) -> usize {
5023 self.bar_handlers.len()
5024 }
5025
5026 #[cfg(test)]
5027 pub fn deltas_handler_count(&self) -> usize {
5028 self.deltas_handlers.len()
5029 }
5030
5031 #[cfg(test)]
5032 pub fn has_quote_handler(&self, topic: &str) -> bool {
5033 self.quote_handlers
5034 .contains_key(&MStr::<Topic>::from(topic))
5035 }
5036
5037 #[cfg(test)]
5038 pub fn has_trade_handler(&self, topic: &str) -> bool {
5039 self.trade_handlers
5040 .contains_key(&MStr::<Topic>::from(topic))
5041 }
5042
5043 #[cfg(test)]
5044 pub fn has_bar_handler(&self, topic: &str) -> bool {
5045 self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
5046 }
5047
5048 #[cfg(test)]
5049 pub fn has_deltas_handler(&self, pattern: &str) -> bool {
5050 self.deltas_handlers
5051 .contains_key(&MStr::<Pattern>::from(pattern))
5052 }
5053}
5054
5055impl DataActorNative for DataActorCore {
5056 fn core(&self) -> &DataActorCore {
5057 self
5058 }
5059
5060 fn core_mut(&mut self) -> &mut DataActorCore {
5061 self
5062 }
5063}
5064
5065fn check_timestamps(
5066 now: DateTime<Utc>,
5067 start: Option<DateTime<Utc>>,
5068 end: Option<DateTime<Utc>>,
5069) -> anyhow::Result<()> {
5070 if let Some(start) = start {
5071 check_predicate_true(start <= now, "start was > now")?;
5072 }
5073
5074 if let Some(end) = end {
5075 check_predicate_true(end <= now, "end was > now")?;
5076 }
5077
5078 if let (Some(start), Some(end)) = (start, end) {
5079 check_predicate_true(start < end, "start was >= end")?;
5080 }
5081
5082 Ok(())
5083}
5084
5085fn log_error(e: &anyhow::Error) {
5086 log::error!("{e}");
5087}
5088
5089fn log_not_running<T>(msg: &T)
5090where
5091 T: Debug,
5092{
5093 log::trace!("Received message when not running - skipping {msg:?}");
5094}
5095
5096fn log_received<T>(msg: &T)
5097where
5098 T: Debug,
5099{
5100 log::debug!("{RECV} {msg:?}");
5101}
5102
5103fn log_received_bulk(kind: &str, correlation_id: &UUID4, records: usize) {
5104 log::debug!("{RECV} {kind} correlation_id={correlation_id} records={records}");
5105}