1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24 sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39 close::InstrumentClose,
40 option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41 },
42 enums::BookType,
43 events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45 instruments::{InstrumentAny, SyntheticInstrument},
46 orderbook::OrderBook,
47};
48use serde::{Deserialize, Serialize};
49use ustr::Ustr;
50
51#[cfg(feature = "indicators")]
52use super::indicators::Indicators;
53use super::{
54 Actor,
55 registry::{get_actor_unchecked, try_get_actor_unchecked},
56};
57#[cfg(feature = "defi")]
58use crate::defi;
59#[cfg(feature = "defi")]
60#[allow(unused_imports)]
61use crate::defi::data_actor as _; use crate::{
63 cache::Cache,
64 clock::Clock,
65 component::Component,
66 enums::{ComponentState, ComponentTrigger},
67 logging::{CMD, RECV, REQ, SEND},
68 messages::{
69 data::{
70 BarsResponse, BookDeltasResponse, BookDepthResponse, BookResponse, CustomDataResponse,
71 DataCommand, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
72 QuotesResponse, RequestBars, RequestBookDeltas, RequestBookDepth, RequestBookSnapshot,
73 RequestCommand, RequestCustomData, RequestFundingRates, RequestInstrument,
74 RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas,
75 SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeFundingRates,
76 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentClose,
77 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices,
78 SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades,
79 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookSnapshots,
80 UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates,
81 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
82 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices,
83 UnsubscribeOptionChain, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
84 is_parent_subscription,
85 },
86 system::ShutdownSystem,
87 },
88 msgbus::{
89 self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
90 switchboard::{
91 MessagingSwitchboard, get_bars_topic, get_book_deltas_pattern, get_book_deltas_topic,
92 get_book_snapshots_topic, get_custom_topic, get_funding_rate_topic,
93 get_index_price_topic, get_instrument_close_topic, get_instrument_status_topic,
94 get_instrument_topic, get_instruments_pattern, get_mark_price_topic,
95 get_option_chain_topic, get_option_greeks_topic, get_order_cancels_topic,
96 get_order_fills_topic, get_quotes_topic, get_signal_pattern, get_trades_topic,
97 },
98 },
99 signal::Signal,
100 timer::{TimeEvent, TimeEventCallback},
101};
102
103#[derive(Debug, Clone, Deserialize, Serialize)]
105#[serde(default, deny_unknown_fields)]
106#[cfg_attr(
107 feature = "python",
108 pyo3::pyclass(
109 module = "nautilus_trader.core.nautilus_pyo3.common",
110 subclass,
111 from_py_object
112 )
113)]
114#[cfg_attr(
115 feature = "python",
116 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
117)]
118pub struct DataActorConfig {
119 pub actor_id: Option<ActorId>,
121 pub log_events: bool,
123 pub log_commands: bool,
125}
126
127impl Default for DataActorConfig {
128 fn default() -> Self {
129 Self {
130 actor_id: None,
131 log_events: true,
132 log_commands: true,
133 }
134 }
135}
136
137#[derive(Debug, Clone, Deserialize, Serialize)]
139#[serde(deny_unknown_fields)]
140#[cfg_attr(
141 feature = "python",
142 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
143)]
144#[cfg_attr(
145 feature = "python",
146 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
147)]
148pub struct ImportableActorConfig {
149 pub actor_path: String,
151 pub config_path: String,
153 pub config: HashMap<String, serde_json::Value>,
155}
156
157type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
158
159pub trait DataActor:
160 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
161{
162 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
168 Ok(IndexMap::new())
169 }
170
171 #[allow(unused_variables)]
177 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
178 Ok(())
179 }
180
181 fn on_start(&mut self) -> anyhow::Result<()> {
187 log::warn!(
188 "The `on_start` handler was called when not overridden, \
189 it's expected that any actions required when starting the actor \
190 occur here, such as subscribing/requesting data"
191 );
192 Ok(())
193 }
194
195 fn on_stop(&mut self) -> anyhow::Result<()> {
201 log::warn!(
202 "The `on_stop` handler was called when not overridden, \
203 it's expected that any actions required when stopping the actor \
204 occur here, such as unsubscribing from data",
205 );
206 Ok(())
207 }
208
209 fn on_resume(&mut self) -> anyhow::Result<()> {
215 log::warn!(
216 "The `on_resume` handler was called when not overridden, \
217 it's expected that any actions required when resuming the actor \
218 following a stop occur here"
219 );
220 Ok(())
221 }
222
223 fn on_reset(&mut self) -> anyhow::Result<()> {
229 log::warn!(
230 "The `on_reset` handler was called when not overridden, \
231 it's expected that any actions required when resetting the actor \
232 occur here, such as resetting indicators and other state"
233 );
234 Ok(())
235 }
236
237 fn on_dispose(&mut self) -> anyhow::Result<()> {
243 Ok(())
244 }
245
246 fn on_degrade(&mut self) -> anyhow::Result<()> {
252 Ok(())
253 }
254
255 fn on_fault(&mut self) -> anyhow::Result<()> {
261 Ok(())
262 }
263
264 #[allow(unused_variables)]
270 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
271 Ok(())
272 }
273
274 #[allow(unused_variables)]
280 fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
281 Ok(())
282 }
283
284 #[allow(unused_variables)]
290 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
291 Ok(())
292 }
293
294 #[allow(unused_variables)]
300 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
301 Ok(())
302 }
303
304 #[allow(unused_variables)]
310 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
311 Ok(())
312 }
313
314 #[allow(unused_variables)]
320 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
321 Ok(())
322 }
323
324 #[allow(unused_variables)]
330 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
331 Ok(())
332 }
333
334 #[allow(unused_variables)]
340 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
341 Ok(())
342 }
343
344 #[allow(unused_variables)]
350 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
351 Ok(())
352 }
353
354 #[allow(unused_variables)]
360 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
361 Ok(())
362 }
363
364 #[allow(unused_variables)]
370 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
371 Ok(())
372 }
373
374 #[allow(unused_variables)]
380 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
381 Ok(())
382 }
383
384 #[allow(unused_variables)]
390 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
391 Ok(())
392 }
393
394 #[allow(unused_variables)]
400 fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
401 Ok(())
402 }
403
404 #[allow(unused_variables)]
410 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
411 Ok(())
412 }
413
414 #[allow(unused_variables)]
420 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
421 Ok(())
422 }
423
424 #[allow(unused_variables)]
430 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
431 Ok(())
432 }
433
434 #[allow(unused_variables)]
440 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
441 Ok(())
442 }
443
444 #[cfg(feature = "defi")]
445 #[allow(unused_variables)]
451 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
452 Ok(())
453 }
454
455 #[cfg(feature = "defi")]
456 #[allow(unused_variables)]
462 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
463 Ok(())
464 }
465
466 #[cfg(feature = "defi")]
467 #[allow(unused_variables)]
473 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
474 Ok(())
475 }
476
477 #[cfg(feature = "defi")]
478 #[allow(unused_variables)]
484 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
485 Ok(())
486 }
487
488 #[cfg(feature = "defi")]
489 #[allow(unused_variables)]
495 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
496 Ok(())
497 }
498
499 #[cfg(feature = "defi")]
500 #[allow(unused_variables)]
506 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
507 Ok(())
508 }
509
510 #[allow(unused_variables)]
516 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
517 Ok(())
518 }
519
520 #[allow(unused_variables)]
526 fn on_historical_book_deltas(&mut self, deltas: &[OrderBookDelta]) -> anyhow::Result<()> {
527 Ok(())
528 }
529
530 #[allow(unused_variables)]
536 fn on_historical_book_depth(&mut self, depths: &[OrderBookDepth10]) -> anyhow::Result<()> {
537 Ok(())
538 }
539
540 #[allow(unused_variables)]
546 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
547 Ok(())
548 }
549
550 #[allow(unused_variables)]
556 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
557 Ok(())
558 }
559
560 #[allow(unused_variables)]
566 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
567 Ok(())
568 }
569
570 #[allow(unused_variables)]
576 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
577 Ok(())
578 }
579
580 #[allow(unused_variables)]
586 fn on_historical_index_prices(
587 &mut self,
588 index_prices: &[IndexPriceUpdate],
589 ) -> anyhow::Result<()> {
590 Ok(())
591 }
592
593 #[allow(unused_variables)]
599 fn on_historical_funding_rates(
600 &mut self,
601 funding_rates: &[FundingRateUpdate],
602 ) -> anyhow::Result<()> {
603 Ok(())
604 }
605
606 fn handle_time_event(&mut self, event: &TimeEvent) {
608 log_received(&event);
609
610 if self.not_running() {
611 log_not_running(&event);
612 return;
613 }
614
615 if let Err(e) = DataActor::on_time_event(self, event) {
616 log_error(&e);
617 }
618 }
619
620 fn handle_data(&mut self, data: &CustomData) {
622 log_received(&data);
623
624 if self.not_running() {
625 log_not_running(&data);
626 return;
627 }
628
629 if let Err(e) = self.on_data(data) {
630 log_error(&e);
631 }
632 }
633
634 fn handle_signal(&mut self, signal: &Signal) {
636 log_received(&signal);
637
638 if self.not_running() {
639 log_not_running(&signal);
640 return;
641 }
642
643 if let Err(e) = self.on_signal(signal) {
644 log_error(&e);
645 }
646 }
647
648 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
650 log_received(&instrument);
651
652 if self.not_running() {
653 log_not_running(&instrument);
654 return;
655 }
656
657 if let Err(e) = self.on_instrument(instrument) {
658 log_error(&e);
659 }
660 }
661
662 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
664 log_received(&deltas);
665
666 if self.not_running() {
667 log_not_running(&deltas);
668 return;
669 }
670
671 if let Err(e) = self.on_book_deltas(deltas) {
672 log_error(&e);
673 }
674 }
675
676 fn handle_book(&mut self, book: &OrderBook) {
678 log_received(&book);
679
680 if self.not_running() {
681 log_not_running(&book);
682 return;
683 }
684
685 if let Err(e) = self.on_book(book) {
686 log_error(&e);
687 }
688 }
689
690 fn handle_quote(&mut self, quote: &QuoteTick) {
692 log_received("e);
693
694 if self.not_running() {
695 log_not_running("e);
696 return;
697 }
698
699 if let Err(e) = self.on_quote(quote) {
700 log_error(&e);
701 }
702 }
703
704 fn handle_trade(&mut self, trade: &TradeTick) {
706 log_received(&trade);
707
708 if self.not_running() {
709 log_not_running(&trade);
710 return;
711 }
712
713 if let Err(e) = self.on_trade(trade) {
714 log_error(&e);
715 }
716 }
717
718 fn handle_bar(&mut self, bar: &Bar) {
720 log_received(&bar);
721
722 if self.not_running() {
723 log_not_running(&bar);
724 return;
725 }
726
727 if let Err(e) = self.on_bar(bar) {
728 log_error(&e);
729 }
730 }
731
732 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
734 log_received(&mark_price);
735
736 if self.not_running() {
737 log_not_running(&mark_price);
738 return;
739 }
740
741 if let Err(e) = self.on_mark_price(mark_price) {
742 log_error(&e);
743 }
744 }
745
746 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
748 log_received(&index_price);
749
750 if self.not_running() {
751 log_not_running(&index_price);
752 return;
753 }
754
755 if let Err(e) = self.on_index_price(index_price) {
756 log_error(&e);
757 }
758 }
759
760 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
762 log_received(&funding_rate);
763
764 if self.not_running() {
765 log_not_running(&funding_rate);
766 return;
767 }
768
769 if let Err(e) = self.on_funding_rate(funding_rate) {
770 log_error(&e);
771 }
772 }
773
774 fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
776 log_received(&greeks);
777
778 if self.not_running() {
779 log_not_running(&greeks);
780 return;
781 }
782
783 if let Err(e) = self.on_option_greeks(greeks) {
784 log_error(&e);
785 }
786 }
787
788 fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
790 log_received(&slice);
791
792 if self.not_running() {
793 log_not_running(&slice);
794 return;
795 }
796
797 if let Err(e) = self.on_option_chain(slice) {
798 log_error(&e);
799 }
800 }
801
802 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
804 log_received(&status);
805
806 if self.not_running() {
807 log_not_running(&status);
808 return;
809 }
810
811 if let Err(e) = self.on_instrument_status(status) {
812 log_error(&e);
813 }
814 }
815
816 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
818 log_received(&close);
819
820 if self.not_running() {
821 log_not_running(&close);
822 return;
823 }
824
825 if let Err(e) = self.on_instrument_close(close) {
826 log_error(&e);
827 }
828 }
829
830 fn handle_order_filled(&mut self, event: &OrderFilled) {
832 log_received(&event);
833
834 if event.strategy_id.inner() == self.actor_id().inner() {
838 return;
839 }
840
841 if self.not_running() {
842 log_not_running(&event);
843 return;
844 }
845
846 if let Err(e) = self.on_order_filled(event) {
847 log_error(&e);
848 }
849 }
850
851 fn handle_order_canceled(&mut self, event: &OrderCanceled) {
853 log_received(&event);
854
855 if event.strategy_id.inner() == self.actor_id().inner() {
859 return;
860 }
861
862 if self.not_running() {
863 log_not_running(&event);
864 return;
865 }
866
867 if let Err(e) = self.on_order_canceled(event) {
868 log_error(&e);
869 }
870 }
871
872 #[cfg(feature = "defi")]
873 fn handle_block(&mut self, block: &Block) {
875 log_received(&block);
876
877 if self.not_running() {
878 log_not_running(&block);
879 return;
880 }
881
882 if let Err(e) = self.on_block(block) {
883 log_error(&e);
884 }
885 }
886
887 #[cfg(feature = "defi")]
888 fn handle_pool(&mut self, pool: &Pool) {
890 log_received(&pool);
891
892 if self.not_running() {
893 log_not_running(&pool);
894 return;
895 }
896
897 if let Err(e) = self.on_pool(pool) {
898 log_error(&e);
899 }
900 }
901
902 #[cfg(feature = "defi")]
903 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
905 log_received(&swap);
906
907 if self.not_running() {
908 log_not_running(&swap);
909 return;
910 }
911
912 if let Err(e) = self.on_pool_swap(swap) {
913 log_error(&e);
914 }
915 }
916
917 #[cfg(feature = "defi")]
918 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
920 log_received(&update);
921
922 if self.not_running() {
923 log_not_running(&update);
924 return;
925 }
926
927 if let Err(e) = self.on_pool_liquidity_update(update) {
928 log_error(&e);
929 }
930 }
931
932 #[cfg(feature = "defi")]
933 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
935 log_received(&collect);
936
937 if self.not_running() {
938 log_not_running(&collect);
939 return;
940 }
941
942 if let Err(e) = self.on_pool_fee_collect(collect) {
943 log_error(&e);
944 }
945 }
946
947 #[cfg(feature = "defi")]
948 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
950 log_received(&flash);
951
952 if self.not_running() {
953 log_not_running(&flash);
954 return;
955 }
956
957 if let Err(e) = self.on_pool_flash(flash) {
958 log_error(&e);
959 }
960 }
961
962 fn handle_historical_data(&mut self, data: &dyn Any) {
964 log_received(&data);
965
966 if let Err(e) = self.on_historical_data(data) {
967 log_error(&e);
968 }
969 }
970
971 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
973 log_received(&resp);
974
975 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
976 log_error(&e);
977 }
978 }
979
980 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
982 log_received(&resp);
983
984 if let Err(e) = self.on_instrument(&resp.data) {
985 log_error(&e);
986 }
987 }
988
989 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
991 log_received_bulk("InstrumentsResponse", &resp.correlation_id, resp.data.len());
992 log::trace!("{RECV} {resp:?}");
993
994 for inst in &resp.data {
995 if let Err(e) = self.on_instrument(inst) {
996 log_error(&e);
997 }
998 }
999 }
1000
1001 fn handle_book_response(&mut self, resp: &BookResponse) {
1003 log_received(&resp);
1004
1005 if let Err(e) = self.on_book(&resp.data) {
1006 log_error(&e);
1007 }
1008 }
1009
1010 fn handle_book_deltas_response(&mut self, resp: &BookDeltasResponse) {
1012 log_received_bulk("BookDeltasResponse", &resp.correlation_id, resp.data.len());
1013 log::trace!("{RECV} {resp:?}");
1014
1015 if let Err(e) = self.on_historical_book_deltas(&resp.data) {
1016 log_error(&e);
1017 }
1018 }
1019
1020 fn handle_book_depth_response(&mut self, resp: &BookDepthResponse) {
1022 log_received_bulk("BookDepthResponse", &resp.correlation_id, resp.data.len());
1023 log::trace!("{RECV} {resp:?}");
1024
1025 if let Err(e) = self.on_historical_book_depth(&resp.data) {
1026 log_error(&e);
1027 }
1028 }
1029
1030 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
1032 log_received_bulk("QuotesResponse", &resp.correlation_id, resp.data.len());
1033 log::trace!("{RECV} {resp:?}");
1034
1035 if let Err(e) = self.on_historical_quotes(&resp.data) {
1036 log_error(&e);
1037 }
1038 }
1039
1040 fn handle_trades_response(&mut self, resp: &TradesResponse) {
1042 log_received_bulk("TradesResponse", &resp.correlation_id, resp.data.len());
1043 log::trace!("{RECV} {resp:?}");
1044
1045 if let Err(e) = self.on_historical_trades(&resp.data) {
1046 log_error(&e);
1047 }
1048 }
1049
1050 fn handle_bars_response(&mut self, resp: &BarsResponse) {
1052 log_received_bulk("BarsResponse", &resp.correlation_id, resp.data.len());
1053 log::trace!("{RECV} {resp:?}");
1054
1055 if let Err(e) = self.on_historical_bars(&resp.data) {
1056 log_error(&e);
1057 }
1058 }
1059
1060 fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1062 log_received_bulk(
1063 "FundingRatesResponse",
1064 &resp.correlation_id,
1065 resp.data.len(),
1066 );
1067 log::trace!("{RECV} {resp:?}");
1068
1069 if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1070 log_error(&e);
1071 }
1072 }
1073
1074 fn subscribe_data(
1076 &mut self,
1077 data_type: DataType,
1078 client_id: Option<ClientId>,
1079 params: Option<Params>,
1080 ) where
1081 Self: 'static + Debug + Sized,
1082 {
1083 let actor_id = self.actor_id().inner();
1084 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1085 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1086 });
1087
1088 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1089 }
1090
1091 fn subscribe_signal(&mut self, name: &str, priority: Option<u32>)
1106 where
1107 Self: 'static + Debug + Sized,
1108 {
1109 let actor_id = self.actor_id().inner();
1110 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1113 if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1114 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1115 actor.handle_signal(signal);
1116 } else {
1117 log::error!("Actor {actor_id} not found for signal handling");
1118 }
1119 }
1120 });
1121
1122 DataActorCore::subscribe_signal(self, handler, name, priority);
1123 }
1124
1125 fn subscribe_quotes(
1127 &mut self,
1128 instrument_id: InstrumentId,
1129 client_id: Option<ClientId>,
1130 params: Option<Params>,
1131 ) where
1132 Self: 'static + Debug + Sized,
1133 {
1134 let actor_id = self.actor_id().inner();
1135 let topic = get_quotes_topic(instrument_id);
1136
1137 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1138 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1139 actor.handle_quote(quote);
1140 } else {
1141 log::error!("Actor {actor_id} not found for quote handling");
1142 }
1143 });
1144
1145 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1146 }
1147
1148 fn subscribe_instruments(
1150 &mut self,
1151 venue: Venue,
1152 client_id: Option<ClientId>,
1153 params: Option<Params>,
1154 ) where
1155 Self: 'static + Debug + Sized,
1156 {
1157 let actor_id = self.actor_id().inner();
1158 let pattern = get_instruments_pattern(venue);
1159
1160 let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1161 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1162 actor.handle_instrument(instrument);
1163 } else {
1164 log::error!("Actor {actor_id} not found for instruments handling");
1165 }
1166 });
1167
1168 DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1169 }
1170
1171 fn subscribe_instrument(
1173 &mut self,
1174 instrument_id: InstrumentId,
1175 client_id: Option<ClientId>,
1176 params: Option<Params>,
1177 ) where
1178 Self: 'static + Debug + Sized,
1179 {
1180 let actor_id = self.actor_id().inner();
1181 let topic = get_instrument_topic(instrument_id);
1182
1183 let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1184 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1185 actor.handle_instrument(instrument);
1186 } else {
1187 log::error!("Actor {actor_id} not found for instrument handling");
1188 }
1189 });
1190
1191 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1192 }
1193
1194 fn subscribe_book_deltas(
1196 &mut self,
1197 instrument_id: InstrumentId,
1198 book_type: BookType,
1199 depth: Option<NonZeroUsize>,
1200 client_id: Option<ClientId>,
1201 managed: bool,
1202 params: Option<Params>,
1203 ) where
1204 Self: 'static + Debug + Sized,
1205 {
1206 let actor_id = self.actor_id().inner();
1207 let is_parent = is_parent_subscription(params.as_ref());
1208 let pattern = if is_parent {
1209 get_book_deltas_pattern(instrument_id)
1210 } else {
1211 get_book_deltas_topic(instrument_id).into()
1212 };
1213
1214 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1215 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1216 });
1217
1218 DataActorCore::subscribe_book_deltas(
1219 self,
1220 pattern,
1221 handler,
1222 instrument_id,
1223 book_type,
1224 depth,
1225 client_id,
1226 managed,
1227 params,
1228 );
1229 }
1230
1231 fn subscribe_book_at_interval(
1233 &mut self,
1234 instrument_id: InstrumentId,
1235 book_type: BookType,
1236 depth: Option<NonZeroUsize>,
1237 interval_ms: NonZeroUsize,
1238 client_id: Option<ClientId>,
1239 params: Option<Params>,
1240 ) where
1241 Self: 'static + Debug + Sized,
1242 {
1243 let actor_id = self.actor_id().inner();
1244 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1245
1246 let handler = TypedHandler::from(move |book: &OrderBook| {
1247 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1248 });
1249
1250 DataActorCore::subscribe_book_at_interval(
1251 self,
1252 topic,
1253 handler,
1254 instrument_id,
1255 book_type,
1256 depth,
1257 interval_ms,
1258 client_id,
1259 params,
1260 );
1261 }
1262
1263 fn subscribe_trades(
1265 &mut self,
1266 instrument_id: InstrumentId,
1267 client_id: Option<ClientId>,
1268 params: Option<Params>,
1269 ) where
1270 Self: 'static + Debug + Sized,
1271 {
1272 let actor_id = self.actor_id().inner();
1273 let topic = get_trades_topic(instrument_id);
1274
1275 let handler = TypedHandler::from(move |trade: &TradeTick| {
1276 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1277 });
1278
1279 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1280 }
1281
1282 fn subscribe_bars(
1284 &mut self,
1285 bar_type: BarType,
1286 client_id: Option<ClientId>,
1287 params: Option<Params>,
1288 ) where
1289 Self: 'static + Debug + Sized,
1290 {
1291 let actor_id = self.actor_id().inner();
1292 let topic = get_bars_topic(bar_type);
1293
1294 let handler = TypedHandler::from(move |bar: &Bar| {
1295 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1296 });
1297
1298 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1299 }
1300
1301 fn subscribe_mark_prices(
1303 &mut self,
1304 instrument_id: InstrumentId,
1305 client_id: Option<ClientId>,
1306 params: Option<Params>,
1307 ) where
1308 Self: 'static + Debug + Sized,
1309 {
1310 let actor_id = self.actor_id().inner();
1311 let topic = get_mark_price_topic(instrument_id);
1312
1313 let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1314 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1315 });
1316
1317 DataActorCore::subscribe_mark_prices(
1318 self,
1319 topic,
1320 handler,
1321 instrument_id,
1322 client_id,
1323 params,
1324 );
1325 }
1326
1327 fn subscribe_index_prices(
1329 &mut self,
1330 instrument_id: InstrumentId,
1331 client_id: Option<ClientId>,
1332 params: Option<Params>,
1333 ) where
1334 Self: 'static + Debug + Sized,
1335 {
1336 let actor_id = self.actor_id().inner();
1337 let topic = get_index_price_topic(instrument_id);
1338
1339 let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1340 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1341 });
1342
1343 DataActorCore::subscribe_index_prices(
1344 self,
1345 topic,
1346 handler,
1347 instrument_id,
1348 client_id,
1349 params,
1350 );
1351 }
1352
1353 fn subscribe_funding_rates(
1355 &mut self,
1356 instrument_id: InstrumentId,
1357 client_id: Option<ClientId>,
1358 params: Option<Params>,
1359 ) where
1360 Self: 'static + Debug + Sized,
1361 {
1362 let actor_id = self.actor_id().inner();
1363 let topic = get_funding_rate_topic(instrument_id);
1364
1365 let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1366 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1367 });
1368
1369 DataActorCore::subscribe_funding_rates(
1370 self,
1371 topic,
1372 handler,
1373 instrument_id,
1374 client_id,
1375 params,
1376 );
1377 }
1378
1379 fn subscribe_option_greeks(
1381 &mut self,
1382 instrument_id: InstrumentId,
1383 client_id: Option<ClientId>,
1384 params: Option<Params>,
1385 ) where
1386 Self: 'static + Debug + Sized,
1387 {
1388 let actor_id = self.actor_id().inner();
1389 let topic = get_option_greeks_topic(instrument_id);
1390
1391 let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1392 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1393 actor.handle_option_greeks(option_greeks);
1394 } else {
1395 log::error!("Actor {actor_id} not found for option greeks handling");
1396 }
1397 });
1398
1399 DataActorCore::subscribe_option_greeks(
1400 self,
1401 topic,
1402 handler,
1403 instrument_id,
1404 client_id,
1405 params,
1406 );
1407 }
1408
1409 fn subscribe_instrument_status(
1411 &mut self,
1412 instrument_id: InstrumentId,
1413 client_id: Option<ClientId>,
1414 params: Option<Params>,
1415 ) where
1416 Self: 'static + Debug + Sized,
1417 {
1418 let actor_id = self.actor_id().inner();
1419 let topic = get_instrument_status_topic(instrument_id);
1420
1421 let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1422 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1423 });
1424
1425 DataActorCore::subscribe_instrument_status(
1426 self,
1427 topic,
1428 handler,
1429 instrument_id,
1430 client_id,
1431 params,
1432 );
1433 }
1434
1435 fn subscribe_instrument_close(
1437 &mut self,
1438 instrument_id: InstrumentId,
1439 client_id: Option<ClientId>,
1440 params: Option<Params>,
1441 ) where
1442 Self: 'static + Debug + Sized,
1443 {
1444 let actor_id = self.actor_id().inner();
1445 let topic = get_instrument_close_topic(instrument_id);
1446
1447 let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1448 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1449 });
1450
1451 DataActorCore::subscribe_instrument_close(
1452 self,
1453 topic,
1454 handler,
1455 instrument_id,
1456 client_id,
1457 params,
1458 );
1459 }
1460
1461 fn subscribe_option_chain(
1466 &mut self,
1467 series_id: OptionSeriesId,
1468 strike_range: StrikeRange,
1469 snapshot_interval_ms: Option<u64>,
1470 client_id: Option<ClientId>,
1471 params: Option<Params>,
1472 ) where
1473 Self: 'static + Debug + Sized,
1474 {
1475 let actor_id = self.actor_id().inner();
1476 let topic = get_option_chain_topic(series_id);
1477
1478 let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1479 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1480 actor.handle_option_chain(slice);
1481 } else {
1482 log::error!("Actor {actor_id} not found for option chain handling");
1483 }
1484 });
1485
1486 DataActorCore::subscribe_option_chain(
1487 self,
1488 topic,
1489 handler,
1490 series_id,
1491 strike_range,
1492 snapshot_interval_ms,
1493 client_id,
1494 params,
1495 );
1496 }
1497
1498 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1500 where
1501 Self: 'static + Debug + Sized,
1502 {
1503 let actor_id = self.actor_id().inner();
1504 let topic = get_order_fills_topic(instrument_id);
1505
1506 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1507 if let OrderEventAny::Filled(filled) = event {
1508 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1509 }
1510 });
1511
1512 DataActorCore::subscribe_order_fills(self, topic, handler);
1513 }
1514
1515 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1517 where
1518 Self: 'static + Debug + Sized,
1519 {
1520 let actor_id = self.actor_id().inner();
1521 let topic = get_order_cancels_topic(instrument_id);
1522
1523 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1524 if let OrderEventAny::Canceled(canceled) = event {
1525 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1526 }
1527 });
1528
1529 DataActorCore::subscribe_order_cancels(self, topic, handler);
1530 }
1531
1532 #[cfg(feature = "defi")]
1533 fn subscribe_blocks(
1535 &mut self,
1536 chain: Blockchain,
1537 client_id: Option<ClientId>,
1538 params: Option<Params>,
1539 ) where
1540 Self: 'static + Debug + Sized,
1541 {
1542 let actor_id = self.actor_id().inner();
1543 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1544
1545 let handler = TypedHandler::from(move |block: &Block| {
1546 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1547 });
1548
1549 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1550 }
1551
1552 #[cfg(feature = "defi")]
1553 fn subscribe_pool(
1555 &mut self,
1556 instrument_id: InstrumentId,
1557 client_id: Option<ClientId>,
1558 params: Option<Params>,
1559 ) where
1560 Self: 'static + Debug + Sized,
1561 {
1562 let actor_id = self.actor_id().inner();
1563 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1564
1565 let handler = TypedHandler::from(move |pool: &Pool| {
1566 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1567 });
1568
1569 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1570 }
1571
1572 #[cfg(feature = "defi")]
1573 fn subscribe_pool_swaps(
1575 &mut self,
1576 instrument_id: InstrumentId,
1577 client_id: Option<ClientId>,
1578 params: Option<Params>,
1579 ) where
1580 Self: 'static + Debug + Sized,
1581 {
1582 let actor_id = self.actor_id().inner();
1583 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1584
1585 let handler = TypedHandler::from(move |swap: &PoolSwap| {
1586 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1587 });
1588
1589 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1590 }
1591
1592 #[cfg(feature = "defi")]
1593 fn subscribe_pool_liquidity_updates(
1595 &mut self,
1596 instrument_id: InstrumentId,
1597 client_id: Option<ClientId>,
1598 params: Option<Params>,
1599 ) where
1600 Self: 'static + Debug + Sized,
1601 {
1602 let actor_id = self.actor_id().inner();
1603 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1604
1605 let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1606 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1607 });
1608
1609 DataActorCore::subscribe_pool_liquidity_updates(
1610 self,
1611 topic,
1612 handler,
1613 instrument_id,
1614 client_id,
1615 params,
1616 );
1617 }
1618
1619 #[cfg(feature = "defi")]
1620 fn subscribe_pool_fee_collects(
1622 &mut self,
1623 instrument_id: InstrumentId,
1624 client_id: Option<ClientId>,
1625 params: Option<Params>,
1626 ) where
1627 Self: 'static + Debug + Sized,
1628 {
1629 let actor_id = self.actor_id().inner();
1630 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1631
1632 let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1633 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1634 });
1635
1636 DataActorCore::subscribe_pool_fee_collects(
1637 self,
1638 topic,
1639 handler,
1640 instrument_id,
1641 client_id,
1642 params,
1643 );
1644 }
1645
1646 #[cfg(feature = "defi")]
1647 fn subscribe_pool_flash_events(
1649 &mut self,
1650 instrument_id: InstrumentId,
1651 client_id: Option<ClientId>,
1652 params: Option<Params>,
1653 ) where
1654 Self: 'static + Debug + Sized,
1655 {
1656 let actor_id = self.actor_id().inner();
1657 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1658
1659 let handler = TypedHandler::from(move |flash: &PoolFlash| {
1660 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1661 });
1662
1663 DataActorCore::subscribe_pool_flash_events(
1664 self,
1665 topic,
1666 handler,
1667 instrument_id,
1668 client_id,
1669 params,
1670 );
1671 }
1672
1673 fn unsubscribe_data(
1675 &mut self,
1676 data_type: DataType,
1677 client_id: Option<ClientId>,
1678 params: Option<Params>,
1679 ) where
1680 Self: 'static + Debug + Sized,
1681 {
1682 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1683 }
1684
1685 fn unsubscribe_signal(&mut self, name: &str)
1687 where
1688 Self: 'static + Debug + Sized,
1689 {
1690 DataActorCore::unsubscribe_signal(self, name);
1691 }
1692
1693 fn unsubscribe_instruments(
1695 &mut self,
1696 venue: Venue,
1697 client_id: Option<ClientId>,
1698 params: Option<Params>,
1699 ) where
1700 Self: 'static + Debug + Sized,
1701 {
1702 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1703 }
1704
1705 fn unsubscribe_instrument(
1707 &mut self,
1708 instrument_id: InstrumentId,
1709 client_id: Option<ClientId>,
1710 params: Option<Params>,
1711 ) where
1712 Self: 'static + Debug + Sized,
1713 {
1714 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1715 }
1716
1717 fn unsubscribe_book_deltas(
1719 &mut self,
1720 instrument_id: InstrumentId,
1721 client_id: Option<ClientId>,
1722 params: Option<Params>,
1723 ) where
1724 Self: 'static + Debug + Sized,
1725 {
1726 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1727 }
1728
1729 fn unsubscribe_book_at_interval(
1731 &mut self,
1732 instrument_id: InstrumentId,
1733 interval_ms: NonZeroUsize,
1734 client_id: Option<ClientId>,
1735 params: Option<Params>,
1736 ) where
1737 Self: 'static + Debug + Sized,
1738 {
1739 DataActorCore::unsubscribe_book_at_interval(
1740 self,
1741 instrument_id,
1742 interval_ms,
1743 client_id,
1744 params,
1745 );
1746 }
1747
1748 fn unsubscribe_quotes(
1750 &mut self,
1751 instrument_id: InstrumentId,
1752 client_id: Option<ClientId>,
1753 params: Option<Params>,
1754 ) where
1755 Self: 'static + Debug + Sized,
1756 {
1757 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1758 }
1759
1760 fn unsubscribe_trades(
1762 &mut self,
1763 instrument_id: InstrumentId,
1764 client_id: Option<ClientId>,
1765 params: Option<Params>,
1766 ) where
1767 Self: 'static + Debug + Sized,
1768 {
1769 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1770 }
1771
1772 fn unsubscribe_bars(
1774 &mut self,
1775 bar_type: BarType,
1776 client_id: Option<ClientId>,
1777 params: Option<Params>,
1778 ) where
1779 Self: 'static + Debug + Sized,
1780 {
1781 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1782 }
1783
1784 fn unsubscribe_mark_prices(
1786 &mut self,
1787 instrument_id: InstrumentId,
1788 client_id: Option<ClientId>,
1789 params: Option<Params>,
1790 ) where
1791 Self: 'static + Debug + Sized,
1792 {
1793 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1794 }
1795
1796 fn unsubscribe_index_prices(
1798 &mut self,
1799 instrument_id: InstrumentId,
1800 client_id: Option<ClientId>,
1801 params: Option<Params>,
1802 ) where
1803 Self: 'static + Debug + Sized,
1804 {
1805 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1806 }
1807
1808 fn unsubscribe_funding_rates(
1810 &mut self,
1811 instrument_id: InstrumentId,
1812 client_id: Option<ClientId>,
1813 params: Option<Params>,
1814 ) where
1815 Self: 'static + Debug + Sized,
1816 {
1817 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1818 }
1819
1820 fn unsubscribe_option_greeks(
1822 &mut self,
1823 instrument_id: InstrumentId,
1824 client_id: Option<ClientId>,
1825 params: Option<Params>,
1826 ) where
1827 Self: 'static + Debug + Sized,
1828 {
1829 DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1830 }
1831
1832 fn unsubscribe_instrument_status(
1834 &mut self,
1835 instrument_id: InstrumentId,
1836 client_id: Option<ClientId>,
1837 params: Option<Params>,
1838 ) where
1839 Self: 'static + Debug + Sized,
1840 {
1841 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1842 }
1843
1844 fn unsubscribe_instrument_close(
1846 &mut self,
1847 instrument_id: InstrumentId,
1848 client_id: Option<ClientId>,
1849 params: Option<Params>,
1850 ) where
1851 Self: 'static + Debug + Sized,
1852 {
1853 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1854 }
1855
1856 fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1858 where
1859 Self: 'static + Debug + Sized,
1860 {
1861 DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1862 }
1863
1864 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1866 where
1867 Self: 'static + Debug + Sized,
1868 {
1869 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1870 }
1871
1872 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1874 where
1875 Self: 'static + Debug + Sized,
1876 {
1877 DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1878 }
1879
1880 #[cfg(feature = "defi")]
1881 fn unsubscribe_blocks(
1883 &mut self,
1884 chain: Blockchain,
1885 client_id: Option<ClientId>,
1886 params: Option<Params>,
1887 ) where
1888 Self: 'static + Debug + Sized,
1889 {
1890 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1891 }
1892
1893 #[cfg(feature = "defi")]
1894 fn unsubscribe_pool(
1896 &mut self,
1897 instrument_id: InstrumentId,
1898 client_id: Option<ClientId>,
1899 params: Option<Params>,
1900 ) where
1901 Self: 'static + Debug + Sized,
1902 {
1903 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1904 }
1905
1906 #[cfg(feature = "defi")]
1907 fn unsubscribe_pool_swaps(
1909 &mut self,
1910 instrument_id: InstrumentId,
1911 client_id: Option<ClientId>,
1912 params: Option<Params>,
1913 ) where
1914 Self: 'static + Debug + Sized,
1915 {
1916 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1917 }
1918
1919 #[cfg(feature = "defi")]
1920 fn unsubscribe_pool_liquidity_updates(
1922 &mut self,
1923 instrument_id: InstrumentId,
1924 client_id: Option<ClientId>,
1925 params: Option<Params>,
1926 ) where
1927 Self: 'static + Debug + Sized,
1928 {
1929 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1930 }
1931
1932 #[cfg(feature = "defi")]
1933 fn unsubscribe_pool_fee_collects(
1935 &mut self,
1936 instrument_id: InstrumentId,
1937 client_id: Option<ClientId>,
1938 params: Option<Params>,
1939 ) where
1940 Self: 'static + Debug + Sized,
1941 {
1942 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1943 }
1944
1945 #[cfg(feature = "defi")]
1946 fn unsubscribe_pool_flash_events(
1948 &mut self,
1949 instrument_id: InstrumentId,
1950 client_id: Option<ClientId>,
1951 params: Option<Params>,
1952 ) where
1953 Self: 'static + Debug + Sized,
1954 {
1955 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1956 }
1957
1958 fn request_data(
1964 &mut self,
1965 data_type: DataType,
1966 client_id: ClientId,
1967 start: Option<DateTime<Utc>>,
1968 end: Option<DateTime<Utc>>,
1969 limit: Option<NonZeroUsize>,
1970 params: Option<Params>,
1971 ) -> anyhow::Result<UUID4>
1972 where
1973 Self: 'static + Debug + Sized,
1974 {
1975 let actor_id = self.actor_id().inner();
1976 let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1977 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1978 });
1979
1980 DataActorCore::request_data(
1981 self, data_type, client_id, start, end, limit, params, handler,
1982 )
1983 }
1984
1985 fn request_instrument(
1991 &mut self,
1992 instrument_id: InstrumentId,
1993 start: Option<DateTime<Utc>>,
1994 end: Option<DateTime<Utc>>,
1995 client_id: Option<ClientId>,
1996 params: Option<Params>,
1997 ) -> anyhow::Result<UUID4>
1998 where
1999 Self: 'static + Debug + Sized,
2000 {
2001 let actor_id = self.actor_id().inner();
2002 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
2003 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
2004 });
2005
2006 DataActorCore::request_instrument(
2007 self,
2008 instrument_id,
2009 start,
2010 end,
2011 client_id,
2012 params,
2013 handler,
2014 )
2015 }
2016
2017 fn request_instruments(
2023 &mut self,
2024 venue: Option<Venue>,
2025 start: Option<DateTime<Utc>>,
2026 end: Option<DateTime<Utc>>,
2027 client_id: Option<ClientId>,
2028 params: Option<Params>,
2029 ) -> anyhow::Result<UUID4>
2030 where
2031 Self: 'static + Debug + Sized,
2032 {
2033 let actor_id = self.actor_id().inner();
2034 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
2035 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
2036 });
2037
2038 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
2039 }
2040
2041 fn request_book_snapshot(
2047 &mut self,
2048 instrument_id: InstrumentId,
2049 depth: Option<NonZeroUsize>,
2050 client_id: Option<ClientId>,
2051 params: Option<Params>,
2052 ) -> anyhow::Result<UUID4>
2053 where
2054 Self: 'static + Debug + Sized,
2055 {
2056 let actor_id = self.actor_id().inner();
2057 let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
2058 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
2059 });
2060
2061 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
2062 }
2063
2064 fn request_book_deltas(
2070 &mut self,
2071 instrument_id: InstrumentId,
2072 start: Option<DateTime<Utc>>,
2073 end: Option<DateTime<Utc>>,
2074 limit: Option<NonZeroUsize>,
2075 client_id: Option<ClientId>,
2076 params: Option<Params>,
2077 ) -> anyhow::Result<UUID4>
2078 where
2079 Self: 'static + Debug + Sized,
2080 {
2081 let actor_id = self.actor_id().inner();
2082 let handler = ShareableMessageHandler::from_typed(move |resp: &BookDeltasResponse| {
2083 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas_response(resp);
2084 });
2085
2086 DataActorCore::request_book_deltas(
2087 self,
2088 instrument_id,
2089 start,
2090 end,
2091 limit,
2092 client_id,
2093 params,
2094 handler,
2095 )
2096 }
2097
2098 #[expect(clippy::too_many_arguments)]
2104 fn request_book_depth(
2105 &mut self,
2106 instrument_id: InstrumentId,
2107 start: Option<DateTime<Utc>>,
2108 end: Option<DateTime<Utc>>,
2109 limit: Option<NonZeroUsize>,
2110 depth: Option<NonZeroUsize>,
2111 client_id: Option<ClientId>,
2112 params: Option<Params>,
2113 ) -> anyhow::Result<UUID4>
2114 where
2115 Self: 'static + Debug + Sized,
2116 {
2117 let actor_id = self.actor_id().inner();
2118 let handler = ShareableMessageHandler::from_typed(move |resp: &BookDepthResponse| {
2119 get_actor_unchecked::<Self>(&actor_id).handle_book_depth_response(resp);
2120 });
2121
2122 DataActorCore::request_book_depth(
2123 self,
2124 instrument_id,
2125 start,
2126 end,
2127 limit,
2128 depth,
2129 client_id,
2130 params,
2131 handler,
2132 )
2133 }
2134
2135 fn request_quotes(
2141 &mut self,
2142 instrument_id: InstrumentId,
2143 start: Option<DateTime<Utc>>,
2144 end: Option<DateTime<Utc>>,
2145 limit: Option<NonZeroUsize>,
2146 client_id: Option<ClientId>,
2147 params: Option<Params>,
2148 ) -> anyhow::Result<UUID4>
2149 where
2150 Self: 'static + Debug + Sized,
2151 {
2152 let actor_id = self.actor_id().inner();
2153 let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2154 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2155 });
2156
2157 DataActorCore::request_quotes(
2158 self,
2159 instrument_id,
2160 start,
2161 end,
2162 limit,
2163 client_id,
2164 params,
2165 handler,
2166 )
2167 }
2168
2169 fn request_trades(
2175 &mut self,
2176 instrument_id: InstrumentId,
2177 start: Option<DateTime<Utc>>,
2178 end: Option<DateTime<Utc>>,
2179 limit: Option<NonZeroUsize>,
2180 client_id: Option<ClientId>,
2181 params: Option<Params>,
2182 ) -> anyhow::Result<UUID4>
2183 where
2184 Self: 'static + Debug + Sized,
2185 {
2186 let actor_id = self.actor_id().inner();
2187 let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2188 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2189 });
2190
2191 DataActorCore::request_trades(
2192 self,
2193 instrument_id,
2194 start,
2195 end,
2196 limit,
2197 client_id,
2198 params,
2199 handler,
2200 )
2201 }
2202
2203 fn request_bars(
2209 &mut self,
2210 bar_type: BarType,
2211 start: Option<DateTime<Utc>>,
2212 end: Option<DateTime<Utc>>,
2213 limit: Option<NonZeroUsize>,
2214 client_id: Option<ClientId>,
2215 params: Option<Params>,
2216 ) -> anyhow::Result<UUID4>
2217 where
2218 Self: 'static + Debug + Sized,
2219 {
2220 let actor_id = self.actor_id().inner();
2221 let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2222 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2223 });
2224
2225 DataActorCore::request_bars(
2226 self, bar_type, start, end, limit, client_id, params, handler,
2227 )
2228 }
2229
2230 fn request_funding_rates(
2236 &mut self,
2237 instrument_id: InstrumentId,
2238 start: Option<DateTime<Utc>>,
2239 end: Option<DateTime<Utc>>,
2240 limit: Option<NonZeroUsize>,
2241 client_id: Option<ClientId>,
2242 params: Option<Params>,
2243 ) -> anyhow::Result<UUID4>
2244 where
2245 Self: 'static + Debug + Sized,
2246 {
2247 let actor_id = self.actor_id().inner();
2248 let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2249 get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2250 });
2251
2252 DataActorCore::request_funding_rates(
2253 self,
2254 instrument_id,
2255 start,
2256 end,
2257 limit,
2258 client_id,
2259 params,
2260 handler,
2261 )
2262 }
2263}
2264
2265impl<T> Actor for T
2267where
2268 T: DataActor + Debug + 'static,
2269{
2270 fn id(&self) -> Ustr {
2271 self.actor_id.inner()
2272 }
2273
2274 #[allow(unused_variables)]
2275 fn handle(&mut self, msg: &dyn Any) {
2276 }
2278
2279 fn as_any(&self) -> &dyn Any {
2280 self
2281 }
2282}
2283
2284impl<T> Component for T
2286where
2287 T: DataActor + Debug + 'static,
2288{
2289 fn component_id(&self) -> ComponentId {
2290 ComponentId::new(self.actor_id.inner().as_str())
2291 }
2292
2293 fn state(&self) -> ComponentState {
2294 self.state
2295 }
2296
2297 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2298 self.state = self.state.transition(&trigger)?;
2299 log::info!(component = self.component_id().as_str(); "{}", self.state.variant_name());
2300 Ok(())
2301 }
2302
2303 fn register(
2304 &mut self,
2305 trader_id: TraderId,
2306 clock: Rc<RefCell<dyn Clock>>,
2307 cache: Rc<RefCell<Cache>>,
2308 ) -> anyhow::Result<()> {
2309 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2310
2311 let actor_id = self.actor_id().inner();
2313 let callback = TimeEventCallback::from(move |event: TimeEvent| {
2314 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2315 actor.handle_time_event(&event);
2316 } else {
2317 log::error!("Actor {actor_id} not found for time event handling");
2318 }
2319 });
2320
2321 clock.borrow_mut().register_default_handler(callback);
2322
2323 self.initialize()
2324 }
2325
2326 fn on_start(&mut self) -> anyhow::Result<()> {
2327 DataActor::on_start(self)
2328 }
2329
2330 fn on_stop(&mut self) -> anyhow::Result<()> {
2331 DataActor::on_stop(self)
2332 }
2333
2334 fn on_resume(&mut self) -> anyhow::Result<()> {
2335 DataActor::on_resume(self)
2336 }
2337
2338 fn on_degrade(&mut self) -> anyhow::Result<()> {
2339 DataActor::on_degrade(self)
2340 }
2341
2342 fn on_fault(&mut self) -> anyhow::Result<()> {
2343 DataActor::on_fault(self)
2344 }
2345
2346 fn on_reset(&mut self) -> anyhow::Result<()> {
2347 DataActor::on_reset(self)
2348 }
2349
2350 fn on_dispose(&mut self) -> anyhow::Result<()> {
2351 DataActor::on_dispose(self)
2352 }
2353}
2354
2355#[derive(Clone)]
2357#[allow(
2358 dead_code,
2359 reason = "TODO: Under development (pending_requests, signal_classes)"
2360)]
2361pub struct DataActorCore {
2362 pub actor_id: ActorId,
2364 pub config: DataActorConfig,
2366 trader_id: Option<TraderId>,
2367 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2370 topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2371 instrument_handlers: AHashMap<MStr<Pattern>, TypedHandler<InstrumentAny>>,
2372 deltas_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDeltas>>,
2373 depth10_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDepth10>>,
2374 book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2375 quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2376 trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2377 bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2378 mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2379 index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2380 funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2381 option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2382 option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2383 order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2384 #[cfg(feature = "defi")]
2385 block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2386 #[cfg(feature = "defi")]
2387 pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2388 #[cfg(feature = "defi")]
2389 pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2390 #[cfg(feature = "defi")]
2391 pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2392 #[cfg(feature = "defi")]
2393 pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2394 #[cfg(feature = "defi")]
2395 pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2396 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2398 signal_classes: AHashMap<String, String>,
2399 #[cfg(feature = "indicators")]
2400 indicators: Indicators,
2401}
2402
2403impl Debug for DataActorCore {
2404 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2405 f.debug_struct(stringify!(DataActorCore))
2406 .field("actor_id", &self.actor_id)
2407 .field("config", &self.config)
2408 .field("state", &self.state)
2409 .field("trader_id", &self.trader_id)
2410 .finish()
2411 }
2412}
2413
2414impl DataActorCore {
2415 pub(crate) fn add_subscription_any(
2419 &mut self,
2420 topic: MStr<Topic>,
2421 handler: ShareableMessageHandler,
2422 ) {
2423 let pattern: MStr<Pattern> = topic.into();
2424 if self.topic_handlers.contains_key(&pattern) {
2425 log::warn!(
2426 "Actor {} attempted duplicate subscription to topic '{topic}'",
2427 self.actor_id,
2428 );
2429 return;
2430 }
2431
2432 self.topic_handlers.insert(pattern, handler.clone());
2433 msgbus::subscribe_any(pattern, handler, None);
2434 }
2435
2436 pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2440 let pattern: MStr<Pattern> = topic.into();
2441 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2442 msgbus::unsubscribe_any(pattern, &handler);
2443 } else {
2444 log::warn!(
2445 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2446 self.actor_id,
2447 );
2448 }
2449 }
2450
2451 pub(crate) fn add_quote_subscription(
2452 &mut self,
2453 topic: MStr<Topic>,
2454 handler: TypedHandler<QuoteTick>,
2455 ) {
2456 if self.quote_handlers.contains_key(&topic) {
2457 log::warn!(
2458 "Actor {} attempted duplicate quote subscription to '{topic}'",
2459 self.actor_id
2460 );
2461 return;
2462 }
2463 self.quote_handlers.insert(topic, handler.clone());
2464 msgbus::subscribe_quotes(topic.into(), handler, None);
2465 }
2466
2467 #[allow(dead_code)]
2468 pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2469 if let Some(handler) = self.quote_handlers.remove(&topic) {
2470 msgbus::unsubscribe_quotes(topic.into(), &handler);
2471 }
2472 }
2473
2474 pub(crate) fn add_trade_subscription(
2475 &mut self,
2476 topic: MStr<Topic>,
2477 handler: TypedHandler<TradeTick>,
2478 ) {
2479 if self.trade_handlers.contains_key(&topic) {
2480 log::warn!(
2481 "Actor {} attempted duplicate trade subscription to '{topic}'",
2482 self.actor_id
2483 );
2484 return;
2485 }
2486 self.trade_handlers.insert(topic, handler.clone());
2487 msgbus::subscribe_trades(topic.into(), handler, None);
2488 }
2489
2490 #[allow(dead_code)]
2491 pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2492 if let Some(handler) = self.trade_handlers.remove(&topic) {
2493 msgbus::unsubscribe_trades(topic.into(), &handler);
2494 }
2495 }
2496
2497 pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2498 if self.bar_handlers.contains_key(&topic) {
2499 log::warn!(
2500 "Actor {} attempted duplicate bar subscription to '{topic}'",
2501 self.actor_id
2502 );
2503 return;
2504 }
2505 self.bar_handlers.insert(topic, handler.clone());
2506 msgbus::subscribe_bars(topic.into(), handler, None);
2507 }
2508
2509 #[allow(dead_code)]
2510 pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2511 if let Some(handler) = self.bar_handlers.remove(&topic) {
2512 msgbus::unsubscribe_bars(topic.into(), &handler);
2513 }
2514 }
2515
2516 pub(crate) fn add_order_event_subscription(
2517 &mut self,
2518 topic: MStr<Topic>,
2519 handler: TypedHandler<OrderEventAny>,
2520 ) {
2521 if self.order_event_handlers.contains_key(&topic) {
2522 log::warn!(
2523 "Actor {} attempted duplicate order event subscription to '{topic}'",
2524 self.actor_id
2525 );
2526 return;
2527 }
2528 self.order_event_handlers.insert(topic, handler.clone());
2529 msgbus::subscribe_order_events(topic.into(), handler, None);
2530 }
2531
2532 #[allow(dead_code)]
2533 pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2534 if let Some(handler) = self.order_event_handlers.remove(&topic) {
2535 msgbus::unsubscribe_order_events(topic.into(), &handler);
2536 }
2537 }
2538
2539 pub(crate) fn add_deltas_subscription(
2540 &mut self,
2541 pattern: MStr<Pattern>,
2542 handler: TypedHandler<OrderBookDeltas>,
2543 ) {
2544 if self.deltas_handlers.contains_key(&pattern) {
2545 log::warn!(
2546 "Actor {} attempted duplicate deltas subscription to '{pattern}'",
2547 self.actor_id
2548 );
2549 return;
2550 }
2551 self.deltas_handlers.insert(pattern, handler.clone());
2552 msgbus::subscribe_book_deltas(pattern, handler, None);
2553 }
2554
2555 #[allow(dead_code)]
2556 pub(crate) fn remove_deltas_subscription(&mut self, pattern: MStr<Pattern>) {
2557 if let Some(handler) = self.deltas_handlers.remove(&pattern) {
2558 msgbus::unsubscribe_book_deltas(pattern, &handler);
2559 }
2560 }
2561
2562 #[allow(dead_code)]
2563 pub(crate) fn add_depth10_subscription(
2564 &mut self,
2565 pattern: MStr<Pattern>,
2566 handler: TypedHandler<OrderBookDepth10>,
2567 ) {
2568 if self.depth10_handlers.contains_key(&pattern) {
2569 log::warn!(
2570 "Actor {} attempted duplicate depth10 subscription to '{pattern}'",
2571 self.actor_id
2572 );
2573 return;
2574 }
2575 self.depth10_handlers.insert(pattern, handler.clone());
2576 msgbus::subscribe_book_depth10(pattern, handler, None);
2577 }
2578
2579 #[allow(dead_code)]
2580 pub(crate) fn remove_depth10_subscription(&mut self, pattern: MStr<Pattern>) {
2581 if let Some(handler) = self.depth10_handlers.remove(&pattern) {
2582 msgbus::unsubscribe_book_depth10(pattern, &handler);
2583 }
2584 }
2585
2586 pub(crate) fn add_instrument_subscription(
2587 &mut self,
2588 pattern: MStr<Pattern>,
2589 handler: TypedHandler<InstrumentAny>,
2590 ) {
2591 if self.instrument_handlers.contains_key(&pattern) {
2592 log::warn!(
2593 "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2594 self.actor_id
2595 );
2596 return;
2597 }
2598 self.instrument_handlers.insert(pattern, handler.clone());
2599 msgbus::subscribe_instruments(pattern, handler, None);
2600 }
2601
2602 #[allow(dead_code)]
2603 pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2604 if let Some(handler) = self.instrument_handlers.remove(&pattern) {
2605 msgbus::unsubscribe_instruments(pattern, &handler);
2606 }
2607 }
2608
2609 pub(crate) fn add_instrument_close_subscription(
2610 &mut self,
2611 topic: MStr<Topic>,
2612 handler: ShareableMessageHandler,
2613 ) {
2614 let pattern: MStr<Pattern> = topic.into();
2615 if self.topic_handlers.contains_key(&pattern) {
2616 log::warn!(
2617 "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2618 self.actor_id
2619 );
2620 return;
2621 }
2622 self.topic_handlers.insert(pattern, handler.clone());
2623 msgbus::subscribe_any(pattern, handler, None);
2624 }
2625
2626 #[allow(dead_code)]
2627 pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2628 let pattern: MStr<Pattern> = topic.into();
2629 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2630 msgbus::unsubscribe_any(pattern, &handler);
2631 }
2632 }
2633
2634 pub(crate) fn add_book_snapshot_subscription(
2635 &mut self,
2636 topic: MStr<Topic>,
2637 handler: TypedHandler<OrderBook>,
2638 ) {
2639 if self.book_handlers.contains_key(&topic) {
2640 log::warn!(
2641 "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2642 self.actor_id
2643 );
2644 return;
2645 }
2646 self.book_handlers.insert(topic, handler.clone());
2647 msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2648 }
2649
2650 #[allow(dead_code)]
2651 pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2652 if let Some(handler) = self.book_handlers.remove(&topic) {
2653 msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2654 }
2655 }
2656
2657 pub(crate) fn add_mark_price_subscription(
2658 &mut self,
2659 topic: MStr<Topic>,
2660 handler: TypedHandler<MarkPriceUpdate>,
2661 ) {
2662 if self.mark_price_handlers.contains_key(&topic) {
2663 log::warn!(
2664 "Actor {} attempted duplicate mark price subscription to '{topic}'",
2665 self.actor_id
2666 );
2667 return;
2668 }
2669 self.mark_price_handlers.insert(topic, handler.clone());
2670 msgbus::subscribe_mark_prices(topic.into(), handler, None);
2671 }
2672
2673 #[allow(dead_code)]
2674 pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2675 if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2676 msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2677 }
2678 }
2679
2680 pub(crate) fn add_index_price_subscription(
2681 &mut self,
2682 topic: MStr<Topic>,
2683 handler: TypedHandler<IndexPriceUpdate>,
2684 ) {
2685 if self.index_price_handlers.contains_key(&topic) {
2686 log::warn!(
2687 "Actor {} attempted duplicate index price subscription to '{topic}'",
2688 self.actor_id
2689 );
2690 return;
2691 }
2692 self.index_price_handlers.insert(topic, handler.clone());
2693 msgbus::subscribe_index_prices(topic.into(), handler, None);
2694 }
2695
2696 #[allow(dead_code)]
2697 pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2698 if let Some(handler) = self.index_price_handlers.remove(&topic) {
2699 msgbus::unsubscribe_index_prices(topic.into(), &handler);
2700 }
2701 }
2702
2703 pub(crate) fn add_funding_rate_subscription(
2704 &mut self,
2705 topic: MStr<Topic>,
2706 handler: TypedHandler<FundingRateUpdate>,
2707 ) {
2708 if self.funding_rate_handlers.contains_key(&topic) {
2709 log::warn!(
2710 "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2711 self.actor_id
2712 );
2713 return;
2714 }
2715 self.funding_rate_handlers.insert(topic, handler.clone());
2716 msgbus::subscribe_funding_rates(topic.into(), handler, None);
2717 }
2718
2719 #[allow(dead_code)]
2720 pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2721 if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2722 msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2723 }
2724 }
2725
2726 pub(crate) fn add_option_greeks_subscription(
2727 &mut self,
2728 topic: MStr<Topic>,
2729 handler: TypedHandler<OptionGreeks>,
2730 ) {
2731 if self.option_greeks_handlers.contains_key(&topic) {
2732 log::warn!(
2733 "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2734 self.actor_id
2735 );
2736 return;
2737 }
2738 self.option_greeks_handlers.insert(topic, handler.clone());
2739 msgbus::subscribe_option_greeks(topic.into(), handler, None);
2740 }
2741
2742 #[allow(dead_code)]
2743 pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2744 if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2745 msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2746 }
2747 }
2748
2749 pub(crate) fn add_option_chain_subscription(
2750 &mut self,
2751 topic: MStr<Topic>,
2752 handler: TypedHandler<OptionChainSlice>,
2753 ) {
2754 if self.option_chain_handlers.contains_key(&topic) {
2755 log::warn!(
2756 "Actor {} attempted duplicate option chain subscription to '{topic}'",
2757 self.actor_id
2758 );
2759 return;
2760 }
2761 self.option_chain_handlers.insert(topic, handler.clone());
2762 msgbus::subscribe_option_chain(topic.into(), handler, None);
2763 }
2764
2765 pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2766 if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2767 msgbus::unsubscribe_option_chain(topic.into(), &handler);
2768 }
2769 }
2770
2771 #[cfg(feature = "defi")]
2772 pub(crate) fn add_block_subscription(
2773 &mut self,
2774 topic: MStr<Topic>,
2775 handler: TypedHandler<Block>,
2776 ) {
2777 if self.block_handlers.contains_key(&topic) {
2778 log::warn!(
2779 "Actor {} attempted duplicate block subscription to '{topic}'",
2780 self.actor_id
2781 );
2782 return;
2783 }
2784 self.block_handlers.insert(topic, handler.clone());
2785 msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2786 }
2787
2788 #[cfg(feature = "defi")]
2789 #[allow(dead_code)]
2790 pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2791 if let Some(handler) = self.block_handlers.remove(&topic) {
2792 msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2793 }
2794 }
2795
2796 #[cfg(feature = "defi")]
2797 pub(crate) fn add_pool_subscription(
2798 &mut self,
2799 topic: MStr<Topic>,
2800 handler: TypedHandler<Pool>,
2801 ) {
2802 if self.pool_handlers.contains_key(&topic) {
2803 log::warn!(
2804 "Actor {} attempted duplicate pool subscription to '{topic}'",
2805 self.actor_id
2806 );
2807 return;
2808 }
2809 self.pool_handlers.insert(topic, handler.clone());
2810 msgbus::subscribe_defi_pools(topic.into(), handler, None);
2811 }
2812
2813 #[cfg(feature = "defi")]
2814 #[allow(dead_code)]
2815 pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2816 if let Some(handler) = self.pool_handlers.remove(&topic) {
2817 msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2818 }
2819 }
2820
2821 #[cfg(feature = "defi")]
2822 pub(crate) fn add_pool_swap_subscription(
2823 &mut self,
2824 topic: MStr<Topic>,
2825 handler: TypedHandler<PoolSwap>,
2826 ) {
2827 if self.pool_swap_handlers.contains_key(&topic) {
2828 log::warn!(
2829 "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2830 self.actor_id
2831 );
2832 return;
2833 }
2834 self.pool_swap_handlers.insert(topic, handler.clone());
2835 msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2836 }
2837
2838 #[cfg(feature = "defi")]
2839 #[allow(dead_code)]
2840 pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2841 if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2842 msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2843 }
2844 }
2845
2846 #[cfg(feature = "defi")]
2847 pub(crate) fn add_pool_liquidity_subscription(
2848 &mut self,
2849 topic: MStr<Topic>,
2850 handler: TypedHandler<PoolLiquidityUpdate>,
2851 ) {
2852 if self.pool_liquidity_handlers.contains_key(&topic) {
2853 log::warn!(
2854 "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2855 self.actor_id
2856 );
2857 return;
2858 }
2859 self.pool_liquidity_handlers.insert(topic, handler.clone());
2860 msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2861 }
2862
2863 #[cfg(feature = "defi")]
2864 #[allow(dead_code)]
2865 pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2866 if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2867 msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2868 }
2869 }
2870
2871 #[cfg(feature = "defi")]
2872 pub(crate) fn add_pool_collect_subscription(
2873 &mut self,
2874 topic: MStr<Topic>,
2875 handler: TypedHandler<PoolFeeCollect>,
2876 ) {
2877 if self.pool_collect_handlers.contains_key(&topic) {
2878 log::warn!(
2879 "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2880 self.actor_id
2881 );
2882 return;
2883 }
2884 self.pool_collect_handlers.insert(topic, handler.clone());
2885 msgbus::subscribe_defi_collects(topic.into(), handler, None);
2886 }
2887
2888 #[cfg(feature = "defi")]
2889 #[allow(dead_code)]
2890 pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2891 if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2892 msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2893 }
2894 }
2895
2896 #[cfg(feature = "defi")]
2897 pub(crate) fn add_pool_flash_subscription(
2898 &mut self,
2899 topic: MStr<Topic>,
2900 handler: TypedHandler<PoolFlash>,
2901 ) {
2902 if self.pool_flash_handlers.contains_key(&topic) {
2903 log::warn!(
2904 "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2905 self.actor_id
2906 );
2907 return;
2908 }
2909 self.pool_flash_handlers.insert(topic, handler.clone());
2910 msgbus::subscribe_defi_flash(topic.into(), handler, None);
2911 }
2912
2913 #[cfg(feature = "defi")]
2914 #[allow(dead_code)]
2915 pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2916 if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2917 msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2918 }
2919 }
2920
2921 pub fn new(config: DataActorConfig) -> Self {
2923 let actor_id = config
2924 .actor_id
2925 .unwrap_or_else(|| Self::default_actor_id(&config));
2926
2927 Self {
2928 actor_id,
2929 config,
2930 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2934 topic_handlers: AHashMap::new(),
2935 instrument_handlers: AHashMap::new(),
2936 deltas_handlers: AHashMap::new(),
2937 depth10_handlers: AHashMap::new(),
2938 book_handlers: AHashMap::new(),
2939 quote_handlers: AHashMap::new(),
2940 trade_handlers: AHashMap::new(),
2941 bar_handlers: AHashMap::new(),
2942 mark_price_handlers: AHashMap::new(),
2943 index_price_handlers: AHashMap::new(),
2944 funding_rate_handlers: AHashMap::new(),
2945 option_greeks_handlers: AHashMap::new(),
2946 option_chain_handlers: AHashMap::new(),
2947 order_event_handlers: AHashMap::new(),
2948 #[cfg(feature = "defi")]
2949 block_handlers: AHashMap::new(),
2950 #[cfg(feature = "defi")]
2951 pool_handlers: AHashMap::new(),
2952 #[cfg(feature = "defi")]
2953 pool_swap_handlers: AHashMap::new(),
2954 #[cfg(feature = "defi")]
2955 pool_liquidity_handlers: AHashMap::new(),
2956 #[cfg(feature = "defi")]
2957 pool_collect_handlers: AHashMap::new(),
2958 #[cfg(feature = "defi")]
2959 pool_flash_handlers: AHashMap::new(),
2960 warning_events: AHashSet::new(),
2961 pending_requests: AHashMap::new(),
2962 signal_classes: AHashMap::new(),
2963 #[cfg(feature = "indicators")]
2964 indicators: Indicators::default(),
2965 }
2966 }
2967
2968 #[must_use]
2970 pub fn mem_address(&self) -> String {
2971 format!("{self:p}")
2972 }
2973
2974 pub fn state(&self) -> ComponentState {
2976 self.state
2977 }
2978
2979 pub fn trader_id(&self) -> Option<TraderId> {
2981 self.trader_id
2982 }
2983
2984 pub fn actor_id(&self) -> ActorId {
2986 self.actor_id
2987 }
2988
2989 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2990 let memory_address = std::ptr::from_ref(config) as usize;
2991 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2992 }
2993
2994 pub fn timestamp_ns(&self) -> UnixNanos {
2996 self.clock_ref().timestamp_ns()
2997 }
2998
2999 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
3005 self.clock
3006 .as_ref()
3007 .unwrap_or_else(|| {
3008 panic!(
3009 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
3010 self.actor_id, self.trader_id
3011 )
3012 })
3013 .borrow_mut()
3014 }
3015
3016 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
3022 self.clock
3023 .as_ref()
3024 .expect("DataActor must be registered before accessing clock")
3025 .clone()
3026 }
3027
3028 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
3029 self.clock
3030 .as_ref()
3031 .unwrap_or_else(|| {
3032 panic!(
3033 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
3034 self.actor_id, self.trader_id
3035 )
3036 })
3037 .borrow()
3038 }
3039
3040 pub fn cache(&self) -> Ref<'_, Cache> {
3046 self.cache
3047 .as_ref()
3048 .expect("DataActor must be registered before accessing cache")
3049 .borrow()
3050 }
3051
3052 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
3058 self.cache
3059 .as_ref()
3060 .expect("DataActor must be registered before accessing cache")
3061 .clone()
3062 }
3063
3064 pub fn register(
3071 &mut self,
3072 trader_id: TraderId,
3073 clock: Rc<RefCell<dyn Clock>>,
3074 cache: Rc<RefCell<Cache>>,
3075 ) -> anyhow::Result<()> {
3076 if let Some(existing_trader_id) = self.trader_id {
3077 anyhow::bail!(
3078 "DataActor {} already registered with trader {existing_trader_id}",
3079 self.actor_id
3080 );
3081 }
3082
3083 {
3085 let _timestamp = clock.borrow().timestamp_ns();
3086 }
3087
3088 {
3090 let _cache_borrow = cache.borrow();
3091 }
3092
3093 self.trader_id = Some(trader_id);
3094 self.clock = Some(clock);
3095 self.cache = Some(cache);
3096
3097 if !self.is_properly_registered() {
3099 anyhow::bail!(
3100 "DataActor {} registration incomplete - validation failed",
3101 self.actor_id
3102 );
3103 }
3104
3105 log::debug!("Registered {} with trader {trader_id}", self.actor_id);
3106 Ok(())
3107 }
3108
3109 pub fn register_warning_event(&mut self, event_type: &str) {
3111 self.warning_events.insert(event_type.to_string());
3112 log::debug!("Registered event type '{event_type}' for warning logs");
3113 }
3114
3115 pub fn deregister_warning_event(&mut self, event_type: &str) {
3117 self.warning_events.remove(event_type);
3118 log::debug!("Deregistered event type '{event_type}' from warning logs");
3119 }
3120
3121 pub fn is_registered(&self) -> bool {
3122 self.trader_id.is_some()
3123 }
3124
3125 pub(crate) fn check_registered(&self) {
3126 assert!(
3127 self.is_registered(),
3128 "Actor has not been registered with a Trader"
3129 );
3130 }
3131
3132 fn is_properly_registered(&self) -> bool {
3134 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
3135 }
3136
3137 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
3138 if self.config.log_commands {
3139 log::info!("{CMD}{SEND} {command:?}");
3140 }
3141
3142 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3143 msgbus::send_data_command(endpoint, command);
3144 }
3145
3146 #[allow(dead_code)]
3147 fn send_data_req(&self, request: &RequestCommand) {
3148 if self.config.log_commands {
3149 log::info!("{REQ}{SEND} {request:?}");
3150 }
3151
3152 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3155 msgbus::send_any(endpoint, request.as_any());
3156 }
3157
3158 pub fn shutdown_system(&self, reason: Option<String>) {
3164 self.check_registered();
3165
3166 let command = ShutdownSystem::new(
3168 self.trader_id().unwrap(),
3169 self.actor_id.inner(),
3170 reason,
3171 UUID4::new(),
3172 self.timestamp_ns(),
3173 None, );
3175
3176 let topic = MessagingSwitchboard::shutdown_system_topic();
3177 msgbus::publish_any(topic, command.as_any());
3178 }
3179
3180 pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3190 self.check_registered();
3191
3192 let topic = get_custom_topic(data_type);
3193 msgbus::publish_any(topic, data);
3194 }
3195
3196 pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3208 self.check_registered();
3209
3210 let now = self.timestamp_ns();
3211 let ts_event = if ts_event.as_u64() == 0 {
3212 now
3213 } else {
3214 ts_event
3215 };
3216 let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3217
3218 let data_type = DataType::new(
3219 &format!(
3220 "Signal{}",
3221 nautilus_core::string::conversions::title_case(name)
3222 ),
3223 None,
3224 None,
3225 );
3226 let data = CustomData::new(Arc::new(signal), data_type);
3227 let topic = get_custom_topic(&data.data_type);
3228 msgbus::publish_any(topic, &data);
3229 }
3230
3231 pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3239 self.check_registered();
3240
3241 let cache = self.cache_rc();
3242 if cache.borrow().synthetic(&synthetic.id).is_some() {
3243 anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3244 }
3245 cache.borrow_mut().add_synthetic(synthetic)
3246 }
3247
3248 pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3256 self.check_registered();
3257
3258 let cache = self.cache_rc();
3259 if cache.borrow().synthetic(&synthetic.id).is_none() {
3260 anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3261 }
3262 cache.borrow_mut().add_synthetic(synthetic)
3263 }
3264
3265 pub fn subscribe_data(
3271 &mut self,
3272 handler: ShareableMessageHandler,
3273 data_type: DataType,
3274 client_id: Option<ClientId>,
3275 params: Option<Params>,
3276 ) {
3277 assert!(
3278 self.is_properly_registered(),
3279 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3280 self.actor_id,
3281 self.trader_id,
3282 self.clock.is_some(),
3283 self.cache.is_some()
3284 );
3285
3286 let topic = get_custom_topic(&data_type);
3287 self.add_subscription_any(topic, handler);
3288
3289 if client_id.is_none() {
3291 return;
3292 }
3293
3294 let command = SubscribeCommand::Data(SubscribeCustomData {
3295 data_type,
3296 client_id,
3297 venue: None,
3298 command_id: UUID4::new(),
3299 ts_init: self.timestamp_ns(),
3300 correlation_id: None,
3301 params,
3302 });
3303
3304 self.send_data_cmd(DataCommand::Subscribe(command));
3305 }
3306
3307 pub fn subscribe_signal(
3315 &mut self,
3316 handler: ShareableMessageHandler,
3317 name: &str,
3318 priority: Option<u32>,
3319 ) {
3320 self.check_registered();
3321
3322 let pattern = get_signal_pattern(name);
3323 if self.topic_handlers.contains_key(&pattern) {
3324 log::warn!(
3325 "Actor {} attempted duplicate signal subscription to '{pattern}'",
3326 self.actor_id,
3327 );
3328 return;
3329 }
3330 self.topic_handlers.insert(pattern, handler.clone());
3331 msgbus::subscribe_any(pattern, handler, priority);
3332 }
3333
3334 pub fn subscribe_quotes(
3336 &mut self,
3337 topic: MStr<Topic>,
3338 handler: TypedHandler<QuoteTick>,
3339 instrument_id: InstrumentId,
3340 client_id: Option<ClientId>,
3341 params: Option<Params>,
3342 ) {
3343 self.check_registered();
3344
3345 self.add_quote_subscription(topic, handler);
3346
3347 let command = SubscribeCommand::Quotes(SubscribeQuotes {
3348 instrument_id,
3349 client_id,
3350 venue: Some(instrument_id.venue),
3351 command_id: UUID4::new(),
3352 ts_init: self.timestamp_ns(),
3353 correlation_id: None,
3354 params,
3355 });
3356
3357 self.send_data_cmd(DataCommand::Subscribe(command));
3358 }
3359
3360 pub fn subscribe_instruments(
3362 &mut self,
3363 pattern: MStr<Pattern>,
3364 handler: TypedHandler<InstrumentAny>,
3365 venue: Venue,
3366 client_id: Option<ClientId>,
3367 params: Option<Params>,
3368 ) {
3369 self.check_registered();
3370
3371 self.add_instrument_subscription(pattern, handler);
3372
3373 let command = SubscribeCommand::Instruments(SubscribeInstruments {
3374 client_id,
3375 venue,
3376 command_id: UUID4::new(),
3377 ts_init: self.timestamp_ns(),
3378 correlation_id: None,
3379 params,
3380 });
3381
3382 self.send_data_cmd(DataCommand::Subscribe(command));
3383 }
3384
3385 pub fn subscribe_instrument(
3387 &mut self,
3388 topic: MStr<Topic>,
3389 handler: TypedHandler<InstrumentAny>,
3390 instrument_id: InstrumentId,
3391 client_id: Option<ClientId>,
3392 params: Option<Params>,
3393 ) {
3394 self.check_registered();
3395
3396 self.add_instrument_subscription(topic.into(), handler);
3397
3398 let command = SubscribeCommand::Instrument(SubscribeInstrument {
3399 instrument_id,
3400 client_id,
3401 venue: Some(instrument_id.venue),
3402 command_id: UUID4::new(),
3403 ts_init: self.timestamp_ns(),
3404 correlation_id: None,
3405 params,
3406 });
3407
3408 self.send_data_cmd(DataCommand::Subscribe(command));
3409 }
3410
3411 #[expect(clippy::too_many_arguments)]
3413 pub fn subscribe_book_deltas(
3414 &mut self,
3415 pattern: MStr<Pattern>,
3416 handler: TypedHandler<OrderBookDeltas>,
3417 instrument_id: InstrumentId,
3418 book_type: BookType,
3419 depth: Option<NonZeroUsize>,
3420 client_id: Option<ClientId>,
3421 managed: bool,
3422 params: Option<Params>,
3423 ) {
3424 self.check_registered();
3425
3426 self.add_deltas_subscription(pattern, handler);
3427
3428 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3429 instrument_id,
3430 book_type,
3431 client_id,
3432 venue: Some(instrument_id.venue),
3433 command_id: UUID4::new(),
3434 ts_init: self.timestamp_ns(),
3435 depth,
3436 managed,
3437 correlation_id: None,
3438 params,
3439 });
3440
3441 self.send_data_cmd(DataCommand::Subscribe(command));
3442 }
3443
3444 #[expect(clippy::too_many_arguments)]
3446 pub fn subscribe_book_at_interval(
3447 &mut self,
3448 topic: MStr<Topic>,
3449 handler: TypedHandler<OrderBook>,
3450 instrument_id: InstrumentId,
3451 book_type: BookType,
3452 depth: Option<NonZeroUsize>,
3453 interval_ms: NonZeroUsize,
3454 client_id: Option<ClientId>,
3455 params: Option<Params>,
3456 ) {
3457 self.check_registered();
3458
3459 self.add_book_snapshot_subscription(topic, handler);
3460
3461 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3462 instrument_id,
3463 book_type,
3464 client_id,
3465 venue: Some(instrument_id.venue),
3466 command_id: UUID4::new(),
3467 ts_init: self.timestamp_ns(),
3468 depth,
3469 interval_ms,
3470 correlation_id: None,
3471 params,
3472 });
3473
3474 self.send_data_cmd(DataCommand::Subscribe(command));
3475 }
3476
3477 pub fn subscribe_trades(
3479 &mut self,
3480 topic: MStr<Topic>,
3481 handler: TypedHandler<TradeTick>,
3482 instrument_id: InstrumentId,
3483 client_id: Option<ClientId>,
3484 params: Option<Params>,
3485 ) {
3486 self.check_registered();
3487
3488 self.add_trade_subscription(topic, handler);
3489
3490 let command = SubscribeCommand::Trades(SubscribeTrades {
3491 instrument_id,
3492 client_id,
3493 venue: Some(instrument_id.venue),
3494 command_id: UUID4::new(),
3495 ts_init: self.timestamp_ns(),
3496 correlation_id: None,
3497 params,
3498 });
3499
3500 self.send_data_cmd(DataCommand::Subscribe(command));
3501 }
3502
3503 pub fn subscribe_bars(
3505 &mut self,
3506 topic: MStr<Topic>,
3507 handler: TypedHandler<Bar>,
3508 bar_type: BarType,
3509 client_id: Option<ClientId>,
3510 params: Option<Params>,
3511 ) {
3512 self.check_registered();
3513
3514 self.add_bar_subscription(topic, handler);
3515
3516 let command = SubscribeCommand::Bars(SubscribeBars {
3517 bar_type,
3518 client_id,
3519 venue: Some(bar_type.instrument_id().venue),
3520 command_id: UUID4::new(),
3521 ts_init: self.timestamp_ns(),
3522 correlation_id: None,
3523 params,
3524 });
3525
3526 self.send_data_cmd(DataCommand::Subscribe(command));
3527 }
3528
3529 pub fn subscribe_mark_prices(
3531 &mut self,
3532 topic: MStr<Topic>,
3533 handler: TypedHandler<MarkPriceUpdate>,
3534 instrument_id: InstrumentId,
3535 client_id: Option<ClientId>,
3536 params: Option<Params>,
3537 ) {
3538 self.check_registered();
3539
3540 self.add_mark_price_subscription(topic, handler);
3541
3542 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3543 instrument_id,
3544 client_id,
3545 venue: Some(instrument_id.venue),
3546 command_id: UUID4::new(),
3547 ts_init: self.timestamp_ns(),
3548 correlation_id: None,
3549 params,
3550 });
3551
3552 self.send_data_cmd(DataCommand::Subscribe(command));
3553 }
3554
3555 pub fn subscribe_index_prices(
3557 &mut self,
3558 topic: MStr<Topic>,
3559 handler: TypedHandler<IndexPriceUpdate>,
3560 instrument_id: InstrumentId,
3561 client_id: Option<ClientId>,
3562 params: Option<Params>,
3563 ) {
3564 self.check_registered();
3565
3566 self.add_index_price_subscription(topic, handler);
3567
3568 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3569 instrument_id,
3570 client_id,
3571 venue: Some(instrument_id.venue),
3572 command_id: UUID4::new(),
3573 ts_init: self.timestamp_ns(),
3574 correlation_id: None,
3575 params,
3576 });
3577
3578 self.send_data_cmd(DataCommand::Subscribe(command));
3579 }
3580
3581 pub fn subscribe_funding_rates(
3583 &mut self,
3584 topic: MStr<Topic>,
3585 handler: TypedHandler<FundingRateUpdate>,
3586 instrument_id: InstrumentId,
3587 client_id: Option<ClientId>,
3588 params: Option<Params>,
3589 ) {
3590 self.check_registered();
3591
3592 self.add_funding_rate_subscription(topic, handler);
3593
3594 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3595 instrument_id,
3596 client_id,
3597 venue: Some(instrument_id.venue),
3598 command_id: UUID4::new(),
3599 ts_init: self.timestamp_ns(),
3600 correlation_id: None,
3601 params,
3602 });
3603
3604 self.send_data_cmd(DataCommand::Subscribe(command));
3605 }
3606
3607 pub fn subscribe_option_greeks(
3609 &mut self,
3610 topic: MStr<Topic>,
3611 handler: TypedHandler<OptionGreeks>,
3612 instrument_id: InstrumentId,
3613 client_id: Option<ClientId>,
3614 params: Option<Params>,
3615 ) {
3616 self.check_registered();
3617
3618 self.add_option_greeks_subscription(topic, handler);
3619
3620 let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3621 instrument_id,
3622 client_id,
3623 venue: Some(instrument_id.venue),
3624 command_id: UUID4::new(),
3625 ts_init: self.timestamp_ns(),
3626 correlation_id: None,
3627 params,
3628 });
3629
3630 self.send_data_cmd(DataCommand::Subscribe(command));
3631 }
3632
3633 pub fn subscribe_instrument_status(
3635 &mut self,
3636 topic: MStr<Topic>,
3637 handler: ShareableMessageHandler,
3638 instrument_id: InstrumentId,
3639 client_id: Option<ClientId>,
3640 params: Option<Params>,
3641 ) {
3642 self.check_registered();
3643
3644 self.add_subscription_any(topic, handler);
3645
3646 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3647 instrument_id,
3648 client_id,
3649 venue: Some(instrument_id.venue),
3650 command_id: UUID4::new(),
3651 ts_init: self.timestamp_ns(),
3652 correlation_id: None,
3653 params,
3654 });
3655
3656 self.send_data_cmd(DataCommand::Subscribe(command));
3657 }
3658
3659 pub fn subscribe_instrument_close(
3661 &mut self,
3662 topic: MStr<Topic>,
3663 handler: ShareableMessageHandler,
3664 instrument_id: InstrumentId,
3665 client_id: Option<ClientId>,
3666 params: Option<Params>,
3667 ) {
3668 self.check_registered();
3669
3670 self.add_instrument_close_subscription(topic, handler);
3671
3672 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3673 instrument_id,
3674 client_id,
3675 venue: Some(instrument_id.venue),
3676 command_id: UUID4::new(),
3677 ts_init: self.timestamp_ns(),
3678 correlation_id: None,
3679 params,
3680 });
3681
3682 self.send_data_cmd(DataCommand::Subscribe(command));
3683 }
3684
3685 #[allow(clippy::too_many_arguments)]
3687 pub fn subscribe_option_chain(
3688 &mut self,
3689 topic: MStr<Topic>,
3690 handler: TypedHandler<OptionChainSlice>,
3691 series_id: OptionSeriesId,
3692 strike_range: StrikeRange,
3693 snapshot_interval_ms: Option<u64>,
3694 client_id: Option<ClientId>,
3695 params: Option<Params>,
3696 ) {
3697 self.check_registered();
3698
3699 self.add_option_chain_subscription(topic, handler);
3700
3701 let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3702 series_id,
3703 strike_range,
3704 snapshot_interval_ms,
3705 UUID4::new(),
3706 self.timestamp_ns(),
3707 client_id,
3708 Some(series_id.venue),
3709 params,
3710 ));
3711
3712 self.send_data_cmd(DataCommand::Subscribe(command));
3713 }
3714
3715 pub fn subscribe_order_fills(
3717 &mut self,
3718 topic: MStr<Topic>,
3719 handler: TypedHandler<OrderEventAny>,
3720 ) {
3721 self.check_registered();
3722 self.add_order_event_subscription(topic, handler);
3723 }
3724
3725 pub fn subscribe_order_cancels(
3727 &mut self,
3728 topic: MStr<Topic>,
3729 handler: TypedHandler<OrderEventAny>,
3730 ) {
3731 self.check_registered();
3732 self.add_order_event_subscription(topic, handler);
3733 }
3734
3735 pub fn unsubscribe_data(
3737 &mut self,
3738 data_type: DataType,
3739 client_id: Option<ClientId>,
3740 params: Option<Params>,
3741 ) {
3742 self.check_registered();
3743
3744 let topic = get_custom_topic(&data_type);
3745 self.remove_subscription_any(topic);
3746
3747 if client_id.is_none() {
3748 return;
3749 }
3750
3751 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3752 data_type,
3753 client_id,
3754 venue: None,
3755 command_id: UUID4::new(),
3756 ts_init: self.timestamp_ns(),
3757 correlation_id: None,
3758 params,
3759 });
3760
3761 self.send_data_cmd(DataCommand::Unsubscribe(command));
3762 }
3763
3764 pub fn unsubscribe_signal(&mut self, name: &str) {
3770 self.check_registered();
3771
3772 let pattern = get_signal_pattern(name);
3773 if let Some(handler) = self.topic_handlers.remove(&pattern) {
3774 msgbus::unsubscribe_any(pattern, &handler);
3775 } else {
3776 log::warn!(
3777 "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
3778 self.actor_id,
3779 );
3780 }
3781 }
3782
3783 pub fn unsubscribe_instruments(
3785 &mut self,
3786 venue: Venue,
3787 client_id: Option<ClientId>,
3788 params: Option<Params>,
3789 ) {
3790 self.check_registered();
3791
3792 let pattern = get_instruments_pattern(venue);
3793 self.remove_instrument_subscription(pattern);
3794
3795 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3796 client_id,
3797 venue,
3798 command_id: UUID4::new(),
3799 ts_init: self.timestamp_ns(),
3800 correlation_id: None,
3801 params,
3802 });
3803
3804 self.send_data_cmd(DataCommand::Unsubscribe(command));
3805 }
3806
3807 pub fn unsubscribe_instrument(
3809 &mut self,
3810 instrument_id: InstrumentId,
3811 client_id: Option<ClientId>,
3812 params: Option<Params>,
3813 ) {
3814 self.check_registered();
3815
3816 let topic = get_instrument_topic(instrument_id);
3817 self.remove_instrument_subscription(topic.into());
3818
3819 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3820 instrument_id,
3821 client_id,
3822 venue: Some(instrument_id.venue),
3823 command_id: UUID4::new(),
3824 ts_init: self.timestamp_ns(),
3825 correlation_id: None,
3826 params,
3827 });
3828
3829 self.send_data_cmd(DataCommand::Unsubscribe(command));
3830 }
3831
3832 pub fn unsubscribe_book_deltas(
3834 &mut self,
3835 instrument_id: InstrumentId,
3836 client_id: Option<ClientId>,
3837 params: Option<Params>,
3838 ) {
3839 self.check_registered();
3840
3841 let pattern = if is_parent_subscription(params.as_ref()) {
3842 get_book_deltas_pattern(instrument_id)
3843 } else {
3844 get_book_deltas_topic(instrument_id).into()
3845 };
3846 self.remove_deltas_subscription(pattern);
3847
3848 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3849 instrument_id,
3850 client_id,
3851 venue: Some(instrument_id.venue),
3852 command_id: UUID4::new(),
3853 ts_init: self.timestamp_ns(),
3854 correlation_id: None,
3855 params,
3856 });
3857
3858 self.send_data_cmd(DataCommand::Unsubscribe(command));
3859 }
3860
3861 pub fn unsubscribe_book_at_interval(
3863 &mut self,
3864 instrument_id: InstrumentId,
3865 interval_ms: NonZeroUsize,
3866 client_id: Option<ClientId>,
3867 params: Option<Params>,
3868 ) {
3869 self.check_registered();
3870
3871 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3872 self.remove_book_snapshot_subscription(topic);
3873
3874 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3875 instrument_id,
3876 interval_ms,
3877 client_id,
3878 venue: Some(instrument_id.venue),
3879 command_id: UUID4::new(),
3880 ts_init: self.timestamp_ns(),
3881 correlation_id: None,
3882 params,
3883 });
3884
3885 self.send_data_cmd(DataCommand::Unsubscribe(command));
3886 }
3887
3888 pub fn unsubscribe_quotes(
3890 &mut self,
3891 instrument_id: InstrumentId,
3892 client_id: Option<ClientId>,
3893 params: Option<Params>,
3894 ) {
3895 self.check_registered();
3896
3897 let topic = get_quotes_topic(instrument_id);
3898 self.remove_quote_subscription(topic);
3899
3900 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3901 instrument_id,
3902 client_id,
3903 venue: Some(instrument_id.venue),
3904 command_id: UUID4::new(),
3905 ts_init: self.timestamp_ns(),
3906 correlation_id: None,
3907 params,
3908 });
3909
3910 self.send_data_cmd(DataCommand::Unsubscribe(command));
3911 }
3912
3913 pub fn unsubscribe_trades(
3915 &mut self,
3916 instrument_id: InstrumentId,
3917 client_id: Option<ClientId>,
3918 params: Option<Params>,
3919 ) {
3920 self.check_registered();
3921
3922 let topic = get_trades_topic(instrument_id);
3923 self.remove_trade_subscription(topic);
3924
3925 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3926 instrument_id,
3927 client_id,
3928 venue: Some(instrument_id.venue),
3929 command_id: UUID4::new(),
3930 ts_init: self.timestamp_ns(),
3931 correlation_id: None,
3932 params,
3933 });
3934
3935 self.send_data_cmd(DataCommand::Unsubscribe(command));
3936 }
3937
3938 pub fn unsubscribe_bars(
3940 &mut self,
3941 bar_type: BarType,
3942 client_id: Option<ClientId>,
3943 params: Option<Params>,
3944 ) {
3945 self.check_registered();
3946
3947 let topic = get_bars_topic(bar_type);
3948 self.remove_bar_subscription(topic);
3949
3950 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3951 bar_type,
3952 client_id,
3953 venue: Some(bar_type.instrument_id().venue),
3954 command_id: UUID4::new(),
3955 ts_init: self.timestamp_ns(),
3956 correlation_id: None,
3957 params,
3958 });
3959
3960 self.send_data_cmd(DataCommand::Unsubscribe(command));
3961 }
3962
3963 pub fn unsubscribe_mark_prices(
3965 &mut self,
3966 instrument_id: InstrumentId,
3967 client_id: Option<ClientId>,
3968 params: Option<Params>,
3969 ) {
3970 self.check_registered();
3971
3972 let topic = get_mark_price_topic(instrument_id);
3973 self.remove_mark_price_subscription(topic);
3974
3975 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3976 instrument_id,
3977 client_id,
3978 venue: Some(instrument_id.venue),
3979 command_id: UUID4::new(),
3980 ts_init: self.timestamp_ns(),
3981 correlation_id: None,
3982 params,
3983 });
3984
3985 self.send_data_cmd(DataCommand::Unsubscribe(command));
3986 }
3987
3988 pub fn unsubscribe_index_prices(
3990 &mut self,
3991 instrument_id: InstrumentId,
3992 client_id: Option<ClientId>,
3993 params: Option<Params>,
3994 ) {
3995 self.check_registered();
3996
3997 let topic = get_index_price_topic(instrument_id);
3998 self.remove_index_price_subscription(topic);
3999
4000 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
4001 instrument_id,
4002 client_id,
4003 venue: Some(instrument_id.venue),
4004 command_id: UUID4::new(),
4005 ts_init: self.timestamp_ns(),
4006 correlation_id: None,
4007 params,
4008 });
4009
4010 self.send_data_cmd(DataCommand::Unsubscribe(command));
4011 }
4012
4013 pub fn unsubscribe_funding_rates(
4015 &mut self,
4016 instrument_id: InstrumentId,
4017 client_id: Option<ClientId>,
4018 params: Option<Params>,
4019 ) {
4020 self.check_registered();
4021
4022 let topic = get_funding_rate_topic(instrument_id);
4023 self.remove_funding_rate_subscription(topic);
4024
4025 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
4026 instrument_id,
4027 client_id,
4028 venue: Some(instrument_id.venue),
4029 command_id: UUID4::new(),
4030 ts_init: self.timestamp_ns(),
4031 correlation_id: None,
4032 params,
4033 });
4034
4035 self.send_data_cmd(DataCommand::Unsubscribe(command));
4036 }
4037
4038 pub fn unsubscribe_option_greeks(
4040 &mut self,
4041 instrument_id: InstrumentId,
4042 client_id: Option<ClientId>,
4043 params: Option<Params>,
4044 ) {
4045 self.check_registered();
4046
4047 let topic = get_option_greeks_topic(instrument_id);
4048 self.remove_option_greeks_subscription(topic);
4049
4050 let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
4051 instrument_id,
4052 client_id,
4053 venue: Some(instrument_id.venue),
4054 command_id: UUID4::new(),
4055 ts_init: self.timestamp_ns(),
4056 correlation_id: None,
4057 params,
4058 });
4059
4060 self.send_data_cmd(DataCommand::Unsubscribe(command));
4061 }
4062
4063 pub fn unsubscribe_instrument_status(
4065 &mut self,
4066 instrument_id: InstrumentId,
4067 client_id: Option<ClientId>,
4068 params: Option<Params>,
4069 ) {
4070 self.check_registered();
4071
4072 let topic = get_instrument_status_topic(instrument_id);
4073 self.remove_subscription_any(topic);
4074
4075 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
4076 instrument_id,
4077 client_id,
4078 venue: Some(instrument_id.venue),
4079 command_id: UUID4::new(),
4080 ts_init: self.timestamp_ns(),
4081 correlation_id: None,
4082 params,
4083 });
4084
4085 self.send_data_cmd(DataCommand::Unsubscribe(command));
4086 }
4087
4088 pub fn unsubscribe_instrument_close(
4090 &mut self,
4091 instrument_id: InstrumentId,
4092 client_id: Option<ClientId>,
4093 params: Option<Params>,
4094 ) {
4095 self.check_registered();
4096
4097 let topic = get_instrument_close_topic(instrument_id);
4098 self.remove_instrument_close_subscription(topic);
4099
4100 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
4101 instrument_id,
4102 client_id,
4103 venue: Some(instrument_id.venue),
4104 command_id: UUID4::new(),
4105 ts_init: self.timestamp_ns(),
4106 correlation_id: None,
4107 params,
4108 });
4109
4110 self.send_data_cmd(DataCommand::Unsubscribe(command));
4111 }
4112
4113 pub fn unsubscribe_option_chain(
4115 &mut self,
4116 series_id: OptionSeriesId,
4117 client_id: Option<ClientId>,
4118 ) {
4119 self.check_registered();
4120
4121 let topic = get_option_chain_topic(series_id);
4122 self.remove_option_chain_subscription(topic);
4123
4124 let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
4125 series_id,
4126 UUID4::new(),
4127 self.timestamp_ns(),
4128 client_id,
4129 Some(series_id.venue),
4130 ));
4131
4132 self.send_data_cmd(DataCommand::Unsubscribe(command));
4133 }
4134
4135 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
4137 self.check_registered();
4138
4139 let topic = get_order_fills_topic(instrument_id);
4140 self.remove_order_event_subscription(topic);
4141 }
4142
4143 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
4145 self.check_registered();
4146
4147 let topic = get_order_cancels_topic(instrument_id);
4148 self.remove_order_event_subscription(topic);
4149 }
4150
4151 #[expect(clippy::too_many_arguments)]
4157 pub fn request_data(
4158 &self,
4159 data_type: DataType,
4160 client_id: ClientId,
4161 start: Option<DateTime<Utc>>,
4162 end: Option<DateTime<Utc>>,
4163 limit: Option<NonZeroUsize>,
4164 params: Option<Params>,
4165 handler: ShareableMessageHandler,
4166 ) -> anyhow::Result<UUID4> {
4167 self.check_registered();
4168
4169 let now = self.clock_ref().utc_now();
4170 check_timestamps(now, start, end)?;
4171
4172 let request_id = UUID4::new();
4173 let command = RequestCommand::Data(RequestCustomData {
4174 client_id,
4175 data_type,
4176 start,
4177 end,
4178 limit,
4179 request_id,
4180 ts_init: self.timestamp_ns(),
4181 params,
4182 });
4183
4184 get_message_bus()
4185 .borrow_mut()
4186 .register_response_handler(command.request_id(), handler)?;
4187
4188 self.send_data_cmd(DataCommand::Request(command));
4189
4190 Ok(request_id)
4191 }
4192
4193 pub fn request_instrument(
4199 &self,
4200 instrument_id: InstrumentId,
4201 start: Option<DateTime<Utc>>,
4202 end: Option<DateTime<Utc>>,
4203 client_id: Option<ClientId>,
4204 params: Option<Params>,
4205 handler: ShareableMessageHandler,
4206 ) -> anyhow::Result<UUID4> {
4207 self.check_registered();
4208
4209 let now = self.clock_ref().utc_now();
4210 check_timestamps(now, start, end)?;
4211
4212 let request_id = UUID4::new();
4213 let command = RequestCommand::Instrument(RequestInstrument {
4214 instrument_id,
4215 start,
4216 end,
4217 client_id,
4218 request_id,
4219 ts_init: now.into(),
4220 params,
4221 });
4222
4223 get_message_bus()
4224 .borrow_mut()
4225 .register_response_handler(command.request_id(), handler)?;
4226
4227 self.send_data_cmd(DataCommand::Request(command));
4228
4229 Ok(request_id)
4230 }
4231
4232 pub fn request_instruments(
4238 &self,
4239 venue: Option<Venue>,
4240 start: Option<DateTime<Utc>>,
4241 end: Option<DateTime<Utc>>,
4242 client_id: Option<ClientId>,
4243 params: Option<Params>,
4244 handler: ShareableMessageHandler,
4245 ) -> anyhow::Result<UUID4> {
4246 self.check_registered();
4247
4248 let now = self.clock_ref().utc_now();
4249 check_timestamps(now, start, end)?;
4250
4251 let request_id = UUID4::new();
4252 let command = RequestCommand::Instruments(RequestInstruments {
4253 venue,
4254 start,
4255 end,
4256 client_id,
4257 request_id,
4258 ts_init: now.into(),
4259 params,
4260 });
4261
4262 get_message_bus()
4263 .borrow_mut()
4264 .register_response_handler(command.request_id(), handler)?;
4265
4266 self.send_data_cmd(DataCommand::Request(command));
4267
4268 Ok(request_id)
4269 }
4270
4271 pub fn request_book_snapshot(
4277 &self,
4278 instrument_id: InstrumentId,
4279 depth: Option<NonZeroUsize>,
4280 client_id: Option<ClientId>,
4281 params: Option<Params>,
4282 handler: ShareableMessageHandler,
4283 ) -> anyhow::Result<UUID4> {
4284 self.check_registered();
4285
4286 let request_id = UUID4::new();
4287 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4288 instrument_id,
4289 depth,
4290 client_id,
4291 request_id,
4292 ts_init: self.timestamp_ns(),
4293 params,
4294 });
4295
4296 get_message_bus()
4297 .borrow_mut()
4298 .register_response_handler(command.request_id(), handler)?;
4299
4300 self.send_data_cmd(DataCommand::Request(command));
4301
4302 Ok(request_id)
4303 }
4304
4305 #[expect(clippy::too_many_arguments)]
4311 pub fn request_book_deltas(
4312 &self,
4313 instrument_id: InstrumentId,
4314 start: Option<DateTime<Utc>>,
4315 end: Option<DateTime<Utc>>,
4316 limit: Option<NonZeroUsize>,
4317 client_id: Option<ClientId>,
4318 params: Option<Params>,
4319 handler: ShareableMessageHandler,
4320 ) -> anyhow::Result<UUID4> {
4321 self.check_registered();
4322
4323 let now = self.clock_ref().utc_now();
4324 check_timestamps(now, start, end)?;
4325
4326 let request_id = UUID4::new();
4327 let command = RequestCommand::BookDeltas(RequestBookDeltas {
4328 instrument_id,
4329 start,
4330 end,
4331 limit,
4332 client_id,
4333 request_id,
4334 ts_init: now.into(),
4335 params,
4336 });
4337
4338 get_message_bus()
4339 .borrow_mut()
4340 .register_response_handler(command.request_id(), handler)?;
4341
4342 self.send_data_cmd(DataCommand::Request(command));
4343
4344 Ok(request_id)
4345 }
4346
4347 #[expect(clippy::too_many_arguments)]
4353 pub fn request_book_depth(
4354 &self,
4355 instrument_id: InstrumentId,
4356 start: Option<DateTime<Utc>>,
4357 end: Option<DateTime<Utc>>,
4358 limit: Option<NonZeroUsize>,
4359 depth: Option<NonZeroUsize>,
4360 client_id: Option<ClientId>,
4361 params: Option<Params>,
4362 handler: ShareableMessageHandler,
4363 ) -> anyhow::Result<UUID4> {
4364 self.check_registered();
4365
4366 let now = self.clock_ref().utc_now();
4367 check_timestamps(now, start, end)?;
4368
4369 let request_id = UUID4::new();
4370 let command = RequestCommand::BookDepth(RequestBookDepth {
4371 instrument_id,
4372 start,
4373 end,
4374 limit,
4375 depth,
4376 client_id,
4377 request_id,
4378 ts_init: now.into(),
4379 params,
4380 });
4381
4382 get_message_bus()
4383 .borrow_mut()
4384 .register_response_handler(command.request_id(), handler)?;
4385
4386 self.send_data_cmd(DataCommand::Request(command));
4387
4388 Ok(request_id)
4389 }
4390
4391 #[expect(clippy::too_many_arguments)]
4397 pub fn request_quotes(
4398 &self,
4399 instrument_id: InstrumentId,
4400 start: Option<DateTime<Utc>>,
4401 end: Option<DateTime<Utc>>,
4402 limit: Option<NonZeroUsize>,
4403 client_id: Option<ClientId>,
4404 params: Option<Params>,
4405 handler: ShareableMessageHandler,
4406 ) -> anyhow::Result<UUID4> {
4407 self.check_registered();
4408
4409 let now = self.clock_ref().utc_now();
4410 check_timestamps(now, start, end)?;
4411
4412 let request_id = UUID4::new();
4413 let command = RequestCommand::Quotes(RequestQuotes {
4414 instrument_id,
4415 start,
4416 end,
4417 limit,
4418 client_id,
4419 request_id,
4420 ts_init: now.into(),
4421 params,
4422 });
4423
4424 get_message_bus()
4425 .borrow_mut()
4426 .register_response_handler(command.request_id(), handler)?;
4427
4428 self.send_data_cmd(DataCommand::Request(command));
4429
4430 Ok(request_id)
4431 }
4432
4433 #[expect(clippy::too_many_arguments)]
4439 pub fn request_trades(
4440 &self,
4441 instrument_id: InstrumentId,
4442 start: Option<DateTime<Utc>>,
4443 end: Option<DateTime<Utc>>,
4444 limit: Option<NonZeroUsize>,
4445 client_id: Option<ClientId>,
4446 params: Option<Params>,
4447 handler: ShareableMessageHandler,
4448 ) -> anyhow::Result<UUID4> {
4449 self.check_registered();
4450
4451 let now = self.clock_ref().utc_now();
4452 check_timestamps(now, start, end)?;
4453
4454 let request_id = UUID4::new();
4455 let command = RequestCommand::Trades(RequestTrades {
4456 instrument_id,
4457 start,
4458 end,
4459 limit,
4460 client_id,
4461 request_id,
4462 ts_init: now.into(),
4463 params,
4464 });
4465
4466 get_message_bus()
4467 .borrow_mut()
4468 .register_response_handler(command.request_id(), handler)?;
4469
4470 self.send_data_cmd(DataCommand::Request(command));
4471
4472 Ok(request_id)
4473 }
4474
4475 #[expect(clippy::too_many_arguments)]
4481 pub fn request_bars(
4482 &self,
4483 bar_type: BarType,
4484 start: Option<DateTime<Utc>>,
4485 end: Option<DateTime<Utc>>,
4486 limit: Option<NonZeroUsize>,
4487 client_id: Option<ClientId>,
4488 params: Option<Params>,
4489 handler: ShareableMessageHandler,
4490 ) -> anyhow::Result<UUID4> {
4491 self.check_registered();
4492
4493 let now = self.clock_ref().utc_now();
4494 check_timestamps(now, start, end)?;
4495
4496 let request_id = UUID4::new();
4497 let command = RequestCommand::Bars(RequestBars {
4498 bar_type,
4499 start,
4500 end,
4501 limit,
4502 client_id,
4503 request_id,
4504 ts_init: now.into(),
4505 params,
4506 });
4507
4508 get_message_bus()
4509 .borrow_mut()
4510 .register_response_handler(command.request_id(), handler)?;
4511
4512 self.send_data_cmd(DataCommand::Request(command));
4513
4514 Ok(request_id)
4515 }
4516
4517 #[expect(clippy::too_many_arguments)]
4523 pub fn request_funding_rates(
4524 &self,
4525 instrument_id: InstrumentId,
4526 start: Option<DateTime<Utc>>,
4527 end: Option<DateTime<Utc>>,
4528 limit: Option<NonZeroUsize>,
4529 client_id: Option<ClientId>,
4530 params: Option<Params>,
4531 handler: ShareableMessageHandler,
4532 ) -> anyhow::Result<UUID4> {
4533 self.check_registered();
4534
4535 let now = self.clock_ref().utc_now();
4536 check_timestamps(now, start, end)?;
4537
4538 let request_id = UUID4::new();
4539 let command = RequestCommand::FundingRates(RequestFundingRates {
4540 instrument_id,
4541 start,
4542 end,
4543 limit,
4544 client_id,
4545 request_id,
4546 ts_init: now.into(),
4547 params,
4548 });
4549
4550 get_message_bus()
4551 .borrow_mut()
4552 .register_response_handler(command.request_id(), handler)?;
4553
4554 self.send_data_cmd(DataCommand::Request(command));
4555
4556 Ok(request_id)
4557 }
4558
4559 #[cfg(test)]
4560 pub fn quote_handler_count(&self) -> usize {
4561 self.quote_handlers.len()
4562 }
4563
4564 #[cfg(test)]
4565 pub fn trade_handler_count(&self) -> usize {
4566 self.trade_handlers.len()
4567 }
4568
4569 #[cfg(test)]
4570 pub fn bar_handler_count(&self) -> usize {
4571 self.bar_handlers.len()
4572 }
4573
4574 #[cfg(test)]
4575 pub fn deltas_handler_count(&self) -> usize {
4576 self.deltas_handlers.len()
4577 }
4578
4579 #[cfg(test)]
4580 pub fn has_quote_handler(&self, topic: &str) -> bool {
4581 self.quote_handlers
4582 .contains_key(&MStr::<Topic>::from(topic))
4583 }
4584
4585 #[cfg(test)]
4586 pub fn has_trade_handler(&self, topic: &str) -> bool {
4587 self.trade_handlers
4588 .contains_key(&MStr::<Topic>::from(topic))
4589 }
4590
4591 #[cfg(test)]
4592 pub fn has_bar_handler(&self, topic: &str) -> bool {
4593 self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4594 }
4595
4596 #[cfg(test)]
4597 pub fn has_deltas_handler(&self, pattern: &str) -> bool {
4598 self.deltas_handlers
4599 .contains_key(&MStr::<Pattern>::from(pattern))
4600 }
4601}
4602
4603fn check_timestamps(
4604 now: DateTime<Utc>,
4605 start: Option<DateTime<Utc>>,
4606 end: Option<DateTime<Utc>>,
4607) -> anyhow::Result<()> {
4608 if let Some(start) = start {
4609 check_predicate_true(start <= now, "start was > now")?;
4610 }
4611
4612 if let Some(end) = end {
4613 check_predicate_true(end <= now, "end was > now")?;
4614 }
4615
4616 if let (Some(start), Some(end)) = (start, end) {
4617 check_predicate_true(start < end, "start was >= end")?;
4618 }
4619
4620 Ok(())
4621}
4622
4623fn log_error(e: &anyhow::Error) {
4624 log::error!("{e}");
4625}
4626
4627fn log_not_running<T>(msg: &T)
4628where
4629 T: Debug,
4630{
4631 log::trace!("Received message when not running - skipping {msg:?}");
4632}
4633
4634fn log_received<T>(msg: &T)
4635where
4636 T: Debug,
4637{
4638 log::debug!("{RECV} {msg:?}");
4639}
4640
4641fn log_received_bulk(kind: &str, correlation_id: &UUID4, records: usize) {
4642 log::debug!("{RECV} {kind} correlation_id={correlation_id} records={records}");
4643}