1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24 sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39 close::InstrumentClose,
40 option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41 },
42 enums::BookType,
43 events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45 instruments::InstrumentAny,
46 orderbook::OrderBook,
47};
48use ustr::Ustr;
49
50#[cfg(feature = "indicators")]
51use super::indicators::Indicators;
52use super::{
53 Actor,
54 registry::{get_actor_unchecked, try_get_actor_unchecked},
55};
56#[cfg(feature = "defi")]
57use crate::defi;
58#[cfg(feature = "defi")]
59#[allow(unused_imports)]
60use crate::defi::data_actor as _; use crate::{
62 cache::Cache,
63 clock::Clock,
64 component::Component,
65 enums::{ComponentState, ComponentTrigger},
66 logging::{CMD, RECV, REQ, SEND},
67 messages::{
68 data::{
69 BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
70 InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
71 RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
72 RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
73 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
74 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
75 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
76 SubscribeMarkPrices, SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes,
77 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
78 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
79 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
80 UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
81 UnsubscribeMarkPrices, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
82 UnsubscribeQuotes, UnsubscribeTrades,
83 },
84 system::ShutdownSystem,
85 },
86 msgbus::{
87 self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
88 switchboard::{
89 MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
90 get_custom_topic, get_funding_rate_topic, get_index_price_topic,
91 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
92 get_instruments_pattern, get_mark_price_topic, get_option_chain_topic,
93 get_option_greeks_topic, get_order_cancels_topic, get_order_fills_topic,
94 get_quotes_topic, get_trades_topic,
95 },
96 },
97 signal::Signal,
98 timer::{TimeEvent, TimeEventCallback},
99};
100
101#[derive(Debug, Clone)]
103#[cfg_attr(
104 feature = "python",
105 pyo3::pyclass(
106 module = "nautilus_trader.core.nautilus_pyo3.common",
107 subclass,
108 from_py_object
109 )
110)]
111#[cfg_attr(
112 feature = "python",
113 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
114)]
115pub struct DataActorConfig {
116 pub actor_id: Option<ActorId>,
118 pub log_events: bool,
120 pub log_commands: bool,
122}
123
124impl Default for DataActorConfig {
125 fn default() -> Self {
126 Self {
127 actor_id: None,
128 log_events: true,
129 log_commands: true,
130 }
131 }
132}
133
134#[derive(Debug, Clone)]
136#[cfg_attr(
137 feature = "python",
138 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
139)]
140#[cfg_attr(
141 feature = "python",
142 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
143)]
144pub struct ImportableActorConfig {
145 pub actor_path: String,
147 pub config_path: String,
149 pub config: HashMap<String, serde_json::Value>,
151}
152
153type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
154
155pub trait DataActor:
156 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
157{
158 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
164 Ok(IndexMap::new())
165 }
166
167 #[allow(unused_variables)]
173 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
174 Ok(())
175 }
176
177 fn on_start(&mut self) -> anyhow::Result<()> {
183 log::warn!(
184 "The `on_start` handler was called when not overridden, \
185 it's expected that any actions required when starting the actor \
186 occur here, such as subscribing/requesting data"
187 );
188 Ok(())
189 }
190
191 fn on_stop(&mut self) -> anyhow::Result<()> {
197 log::warn!(
198 "The `on_stop` handler was called when not overridden, \
199 it's expected that any actions required when stopping the actor \
200 occur here, such as unsubscribing from data",
201 );
202 Ok(())
203 }
204
205 fn on_resume(&mut self) -> anyhow::Result<()> {
211 log::warn!(
212 "The `on_resume` handler was called when not overridden, \
213 it's expected that any actions required when resuming the actor \
214 following a stop occur here"
215 );
216 Ok(())
217 }
218
219 fn on_reset(&mut self) -> anyhow::Result<()> {
225 log::warn!(
226 "The `on_reset` handler was called when not overridden, \
227 it's expected that any actions required when resetting the actor \
228 occur here, such as resetting indicators and other state"
229 );
230 Ok(())
231 }
232
233 fn on_dispose(&mut self) -> anyhow::Result<()> {
239 Ok(())
240 }
241
242 fn on_degrade(&mut self) -> anyhow::Result<()> {
248 Ok(())
249 }
250
251 fn on_fault(&mut self) -> anyhow::Result<()> {
257 Ok(())
258 }
259
260 #[allow(unused_variables)]
266 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
267 Ok(())
268 }
269
270 #[allow(unused_variables)]
276 fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
277 Ok(())
278 }
279
280 #[allow(unused_variables)]
286 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
287 Ok(())
288 }
289
290 #[allow(unused_variables)]
296 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
297 Ok(())
298 }
299
300 #[allow(unused_variables)]
306 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
307 Ok(())
308 }
309
310 #[allow(unused_variables)]
316 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
317 Ok(())
318 }
319
320 #[allow(unused_variables)]
326 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
327 Ok(())
328 }
329
330 #[allow(unused_variables)]
336 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
337 Ok(())
338 }
339
340 #[allow(unused_variables)]
346 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
347 Ok(())
348 }
349
350 #[allow(unused_variables)]
356 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
357 Ok(())
358 }
359
360 #[allow(unused_variables)]
366 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
367 Ok(())
368 }
369
370 #[allow(unused_variables)]
376 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
377 Ok(())
378 }
379
380 #[allow(unused_variables)]
386 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
387 Ok(())
388 }
389
390 #[allow(unused_variables)]
396 fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
397 Ok(())
398 }
399
400 #[allow(unused_variables)]
406 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
407 Ok(())
408 }
409
410 #[allow(unused_variables)]
416 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
417 Ok(())
418 }
419
420 #[allow(unused_variables)]
426 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
427 Ok(())
428 }
429
430 #[allow(unused_variables)]
436 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
437 Ok(())
438 }
439
440 #[cfg(feature = "defi")]
441 #[allow(unused_variables)]
447 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
448 Ok(())
449 }
450
451 #[cfg(feature = "defi")]
452 #[allow(unused_variables)]
458 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
459 Ok(())
460 }
461
462 #[cfg(feature = "defi")]
463 #[allow(unused_variables)]
469 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
470 Ok(())
471 }
472
473 #[cfg(feature = "defi")]
474 #[allow(unused_variables)]
480 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
481 Ok(())
482 }
483
484 #[cfg(feature = "defi")]
485 #[allow(unused_variables)]
491 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
492 Ok(())
493 }
494
495 #[cfg(feature = "defi")]
496 #[allow(unused_variables)]
502 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
503 Ok(())
504 }
505
506 #[allow(unused_variables)]
512 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
513 Ok(())
514 }
515
516 #[allow(unused_variables)]
522 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
523 Ok(())
524 }
525
526 #[allow(unused_variables)]
532 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
533 Ok(())
534 }
535
536 #[allow(unused_variables)]
542 fn on_historical_funding_rates(
543 &mut self,
544 funding_rates: &[FundingRateUpdate],
545 ) -> anyhow::Result<()> {
546 Ok(())
547 }
548
549 #[allow(unused_variables)]
555 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
556 Ok(())
557 }
558
559 #[allow(unused_variables)]
565 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
566 Ok(())
567 }
568
569 #[allow(unused_variables)]
575 fn on_historical_index_prices(
576 &mut self,
577 index_prices: &[IndexPriceUpdate],
578 ) -> anyhow::Result<()> {
579 Ok(())
580 }
581
582 fn handle_time_event(&mut self, event: &TimeEvent) {
584 log_received(&event);
585
586 if self.not_running() {
587 log_not_running(&event);
588 return;
589 }
590
591 if let Err(e) = DataActor::on_time_event(self, event) {
592 log_error(&e);
593 }
594 }
595
596 fn handle_data(&mut self, data: &CustomData) {
598 log_received(&data);
599
600 if self.not_running() {
601 log_not_running(&data);
602 return;
603 }
604
605 if let Err(e) = self.on_data(data) {
606 log_error(&e);
607 }
608 }
609
610 fn handle_signal(&mut self, signal: &Signal) {
612 log_received(&signal);
613
614 if self.not_running() {
615 log_not_running(&signal);
616 return;
617 }
618
619 if let Err(e) = self.on_signal(signal) {
620 log_error(&e);
621 }
622 }
623
624 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
626 log_received(&instrument);
627
628 if self.not_running() {
629 log_not_running(&instrument);
630 return;
631 }
632
633 if let Err(e) = self.on_instrument(instrument) {
634 log_error(&e);
635 }
636 }
637
638 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
640 log_received(&deltas);
641
642 if self.not_running() {
643 log_not_running(&deltas);
644 return;
645 }
646
647 if let Err(e) = self.on_book_deltas(deltas) {
648 log_error(&e);
649 }
650 }
651
652 fn handle_book(&mut self, book: &OrderBook) {
654 log_received(&book);
655
656 if self.not_running() {
657 log_not_running(&book);
658 return;
659 }
660
661 if let Err(e) = self.on_book(book) {
662 log_error(&e);
663 }
664 }
665
666 fn handle_quote(&mut self, quote: &QuoteTick) {
668 log_received("e);
669
670 if self.not_running() {
671 log_not_running("e);
672 return;
673 }
674
675 if let Err(e) = self.on_quote(quote) {
676 log_error(&e);
677 }
678 }
679
680 fn handle_trade(&mut self, trade: &TradeTick) {
682 log_received(&trade);
683
684 if self.not_running() {
685 log_not_running(&trade);
686 return;
687 }
688
689 if let Err(e) = self.on_trade(trade) {
690 log_error(&e);
691 }
692 }
693
694 fn handle_bar(&mut self, bar: &Bar) {
696 log_received(&bar);
697
698 if self.not_running() {
699 log_not_running(&bar);
700 return;
701 }
702
703 if let Err(e) = self.on_bar(bar) {
704 log_error(&e);
705 }
706 }
707
708 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
710 log_received(&mark_price);
711
712 if self.not_running() {
713 log_not_running(&mark_price);
714 return;
715 }
716
717 if let Err(e) = self.on_mark_price(mark_price) {
718 log_error(&e);
719 }
720 }
721
722 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
724 log_received(&index_price);
725
726 if self.not_running() {
727 log_not_running(&index_price);
728 return;
729 }
730
731 if let Err(e) = self.on_index_price(index_price) {
732 log_error(&e);
733 }
734 }
735
736 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
738 log_received(&funding_rate);
739
740 if self.not_running() {
741 log_not_running(&funding_rate);
742 return;
743 }
744
745 if let Err(e) = self.on_funding_rate(funding_rate) {
746 log_error(&e);
747 }
748 }
749
750 fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
752 log_received(&greeks);
753
754 if self.not_running() {
755 log_not_running(&greeks);
756 return;
757 }
758
759 if let Err(e) = self.on_option_greeks(greeks) {
760 log_error(&e);
761 }
762 }
763
764 fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
766 log_received(&slice);
767
768 if self.not_running() {
769 log_not_running(&slice);
770 return;
771 }
772
773 if let Err(e) = self.on_option_chain(slice) {
774 log_error(&e);
775 }
776 }
777
778 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
780 log_received(&status);
781
782 if self.not_running() {
783 log_not_running(&status);
784 return;
785 }
786
787 if let Err(e) = self.on_instrument_status(status) {
788 log_error(&e);
789 }
790 }
791
792 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
794 log_received(&close);
795
796 if self.not_running() {
797 log_not_running(&close);
798 return;
799 }
800
801 if let Err(e) = self.on_instrument_close(close) {
802 log_error(&e);
803 }
804 }
805
806 fn handle_order_filled(&mut self, event: &OrderFilled) {
808 log_received(&event);
809
810 if event.strategy_id.inner() == self.actor_id().inner() {
814 return;
815 }
816
817 if self.not_running() {
818 log_not_running(&event);
819 return;
820 }
821
822 if let Err(e) = self.on_order_filled(event) {
823 log_error(&e);
824 }
825 }
826
827 fn handle_order_canceled(&mut self, event: &OrderCanceled) {
829 log_received(&event);
830
831 if event.strategy_id.inner() == self.actor_id().inner() {
835 return;
836 }
837
838 if self.not_running() {
839 log_not_running(&event);
840 return;
841 }
842
843 if let Err(e) = self.on_order_canceled(event) {
844 log_error(&e);
845 }
846 }
847
848 #[cfg(feature = "defi")]
849 fn handle_block(&mut self, block: &Block) {
851 log_received(&block);
852
853 if self.not_running() {
854 log_not_running(&block);
855 return;
856 }
857
858 if let Err(e) = self.on_block(block) {
859 log_error(&e);
860 }
861 }
862
863 #[cfg(feature = "defi")]
864 fn handle_pool(&mut self, pool: &Pool) {
866 log_received(&pool);
867
868 if self.not_running() {
869 log_not_running(&pool);
870 return;
871 }
872
873 if let Err(e) = self.on_pool(pool) {
874 log_error(&e);
875 }
876 }
877
878 #[cfg(feature = "defi")]
879 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
881 log_received(&swap);
882
883 if self.not_running() {
884 log_not_running(&swap);
885 return;
886 }
887
888 if let Err(e) = self.on_pool_swap(swap) {
889 log_error(&e);
890 }
891 }
892
893 #[cfg(feature = "defi")]
894 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
896 log_received(&update);
897
898 if self.not_running() {
899 log_not_running(&update);
900 return;
901 }
902
903 if let Err(e) = self.on_pool_liquidity_update(update) {
904 log_error(&e);
905 }
906 }
907
908 #[cfg(feature = "defi")]
909 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
911 log_received(&collect);
912
913 if self.not_running() {
914 log_not_running(&collect);
915 return;
916 }
917
918 if let Err(e) = self.on_pool_fee_collect(collect) {
919 log_error(&e);
920 }
921 }
922
923 #[cfg(feature = "defi")]
924 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
926 log_received(&flash);
927
928 if self.not_running() {
929 log_not_running(&flash);
930 return;
931 }
932
933 if let Err(e) = self.on_pool_flash(flash) {
934 log_error(&e);
935 }
936 }
937
938 fn handle_historical_data(&mut self, data: &dyn Any) {
940 log_received(&data);
941
942 if let Err(e) = self.on_historical_data(data) {
943 log_error(&e);
944 }
945 }
946
947 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
949 log_received(&resp);
950
951 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
952 log_error(&e);
953 }
954 }
955
956 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
958 log_received(&resp);
959
960 if let Err(e) = self.on_instrument(&resp.data) {
961 log_error(&e);
962 }
963 }
964
965 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
967 log_received(&resp);
968
969 for inst in &resp.data {
970 if let Err(e) = self.on_instrument(inst) {
971 log_error(&e);
972 }
973 }
974 }
975
976 fn handle_book_response(&mut self, resp: &BookResponse) {
978 log_received(&resp);
979
980 if let Err(e) = self.on_book(&resp.data) {
981 log_error(&e);
982 }
983 }
984
985 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
987 log_received(&resp);
988
989 if let Err(e) = self.on_historical_quotes(&resp.data) {
990 log_error(&e);
991 }
992 }
993
994 fn handle_trades_response(&mut self, resp: &TradesResponse) {
996 log_received(&resp);
997
998 if let Err(e) = self.on_historical_trades(&resp.data) {
999 log_error(&e);
1000 }
1001 }
1002
1003 fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1005 log_received(&resp);
1006
1007 if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1008 log_error(&e);
1009 }
1010 }
1011
1012 fn handle_bars_response(&mut self, resp: &BarsResponse) {
1014 log_received(&resp);
1015
1016 if let Err(e) = self.on_historical_bars(&resp.data) {
1017 log_error(&e);
1018 }
1019 }
1020
1021 fn subscribe_data(
1023 &mut self,
1024 data_type: DataType,
1025 client_id: Option<ClientId>,
1026 params: Option<Params>,
1027 ) where
1028 Self: 'static + Debug + Sized,
1029 {
1030 let actor_id = self.actor_id().inner();
1031 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1032 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1033 });
1034
1035 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1036 }
1037
1038 fn subscribe_quotes(
1040 &mut self,
1041 instrument_id: InstrumentId,
1042 client_id: Option<ClientId>,
1043 params: Option<Params>,
1044 ) where
1045 Self: 'static + Debug + Sized,
1046 {
1047 let actor_id = self.actor_id().inner();
1048 let topic = get_quotes_topic(instrument_id);
1049
1050 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1051 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1052 actor.handle_quote(quote);
1053 } else {
1054 log::error!("Actor {actor_id} not found for quote handling");
1055 }
1056 });
1057
1058 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1059 }
1060
1061 fn subscribe_instruments(
1063 &mut self,
1064 venue: Venue,
1065 client_id: Option<ClientId>,
1066 params: Option<Params>,
1067 ) where
1068 Self: 'static + Debug + Sized,
1069 {
1070 let actor_id = self.actor_id().inner();
1071 let pattern = get_instruments_pattern(venue);
1072
1073 let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1074 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1075 actor.handle_instrument(instrument);
1076 } else {
1077 log::error!("Actor {actor_id} not found for instruments handling");
1078 }
1079 });
1080
1081 DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1082 }
1083
1084 fn subscribe_instrument(
1086 &mut self,
1087 instrument_id: InstrumentId,
1088 client_id: Option<ClientId>,
1089 params: Option<Params>,
1090 ) where
1091 Self: 'static + Debug + Sized,
1092 {
1093 let actor_id = self.actor_id().inner();
1094 let topic = get_instrument_topic(instrument_id);
1095
1096 let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1097 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1098 actor.handle_instrument(instrument);
1099 } else {
1100 log::error!("Actor {actor_id} not found for instrument handling");
1101 }
1102 });
1103
1104 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1105 }
1106
1107 fn subscribe_book_deltas(
1109 &mut self,
1110 instrument_id: InstrumentId,
1111 book_type: BookType,
1112 depth: Option<NonZeroUsize>,
1113 client_id: Option<ClientId>,
1114 managed: bool,
1115 params: Option<Params>,
1116 ) where
1117 Self: 'static + Debug + Sized,
1118 {
1119 let actor_id = self.actor_id().inner();
1120 let topic = get_book_deltas_topic(instrument_id);
1121
1122 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1123 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1124 });
1125
1126 DataActorCore::subscribe_book_deltas(
1127 self,
1128 topic,
1129 handler,
1130 instrument_id,
1131 book_type,
1132 depth,
1133 client_id,
1134 managed,
1135 params,
1136 );
1137 }
1138
1139 fn subscribe_book_at_interval(
1141 &mut self,
1142 instrument_id: InstrumentId,
1143 book_type: BookType,
1144 depth: Option<NonZeroUsize>,
1145 interval_ms: NonZeroUsize,
1146 client_id: Option<ClientId>,
1147 params: Option<Params>,
1148 ) where
1149 Self: 'static + Debug + Sized,
1150 {
1151 let actor_id = self.actor_id().inner();
1152 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1153
1154 let handler = TypedHandler::from(move |book: &OrderBook| {
1155 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1156 });
1157
1158 DataActorCore::subscribe_book_at_interval(
1159 self,
1160 topic,
1161 handler,
1162 instrument_id,
1163 book_type,
1164 depth,
1165 interval_ms,
1166 client_id,
1167 params,
1168 );
1169 }
1170
1171 fn subscribe_trades(
1173 &mut self,
1174 instrument_id: InstrumentId,
1175 client_id: Option<ClientId>,
1176 params: Option<Params>,
1177 ) where
1178 Self: 'static + Debug + Sized,
1179 {
1180 let actor_id = self.actor_id().inner();
1181 let topic = get_trades_topic(instrument_id);
1182
1183 let handler = TypedHandler::from(move |trade: &TradeTick| {
1184 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1185 });
1186
1187 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1188 }
1189
1190 fn subscribe_bars(
1192 &mut self,
1193 bar_type: BarType,
1194 client_id: Option<ClientId>,
1195 params: Option<Params>,
1196 ) where
1197 Self: 'static + Debug + Sized,
1198 {
1199 let actor_id = self.actor_id().inner();
1200 let topic = get_bars_topic(bar_type);
1201
1202 let handler = TypedHandler::from(move |bar: &Bar| {
1203 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1204 });
1205
1206 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1207 }
1208
1209 fn subscribe_mark_prices(
1211 &mut self,
1212 instrument_id: InstrumentId,
1213 client_id: Option<ClientId>,
1214 params: Option<Params>,
1215 ) where
1216 Self: 'static + Debug + Sized,
1217 {
1218 let actor_id = self.actor_id().inner();
1219 let topic = get_mark_price_topic(instrument_id);
1220
1221 let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1222 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1223 });
1224
1225 DataActorCore::subscribe_mark_prices(
1226 self,
1227 topic,
1228 handler,
1229 instrument_id,
1230 client_id,
1231 params,
1232 );
1233 }
1234
1235 fn subscribe_index_prices(
1237 &mut self,
1238 instrument_id: InstrumentId,
1239 client_id: Option<ClientId>,
1240 params: Option<Params>,
1241 ) where
1242 Self: 'static + Debug + Sized,
1243 {
1244 let actor_id = self.actor_id().inner();
1245 let topic = get_index_price_topic(instrument_id);
1246
1247 let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1248 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1249 });
1250
1251 DataActorCore::subscribe_index_prices(
1252 self,
1253 topic,
1254 handler,
1255 instrument_id,
1256 client_id,
1257 params,
1258 );
1259 }
1260
1261 fn subscribe_funding_rates(
1263 &mut self,
1264 instrument_id: InstrumentId,
1265 client_id: Option<ClientId>,
1266 params: Option<Params>,
1267 ) where
1268 Self: 'static + Debug + Sized,
1269 {
1270 let actor_id = self.actor_id().inner();
1271 let topic = get_funding_rate_topic(instrument_id);
1272
1273 let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1274 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1275 });
1276
1277 DataActorCore::subscribe_funding_rates(
1278 self,
1279 topic,
1280 handler,
1281 instrument_id,
1282 client_id,
1283 params,
1284 );
1285 }
1286
1287 fn subscribe_option_greeks(
1289 &mut self,
1290 instrument_id: InstrumentId,
1291 client_id: Option<ClientId>,
1292 params: Option<Params>,
1293 ) where
1294 Self: 'static + Debug + Sized,
1295 {
1296 let actor_id = self.actor_id().inner();
1297 let topic = get_option_greeks_topic(instrument_id);
1298
1299 let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1300 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1301 actor.handle_option_greeks(option_greeks);
1302 } else {
1303 log::error!("Actor {actor_id} not found for option greeks handling");
1304 }
1305 });
1306
1307 DataActorCore::subscribe_option_greeks(
1308 self,
1309 topic,
1310 handler,
1311 instrument_id,
1312 client_id,
1313 params,
1314 );
1315 }
1316
1317 fn subscribe_instrument_status(
1319 &mut self,
1320 instrument_id: InstrumentId,
1321 client_id: Option<ClientId>,
1322 params: Option<Params>,
1323 ) where
1324 Self: 'static + Debug + Sized,
1325 {
1326 let actor_id = self.actor_id().inner();
1327 let topic = get_instrument_status_topic(instrument_id);
1328
1329 let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1330 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1331 });
1332
1333 DataActorCore::subscribe_instrument_status(
1334 self,
1335 topic,
1336 handler,
1337 instrument_id,
1338 client_id,
1339 params,
1340 );
1341 }
1342
1343 fn subscribe_instrument_close(
1345 &mut self,
1346 instrument_id: InstrumentId,
1347 client_id: Option<ClientId>,
1348 params: Option<Params>,
1349 ) where
1350 Self: 'static + Debug + Sized,
1351 {
1352 let actor_id = self.actor_id().inner();
1353 let topic = get_instrument_close_topic(instrument_id);
1354
1355 let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1356 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1357 });
1358
1359 DataActorCore::subscribe_instrument_close(
1360 self,
1361 topic,
1362 handler,
1363 instrument_id,
1364 client_id,
1365 params,
1366 );
1367 }
1368
1369 fn subscribe_option_chain(
1374 &mut self,
1375 series_id: OptionSeriesId,
1376 strike_range: StrikeRange,
1377 snapshot_interval_ms: Option<u64>,
1378 client_id: Option<ClientId>,
1379 ) where
1380 Self: 'static + Debug + Sized,
1381 {
1382 let actor_id = self.actor_id().inner();
1383 let topic = get_option_chain_topic(series_id);
1384
1385 let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1386 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1387 actor.handle_option_chain(slice);
1388 } else {
1389 log::error!("Actor {actor_id} not found for option chain handling");
1390 }
1391 });
1392
1393 DataActorCore::subscribe_option_chain(
1394 self,
1395 topic,
1396 handler,
1397 series_id,
1398 strike_range,
1399 snapshot_interval_ms,
1400 client_id,
1401 );
1402 }
1403
1404 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1406 where
1407 Self: 'static + Debug + Sized,
1408 {
1409 let actor_id = self.actor_id().inner();
1410 let topic = get_order_fills_topic(instrument_id);
1411
1412 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1413 if let OrderEventAny::Filled(filled) = event {
1414 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1415 }
1416 });
1417
1418 DataActorCore::subscribe_order_fills(self, topic, handler);
1419 }
1420
1421 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1423 where
1424 Self: 'static + Debug + Sized,
1425 {
1426 let actor_id = self.actor_id().inner();
1427 let topic = get_order_cancels_topic(instrument_id);
1428
1429 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1430 if let OrderEventAny::Canceled(canceled) = event {
1431 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1432 }
1433 });
1434
1435 DataActorCore::subscribe_order_cancels(self, topic, handler);
1436 }
1437
1438 #[cfg(feature = "defi")]
1439 fn subscribe_blocks(
1441 &mut self,
1442 chain: Blockchain,
1443 client_id: Option<ClientId>,
1444 params: Option<Params>,
1445 ) where
1446 Self: 'static + Debug + Sized,
1447 {
1448 let actor_id = self.actor_id().inner();
1449 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1450
1451 let handler = TypedHandler::from(move |block: &Block| {
1452 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1453 });
1454
1455 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1456 }
1457
1458 #[cfg(feature = "defi")]
1459 fn subscribe_pool(
1461 &mut self,
1462 instrument_id: InstrumentId,
1463 client_id: Option<ClientId>,
1464 params: Option<Params>,
1465 ) where
1466 Self: 'static + Debug + Sized,
1467 {
1468 let actor_id = self.actor_id().inner();
1469 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1470
1471 let handler = TypedHandler::from(move |pool: &Pool| {
1472 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1473 });
1474
1475 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1476 }
1477
1478 #[cfg(feature = "defi")]
1479 fn subscribe_pool_swaps(
1481 &mut self,
1482 instrument_id: InstrumentId,
1483 client_id: Option<ClientId>,
1484 params: Option<Params>,
1485 ) where
1486 Self: 'static + Debug + Sized,
1487 {
1488 let actor_id = self.actor_id().inner();
1489 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1490
1491 let handler = TypedHandler::from(move |swap: &PoolSwap| {
1492 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1493 });
1494
1495 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1496 }
1497
1498 #[cfg(feature = "defi")]
1499 fn subscribe_pool_liquidity_updates(
1501 &mut self,
1502 instrument_id: InstrumentId,
1503 client_id: Option<ClientId>,
1504 params: Option<Params>,
1505 ) where
1506 Self: 'static + Debug + Sized,
1507 {
1508 let actor_id = self.actor_id().inner();
1509 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1510
1511 let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1512 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1513 });
1514
1515 DataActorCore::subscribe_pool_liquidity_updates(
1516 self,
1517 topic,
1518 handler,
1519 instrument_id,
1520 client_id,
1521 params,
1522 );
1523 }
1524
1525 #[cfg(feature = "defi")]
1526 fn subscribe_pool_fee_collects(
1528 &mut self,
1529 instrument_id: InstrumentId,
1530 client_id: Option<ClientId>,
1531 params: Option<Params>,
1532 ) where
1533 Self: 'static + Debug + Sized,
1534 {
1535 let actor_id = self.actor_id().inner();
1536 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1537
1538 let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1539 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1540 });
1541
1542 DataActorCore::subscribe_pool_fee_collects(
1543 self,
1544 topic,
1545 handler,
1546 instrument_id,
1547 client_id,
1548 params,
1549 );
1550 }
1551
1552 #[cfg(feature = "defi")]
1553 fn subscribe_pool_flash_events(
1555 &mut self,
1556 instrument_id: InstrumentId,
1557 client_id: Option<ClientId>,
1558 params: Option<Params>,
1559 ) where
1560 Self: 'static + Debug + Sized,
1561 {
1562 let actor_id = self.actor_id().inner();
1563 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1564
1565 let handler = TypedHandler::from(move |flash: &PoolFlash| {
1566 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1567 });
1568
1569 DataActorCore::subscribe_pool_flash_events(
1570 self,
1571 topic,
1572 handler,
1573 instrument_id,
1574 client_id,
1575 params,
1576 );
1577 }
1578
1579 fn unsubscribe_data(
1581 &mut self,
1582 data_type: DataType,
1583 client_id: Option<ClientId>,
1584 params: Option<Params>,
1585 ) where
1586 Self: 'static + Debug + Sized,
1587 {
1588 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1589 }
1590
1591 fn unsubscribe_instruments(
1593 &mut self,
1594 venue: Venue,
1595 client_id: Option<ClientId>,
1596 params: Option<Params>,
1597 ) where
1598 Self: 'static + Debug + Sized,
1599 {
1600 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1601 }
1602
1603 fn unsubscribe_instrument(
1605 &mut self,
1606 instrument_id: InstrumentId,
1607 client_id: Option<ClientId>,
1608 params: Option<Params>,
1609 ) where
1610 Self: 'static + Debug + Sized,
1611 {
1612 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1613 }
1614
1615 fn unsubscribe_book_deltas(
1617 &mut self,
1618 instrument_id: InstrumentId,
1619 client_id: Option<ClientId>,
1620 params: Option<Params>,
1621 ) where
1622 Self: 'static + Debug + Sized,
1623 {
1624 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1625 }
1626
1627 fn unsubscribe_book_at_interval(
1629 &mut self,
1630 instrument_id: InstrumentId,
1631 interval_ms: NonZeroUsize,
1632 client_id: Option<ClientId>,
1633 params: Option<Params>,
1634 ) where
1635 Self: 'static + Debug + Sized,
1636 {
1637 DataActorCore::unsubscribe_book_at_interval(
1638 self,
1639 instrument_id,
1640 interval_ms,
1641 client_id,
1642 params,
1643 );
1644 }
1645
1646 fn unsubscribe_quotes(
1648 &mut self,
1649 instrument_id: InstrumentId,
1650 client_id: Option<ClientId>,
1651 params: Option<Params>,
1652 ) where
1653 Self: 'static + Debug + Sized,
1654 {
1655 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1656 }
1657
1658 fn unsubscribe_trades(
1660 &mut self,
1661 instrument_id: InstrumentId,
1662 client_id: Option<ClientId>,
1663 params: Option<Params>,
1664 ) where
1665 Self: 'static + Debug + Sized,
1666 {
1667 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1668 }
1669
1670 fn unsubscribe_bars(
1672 &mut self,
1673 bar_type: BarType,
1674 client_id: Option<ClientId>,
1675 params: Option<Params>,
1676 ) where
1677 Self: 'static + Debug + Sized,
1678 {
1679 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1680 }
1681
1682 fn unsubscribe_mark_prices(
1684 &mut self,
1685 instrument_id: InstrumentId,
1686 client_id: Option<ClientId>,
1687 params: Option<Params>,
1688 ) where
1689 Self: 'static + Debug + Sized,
1690 {
1691 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1692 }
1693
1694 fn unsubscribe_index_prices(
1696 &mut self,
1697 instrument_id: InstrumentId,
1698 client_id: Option<ClientId>,
1699 params: Option<Params>,
1700 ) where
1701 Self: 'static + Debug + Sized,
1702 {
1703 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1704 }
1705
1706 fn unsubscribe_funding_rates(
1708 &mut self,
1709 instrument_id: InstrumentId,
1710 client_id: Option<ClientId>,
1711 params: Option<Params>,
1712 ) where
1713 Self: 'static + Debug + Sized,
1714 {
1715 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1716 }
1717
1718 fn unsubscribe_option_greeks(
1720 &mut self,
1721 instrument_id: InstrumentId,
1722 client_id: Option<ClientId>,
1723 params: Option<Params>,
1724 ) where
1725 Self: 'static + Debug + Sized,
1726 {
1727 DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1728 }
1729
1730 fn unsubscribe_instrument_status(
1732 &mut self,
1733 instrument_id: InstrumentId,
1734 client_id: Option<ClientId>,
1735 params: Option<Params>,
1736 ) where
1737 Self: 'static + Debug + Sized,
1738 {
1739 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1740 }
1741
1742 fn unsubscribe_instrument_close(
1744 &mut self,
1745 instrument_id: InstrumentId,
1746 client_id: Option<ClientId>,
1747 params: Option<Params>,
1748 ) where
1749 Self: 'static + Debug + Sized,
1750 {
1751 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1752 }
1753
1754 fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1756 where
1757 Self: 'static + Debug + Sized,
1758 {
1759 DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1760 }
1761
1762 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1764 where
1765 Self: 'static + Debug + Sized,
1766 {
1767 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1768 }
1769
1770 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1772 where
1773 Self: 'static + Debug + Sized,
1774 {
1775 DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1776 }
1777
1778 #[cfg(feature = "defi")]
1779 fn unsubscribe_blocks(
1781 &mut self,
1782 chain: Blockchain,
1783 client_id: Option<ClientId>,
1784 params: Option<Params>,
1785 ) where
1786 Self: 'static + Debug + Sized,
1787 {
1788 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1789 }
1790
1791 #[cfg(feature = "defi")]
1792 fn unsubscribe_pool(
1794 &mut self,
1795 instrument_id: InstrumentId,
1796 client_id: Option<ClientId>,
1797 params: Option<Params>,
1798 ) where
1799 Self: 'static + Debug + Sized,
1800 {
1801 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1802 }
1803
1804 #[cfg(feature = "defi")]
1805 fn unsubscribe_pool_swaps(
1807 &mut self,
1808 instrument_id: InstrumentId,
1809 client_id: Option<ClientId>,
1810 params: Option<Params>,
1811 ) where
1812 Self: 'static + Debug + Sized,
1813 {
1814 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1815 }
1816
1817 #[cfg(feature = "defi")]
1818 fn unsubscribe_pool_liquidity_updates(
1820 &mut self,
1821 instrument_id: InstrumentId,
1822 client_id: Option<ClientId>,
1823 params: Option<Params>,
1824 ) where
1825 Self: 'static + Debug + Sized,
1826 {
1827 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1828 }
1829
1830 #[cfg(feature = "defi")]
1831 fn unsubscribe_pool_fee_collects(
1833 &mut self,
1834 instrument_id: InstrumentId,
1835 client_id: Option<ClientId>,
1836 params: Option<Params>,
1837 ) where
1838 Self: 'static + Debug + Sized,
1839 {
1840 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1841 }
1842
1843 #[cfg(feature = "defi")]
1844 fn unsubscribe_pool_flash_events(
1846 &mut self,
1847 instrument_id: InstrumentId,
1848 client_id: Option<ClientId>,
1849 params: Option<Params>,
1850 ) where
1851 Self: 'static + Debug + Sized,
1852 {
1853 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1854 }
1855
1856 fn request_data(
1862 &mut self,
1863 data_type: DataType,
1864 client_id: ClientId,
1865 start: Option<DateTime<Utc>>,
1866 end: Option<DateTime<Utc>>,
1867 limit: Option<NonZeroUsize>,
1868 params: Option<Params>,
1869 ) -> anyhow::Result<UUID4>
1870 where
1871 Self: 'static + Debug + Sized,
1872 {
1873 let actor_id = self.actor_id().inner();
1874 let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1875 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1876 });
1877
1878 DataActorCore::request_data(
1879 self, data_type, client_id, start, end, limit, params, handler,
1880 )
1881 }
1882
1883 fn request_instrument(
1889 &mut self,
1890 instrument_id: InstrumentId,
1891 start: Option<DateTime<Utc>>,
1892 end: Option<DateTime<Utc>>,
1893 client_id: Option<ClientId>,
1894 params: Option<Params>,
1895 ) -> anyhow::Result<UUID4>
1896 where
1897 Self: 'static + Debug + Sized,
1898 {
1899 let actor_id = self.actor_id().inner();
1900 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1901 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1902 });
1903
1904 DataActorCore::request_instrument(
1905 self,
1906 instrument_id,
1907 start,
1908 end,
1909 client_id,
1910 params,
1911 handler,
1912 )
1913 }
1914
1915 fn request_instruments(
1921 &mut self,
1922 venue: Option<Venue>,
1923 start: Option<DateTime<Utc>>,
1924 end: Option<DateTime<Utc>>,
1925 client_id: Option<ClientId>,
1926 params: Option<Params>,
1927 ) -> anyhow::Result<UUID4>
1928 where
1929 Self: 'static + Debug + Sized,
1930 {
1931 let actor_id = self.actor_id().inner();
1932 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1933 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1934 });
1935
1936 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1937 }
1938
1939 fn request_book_snapshot(
1945 &mut self,
1946 instrument_id: InstrumentId,
1947 depth: Option<NonZeroUsize>,
1948 client_id: Option<ClientId>,
1949 params: Option<Params>,
1950 ) -> anyhow::Result<UUID4>
1951 where
1952 Self: 'static + Debug + Sized,
1953 {
1954 let actor_id = self.actor_id().inner();
1955 let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
1956 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1957 });
1958
1959 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1960 }
1961
1962 fn request_quotes(
1968 &mut self,
1969 instrument_id: InstrumentId,
1970 start: Option<DateTime<Utc>>,
1971 end: Option<DateTime<Utc>>,
1972 limit: Option<NonZeroUsize>,
1973 client_id: Option<ClientId>,
1974 params: Option<Params>,
1975 ) -> anyhow::Result<UUID4>
1976 where
1977 Self: 'static + Debug + Sized,
1978 {
1979 let actor_id = self.actor_id().inner();
1980 let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
1981 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1982 });
1983
1984 DataActorCore::request_quotes(
1985 self,
1986 instrument_id,
1987 start,
1988 end,
1989 limit,
1990 client_id,
1991 params,
1992 handler,
1993 )
1994 }
1995
1996 fn request_trades(
2002 &mut self,
2003 instrument_id: InstrumentId,
2004 start: Option<DateTime<Utc>>,
2005 end: Option<DateTime<Utc>>,
2006 limit: Option<NonZeroUsize>,
2007 client_id: Option<ClientId>,
2008 params: Option<Params>,
2009 ) -> anyhow::Result<UUID4>
2010 where
2011 Self: 'static + Debug + Sized,
2012 {
2013 let actor_id = self.actor_id().inner();
2014 let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2015 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2016 });
2017
2018 DataActorCore::request_trades(
2019 self,
2020 instrument_id,
2021 start,
2022 end,
2023 limit,
2024 client_id,
2025 params,
2026 handler,
2027 )
2028 }
2029
2030 fn request_funding_rates(
2036 &mut self,
2037 instrument_id: InstrumentId,
2038 start: Option<DateTime<Utc>>,
2039 end: Option<DateTime<Utc>>,
2040 limit: Option<NonZeroUsize>,
2041 client_id: Option<ClientId>,
2042 params: Option<Params>,
2043 ) -> anyhow::Result<UUID4>
2044 where
2045 Self: 'static + Debug + Sized,
2046 {
2047 let actor_id = self.actor_id().inner();
2048 let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2049 get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2050 });
2051
2052 DataActorCore::request_funding_rates(
2053 self,
2054 instrument_id,
2055 start,
2056 end,
2057 limit,
2058 client_id,
2059 params,
2060 handler,
2061 )
2062 }
2063
2064 fn request_bars(
2070 &mut self,
2071 bar_type: BarType,
2072 start: Option<DateTime<Utc>>,
2073 end: Option<DateTime<Utc>>,
2074 limit: Option<NonZeroUsize>,
2075 client_id: Option<ClientId>,
2076 params: Option<Params>,
2077 ) -> anyhow::Result<UUID4>
2078 where
2079 Self: 'static + Debug + Sized,
2080 {
2081 let actor_id = self.actor_id().inner();
2082 let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2083 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2084 });
2085
2086 DataActorCore::request_bars(
2087 self, bar_type, start, end, limit, client_id, params, handler,
2088 )
2089 }
2090}
2091
2092impl<T> Actor for T
2094where
2095 T: DataActor + Debug + 'static,
2096{
2097 fn id(&self) -> Ustr {
2098 self.actor_id.inner()
2099 }
2100
2101 #[allow(unused_variables)]
2102 fn handle(&mut self, msg: &dyn Any) {
2103 }
2105
2106 fn as_any(&self) -> &dyn Any {
2107 self
2108 }
2109}
2110
2111impl<T> Component for T
2113where
2114 T: DataActor + Debug + 'static,
2115{
2116 fn component_id(&self) -> ComponentId {
2117 ComponentId::new(self.actor_id.inner().as_str())
2118 }
2119
2120 fn state(&self) -> ComponentState {
2121 self.state
2122 }
2123
2124 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2125 self.state = self.state.transition(&trigger)?;
2126 log::info!("{}", self.state.variant_name());
2127 Ok(())
2128 }
2129
2130 fn register(
2131 &mut self,
2132 trader_id: TraderId,
2133 clock: Rc<RefCell<dyn Clock>>,
2134 cache: Rc<RefCell<Cache>>,
2135 ) -> anyhow::Result<()> {
2136 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2137
2138 let actor_id = self.actor_id().inner();
2140 let callback = TimeEventCallback::from(move |event: TimeEvent| {
2141 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2142 actor.handle_time_event(&event);
2143 } else {
2144 log::error!("Actor {actor_id} not found for time event handling");
2145 }
2146 });
2147
2148 clock.borrow_mut().register_default_handler(callback);
2149
2150 self.initialize()
2151 }
2152
2153 fn on_start(&mut self) -> anyhow::Result<()> {
2154 DataActor::on_start(self)
2155 }
2156
2157 fn on_stop(&mut self) -> anyhow::Result<()> {
2158 DataActor::on_stop(self)
2159 }
2160
2161 fn on_resume(&mut self) -> anyhow::Result<()> {
2162 DataActor::on_resume(self)
2163 }
2164
2165 fn on_degrade(&mut self) -> anyhow::Result<()> {
2166 DataActor::on_degrade(self)
2167 }
2168
2169 fn on_fault(&mut self) -> anyhow::Result<()> {
2170 DataActor::on_fault(self)
2171 }
2172
2173 fn on_reset(&mut self) -> anyhow::Result<()> {
2174 DataActor::on_reset(self)
2175 }
2176
2177 fn on_dispose(&mut self) -> anyhow::Result<()> {
2178 DataActor::on_dispose(self)
2179 }
2180}
2181
2182#[derive(Clone)]
2184#[allow(
2185 dead_code,
2186 reason = "TODO: Under development (pending_requests, signal_classes)"
2187)]
2188pub struct DataActorCore {
2189 pub actor_id: ActorId,
2191 pub config: DataActorConfig,
2193 trader_id: Option<TraderId>,
2194 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2197 topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2198 deltas_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDeltas>>,
2199 depth10_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDepth10>>,
2200 book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2201 quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2202 trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2203 bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2204 mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2205 index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2206 funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2207 option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2208 option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2209 order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2210 #[cfg(feature = "defi")]
2211 block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2212 #[cfg(feature = "defi")]
2213 pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2214 #[cfg(feature = "defi")]
2215 pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2216 #[cfg(feature = "defi")]
2217 pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2218 #[cfg(feature = "defi")]
2219 pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2220 #[cfg(feature = "defi")]
2221 pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2222 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2224 signal_classes: AHashMap<String, String>,
2225 #[cfg(feature = "indicators")]
2226 indicators: Indicators,
2227}
2228
2229impl Debug for DataActorCore {
2230 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2231 f.debug_struct(stringify!(DataActorCore))
2232 .field("actor_id", &self.actor_id)
2233 .field("config", &self.config)
2234 .field("state", &self.state)
2235 .field("trader_id", &self.trader_id)
2236 .finish()
2237 }
2238}
2239
2240impl DataActorCore {
2241 pub(crate) fn add_subscription_any(
2245 &mut self,
2246 topic: MStr<Topic>,
2247 handler: ShareableMessageHandler,
2248 ) {
2249 let pattern: MStr<Pattern> = topic.into();
2250 if self.topic_handlers.contains_key(&pattern) {
2251 log::warn!(
2252 "Actor {} attempted duplicate subscription to topic '{topic}'",
2253 self.actor_id,
2254 );
2255 return;
2256 }
2257
2258 self.topic_handlers.insert(pattern, handler.clone());
2259 msgbus::subscribe_any(pattern, handler, None);
2260 }
2261
2262 pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2266 let pattern: MStr<Pattern> = topic.into();
2267 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2268 msgbus::unsubscribe_any(pattern, &handler);
2269 } else {
2270 log::warn!(
2271 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2272 self.actor_id,
2273 );
2274 }
2275 }
2276
2277 pub(crate) fn add_quote_subscription(
2278 &mut self,
2279 topic: MStr<Topic>,
2280 handler: TypedHandler<QuoteTick>,
2281 ) {
2282 if self.quote_handlers.contains_key(&topic) {
2283 log::warn!(
2284 "Actor {} attempted duplicate quote subscription to '{topic}'",
2285 self.actor_id
2286 );
2287 return;
2288 }
2289 self.quote_handlers.insert(topic, handler.clone());
2290 msgbus::subscribe_quotes(topic.into(), handler, None);
2291 }
2292
2293 #[allow(dead_code)]
2294 pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2295 if let Some(handler) = self.quote_handlers.remove(&topic) {
2296 msgbus::unsubscribe_quotes(topic.into(), &handler);
2297 }
2298 }
2299
2300 pub(crate) fn add_trade_subscription(
2301 &mut self,
2302 topic: MStr<Topic>,
2303 handler: TypedHandler<TradeTick>,
2304 ) {
2305 if self.trade_handlers.contains_key(&topic) {
2306 log::warn!(
2307 "Actor {} attempted duplicate trade subscription to '{topic}'",
2308 self.actor_id
2309 );
2310 return;
2311 }
2312 self.trade_handlers.insert(topic, handler.clone());
2313 msgbus::subscribe_trades(topic.into(), handler, None);
2314 }
2315
2316 #[allow(dead_code)]
2317 pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2318 if let Some(handler) = self.trade_handlers.remove(&topic) {
2319 msgbus::unsubscribe_trades(topic.into(), &handler);
2320 }
2321 }
2322
2323 pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2324 if self.bar_handlers.contains_key(&topic) {
2325 log::warn!(
2326 "Actor {} attempted duplicate bar subscription to '{topic}'",
2327 self.actor_id
2328 );
2329 return;
2330 }
2331 self.bar_handlers.insert(topic, handler.clone());
2332 msgbus::subscribe_bars(topic.into(), handler, None);
2333 }
2334
2335 #[allow(dead_code)]
2336 pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2337 if let Some(handler) = self.bar_handlers.remove(&topic) {
2338 msgbus::unsubscribe_bars(topic.into(), &handler);
2339 }
2340 }
2341
2342 pub(crate) fn add_order_event_subscription(
2343 &mut self,
2344 topic: MStr<Topic>,
2345 handler: TypedHandler<OrderEventAny>,
2346 ) {
2347 if self.order_event_handlers.contains_key(&topic) {
2348 log::warn!(
2349 "Actor {} attempted duplicate order event subscription to '{topic}'",
2350 self.actor_id
2351 );
2352 return;
2353 }
2354 self.order_event_handlers.insert(topic, handler.clone());
2355 msgbus::subscribe_order_events(topic.into(), handler, None);
2356 }
2357
2358 #[allow(dead_code)]
2359 pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2360 if let Some(handler) = self.order_event_handlers.remove(&topic) {
2361 msgbus::unsubscribe_order_events(topic.into(), &handler);
2362 }
2363 }
2364
2365 pub(crate) fn add_deltas_subscription(
2366 &mut self,
2367 topic: MStr<Topic>,
2368 handler: TypedHandler<OrderBookDeltas>,
2369 ) {
2370 if self.deltas_handlers.contains_key(&topic) {
2371 log::warn!(
2372 "Actor {} attempted duplicate deltas subscription to '{topic}'",
2373 self.actor_id
2374 );
2375 return;
2376 }
2377 self.deltas_handlers.insert(topic, handler.clone());
2378 msgbus::subscribe_book_deltas(topic.into(), handler, None);
2379 }
2380
2381 #[allow(dead_code)]
2382 pub(crate) fn remove_deltas_subscription(&mut self, topic: MStr<Topic>) {
2383 if let Some(handler) = self.deltas_handlers.remove(&topic) {
2384 msgbus::unsubscribe_book_deltas(topic.into(), &handler);
2385 }
2386 }
2387
2388 #[allow(dead_code)]
2389 pub(crate) fn add_depth10_subscription(
2390 &mut self,
2391 topic: MStr<Topic>,
2392 handler: TypedHandler<OrderBookDepth10>,
2393 ) {
2394 if self.depth10_handlers.contains_key(&topic) {
2395 log::warn!(
2396 "Actor {} attempted duplicate depth10 subscription to '{topic}'",
2397 self.actor_id
2398 );
2399 return;
2400 }
2401 self.depth10_handlers.insert(topic, handler.clone());
2402 msgbus::subscribe_book_depth10(topic.into(), handler, None);
2403 }
2404
2405 #[allow(dead_code)]
2406 pub(crate) fn remove_depth10_subscription(&mut self, topic: MStr<Topic>) {
2407 if let Some(handler) = self.depth10_handlers.remove(&topic) {
2408 msgbus::unsubscribe_book_depth10(topic.into(), &handler);
2409 }
2410 }
2411
2412 pub(crate) fn add_instrument_subscription(
2413 &mut self,
2414 pattern: MStr<Pattern>,
2415 handler: ShareableMessageHandler,
2416 ) {
2417 if self.topic_handlers.contains_key(&pattern) {
2418 log::warn!(
2419 "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2420 self.actor_id
2421 );
2422 return;
2423 }
2424 self.topic_handlers.insert(pattern, handler.clone());
2425 msgbus::subscribe_any(pattern, handler, None);
2426 }
2427
2428 #[allow(dead_code)]
2429 pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2430 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2431 msgbus::unsubscribe_any(pattern, &handler);
2432 }
2433 }
2434
2435 pub(crate) fn add_instrument_close_subscription(
2436 &mut self,
2437 topic: MStr<Topic>,
2438 handler: ShareableMessageHandler,
2439 ) {
2440 let pattern: MStr<Pattern> = topic.into();
2441 if self.topic_handlers.contains_key(&pattern) {
2442 log::warn!(
2443 "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2444 self.actor_id
2445 );
2446 return;
2447 }
2448 self.topic_handlers.insert(pattern, handler.clone());
2449 msgbus::subscribe_any(pattern, handler, None);
2450 }
2451
2452 #[allow(dead_code)]
2453 pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2454 let pattern: MStr<Pattern> = topic.into();
2455 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2456 msgbus::unsubscribe_any(pattern, &handler);
2457 }
2458 }
2459
2460 pub(crate) fn add_book_snapshot_subscription(
2461 &mut self,
2462 topic: MStr<Topic>,
2463 handler: TypedHandler<OrderBook>,
2464 ) {
2465 if self.book_handlers.contains_key(&topic) {
2466 log::warn!(
2467 "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2468 self.actor_id
2469 );
2470 return;
2471 }
2472 self.book_handlers.insert(topic, handler.clone());
2473 msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2474 }
2475
2476 #[allow(dead_code)]
2477 pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2478 if let Some(handler) = self.book_handlers.remove(&topic) {
2479 msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2480 }
2481 }
2482
2483 pub(crate) fn add_mark_price_subscription(
2484 &mut self,
2485 topic: MStr<Topic>,
2486 handler: TypedHandler<MarkPriceUpdate>,
2487 ) {
2488 if self.mark_price_handlers.contains_key(&topic) {
2489 log::warn!(
2490 "Actor {} attempted duplicate mark price subscription to '{topic}'",
2491 self.actor_id
2492 );
2493 return;
2494 }
2495 self.mark_price_handlers.insert(topic, handler.clone());
2496 msgbus::subscribe_mark_prices(topic.into(), handler, None);
2497 }
2498
2499 #[allow(dead_code)]
2500 pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2501 if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2502 msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2503 }
2504 }
2505
2506 pub(crate) fn add_index_price_subscription(
2507 &mut self,
2508 topic: MStr<Topic>,
2509 handler: TypedHandler<IndexPriceUpdate>,
2510 ) {
2511 if self.index_price_handlers.contains_key(&topic) {
2512 log::warn!(
2513 "Actor {} attempted duplicate index price subscription to '{topic}'",
2514 self.actor_id
2515 );
2516 return;
2517 }
2518 self.index_price_handlers.insert(topic, handler.clone());
2519 msgbus::subscribe_index_prices(topic.into(), handler, None);
2520 }
2521
2522 #[allow(dead_code)]
2523 pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2524 if let Some(handler) = self.index_price_handlers.remove(&topic) {
2525 msgbus::unsubscribe_index_prices(topic.into(), &handler);
2526 }
2527 }
2528
2529 pub(crate) fn add_funding_rate_subscription(
2530 &mut self,
2531 topic: MStr<Topic>,
2532 handler: TypedHandler<FundingRateUpdate>,
2533 ) {
2534 if self.funding_rate_handlers.contains_key(&topic) {
2535 log::warn!(
2536 "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2537 self.actor_id
2538 );
2539 return;
2540 }
2541 self.funding_rate_handlers.insert(topic, handler.clone());
2542 msgbus::subscribe_funding_rates(topic.into(), handler, None);
2543 }
2544
2545 #[allow(dead_code)]
2546 pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2547 if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2548 msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2549 }
2550 }
2551
2552 pub(crate) fn add_option_greeks_subscription(
2553 &mut self,
2554 topic: MStr<Topic>,
2555 handler: TypedHandler<OptionGreeks>,
2556 ) {
2557 if self.option_greeks_handlers.contains_key(&topic) {
2558 log::warn!(
2559 "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2560 self.actor_id
2561 );
2562 return;
2563 }
2564 self.option_greeks_handlers.insert(topic, handler.clone());
2565 msgbus::subscribe_option_greeks(topic.into(), handler, None);
2566 }
2567
2568 #[allow(dead_code)]
2569 pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2570 if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2571 msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2572 }
2573 }
2574
2575 pub(crate) fn add_option_chain_subscription(
2576 &mut self,
2577 topic: MStr<Topic>,
2578 handler: TypedHandler<OptionChainSlice>,
2579 ) {
2580 if self.option_chain_handlers.contains_key(&topic) {
2581 log::warn!(
2582 "Actor {} attempted duplicate option chain subscription to '{topic}'",
2583 self.actor_id
2584 );
2585 return;
2586 }
2587 self.option_chain_handlers.insert(topic, handler.clone());
2588 msgbus::subscribe_option_chain(topic.into(), handler, None);
2589 }
2590
2591 pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2592 if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2593 msgbus::unsubscribe_option_chain(topic.into(), &handler);
2594 }
2595 }
2596
2597 #[cfg(feature = "defi")]
2598 pub(crate) fn add_block_subscription(
2599 &mut self,
2600 topic: MStr<Topic>,
2601 handler: TypedHandler<Block>,
2602 ) {
2603 if self.block_handlers.contains_key(&topic) {
2604 log::warn!(
2605 "Actor {} attempted duplicate block subscription to '{topic}'",
2606 self.actor_id
2607 );
2608 return;
2609 }
2610 self.block_handlers.insert(topic, handler.clone());
2611 msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2612 }
2613
2614 #[cfg(feature = "defi")]
2615 #[allow(dead_code)]
2616 pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2617 if let Some(handler) = self.block_handlers.remove(&topic) {
2618 msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2619 }
2620 }
2621
2622 #[cfg(feature = "defi")]
2623 pub(crate) fn add_pool_subscription(
2624 &mut self,
2625 topic: MStr<Topic>,
2626 handler: TypedHandler<Pool>,
2627 ) {
2628 if self.pool_handlers.contains_key(&topic) {
2629 log::warn!(
2630 "Actor {} attempted duplicate pool subscription to '{topic}'",
2631 self.actor_id
2632 );
2633 return;
2634 }
2635 self.pool_handlers.insert(topic, handler.clone());
2636 msgbus::subscribe_defi_pools(topic.into(), handler, None);
2637 }
2638
2639 #[cfg(feature = "defi")]
2640 #[allow(dead_code)]
2641 pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2642 if let Some(handler) = self.pool_handlers.remove(&topic) {
2643 msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2644 }
2645 }
2646
2647 #[cfg(feature = "defi")]
2648 pub(crate) fn add_pool_swap_subscription(
2649 &mut self,
2650 topic: MStr<Topic>,
2651 handler: TypedHandler<PoolSwap>,
2652 ) {
2653 if self.pool_swap_handlers.contains_key(&topic) {
2654 log::warn!(
2655 "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2656 self.actor_id
2657 );
2658 return;
2659 }
2660 self.pool_swap_handlers.insert(topic, handler.clone());
2661 msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2662 }
2663
2664 #[cfg(feature = "defi")]
2665 #[allow(dead_code)]
2666 pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2667 if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2668 msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2669 }
2670 }
2671
2672 #[cfg(feature = "defi")]
2673 pub(crate) fn add_pool_liquidity_subscription(
2674 &mut self,
2675 topic: MStr<Topic>,
2676 handler: TypedHandler<PoolLiquidityUpdate>,
2677 ) {
2678 if self.pool_liquidity_handlers.contains_key(&topic) {
2679 log::warn!(
2680 "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2681 self.actor_id
2682 );
2683 return;
2684 }
2685 self.pool_liquidity_handlers.insert(topic, handler.clone());
2686 msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2687 }
2688
2689 #[cfg(feature = "defi")]
2690 #[allow(dead_code)]
2691 pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2692 if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2693 msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2694 }
2695 }
2696
2697 #[cfg(feature = "defi")]
2698 pub(crate) fn add_pool_collect_subscription(
2699 &mut self,
2700 topic: MStr<Topic>,
2701 handler: TypedHandler<PoolFeeCollect>,
2702 ) {
2703 if self.pool_collect_handlers.contains_key(&topic) {
2704 log::warn!(
2705 "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2706 self.actor_id
2707 );
2708 return;
2709 }
2710 self.pool_collect_handlers.insert(topic, handler.clone());
2711 msgbus::subscribe_defi_collects(topic.into(), handler, None);
2712 }
2713
2714 #[cfg(feature = "defi")]
2715 #[allow(dead_code)]
2716 pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2717 if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2718 msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2719 }
2720 }
2721
2722 #[cfg(feature = "defi")]
2723 pub(crate) fn add_pool_flash_subscription(
2724 &mut self,
2725 topic: MStr<Topic>,
2726 handler: TypedHandler<PoolFlash>,
2727 ) {
2728 if self.pool_flash_handlers.contains_key(&topic) {
2729 log::warn!(
2730 "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2731 self.actor_id
2732 );
2733 return;
2734 }
2735 self.pool_flash_handlers.insert(topic, handler.clone());
2736 msgbus::subscribe_defi_flash(topic.into(), handler, None);
2737 }
2738
2739 #[cfg(feature = "defi")]
2740 #[allow(dead_code)]
2741 pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2742 if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2743 msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2744 }
2745 }
2746
2747 pub fn new(config: DataActorConfig) -> Self {
2749 let actor_id = config
2750 .actor_id
2751 .unwrap_or_else(|| Self::default_actor_id(&config));
2752
2753 Self {
2754 actor_id,
2755 config,
2756 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2760 topic_handlers: AHashMap::new(),
2761 deltas_handlers: AHashMap::new(),
2762 depth10_handlers: AHashMap::new(),
2763 book_handlers: AHashMap::new(),
2764 quote_handlers: AHashMap::new(),
2765 trade_handlers: AHashMap::new(),
2766 bar_handlers: AHashMap::new(),
2767 mark_price_handlers: AHashMap::new(),
2768 index_price_handlers: AHashMap::new(),
2769 funding_rate_handlers: AHashMap::new(),
2770 option_greeks_handlers: AHashMap::new(),
2771 option_chain_handlers: AHashMap::new(),
2772 order_event_handlers: AHashMap::new(),
2773 #[cfg(feature = "defi")]
2774 block_handlers: AHashMap::new(),
2775 #[cfg(feature = "defi")]
2776 pool_handlers: AHashMap::new(),
2777 #[cfg(feature = "defi")]
2778 pool_swap_handlers: AHashMap::new(),
2779 #[cfg(feature = "defi")]
2780 pool_liquidity_handlers: AHashMap::new(),
2781 #[cfg(feature = "defi")]
2782 pool_collect_handlers: AHashMap::new(),
2783 #[cfg(feature = "defi")]
2784 pool_flash_handlers: AHashMap::new(),
2785 warning_events: AHashSet::new(),
2786 pending_requests: AHashMap::new(),
2787 signal_classes: AHashMap::new(),
2788 #[cfg(feature = "indicators")]
2789 indicators: Indicators::default(),
2790 }
2791 }
2792
2793 #[must_use]
2795 pub fn mem_address(&self) -> String {
2796 format!("{self:p}")
2797 }
2798
2799 pub fn state(&self) -> ComponentState {
2801 self.state
2802 }
2803
2804 pub fn trader_id(&self) -> Option<TraderId> {
2806 self.trader_id
2807 }
2808
2809 pub fn actor_id(&self) -> ActorId {
2811 self.actor_id
2812 }
2813
2814 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2815 let memory_address = std::ptr::from_ref(config) as usize;
2816 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2817 }
2818
2819 pub fn timestamp_ns(&self) -> UnixNanos {
2821 self.clock_ref().timestamp_ns()
2822 }
2823
2824 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2830 self.clock
2831 .as_ref()
2832 .unwrap_or_else(|| {
2833 panic!(
2834 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2835 self.actor_id, self.trader_id
2836 )
2837 })
2838 .borrow_mut()
2839 }
2840
2841 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2847 self.clock
2848 .as_ref()
2849 .expect("DataActor must be registered before accessing clock")
2850 .clone()
2851 }
2852
2853 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2854 self.clock
2855 .as_ref()
2856 .unwrap_or_else(|| {
2857 panic!(
2858 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2859 self.actor_id, self.trader_id
2860 )
2861 })
2862 .borrow()
2863 }
2864
2865 pub fn cache(&self) -> Ref<'_, Cache> {
2871 self.cache
2872 .as_ref()
2873 .expect("DataActor must be registered before accessing cache")
2874 .borrow()
2875 }
2876
2877 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2883 self.cache
2884 .as_ref()
2885 .expect("DataActor must be registered before accessing cache")
2886 .clone()
2887 }
2888
2889 pub fn register(
2898 &mut self,
2899 trader_id: TraderId,
2900 clock: Rc<RefCell<dyn Clock>>,
2901 cache: Rc<RefCell<Cache>>,
2902 ) -> anyhow::Result<()> {
2903 if let Some(existing_trader_id) = self.trader_id {
2904 anyhow::bail!(
2905 "DataActor {} already registered with trader {existing_trader_id}",
2906 self.actor_id
2907 );
2908 }
2909
2910 {
2912 let _timestamp = clock.borrow().timestamp_ns();
2913 }
2914
2915 {
2917 let _cache_borrow = cache.borrow();
2918 }
2919
2920 self.trader_id = Some(trader_id);
2921 self.clock = Some(clock);
2922 self.cache = Some(cache);
2923
2924 if !self.is_properly_registered() {
2926 anyhow::bail!(
2927 "DataActor {} registration incomplete - validation failed",
2928 self.actor_id
2929 );
2930 }
2931
2932 log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2933 Ok(())
2934 }
2935
2936 pub fn register_warning_event(&mut self, event_type: &str) {
2938 self.warning_events.insert(event_type.to_string());
2939 log::debug!("Registered event type '{event_type}' for warning logs");
2940 }
2941
2942 pub fn deregister_warning_event(&mut self, event_type: &str) {
2944 self.warning_events.remove(event_type);
2945 log::debug!("Deregistered event type '{event_type}' from warning logs");
2946 }
2947
2948 pub fn is_registered(&self) -> bool {
2949 self.trader_id.is_some()
2950 }
2951
2952 pub(crate) fn check_registered(&self) {
2953 assert!(
2954 self.is_registered(),
2955 "Actor has not been registered with a Trader"
2956 );
2957 }
2958
2959 fn is_properly_registered(&self) -> bool {
2961 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2962 }
2963
2964 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2965 if self.config.log_commands {
2966 log::info!("{CMD}{SEND} {command:?}");
2967 }
2968
2969 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2970 msgbus::send_data_command(endpoint, command);
2971 }
2972
2973 #[allow(dead_code)]
2974 fn send_data_req(&self, request: &RequestCommand) {
2975 if self.config.log_commands {
2976 log::info!("{REQ}{SEND} {request:?}");
2977 }
2978
2979 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2982 msgbus::send_any(endpoint, request.as_any());
2983 }
2984
2985 pub fn shutdown_system(&self, reason: Option<String>) {
2991 self.check_registered();
2992
2993 let command = ShutdownSystem::new(
2995 self.trader_id().unwrap(),
2996 self.actor_id.inner(),
2997 reason,
2998 UUID4::new(),
2999 self.timestamp_ns(),
3000 );
3001
3002 let endpoint = "command.system.shutdown".into();
3003 msgbus::send_any(endpoint, command.as_any());
3004 }
3005
3006 pub fn subscribe_data(
3014 &mut self,
3015 handler: ShareableMessageHandler,
3016 data_type: DataType,
3017 client_id: Option<ClientId>,
3018 params: Option<Params>,
3019 ) {
3020 assert!(
3021 self.is_properly_registered(),
3022 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3023 self.actor_id,
3024 self.trader_id,
3025 self.clock.is_some(),
3026 self.cache.is_some()
3027 );
3028
3029 let topic = get_custom_topic(&data_type);
3030 self.add_subscription_any(topic, handler);
3031
3032 if client_id.is_none() {
3034 return;
3035 }
3036
3037 let command = SubscribeCommand::Data(SubscribeCustomData {
3038 data_type,
3039 client_id,
3040 venue: None,
3041 command_id: UUID4::new(),
3042 ts_init: self.timestamp_ns(),
3043 correlation_id: None,
3044 params,
3045 });
3046
3047 self.send_data_cmd(DataCommand::Subscribe(command));
3048 }
3049
3050 pub fn subscribe_quotes(
3052 &mut self,
3053 topic: MStr<Topic>,
3054 handler: TypedHandler<QuoteTick>,
3055 instrument_id: InstrumentId,
3056 client_id: Option<ClientId>,
3057 params: Option<Params>,
3058 ) {
3059 self.check_registered();
3060
3061 self.add_quote_subscription(topic, handler);
3062
3063 let command = SubscribeCommand::Quotes(SubscribeQuotes {
3064 instrument_id,
3065 client_id,
3066 venue: Some(instrument_id.venue),
3067 command_id: UUID4::new(),
3068 ts_init: self.timestamp_ns(),
3069 correlation_id: None,
3070 params,
3071 });
3072
3073 self.send_data_cmd(DataCommand::Subscribe(command));
3074 }
3075
3076 pub fn subscribe_instruments(
3078 &mut self,
3079 pattern: MStr<Pattern>,
3080 handler: ShareableMessageHandler,
3081 venue: Venue,
3082 client_id: Option<ClientId>,
3083 params: Option<Params>,
3084 ) {
3085 self.check_registered();
3086
3087 self.add_instrument_subscription(pattern, handler);
3088
3089 let command = SubscribeCommand::Instruments(SubscribeInstruments {
3090 client_id,
3091 venue,
3092 command_id: UUID4::new(),
3093 ts_init: self.timestamp_ns(),
3094 correlation_id: None,
3095 params,
3096 });
3097
3098 self.send_data_cmd(DataCommand::Subscribe(command));
3099 }
3100
3101 pub fn subscribe_instrument(
3103 &mut self,
3104 topic: MStr<Topic>,
3105 handler: ShareableMessageHandler,
3106 instrument_id: InstrumentId,
3107 client_id: Option<ClientId>,
3108 params: Option<Params>,
3109 ) {
3110 self.check_registered();
3111
3112 self.add_instrument_subscription(topic.into(), handler);
3113
3114 let command = SubscribeCommand::Instrument(SubscribeInstrument {
3115 instrument_id,
3116 client_id,
3117 venue: Some(instrument_id.venue),
3118 command_id: UUID4::new(),
3119 ts_init: self.timestamp_ns(),
3120 correlation_id: None,
3121 params,
3122 });
3123
3124 self.send_data_cmd(DataCommand::Subscribe(command));
3125 }
3126
3127 #[allow(clippy::too_many_arguments)]
3129 pub fn subscribe_book_deltas(
3130 &mut self,
3131 topic: MStr<Topic>,
3132 handler: TypedHandler<OrderBookDeltas>,
3133 instrument_id: InstrumentId,
3134 book_type: BookType,
3135 depth: Option<NonZeroUsize>,
3136 client_id: Option<ClientId>,
3137 managed: bool,
3138 params: Option<Params>,
3139 ) {
3140 self.check_registered();
3141
3142 self.add_deltas_subscription(topic, handler);
3143
3144 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3145 instrument_id,
3146 book_type,
3147 client_id,
3148 venue: Some(instrument_id.venue),
3149 command_id: UUID4::new(),
3150 ts_init: self.timestamp_ns(),
3151 depth,
3152 managed,
3153 correlation_id: None,
3154 params,
3155 });
3156
3157 self.send_data_cmd(DataCommand::Subscribe(command));
3158 }
3159
3160 #[allow(clippy::too_many_arguments)]
3162 pub fn subscribe_book_at_interval(
3163 &mut self,
3164 topic: MStr<Topic>,
3165 handler: TypedHandler<OrderBook>,
3166 instrument_id: InstrumentId,
3167 book_type: BookType,
3168 depth: Option<NonZeroUsize>,
3169 interval_ms: NonZeroUsize,
3170 client_id: Option<ClientId>,
3171 params: Option<Params>,
3172 ) {
3173 self.check_registered();
3174
3175 self.add_book_snapshot_subscription(topic, handler);
3176
3177 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3178 instrument_id,
3179 book_type,
3180 client_id,
3181 venue: Some(instrument_id.venue),
3182 command_id: UUID4::new(),
3183 ts_init: self.timestamp_ns(),
3184 depth,
3185 interval_ms,
3186 correlation_id: None,
3187 params,
3188 });
3189
3190 self.send_data_cmd(DataCommand::Subscribe(command));
3191 }
3192
3193 pub fn subscribe_trades(
3195 &mut self,
3196 topic: MStr<Topic>,
3197 handler: TypedHandler<TradeTick>,
3198 instrument_id: InstrumentId,
3199 client_id: Option<ClientId>,
3200 params: Option<Params>,
3201 ) {
3202 self.check_registered();
3203
3204 self.add_trade_subscription(topic, handler);
3205
3206 let command = SubscribeCommand::Trades(SubscribeTrades {
3207 instrument_id,
3208 client_id,
3209 venue: Some(instrument_id.venue),
3210 command_id: UUID4::new(),
3211 ts_init: self.timestamp_ns(),
3212 correlation_id: None,
3213 params,
3214 });
3215
3216 self.send_data_cmd(DataCommand::Subscribe(command));
3217 }
3218
3219 pub fn subscribe_bars(
3221 &mut self,
3222 topic: MStr<Topic>,
3223 handler: TypedHandler<Bar>,
3224 bar_type: BarType,
3225 client_id: Option<ClientId>,
3226 params: Option<Params>,
3227 ) {
3228 self.check_registered();
3229
3230 self.add_bar_subscription(topic, handler);
3231
3232 let command = SubscribeCommand::Bars(SubscribeBars {
3233 bar_type,
3234 client_id,
3235 venue: Some(bar_type.instrument_id().venue),
3236 command_id: UUID4::new(),
3237 ts_init: self.timestamp_ns(),
3238 correlation_id: None,
3239 params,
3240 });
3241
3242 self.send_data_cmd(DataCommand::Subscribe(command));
3243 }
3244
3245 pub fn subscribe_mark_prices(
3247 &mut self,
3248 topic: MStr<Topic>,
3249 handler: TypedHandler<MarkPriceUpdate>,
3250 instrument_id: InstrumentId,
3251 client_id: Option<ClientId>,
3252 params: Option<Params>,
3253 ) {
3254 self.check_registered();
3255
3256 self.add_mark_price_subscription(topic, handler);
3257
3258 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3259 instrument_id,
3260 client_id,
3261 venue: Some(instrument_id.venue),
3262 command_id: UUID4::new(),
3263 ts_init: self.timestamp_ns(),
3264 correlation_id: None,
3265 params,
3266 });
3267
3268 self.send_data_cmd(DataCommand::Subscribe(command));
3269 }
3270
3271 pub fn subscribe_index_prices(
3273 &mut self,
3274 topic: MStr<Topic>,
3275 handler: TypedHandler<IndexPriceUpdate>,
3276 instrument_id: InstrumentId,
3277 client_id: Option<ClientId>,
3278 params: Option<Params>,
3279 ) {
3280 self.check_registered();
3281
3282 self.add_index_price_subscription(topic, handler);
3283
3284 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3285 instrument_id,
3286 client_id,
3287 venue: Some(instrument_id.venue),
3288 command_id: UUID4::new(),
3289 ts_init: self.timestamp_ns(),
3290 correlation_id: None,
3291 params,
3292 });
3293
3294 self.send_data_cmd(DataCommand::Subscribe(command));
3295 }
3296
3297 pub fn subscribe_funding_rates(
3299 &mut self,
3300 topic: MStr<Topic>,
3301 handler: TypedHandler<FundingRateUpdate>,
3302 instrument_id: InstrumentId,
3303 client_id: Option<ClientId>,
3304 params: Option<Params>,
3305 ) {
3306 self.check_registered();
3307
3308 self.add_funding_rate_subscription(topic, handler);
3309
3310 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3311 instrument_id,
3312 client_id,
3313 venue: Some(instrument_id.venue),
3314 command_id: UUID4::new(),
3315 ts_init: self.timestamp_ns(),
3316 correlation_id: None,
3317 params,
3318 });
3319
3320 self.send_data_cmd(DataCommand::Subscribe(command));
3321 }
3322
3323 pub fn subscribe_option_greeks(
3325 &mut self,
3326 topic: MStr<Topic>,
3327 handler: TypedHandler<OptionGreeks>,
3328 instrument_id: InstrumentId,
3329 client_id: Option<ClientId>,
3330 params: Option<Params>,
3331 ) {
3332 self.check_registered();
3333
3334 self.add_option_greeks_subscription(topic, handler);
3335
3336 let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3337 instrument_id,
3338 client_id,
3339 venue: Some(instrument_id.venue),
3340 command_id: UUID4::new(),
3341 ts_init: self.timestamp_ns(),
3342 correlation_id: None,
3343 params,
3344 });
3345
3346 self.send_data_cmd(DataCommand::Subscribe(command));
3347 }
3348
3349 pub fn subscribe_instrument_status(
3351 &mut self,
3352 topic: MStr<Topic>,
3353 handler: ShareableMessageHandler,
3354 instrument_id: InstrumentId,
3355 client_id: Option<ClientId>,
3356 params: Option<Params>,
3357 ) {
3358 self.check_registered();
3359
3360 self.add_subscription_any(topic, handler);
3361
3362 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3363 instrument_id,
3364 client_id,
3365 venue: Some(instrument_id.venue),
3366 command_id: UUID4::new(),
3367 ts_init: self.timestamp_ns(),
3368 correlation_id: None,
3369 params,
3370 });
3371
3372 self.send_data_cmd(DataCommand::Subscribe(command));
3373 }
3374
3375 pub fn subscribe_instrument_close(
3377 &mut self,
3378 topic: MStr<Topic>,
3379 handler: ShareableMessageHandler,
3380 instrument_id: InstrumentId,
3381 client_id: Option<ClientId>,
3382 params: Option<Params>,
3383 ) {
3384 self.check_registered();
3385
3386 self.add_instrument_close_subscription(topic, handler);
3387
3388 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3389 instrument_id,
3390 client_id,
3391 venue: Some(instrument_id.venue),
3392 command_id: UUID4::new(),
3393 ts_init: self.timestamp_ns(),
3394 correlation_id: None,
3395 params,
3396 });
3397
3398 self.send_data_cmd(DataCommand::Subscribe(command));
3399 }
3400
3401 #[allow(clippy::too_many_arguments)]
3403 pub fn subscribe_option_chain(
3404 &mut self,
3405 topic: MStr<Topic>,
3406 handler: TypedHandler<OptionChainSlice>,
3407 series_id: OptionSeriesId,
3408 strike_range: StrikeRange,
3409 snapshot_interval_ms: Option<u64>,
3410 client_id: Option<ClientId>,
3411 ) {
3412 self.check_registered();
3413
3414 self.add_option_chain_subscription(topic, handler);
3415
3416 let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3417 series_id,
3418 strike_range,
3419 snapshot_interval_ms,
3420 UUID4::new(),
3421 self.timestamp_ns(),
3422 client_id,
3423 Some(series_id.venue),
3424 ));
3425
3426 self.send_data_cmd(DataCommand::Subscribe(command));
3427 }
3428
3429 pub fn subscribe_order_fills(
3431 &mut self,
3432 topic: MStr<Topic>,
3433 handler: TypedHandler<OrderEventAny>,
3434 ) {
3435 self.check_registered();
3436 self.add_order_event_subscription(topic, handler);
3437 }
3438
3439 pub fn subscribe_order_cancels(
3441 &mut self,
3442 topic: MStr<Topic>,
3443 handler: TypedHandler<OrderEventAny>,
3444 ) {
3445 self.check_registered();
3446 self.add_order_event_subscription(topic, handler);
3447 }
3448
3449 pub fn unsubscribe_data(
3451 &mut self,
3452 data_type: DataType,
3453 client_id: Option<ClientId>,
3454 params: Option<Params>,
3455 ) {
3456 self.check_registered();
3457
3458 let topic = get_custom_topic(&data_type);
3459 self.remove_subscription_any(topic);
3460
3461 if client_id.is_none() {
3462 return;
3463 }
3464
3465 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3466 data_type,
3467 client_id,
3468 venue: None,
3469 command_id: UUID4::new(),
3470 ts_init: self.timestamp_ns(),
3471 correlation_id: None,
3472 params,
3473 });
3474
3475 self.send_data_cmd(DataCommand::Unsubscribe(command));
3476 }
3477
3478 pub fn unsubscribe_instruments(
3480 &mut self,
3481 venue: Venue,
3482 client_id: Option<ClientId>,
3483 params: Option<Params>,
3484 ) {
3485 self.check_registered();
3486
3487 let pattern = get_instruments_pattern(venue);
3488 self.remove_instrument_subscription(pattern);
3489
3490 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3491 client_id,
3492 venue,
3493 command_id: UUID4::new(),
3494 ts_init: self.timestamp_ns(),
3495 correlation_id: None,
3496 params,
3497 });
3498
3499 self.send_data_cmd(DataCommand::Unsubscribe(command));
3500 }
3501
3502 pub fn unsubscribe_instrument(
3504 &mut self,
3505 instrument_id: InstrumentId,
3506 client_id: Option<ClientId>,
3507 params: Option<Params>,
3508 ) {
3509 self.check_registered();
3510
3511 let topic = get_instrument_topic(instrument_id);
3512 self.remove_instrument_subscription(topic.into());
3513
3514 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3515 instrument_id,
3516 client_id,
3517 venue: Some(instrument_id.venue),
3518 command_id: UUID4::new(),
3519 ts_init: self.timestamp_ns(),
3520 correlation_id: None,
3521 params,
3522 });
3523
3524 self.send_data_cmd(DataCommand::Unsubscribe(command));
3525 }
3526
3527 pub fn unsubscribe_book_deltas(
3529 &mut self,
3530 instrument_id: InstrumentId,
3531 client_id: Option<ClientId>,
3532 params: Option<Params>,
3533 ) {
3534 self.check_registered();
3535
3536 let topic = get_book_deltas_topic(instrument_id);
3537 self.remove_deltas_subscription(topic);
3538
3539 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3540 instrument_id,
3541 client_id,
3542 venue: Some(instrument_id.venue),
3543 command_id: UUID4::new(),
3544 ts_init: self.timestamp_ns(),
3545 correlation_id: None,
3546 params,
3547 });
3548
3549 self.send_data_cmd(DataCommand::Unsubscribe(command));
3550 }
3551
3552 pub fn unsubscribe_book_at_interval(
3554 &mut self,
3555 instrument_id: InstrumentId,
3556 interval_ms: NonZeroUsize,
3557 client_id: Option<ClientId>,
3558 params: Option<Params>,
3559 ) {
3560 self.check_registered();
3561
3562 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3563 self.remove_book_snapshot_subscription(topic);
3564
3565 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3566 instrument_id,
3567 client_id,
3568 venue: Some(instrument_id.venue),
3569 command_id: UUID4::new(),
3570 ts_init: self.timestamp_ns(),
3571 correlation_id: None,
3572 params,
3573 });
3574
3575 self.send_data_cmd(DataCommand::Unsubscribe(command));
3576 }
3577
3578 pub fn unsubscribe_quotes(
3580 &mut self,
3581 instrument_id: InstrumentId,
3582 client_id: Option<ClientId>,
3583 params: Option<Params>,
3584 ) {
3585 self.check_registered();
3586
3587 let topic = get_quotes_topic(instrument_id);
3588 self.remove_quote_subscription(topic);
3589
3590 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3591 instrument_id,
3592 client_id,
3593 venue: Some(instrument_id.venue),
3594 command_id: UUID4::new(),
3595 ts_init: self.timestamp_ns(),
3596 correlation_id: None,
3597 params,
3598 });
3599
3600 self.send_data_cmd(DataCommand::Unsubscribe(command));
3601 }
3602
3603 pub fn unsubscribe_trades(
3605 &mut self,
3606 instrument_id: InstrumentId,
3607 client_id: Option<ClientId>,
3608 params: Option<Params>,
3609 ) {
3610 self.check_registered();
3611
3612 let topic = get_trades_topic(instrument_id);
3613 self.remove_trade_subscription(topic);
3614
3615 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3616 instrument_id,
3617 client_id,
3618 venue: Some(instrument_id.venue),
3619 command_id: UUID4::new(),
3620 ts_init: self.timestamp_ns(),
3621 correlation_id: None,
3622 params,
3623 });
3624
3625 self.send_data_cmd(DataCommand::Unsubscribe(command));
3626 }
3627
3628 pub fn unsubscribe_bars(
3630 &mut self,
3631 bar_type: BarType,
3632 client_id: Option<ClientId>,
3633 params: Option<Params>,
3634 ) {
3635 self.check_registered();
3636
3637 let topic = get_bars_topic(bar_type);
3638 self.remove_bar_subscription(topic);
3639
3640 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3641 bar_type,
3642 client_id,
3643 venue: Some(bar_type.instrument_id().venue),
3644 command_id: UUID4::new(),
3645 ts_init: self.timestamp_ns(),
3646 correlation_id: None,
3647 params,
3648 });
3649
3650 self.send_data_cmd(DataCommand::Unsubscribe(command));
3651 }
3652
3653 pub fn unsubscribe_mark_prices(
3655 &mut self,
3656 instrument_id: InstrumentId,
3657 client_id: Option<ClientId>,
3658 params: Option<Params>,
3659 ) {
3660 self.check_registered();
3661
3662 let topic = get_mark_price_topic(instrument_id);
3663 self.remove_mark_price_subscription(topic);
3664
3665 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3666 instrument_id,
3667 client_id,
3668 venue: Some(instrument_id.venue),
3669 command_id: UUID4::new(),
3670 ts_init: self.timestamp_ns(),
3671 correlation_id: None,
3672 params,
3673 });
3674
3675 self.send_data_cmd(DataCommand::Unsubscribe(command));
3676 }
3677
3678 pub fn unsubscribe_index_prices(
3680 &mut self,
3681 instrument_id: InstrumentId,
3682 client_id: Option<ClientId>,
3683 params: Option<Params>,
3684 ) {
3685 self.check_registered();
3686
3687 let topic = get_index_price_topic(instrument_id);
3688 self.remove_index_price_subscription(topic);
3689
3690 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3691 instrument_id,
3692 client_id,
3693 venue: Some(instrument_id.venue),
3694 command_id: UUID4::new(),
3695 ts_init: self.timestamp_ns(),
3696 correlation_id: None,
3697 params,
3698 });
3699
3700 self.send_data_cmd(DataCommand::Unsubscribe(command));
3701 }
3702
3703 pub fn unsubscribe_funding_rates(
3705 &mut self,
3706 instrument_id: InstrumentId,
3707 client_id: Option<ClientId>,
3708 params: Option<Params>,
3709 ) {
3710 self.check_registered();
3711
3712 let topic = get_funding_rate_topic(instrument_id);
3713 self.remove_funding_rate_subscription(topic);
3714
3715 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3716 instrument_id,
3717 client_id,
3718 venue: Some(instrument_id.venue),
3719 command_id: UUID4::new(),
3720 ts_init: self.timestamp_ns(),
3721 correlation_id: None,
3722 params,
3723 });
3724
3725 self.send_data_cmd(DataCommand::Unsubscribe(command));
3726 }
3727
3728 pub fn unsubscribe_option_greeks(
3730 &mut self,
3731 instrument_id: InstrumentId,
3732 client_id: Option<ClientId>,
3733 params: Option<Params>,
3734 ) {
3735 self.check_registered();
3736
3737 let topic = get_option_greeks_topic(instrument_id);
3738 self.remove_option_greeks_subscription(topic);
3739
3740 let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
3741 instrument_id,
3742 client_id,
3743 venue: Some(instrument_id.venue),
3744 command_id: UUID4::new(),
3745 ts_init: self.timestamp_ns(),
3746 correlation_id: None,
3747 params,
3748 });
3749
3750 self.send_data_cmd(DataCommand::Unsubscribe(command));
3751 }
3752
3753 pub fn unsubscribe_instrument_status(
3755 &mut self,
3756 instrument_id: InstrumentId,
3757 client_id: Option<ClientId>,
3758 params: Option<Params>,
3759 ) {
3760 self.check_registered();
3761
3762 let topic = get_instrument_status_topic(instrument_id);
3763 self.remove_subscription_any(topic);
3764
3765 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3766 instrument_id,
3767 client_id,
3768 venue: Some(instrument_id.venue),
3769 command_id: UUID4::new(),
3770 ts_init: self.timestamp_ns(),
3771 correlation_id: None,
3772 params,
3773 });
3774
3775 self.send_data_cmd(DataCommand::Unsubscribe(command));
3776 }
3777
3778 pub fn unsubscribe_instrument_close(
3780 &mut self,
3781 instrument_id: InstrumentId,
3782 client_id: Option<ClientId>,
3783 params: Option<Params>,
3784 ) {
3785 self.check_registered();
3786
3787 let topic = get_instrument_close_topic(instrument_id);
3788 self.remove_instrument_close_subscription(topic);
3789
3790 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3791 instrument_id,
3792 client_id,
3793 venue: Some(instrument_id.venue),
3794 command_id: UUID4::new(),
3795 ts_init: self.timestamp_ns(),
3796 correlation_id: None,
3797 params,
3798 });
3799
3800 self.send_data_cmd(DataCommand::Unsubscribe(command));
3801 }
3802
3803 pub fn unsubscribe_option_chain(
3805 &mut self,
3806 series_id: OptionSeriesId,
3807 client_id: Option<ClientId>,
3808 ) {
3809 self.check_registered();
3810
3811 let topic = get_option_chain_topic(series_id);
3812 self.remove_option_chain_subscription(topic);
3813
3814 let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
3815 series_id,
3816 UUID4::new(),
3817 self.timestamp_ns(),
3818 client_id,
3819 Some(series_id.venue),
3820 ));
3821
3822 self.send_data_cmd(DataCommand::Unsubscribe(command));
3823 }
3824
3825 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3827 self.check_registered();
3828
3829 let topic = get_order_fills_topic(instrument_id);
3830 self.remove_order_event_subscription(topic);
3831 }
3832
3833 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3835 self.check_registered();
3836
3837 let topic = get_order_cancels_topic(instrument_id);
3838 self.remove_order_event_subscription(topic);
3839 }
3840
3841 #[allow(clippy::too_many_arguments)]
3847 pub fn request_data(
3848 &self,
3849 data_type: DataType,
3850 client_id: ClientId,
3851 start: Option<DateTime<Utc>>,
3852 end: Option<DateTime<Utc>>,
3853 limit: Option<NonZeroUsize>,
3854 params: Option<Params>,
3855 handler: ShareableMessageHandler,
3856 ) -> anyhow::Result<UUID4> {
3857 self.check_registered();
3858
3859 let now = self.clock_ref().utc_now();
3860 check_timestamps(now, start, end)?;
3861
3862 let request_id = UUID4::new();
3863 let command = RequestCommand::Data(RequestCustomData {
3864 client_id,
3865 data_type,
3866 start,
3867 end,
3868 limit,
3869 request_id,
3870 ts_init: self.timestamp_ns(),
3871 params,
3872 });
3873
3874 get_message_bus()
3875 .borrow_mut()
3876 .register_response_handler(command.request_id(), handler)?;
3877
3878 self.send_data_cmd(DataCommand::Request(command));
3879
3880 Ok(request_id)
3881 }
3882
3883 pub fn request_instrument(
3889 &self,
3890 instrument_id: InstrumentId,
3891 start: Option<DateTime<Utc>>,
3892 end: Option<DateTime<Utc>>,
3893 client_id: Option<ClientId>,
3894 params: Option<Params>,
3895 handler: ShareableMessageHandler,
3896 ) -> anyhow::Result<UUID4> {
3897 self.check_registered();
3898
3899 let now = self.clock_ref().utc_now();
3900 check_timestamps(now, start, end)?;
3901
3902 let request_id = UUID4::new();
3903 let command = RequestCommand::Instrument(RequestInstrument {
3904 instrument_id,
3905 start,
3906 end,
3907 client_id,
3908 request_id,
3909 ts_init: now.into(),
3910 params,
3911 });
3912
3913 get_message_bus()
3914 .borrow_mut()
3915 .register_response_handler(command.request_id(), handler)?;
3916
3917 self.send_data_cmd(DataCommand::Request(command));
3918
3919 Ok(request_id)
3920 }
3921
3922 pub fn request_instruments(
3928 &self,
3929 venue: Option<Venue>,
3930 start: Option<DateTime<Utc>>,
3931 end: Option<DateTime<Utc>>,
3932 client_id: Option<ClientId>,
3933 params: Option<Params>,
3934 handler: ShareableMessageHandler,
3935 ) -> anyhow::Result<UUID4> {
3936 self.check_registered();
3937
3938 let now = self.clock_ref().utc_now();
3939 check_timestamps(now, start, end)?;
3940
3941 let request_id = UUID4::new();
3942 let command = RequestCommand::Instruments(RequestInstruments {
3943 venue,
3944 start,
3945 end,
3946 client_id,
3947 request_id,
3948 ts_init: now.into(),
3949 params,
3950 });
3951
3952 get_message_bus()
3953 .borrow_mut()
3954 .register_response_handler(command.request_id(), handler)?;
3955
3956 self.send_data_cmd(DataCommand::Request(command));
3957
3958 Ok(request_id)
3959 }
3960
3961 pub fn request_book_snapshot(
3967 &self,
3968 instrument_id: InstrumentId,
3969 depth: Option<NonZeroUsize>,
3970 client_id: Option<ClientId>,
3971 params: Option<Params>,
3972 handler: ShareableMessageHandler,
3973 ) -> anyhow::Result<UUID4> {
3974 self.check_registered();
3975
3976 let request_id = UUID4::new();
3977 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3978 instrument_id,
3979 depth,
3980 client_id,
3981 request_id,
3982 ts_init: self.timestamp_ns(),
3983 params,
3984 });
3985
3986 get_message_bus()
3987 .borrow_mut()
3988 .register_response_handler(command.request_id(), handler)?;
3989
3990 self.send_data_cmd(DataCommand::Request(command));
3991
3992 Ok(request_id)
3993 }
3994
3995 #[allow(clippy::too_many_arguments)]
4001 pub fn request_quotes(
4002 &self,
4003 instrument_id: InstrumentId,
4004 start: Option<DateTime<Utc>>,
4005 end: Option<DateTime<Utc>>,
4006 limit: Option<NonZeroUsize>,
4007 client_id: Option<ClientId>,
4008 params: Option<Params>,
4009 handler: ShareableMessageHandler,
4010 ) -> anyhow::Result<UUID4> {
4011 self.check_registered();
4012
4013 let now = self.clock_ref().utc_now();
4014 check_timestamps(now, start, end)?;
4015
4016 let request_id = UUID4::new();
4017 let command = RequestCommand::Quotes(RequestQuotes {
4018 instrument_id,
4019 start,
4020 end,
4021 limit,
4022 client_id,
4023 request_id,
4024 ts_init: now.into(),
4025 params,
4026 });
4027
4028 get_message_bus()
4029 .borrow_mut()
4030 .register_response_handler(command.request_id(), handler)?;
4031
4032 self.send_data_cmd(DataCommand::Request(command));
4033
4034 Ok(request_id)
4035 }
4036
4037 #[allow(clippy::too_many_arguments)]
4043 pub fn request_trades(
4044 &self,
4045 instrument_id: InstrumentId,
4046 start: Option<DateTime<Utc>>,
4047 end: Option<DateTime<Utc>>,
4048 limit: Option<NonZeroUsize>,
4049 client_id: Option<ClientId>,
4050 params: Option<Params>,
4051 handler: ShareableMessageHandler,
4052 ) -> anyhow::Result<UUID4> {
4053 self.check_registered();
4054
4055 let now = self.clock_ref().utc_now();
4056 check_timestamps(now, start, end)?;
4057
4058 let request_id = UUID4::new();
4059 let command = RequestCommand::Trades(RequestTrades {
4060 instrument_id,
4061 start,
4062 end,
4063 limit,
4064 client_id,
4065 request_id,
4066 ts_init: now.into(),
4067 params,
4068 });
4069
4070 get_message_bus()
4071 .borrow_mut()
4072 .register_response_handler(command.request_id(), handler)?;
4073
4074 self.send_data_cmd(DataCommand::Request(command));
4075
4076 Ok(request_id)
4077 }
4078
4079 #[allow(clippy::too_many_arguments)]
4085 pub fn request_funding_rates(
4086 &self,
4087 instrument_id: InstrumentId,
4088 start: Option<DateTime<Utc>>,
4089 end: Option<DateTime<Utc>>,
4090 limit: Option<NonZeroUsize>,
4091 client_id: Option<ClientId>,
4092 params: Option<Params>,
4093 handler: ShareableMessageHandler,
4094 ) -> anyhow::Result<UUID4> {
4095 self.check_registered();
4096
4097 let now = self.clock_ref().utc_now();
4098 check_timestamps(now, start, end)?;
4099
4100 let request_id = UUID4::new();
4101 let command = RequestCommand::FundingRates(RequestFundingRates {
4102 instrument_id,
4103 start,
4104 end,
4105 limit,
4106 client_id,
4107 request_id,
4108 ts_init: now.into(),
4109 params,
4110 });
4111
4112 get_message_bus()
4113 .borrow_mut()
4114 .register_response_handler(command.request_id(), handler)?;
4115
4116 self.send_data_cmd(DataCommand::Request(command));
4117
4118 Ok(request_id)
4119 }
4120
4121 #[allow(clippy::too_many_arguments)]
4127 pub fn request_bars(
4128 &self,
4129 bar_type: BarType,
4130 start: Option<DateTime<Utc>>,
4131 end: Option<DateTime<Utc>>,
4132 limit: Option<NonZeroUsize>,
4133 client_id: Option<ClientId>,
4134 params: Option<Params>,
4135 handler: ShareableMessageHandler,
4136 ) -> anyhow::Result<UUID4> {
4137 self.check_registered();
4138
4139 let now = self.clock_ref().utc_now();
4140 check_timestamps(now, start, end)?;
4141
4142 let request_id = UUID4::new();
4143 let command = RequestCommand::Bars(RequestBars {
4144 bar_type,
4145 start,
4146 end,
4147 limit,
4148 client_id,
4149 request_id,
4150 ts_init: now.into(),
4151 params,
4152 });
4153
4154 get_message_bus()
4155 .borrow_mut()
4156 .register_response_handler(command.request_id(), handler)?;
4157
4158 self.send_data_cmd(DataCommand::Request(command));
4159
4160 Ok(request_id)
4161 }
4162
4163 #[cfg(test)]
4164 pub fn quote_handler_count(&self) -> usize {
4165 self.quote_handlers.len()
4166 }
4167
4168 #[cfg(test)]
4169 pub fn trade_handler_count(&self) -> usize {
4170 self.trade_handlers.len()
4171 }
4172
4173 #[cfg(test)]
4174 pub fn bar_handler_count(&self) -> usize {
4175 self.bar_handlers.len()
4176 }
4177
4178 #[cfg(test)]
4179 pub fn deltas_handler_count(&self) -> usize {
4180 self.deltas_handlers.len()
4181 }
4182
4183 #[cfg(test)]
4184 pub fn has_quote_handler(&self, topic: &str) -> bool {
4185 self.quote_handlers
4186 .contains_key(&MStr::<Topic>::from(topic))
4187 }
4188
4189 #[cfg(test)]
4190 pub fn has_trade_handler(&self, topic: &str) -> bool {
4191 self.trade_handlers
4192 .contains_key(&MStr::<Topic>::from(topic))
4193 }
4194
4195 #[cfg(test)]
4196 pub fn has_bar_handler(&self, topic: &str) -> bool {
4197 self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4198 }
4199
4200 #[cfg(test)]
4201 pub fn has_deltas_handler(&self, topic: &str) -> bool {
4202 self.deltas_handlers
4203 .contains_key(&MStr::<Topic>::from(topic))
4204 }
4205}
4206
4207fn check_timestamps(
4208 now: DateTime<Utc>,
4209 start: Option<DateTime<Utc>>,
4210 end: Option<DateTime<Utc>>,
4211) -> anyhow::Result<()> {
4212 if let Some(start) = start {
4213 check_predicate_true(start <= now, "start was > now")?;
4214 }
4215
4216 if let Some(end) = end {
4217 check_predicate_true(end <= now, "end was > now")?;
4218 }
4219
4220 if let (Some(start), Some(end)) = (start, end) {
4221 check_predicate_true(start < end, "start was >= end")?;
4222 }
4223
4224 Ok(())
4225}
4226
4227fn log_error(e: &anyhow::Error) {
4228 log::error!("{e}");
4229}
4230
4231fn log_not_running<T>(msg: &T)
4232where
4233 T: Debug,
4234{
4235 log::trace!("Received message when not running - skipping {msg:?}");
4236}
4237
4238fn log_received<T>(msg: &T)
4239where
4240 T: Debug,
4241{
4242 log::debug!("{RECV} {msg:?}");
4243}