1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24 sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39 },
40 enums::BookType,
41 events::order::{canceled::OrderCanceled, filled::OrderFilled},
42 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
43 instruments::InstrumentAny,
44 orderbook::OrderBook,
45};
46use ustr::Ustr;
47
48#[cfg(feature = "indicators")]
49use super::indicators::Indicators;
50use super::{
51 Actor,
52 registry::{get_actor_unchecked, try_get_actor_unchecked},
53};
54#[cfg(feature = "defi")]
55use crate::defi;
56#[cfg(feature = "defi")]
57#[allow(unused_imports)]
58use crate::defi::data_actor as _; use crate::{
60 cache::Cache,
61 clock::Clock,
62 component::Component,
63 enums::{ComponentState, ComponentTrigger},
64 logging::{CMD, RECV, REQ, SEND},
65 messages::{
66 data::{
67 BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
68 InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
69 RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
70 SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
71 SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
72 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
73 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
74 UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
75 UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
76 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
77 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
78 },
79 system::ShutdownSystem,
80 },
81 msgbus::{
82 self, MStr, Topic, get_message_bus,
83 handler::{ShareableMessageHandler, TypedMessageHandler},
84 switchboard::{
85 MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
86 get_custom_topic, get_funding_rate_topic, get_index_price_topic,
87 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
88 get_instruments_topic, get_mark_price_topic, get_order_cancels_topic,
89 get_order_fills_topic, get_quotes_topic, get_trades_topic,
90 },
91 },
92 signal::Signal,
93 timer::{TimeEvent, TimeEventCallback},
94};
95
96#[derive(Debug, Clone)]
98#[cfg_attr(
99 feature = "python",
100 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", subclass)
101)]
102pub struct DataActorConfig {
103 pub actor_id: Option<ActorId>,
105 pub log_events: bool,
107 pub log_commands: bool,
109}
110
111impl Default for DataActorConfig {
112 fn default() -> Self {
113 Self {
114 actor_id: None,
115 log_events: true,
116 log_commands: true,
117 }
118 }
119}
120
121#[derive(Debug, Clone)]
123#[cfg_attr(
124 feature = "python",
125 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
126)]
127pub struct ImportableActorConfig {
128 pub actor_path: String,
130 pub config_path: String,
132 pub config: HashMap<String, serde_json::Value>,
134}
135
136type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
137
138pub trait DataActor:
139 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
140{
141 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
147 Ok(IndexMap::new())
148 }
149
150 #[allow(unused_variables)]
156 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
157 Ok(())
158 }
159
160 fn on_start(&mut self) -> anyhow::Result<()> {
166 log::warn!(
167 "The `on_start` handler was called when not overridden, \
168 it's expected that any actions required when starting the actor \
169 occur here, such as subscribing/requesting data"
170 );
171 Ok(())
172 }
173
174 fn on_stop(&mut self) -> anyhow::Result<()> {
180 log::warn!(
181 "The `on_stop` handler was called when not overridden, \
182 it's expected that any actions required when stopping the actor \
183 occur here, such as unsubscribing from data",
184 );
185 Ok(())
186 }
187
188 fn on_resume(&mut self) -> anyhow::Result<()> {
194 log::warn!(
195 "The `on_resume` handler was called when not overridden, \
196 it's expected that any actions required when resuming the actor \
197 following a stop occur here"
198 );
199 Ok(())
200 }
201
202 fn on_reset(&mut self) -> anyhow::Result<()> {
208 log::warn!(
209 "The `on_reset` handler was called when not overridden, \
210 it's expected that any actions required when resetting the actor \
211 occur here, such as resetting indicators and other state"
212 );
213 Ok(())
214 }
215
216 fn on_dispose(&mut self) -> anyhow::Result<()> {
222 Ok(())
223 }
224
225 fn on_degrade(&mut self) -> anyhow::Result<()> {
231 Ok(())
232 }
233
234 fn on_fault(&mut self) -> anyhow::Result<()> {
240 Ok(())
241 }
242
243 #[allow(unused_variables)]
249 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
250 Ok(())
251 }
252
253 #[allow(unused_variables)]
259 fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
260 Ok(())
261 }
262
263 #[allow(unused_variables)]
269 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
270 Ok(())
271 }
272
273 #[allow(unused_variables)]
279 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
280 Ok(())
281 }
282
283 #[allow(unused_variables)]
289 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
290 Ok(())
291 }
292
293 #[allow(unused_variables)]
299 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 #[allow(unused_variables)]
309 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
310 Ok(())
311 }
312
313 #[allow(unused_variables)]
319 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
320 Ok(())
321 }
322
323 #[allow(unused_variables)]
329 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
330 Ok(())
331 }
332
333 #[allow(unused_variables)]
339 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
340 Ok(())
341 }
342
343 #[allow(unused_variables)]
349 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
350 Ok(())
351 }
352
353 #[allow(unused_variables)]
359 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
360 Ok(())
361 }
362
363 #[allow(unused_variables)]
369 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
370 Ok(())
371 }
372
373 #[allow(unused_variables)]
379 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
380 Ok(())
381 }
382
383 #[allow(unused_variables)]
389 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
390 Ok(())
391 }
392
393 #[allow(unused_variables)]
399 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
400 Ok(())
401 }
402
403 #[cfg(feature = "defi")]
404 #[allow(unused_variables)]
410 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
411 Ok(())
412 }
413
414 #[cfg(feature = "defi")]
415 #[allow(unused_variables)]
421 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
422 Ok(())
423 }
424
425 #[cfg(feature = "defi")]
426 #[allow(unused_variables)]
432 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
433 Ok(())
434 }
435
436 #[cfg(feature = "defi")]
437 #[allow(unused_variables)]
443 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
444 Ok(())
445 }
446
447 #[cfg(feature = "defi")]
448 #[allow(unused_variables)]
454 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
455 Ok(())
456 }
457
458 #[cfg(feature = "defi")]
459 #[allow(unused_variables)]
465 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
466 Ok(())
467 }
468
469 #[allow(unused_variables)]
475 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
476 Ok(())
477 }
478
479 #[allow(unused_variables)]
485 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
486 Ok(())
487 }
488
489 #[allow(unused_variables)]
495 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
496 Ok(())
497 }
498
499 #[allow(unused_variables)]
505 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
506 Ok(())
507 }
508
509 #[allow(unused_variables)]
515 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
516 Ok(())
517 }
518
519 #[allow(unused_variables)]
525 fn on_historical_index_prices(
526 &mut self,
527 index_prices: &[IndexPriceUpdate],
528 ) -> anyhow::Result<()> {
529 Ok(())
530 }
531
532 fn handle_time_event(&mut self, event: &TimeEvent) {
534 log_received(&event);
535
536 if let Err(e) = DataActor::on_time_event(self, event) {
537 log_error(&e);
538 }
539 }
540
541 fn handle_data(&mut self, data: &dyn Any) {
543 log_received(&data);
544
545 if self.not_running() {
546 log_not_running(&data);
547 return;
548 }
549
550 if let Err(e) = self.on_data(data) {
551 log_error(&e);
552 }
553 }
554
555 fn handle_signal(&mut self, signal: &Signal) {
557 log_received(&signal);
558
559 if self.not_running() {
560 log_not_running(&signal);
561 return;
562 }
563
564 if let Err(e) = self.on_signal(signal) {
565 log_error(&e);
566 }
567 }
568
569 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
571 log_received(&instrument);
572
573 if self.not_running() {
574 log_not_running(&instrument);
575 return;
576 }
577
578 if let Err(e) = self.on_instrument(instrument) {
579 log_error(&e);
580 }
581 }
582
583 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
585 log_received(&deltas);
586
587 if self.not_running() {
588 log_not_running(&deltas);
589 return;
590 }
591
592 if let Err(e) = self.on_book_deltas(deltas) {
593 log_error(&e);
594 }
595 }
596
597 fn handle_book(&mut self, book: &OrderBook) {
599 log_received(&book);
600
601 if self.not_running() {
602 log_not_running(&book);
603 return;
604 }
605
606 if let Err(e) = self.on_book(book) {
607 log_error(&e);
608 };
609 }
610
611 fn handle_quote(&mut self, quote: &QuoteTick) {
613 log_received("e);
614
615 if self.not_running() {
616 log_not_running("e);
617 return;
618 }
619
620 if let Err(e) = self.on_quote(quote) {
621 log_error(&e);
622 }
623 }
624
625 fn handle_trade(&mut self, trade: &TradeTick) {
627 log_received(&trade);
628
629 if self.not_running() {
630 log_not_running(&trade);
631 return;
632 }
633
634 if let Err(e) = self.on_trade(trade) {
635 log_error(&e);
636 }
637 }
638
639 fn handle_bar(&mut self, bar: &Bar) {
641 log_received(&bar);
642
643 if self.not_running() {
644 log_not_running(&bar);
645 return;
646 }
647
648 if let Err(e) = self.on_bar(bar) {
649 log_error(&e);
650 }
651 }
652
653 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
655 log_received(&mark_price);
656
657 if self.not_running() {
658 log_not_running(&mark_price);
659 return;
660 }
661
662 if let Err(e) = self.on_mark_price(mark_price) {
663 log_error(&e);
664 }
665 }
666
667 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
669 log_received(&index_price);
670
671 if self.not_running() {
672 log_not_running(&index_price);
673 return;
674 }
675
676 if let Err(e) = self.on_index_price(index_price) {
677 log_error(&e);
678 }
679 }
680
681 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
683 log_received(&funding_rate);
684
685 if self.not_running() {
686 log_not_running(&funding_rate);
687 return;
688 }
689
690 if let Err(e) = self.on_funding_rate(funding_rate) {
691 log_error(&e);
692 }
693 }
694
695 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
697 log_received(&status);
698
699 if self.not_running() {
700 log_not_running(&status);
701 return;
702 }
703
704 if let Err(e) = self.on_instrument_status(status) {
705 log_error(&e);
706 }
707 }
708
709 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
711 log_received(&close);
712
713 if self.not_running() {
714 log_not_running(&close);
715 return;
716 }
717
718 if let Err(e) = self.on_instrument_close(close) {
719 log_error(&e);
720 }
721 }
722
723 fn handle_order_filled(&mut self, event: &OrderFilled) {
725 log_received(&event);
726
727 if event.strategy_id.inner() == self.actor_id().inner() {
731 return;
732 }
733
734 if self.not_running() {
735 log_not_running(&event);
736 return;
737 }
738
739 if let Err(e) = self.on_order_filled(event) {
740 log_error(&e);
741 }
742 }
743
744 fn handle_order_canceled(&mut self, event: &OrderCanceled) {
746 log_received(&event);
747
748 if event.strategy_id.inner() == self.actor_id().inner() {
752 return;
753 }
754
755 if self.not_running() {
756 log_not_running(&event);
757 return;
758 }
759
760 if let Err(e) = self.on_order_canceled(event) {
761 log_error(&e);
762 }
763 }
764
765 #[cfg(feature = "defi")]
766 fn handle_block(&mut self, block: &Block) {
768 log_received(&block);
769
770 if self.not_running() {
771 log_not_running(&block);
772 return;
773 }
774
775 if let Err(e) = self.on_block(block) {
776 log_error(&e);
777 }
778 }
779
780 #[cfg(feature = "defi")]
781 fn handle_pool(&mut self, pool: &Pool) {
783 log_received(&pool);
784
785 if self.not_running() {
786 log_not_running(&pool);
787 return;
788 }
789
790 if let Err(e) = self.on_pool(pool) {
791 log_error(&e);
792 }
793 }
794
795 #[cfg(feature = "defi")]
796 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
798 log_received(&swap);
799
800 if self.not_running() {
801 log_not_running(&swap);
802 return;
803 }
804
805 if let Err(e) = self.on_pool_swap(swap) {
806 log_error(&e);
807 }
808 }
809
810 #[cfg(feature = "defi")]
811 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
813 log_received(&update);
814
815 if self.not_running() {
816 log_not_running(&update);
817 return;
818 }
819
820 if let Err(e) = self.on_pool_liquidity_update(update) {
821 log_error(&e);
822 }
823 }
824
825 #[cfg(feature = "defi")]
826 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
828 log_received(&collect);
829
830 if self.not_running() {
831 log_not_running(&collect);
832 return;
833 }
834
835 if let Err(e) = self.on_pool_fee_collect(collect) {
836 log_error(&e);
837 }
838 }
839
840 #[cfg(feature = "defi")]
841 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
843 log_received(&flash);
844
845 if self.not_running() {
846 log_not_running(&flash);
847 return;
848 }
849
850 if let Err(e) = self.on_pool_flash(flash) {
851 log_error(&e);
852 }
853 }
854
855 fn handle_historical_data(&mut self, data: &dyn Any) {
857 log_received(&data);
858
859 if let Err(e) = self.on_historical_data(data) {
860 log_error(&e);
861 }
862 }
863
864 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
866 log_received(&resp);
867
868 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
869 log_error(&e);
870 }
871 }
872
873 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
875 log_received(&resp);
876
877 if let Err(e) = self.on_instrument(&resp.data) {
878 log_error(&e);
879 }
880 }
881
882 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
884 log_received(&resp);
885
886 for inst in &resp.data {
887 if let Err(e) = self.on_instrument(inst) {
888 log_error(&e);
889 }
890 }
891 }
892
893 fn handle_book_response(&mut self, resp: &BookResponse) {
895 log_received(&resp);
896
897 if let Err(e) = self.on_book(&resp.data) {
898 log_error(&e);
899 }
900 }
901
902 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
904 log_received(&resp);
905
906 if let Err(e) = self.on_historical_quotes(&resp.data) {
907 log_error(&e);
908 }
909 }
910
911 fn handle_trades_response(&mut self, resp: &TradesResponse) {
913 log_received(&resp);
914
915 if let Err(e) = self.on_historical_trades(&resp.data) {
916 log_error(&e);
917 }
918 }
919
920 fn handle_bars_response(&mut self, resp: &BarsResponse) {
922 log_received(&resp);
923
924 if let Err(e) = self.on_historical_bars(&resp.data) {
925 log_error(&e);
926 }
927 }
928
929 fn subscribe_data(
931 &mut self,
932 data_type: DataType,
933 client_id: Option<ClientId>,
934 params: Option<IndexMap<String, String>>,
935 ) where
936 Self: 'static + Debug + Sized,
937 {
938 let actor_id = self.actor_id().inner();
939 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
940 move |data: &dyn Any| {
941 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
942 },
943 )));
944
945 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
946 }
947
948 fn subscribe_quotes(
950 &mut self,
951 instrument_id: InstrumentId,
952 client_id: Option<ClientId>,
953 params: Option<IndexMap<String, String>>,
954 ) where
955 Self: 'static + Debug + Sized,
956 {
957 let actor_id = self.actor_id().inner();
958 let topic = get_quotes_topic(instrument_id);
959
960 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
961 move |quote: &QuoteTick| {
962 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
963 actor.handle_quote(quote);
964 } else {
965 log::error!("Actor {actor_id} not found for quote handling");
966 }
967 },
968 )));
969
970 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
971 }
972
973 fn subscribe_instruments(
975 &mut self,
976 venue: Venue,
977 client_id: Option<ClientId>,
978 params: Option<IndexMap<String, String>>,
979 ) where
980 Self: 'static + Debug + Sized,
981 {
982 let actor_id = self.actor_id().inner();
983 let topic = get_instruments_topic(venue);
984
985 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
986 move |instrument: &InstrumentAny| {
987 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
988 actor.handle_instrument(instrument);
989 } else {
990 log::error!("Actor {actor_id} not found for instruments handling");
991 }
992 },
993 )));
994
995 DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
996 }
997
998 fn subscribe_instrument(
1000 &mut self,
1001 instrument_id: InstrumentId,
1002 client_id: Option<ClientId>,
1003 params: Option<IndexMap<String, String>>,
1004 ) where
1005 Self: 'static + Debug + Sized,
1006 {
1007 let actor_id = self.actor_id().inner();
1008 let topic = get_instrument_topic(instrument_id);
1009
1010 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1011 move |instrument: &InstrumentAny| {
1012 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1013 actor.handle_instrument(instrument);
1014 } else {
1015 log::error!("Actor {actor_id} not found for instrument handling");
1016 }
1017 },
1018 )));
1019
1020 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1021 }
1022
1023 fn subscribe_book_deltas(
1025 &mut self,
1026 instrument_id: InstrumentId,
1027 book_type: BookType,
1028 depth: Option<NonZeroUsize>,
1029 client_id: Option<ClientId>,
1030 managed: bool,
1031 params: Option<IndexMap<String, String>>,
1032 ) where
1033 Self: 'static + Debug + Sized,
1034 {
1035 let actor_id = self.actor_id().inner();
1036 let topic = get_book_deltas_topic(instrument_id);
1037
1038 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1039 move |deltas: &OrderBookDeltas| {
1040 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1041 },
1042 )));
1043
1044 DataActorCore::subscribe_book_deltas(
1045 self,
1046 topic,
1047 handler,
1048 instrument_id,
1049 book_type,
1050 depth,
1051 client_id,
1052 managed,
1053 params,
1054 );
1055 }
1056
1057 fn subscribe_book_at_interval(
1059 &mut self,
1060 instrument_id: InstrumentId,
1061 book_type: BookType,
1062 depth: Option<NonZeroUsize>,
1063 interval_ms: NonZeroUsize,
1064 client_id: Option<ClientId>,
1065 params: Option<IndexMap<String, String>>,
1066 ) where
1067 Self: 'static + Debug + Sized,
1068 {
1069 let actor_id = self.actor_id().inner();
1070 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1071
1072 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1073 move |book: &OrderBook| {
1074 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1075 },
1076 )));
1077
1078 DataActorCore::subscribe_book_at_interval(
1079 self,
1080 topic,
1081 handler,
1082 instrument_id,
1083 book_type,
1084 depth,
1085 interval_ms,
1086 client_id,
1087 params,
1088 );
1089 }
1090
1091 fn subscribe_trades(
1093 &mut self,
1094 instrument_id: InstrumentId,
1095 client_id: Option<ClientId>,
1096 params: Option<IndexMap<String, String>>,
1097 ) where
1098 Self: 'static + Debug + Sized,
1099 {
1100 let actor_id = self.actor_id().inner();
1101 let topic = get_trades_topic(instrument_id);
1102
1103 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1104 move |trade: &TradeTick| {
1105 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1106 },
1107 )));
1108
1109 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1110 }
1111
1112 fn subscribe_bars(
1114 &mut self,
1115 bar_type: BarType,
1116 client_id: Option<ClientId>,
1117 params: Option<IndexMap<String, String>>,
1118 ) where
1119 Self: 'static + Debug + Sized,
1120 {
1121 let actor_id = self.actor_id().inner();
1122 let topic = get_bars_topic(bar_type);
1123
1124 let handler =
1125 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1126 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1127 })));
1128
1129 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1130 }
1131
1132 fn subscribe_mark_prices(
1134 &mut self,
1135 instrument_id: InstrumentId,
1136 client_id: Option<ClientId>,
1137 params: Option<IndexMap<String, String>>,
1138 ) where
1139 Self: 'static + Debug + Sized,
1140 {
1141 let actor_id = self.actor_id().inner();
1142 let topic = get_mark_price_topic(instrument_id);
1143
1144 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1145 move |mark_price: &MarkPriceUpdate| {
1146 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1147 },
1148 )));
1149
1150 DataActorCore::subscribe_mark_prices(
1151 self,
1152 topic,
1153 handler,
1154 instrument_id,
1155 client_id,
1156 params,
1157 );
1158 }
1159
1160 fn subscribe_index_prices(
1162 &mut self,
1163 instrument_id: InstrumentId,
1164 client_id: Option<ClientId>,
1165 params: Option<IndexMap<String, String>>,
1166 ) where
1167 Self: 'static + Debug + Sized,
1168 {
1169 let actor_id = self.actor_id().inner();
1170 let topic = get_index_price_topic(instrument_id);
1171
1172 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1173 move |index_price: &IndexPriceUpdate| {
1174 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1175 },
1176 )));
1177
1178 DataActorCore::subscribe_index_prices(
1179 self,
1180 topic,
1181 handler,
1182 instrument_id,
1183 client_id,
1184 params,
1185 );
1186 }
1187
1188 fn subscribe_funding_rates(
1190 &mut self,
1191 instrument_id: InstrumentId,
1192 client_id: Option<ClientId>,
1193 params: Option<IndexMap<String, String>>,
1194 ) where
1195 Self: 'static + Debug + Sized,
1196 {
1197 let actor_id = self.actor_id().inner();
1198 let topic = get_funding_rate_topic(instrument_id);
1199
1200 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1201 move |funding_rate: &FundingRateUpdate| {
1202 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1203 },
1204 )));
1205
1206 DataActorCore::subscribe_funding_rates(
1207 self,
1208 topic,
1209 handler,
1210 instrument_id,
1211 client_id,
1212 params,
1213 );
1214 }
1215
1216 fn subscribe_instrument_status(
1218 &mut self,
1219 instrument_id: InstrumentId,
1220 client_id: Option<ClientId>,
1221 params: Option<IndexMap<String, String>>,
1222 ) where
1223 Self: 'static + Debug + Sized,
1224 {
1225 let actor_id = self.actor_id().inner();
1226 let topic = get_instrument_status_topic(instrument_id);
1227
1228 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1229 move |status: &InstrumentStatus| {
1230 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1231 },
1232 )));
1233
1234 DataActorCore::subscribe_instrument_status(
1235 self,
1236 topic,
1237 handler,
1238 instrument_id,
1239 client_id,
1240 params,
1241 );
1242 }
1243
1244 fn subscribe_instrument_close(
1246 &mut self,
1247 instrument_id: InstrumentId,
1248 client_id: Option<ClientId>,
1249 params: Option<IndexMap<String, String>>,
1250 ) where
1251 Self: 'static + Debug + Sized,
1252 {
1253 let actor_id = self.actor_id().inner();
1254 let topic = get_instrument_close_topic(instrument_id);
1255
1256 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1257 move |close: &InstrumentClose| {
1258 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1259 },
1260 )));
1261
1262 DataActorCore::subscribe_instrument_close(
1263 self,
1264 topic,
1265 handler,
1266 instrument_id,
1267 client_id,
1268 params,
1269 );
1270 }
1271
1272 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1274 where
1275 Self: 'static + Debug + Sized,
1276 {
1277 let actor_id = self.actor_id().inner();
1278 let topic = get_order_fills_topic(instrument_id);
1279
1280 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1281 move |event: &OrderFilled| {
1282 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(event);
1283 },
1284 )));
1285
1286 DataActorCore::subscribe_order_fills(self, topic, handler);
1287 }
1288
1289 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1291 where
1292 Self: 'static + Debug + Sized,
1293 {
1294 let actor_id = self.actor_id().inner();
1295 let topic = get_order_cancels_topic(instrument_id);
1296
1297 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1298 move |event: &OrderCanceled| {
1299 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(event);
1300 },
1301 )));
1302
1303 DataActorCore::subscribe_order_cancels(self, topic, handler);
1304 }
1305
1306 #[cfg(feature = "defi")]
1307 fn subscribe_blocks(
1309 &mut self,
1310 chain: Blockchain,
1311 client_id: Option<ClientId>,
1312 params: Option<IndexMap<String, String>>,
1313 ) where
1314 Self: 'static + Debug + Sized,
1315 {
1316 let actor_id = self.actor_id().inner();
1317 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1318
1319 let handler =
1320 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1321 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1322 })));
1323
1324 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1325 }
1326
1327 #[cfg(feature = "defi")]
1328 fn subscribe_pool(
1330 &mut self,
1331 instrument_id: InstrumentId,
1332 client_id: Option<ClientId>,
1333 params: Option<IndexMap<String, String>>,
1334 ) where
1335 Self: 'static + Debug + Sized,
1336 {
1337 let actor_id = self.actor_id().inner();
1338 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1339
1340 let handler =
1341 ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1342 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1343 })));
1344
1345 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1346 }
1347
1348 #[cfg(feature = "defi")]
1349 fn subscribe_pool_swaps(
1351 &mut self,
1352 instrument_id: InstrumentId,
1353 client_id: Option<ClientId>,
1354 params: Option<IndexMap<String, String>>,
1355 ) where
1356 Self: 'static + Debug + Sized,
1357 {
1358 let actor_id = self.actor_id().inner();
1359 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1360
1361 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1362 move |swap: &PoolSwap| {
1363 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1364 },
1365 )));
1366
1367 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1368 }
1369
1370 #[cfg(feature = "defi")]
1371 fn subscribe_pool_liquidity_updates(
1373 &mut self,
1374 instrument_id: InstrumentId,
1375 client_id: Option<ClientId>,
1376 params: Option<IndexMap<String, String>>,
1377 ) where
1378 Self: 'static + Debug + Sized,
1379 {
1380 let actor_id = self.actor_id().inner();
1381 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1382
1383 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1384 move |update: &PoolLiquidityUpdate| {
1385 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1386 },
1387 )));
1388
1389 DataActorCore::subscribe_pool_liquidity_updates(
1390 self,
1391 topic,
1392 handler,
1393 instrument_id,
1394 client_id,
1395 params,
1396 );
1397 }
1398
1399 #[cfg(feature = "defi")]
1400 fn subscribe_pool_fee_collects(
1402 &mut self,
1403 instrument_id: InstrumentId,
1404 client_id: Option<ClientId>,
1405 params: Option<IndexMap<String, String>>,
1406 ) where
1407 Self: 'static + Debug + Sized,
1408 {
1409 let actor_id = self.actor_id().inner();
1410 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1411
1412 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1413 move |collect: &PoolFeeCollect| {
1414 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1415 },
1416 )));
1417
1418 DataActorCore::subscribe_pool_fee_collects(
1419 self,
1420 topic,
1421 handler,
1422 instrument_id,
1423 client_id,
1424 params,
1425 );
1426 }
1427
1428 #[cfg(feature = "defi")]
1429 fn subscribe_pool_flash_events(
1431 &mut self,
1432 instrument_id: InstrumentId,
1433 client_id: Option<ClientId>,
1434 params: Option<IndexMap<String, String>>,
1435 ) where
1436 Self: 'static + Debug + Sized,
1437 {
1438 let actor_id = self.actor_id().inner();
1439 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1440
1441 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1442 move |flash: &PoolFlash| {
1443 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1444 },
1445 )));
1446
1447 DataActorCore::subscribe_pool_flash_events(
1448 self,
1449 topic,
1450 handler,
1451 instrument_id,
1452 client_id,
1453 params,
1454 );
1455 }
1456
1457 fn unsubscribe_data(
1459 &mut self,
1460 data_type: DataType,
1461 client_id: Option<ClientId>,
1462 params: Option<IndexMap<String, String>>,
1463 ) where
1464 Self: 'static + Debug + Sized,
1465 {
1466 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1467 }
1468
1469 fn unsubscribe_instruments(
1471 &mut self,
1472 venue: Venue,
1473 client_id: Option<ClientId>,
1474 params: Option<IndexMap<String, String>>,
1475 ) where
1476 Self: 'static + Debug + Sized,
1477 {
1478 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1479 }
1480
1481 fn unsubscribe_instrument(
1483 &mut self,
1484 instrument_id: InstrumentId,
1485 client_id: Option<ClientId>,
1486 params: Option<IndexMap<String, String>>,
1487 ) where
1488 Self: 'static + Debug + Sized,
1489 {
1490 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1491 }
1492
1493 fn unsubscribe_book_deltas(
1495 &mut self,
1496 instrument_id: InstrumentId,
1497 client_id: Option<ClientId>,
1498 params: Option<IndexMap<String, String>>,
1499 ) where
1500 Self: 'static + Debug + Sized,
1501 {
1502 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1503 }
1504
1505 fn unsubscribe_book_at_interval(
1507 &mut self,
1508 instrument_id: InstrumentId,
1509 interval_ms: NonZeroUsize,
1510 client_id: Option<ClientId>,
1511 params: Option<IndexMap<String, String>>,
1512 ) where
1513 Self: 'static + Debug + Sized,
1514 {
1515 DataActorCore::unsubscribe_book_at_interval(
1516 self,
1517 instrument_id,
1518 interval_ms,
1519 client_id,
1520 params,
1521 );
1522 }
1523
1524 fn unsubscribe_quotes(
1526 &mut self,
1527 instrument_id: InstrumentId,
1528 client_id: Option<ClientId>,
1529 params: Option<IndexMap<String, String>>,
1530 ) where
1531 Self: 'static + Debug + Sized,
1532 {
1533 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1534 }
1535
1536 fn unsubscribe_trades(
1538 &mut self,
1539 instrument_id: InstrumentId,
1540 client_id: Option<ClientId>,
1541 params: Option<IndexMap<String, String>>,
1542 ) where
1543 Self: 'static + Debug + Sized,
1544 {
1545 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1546 }
1547
1548 fn unsubscribe_bars(
1550 &mut self,
1551 bar_type: BarType,
1552 client_id: Option<ClientId>,
1553 params: Option<IndexMap<String, String>>,
1554 ) where
1555 Self: 'static + Debug + Sized,
1556 {
1557 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1558 }
1559
1560 fn unsubscribe_mark_prices(
1562 &mut self,
1563 instrument_id: InstrumentId,
1564 client_id: Option<ClientId>,
1565 params: Option<IndexMap<String, String>>,
1566 ) where
1567 Self: 'static + Debug + Sized,
1568 {
1569 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1570 }
1571
1572 fn unsubscribe_index_prices(
1574 &mut self,
1575 instrument_id: InstrumentId,
1576 client_id: Option<ClientId>,
1577 params: Option<IndexMap<String, String>>,
1578 ) where
1579 Self: 'static + Debug + Sized,
1580 {
1581 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1582 }
1583
1584 fn unsubscribe_funding_rates(
1586 &mut self,
1587 instrument_id: InstrumentId,
1588 client_id: Option<ClientId>,
1589 params: Option<IndexMap<String, String>>,
1590 ) where
1591 Self: 'static + Debug + Sized,
1592 {
1593 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1594 }
1595
1596 fn unsubscribe_instrument_status(
1598 &mut self,
1599 instrument_id: InstrumentId,
1600 client_id: Option<ClientId>,
1601 params: Option<IndexMap<String, String>>,
1602 ) where
1603 Self: 'static + Debug + Sized,
1604 {
1605 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1606 }
1607
1608 fn unsubscribe_instrument_close(
1610 &mut self,
1611 instrument_id: InstrumentId,
1612 client_id: Option<ClientId>,
1613 params: Option<IndexMap<String, String>>,
1614 ) where
1615 Self: 'static + Debug + Sized,
1616 {
1617 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1618 }
1619
1620 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1622 where
1623 Self: 'static + Debug + Sized,
1624 {
1625 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1626 }
1627
1628 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1630 where
1631 Self: 'static + Debug + Sized,
1632 {
1633 DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1634 }
1635
1636 #[cfg(feature = "defi")]
1637 fn unsubscribe_blocks(
1639 &mut self,
1640 chain: Blockchain,
1641 client_id: Option<ClientId>,
1642 params: Option<IndexMap<String, String>>,
1643 ) where
1644 Self: 'static + Debug + Sized,
1645 {
1646 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1647 }
1648
1649 #[cfg(feature = "defi")]
1650 fn unsubscribe_pool(
1652 &mut self,
1653 instrument_id: InstrumentId,
1654 client_id: Option<ClientId>,
1655 params: Option<IndexMap<String, String>>,
1656 ) where
1657 Self: 'static + Debug + Sized,
1658 {
1659 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1660 }
1661
1662 #[cfg(feature = "defi")]
1663 fn unsubscribe_pool_swaps(
1665 &mut self,
1666 instrument_id: InstrumentId,
1667 client_id: Option<ClientId>,
1668 params: Option<IndexMap<String, String>>,
1669 ) where
1670 Self: 'static + Debug + Sized,
1671 {
1672 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1673 }
1674
1675 #[cfg(feature = "defi")]
1676 fn unsubscribe_pool_liquidity_updates(
1678 &mut self,
1679 instrument_id: InstrumentId,
1680 client_id: Option<ClientId>,
1681 params: Option<IndexMap<String, String>>,
1682 ) where
1683 Self: 'static + Debug + Sized,
1684 {
1685 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1686 }
1687
1688 #[cfg(feature = "defi")]
1689 fn unsubscribe_pool_fee_collects(
1691 &mut self,
1692 instrument_id: InstrumentId,
1693 client_id: Option<ClientId>,
1694 params: Option<IndexMap<String, String>>,
1695 ) where
1696 Self: 'static + Debug + Sized,
1697 {
1698 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1699 }
1700
1701 #[cfg(feature = "defi")]
1702 fn unsubscribe_pool_flash_events(
1704 &mut self,
1705 instrument_id: InstrumentId,
1706 client_id: Option<ClientId>,
1707 params: Option<IndexMap<String, String>>,
1708 ) where
1709 Self: 'static + Debug + Sized,
1710 {
1711 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1712 }
1713
1714 fn request_data(
1720 &mut self,
1721 data_type: DataType,
1722 client_id: ClientId,
1723 start: Option<DateTime<Utc>>,
1724 end: Option<DateTime<Utc>>,
1725 limit: Option<NonZeroUsize>,
1726 params: Option<IndexMap<String, String>>,
1727 ) -> anyhow::Result<UUID4>
1728 where
1729 Self: 'static + Debug + Sized,
1730 {
1731 let actor_id = self.actor_id().inner();
1732 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1733 move |resp: &CustomDataResponse| {
1734 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1735 },
1736 )));
1737
1738 DataActorCore::request_data(
1739 self, data_type, client_id, start, end, limit, params, handler,
1740 )
1741 }
1742
1743 fn request_instrument(
1749 &mut self,
1750 instrument_id: InstrumentId,
1751 start: Option<DateTime<Utc>>,
1752 end: Option<DateTime<Utc>>,
1753 client_id: Option<ClientId>,
1754 params: Option<IndexMap<String, String>>,
1755 ) -> anyhow::Result<UUID4>
1756 where
1757 Self: 'static + Debug + Sized,
1758 {
1759 let actor_id = self.actor_id().inner();
1760 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1761 move |resp: &InstrumentResponse| {
1762 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1763 },
1764 )));
1765
1766 DataActorCore::request_instrument(
1767 self,
1768 instrument_id,
1769 start,
1770 end,
1771 client_id,
1772 params,
1773 handler,
1774 )
1775 }
1776
1777 fn request_instruments(
1783 &mut self,
1784 venue: Option<Venue>,
1785 start: Option<DateTime<Utc>>,
1786 end: Option<DateTime<Utc>>,
1787 client_id: Option<ClientId>,
1788 params: Option<IndexMap<String, String>>,
1789 ) -> anyhow::Result<UUID4>
1790 where
1791 Self: 'static + Debug + Sized,
1792 {
1793 let actor_id = self.actor_id().inner();
1794 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1795 move |resp: &InstrumentsResponse| {
1796 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1797 },
1798 )));
1799
1800 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1801 }
1802
1803 fn request_book_snapshot(
1809 &mut self,
1810 instrument_id: InstrumentId,
1811 depth: Option<NonZeroUsize>,
1812 client_id: Option<ClientId>,
1813 params: Option<IndexMap<String, String>>,
1814 ) -> anyhow::Result<UUID4>
1815 where
1816 Self: 'static + Debug + Sized,
1817 {
1818 let actor_id = self.actor_id().inner();
1819 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1820 move |resp: &BookResponse| {
1821 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1822 },
1823 )));
1824
1825 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1826 }
1827
1828 fn request_quotes(
1834 &mut self,
1835 instrument_id: InstrumentId,
1836 start: Option<DateTime<Utc>>,
1837 end: Option<DateTime<Utc>>,
1838 limit: Option<NonZeroUsize>,
1839 client_id: Option<ClientId>,
1840 params: Option<IndexMap<String, String>>,
1841 ) -> anyhow::Result<UUID4>
1842 where
1843 Self: 'static + Debug + Sized,
1844 {
1845 let actor_id = self.actor_id().inner();
1846 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1847 move |resp: &QuotesResponse| {
1848 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1849 },
1850 )));
1851
1852 DataActorCore::request_quotes(
1853 self,
1854 instrument_id,
1855 start,
1856 end,
1857 limit,
1858 client_id,
1859 params,
1860 handler,
1861 )
1862 }
1863
1864 fn request_trades(
1870 &mut self,
1871 instrument_id: InstrumentId,
1872 start: Option<DateTime<Utc>>,
1873 end: Option<DateTime<Utc>>,
1874 limit: Option<NonZeroUsize>,
1875 client_id: Option<ClientId>,
1876 params: Option<IndexMap<String, String>>,
1877 ) -> anyhow::Result<UUID4>
1878 where
1879 Self: 'static + Debug + Sized,
1880 {
1881 let actor_id = self.actor_id().inner();
1882 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1883 move |resp: &TradesResponse| {
1884 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1885 },
1886 )));
1887
1888 DataActorCore::request_trades(
1889 self,
1890 instrument_id,
1891 start,
1892 end,
1893 limit,
1894 client_id,
1895 params,
1896 handler,
1897 )
1898 }
1899
1900 fn request_bars(
1906 &mut self,
1907 bar_type: BarType,
1908 start: Option<DateTime<Utc>>,
1909 end: Option<DateTime<Utc>>,
1910 limit: Option<NonZeroUsize>,
1911 client_id: Option<ClientId>,
1912 params: Option<IndexMap<String, String>>,
1913 ) -> anyhow::Result<UUID4>
1914 where
1915 Self: 'static + Debug + Sized,
1916 {
1917 let actor_id = self.actor_id().inner();
1918 let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1919 move |resp: &BarsResponse| {
1920 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1921 },
1922 )));
1923
1924 DataActorCore::request_bars(
1925 self, bar_type, start, end, limit, client_id, params, handler,
1926 )
1927 }
1928}
1929
1930impl<T> Actor for T
1932where
1933 T: DataActor + Debug + 'static,
1934{
1935 fn id(&self) -> Ustr {
1936 self.actor_id.inner()
1937 }
1938
1939 #[allow(unused_variables)]
1940 fn handle(&mut self, msg: &dyn Any) {
1941 }
1943
1944 fn as_any(&self) -> &dyn Any {
1945 self
1946 }
1947}
1948
1949impl<T> Component for T
1951where
1952 T: DataActor + Debug + 'static,
1953{
1954 fn component_id(&self) -> ComponentId {
1955 ComponentId::new(self.actor_id.inner().as_str())
1956 }
1957
1958 fn state(&self) -> ComponentState {
1959 self.state
1960 }
1961
1962 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1963 self.state = self.state.transition(&trigger)?;
1964 log::info!("{}", self.state.variant_name());
1965 Ok(())
1966 }
1967
1968 fn register(
1969 &mut self,
1970 trader_id: TraderId,
1971 clock: Rc<RefCell<dyn Clock>>,
1972 cache: Rc<RefCell<Cache>>,
1973 ) -> anyhow::Result<()> {
1974 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1975
1976 let actor_id = self.actor_id().inner();
1978 let callback = TimeEventCallback::from(move |event: TimeEvent| {
1979 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1980 actor.handle_time_event(&event);
1981 } else {
1982 log::error!("Actor {actor_id} not found for time event handling");
1983 }
1984 });
1985
1986 clock.borrow_mut().register_default_handler(callback);
1987
1988 self.initialize()
1989 }
1990
1991 fn on_start(&mut self) -> anyhow::Result<()> {
1992 DataActor::on_start(self)
1993 }
1994
1995 fn on_stop(&mut self) -> anyhow::Result<()> {
1996 DataActor::on_stop(self)
1997 }
1998
1999 fn on_resume(&mut self) -> anyhow::Result<()> {
2000 DataActor::on_resume(self)
2001 }
2002
2003 fn on_degrade(&mut self) -> anyhow::Result<()> {
2004 DataActor::on_degrade(self)
2005 }
2006
2007 fn on_fault(&mut self) -> anyhow::Result<()> {
2008 DataActor::on_fault(self)
2009 }
2010
2011 fn on_reset(&mut self) -> anyhow::Result<()> {
2012 DataActor::on_reset(self)
2013 }
2014
2015 fn on_dispose(&mut self) -> anyhow::Result<()> {
2016 DataActor::on_dispose(self)
2017 }
2018}
2019
2020#[derive(Clone)]
2022#[allow(
2023 dead_code,
2024 reason = "TODO: Under development (pending_requests, signal_classes)"
2025)]
2026pub struct DataActorCore {
2027 pub actor_id: ActorId,
2029 pub config: DataActorConfig,
2031 trader_id: Option<TraderId>,
2032 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2035 topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
2036 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2038 signal_classes: AHashMap<String, String>,
2039 #[cfg(feature = "indicators")]
2040 indicators: Indicators,
2041}
2042
2043impl Debug for DataActorCore {
2044 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2045 f.debug_struct(stringify!(DataActorCore))
2046 .field("actor_id", &self.actor_id)
2047 .field("config", &self.config)
2048 .field("state", &self.state)
2049 .field("trader_id", &self.trader_id)
2050 .finish()
2051 }
2052}
2053
2054impl DataActorCore {
2055 pub(crate) fn add_subscription(
2059 &mut self,
2060 topic: MStr<Topic>,
2061 handler: ShareableMessageHandler,
2062 ) {
2063 if self.topic_handlers.contains_key(&topic) {
2064 log::warn!(
2065 "Actor {} attempted duplicate subscription to topic '{topic}'",
2066 self.actor_id,
2067 );
2068 return;
2069 }
2070
2071 self.topic_handlers.insert(topic, handler.clone());
2072 msgbus::subscribe_topic(topic, handler, None);
2073 }
2074
2075 pub(crate) fn remove_subscription(&mut self, topic: MStr<Topic>) {
2079 if let Some(handler) = self.topic_handlers.remove(&topic) {
2080 msgbus::unsubscribe_topic(topic, handler);
2081 } else {
2082 log::warn!(
2083 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2084 self.actor_id,
2085 );
2086 }
2087 }
2088
2089 pub fn new(config: DataActorConfig) -> Self {
2091 let actor_id = config
2092 .actor_id
2093 .unwrap_or_else(|| Self::default_actor_id(&config));
2094
2095 Self {
2096 actor_id,
2097 config,
2098 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2102 topic_handlers: AHashMap::new(),
2103 warning_events: AHashSet::new(),
2104 pending_requests: AHashMap::new(),
2105 signal_classes: AHashMap::new(),
2106 #[cfg(feature = "indicators")]
2107 indicators: Indicators::default(),
2108 }
2109 }
2110
2111 #[must_use]
2113 pub fn mem_address(&self) -> String {
2114 format!("{self:p}")
2115 }
2116
2117 pub fn state(&self) -> ComponentState {
2119 self.state
2120 }
2121
2122 pub fn trader_id(&self) -> Option<TraderId> {
2124 self.trader_id
2125 }
2126
2127 pub fn actor_id(&self) -> ActorId {
2129 self.actor_id
2130 }
2131
2132 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2133 let memory_address = std::ptr::from_ref(config) as *const _ as usize;
2134 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2135 }
2136
2137 pub fn timestamp_ns(&self) -> UnixNanos {
2139 self.clock_ref().timestamp_ns()
2140 }
2141
2142 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2148 self.clock
2149 .as_ref()
2150 .unwrap_or_else(|| {
2151 panic!(
2152 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2153 self.actor_id, self.trader_id
2154 )
2155 })
2156 .borrow_mut()
2157 }
2158
2159 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2165 self.clock
2166 .as_ref()
2167 .expect("DataActor must be registered before accessing clock")
2168 .clone()
2169 }
2170
2171 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2172 self.clock
2173 .as_ref()
2174 .unwrap_or_else(|| {
2175 panic!(
2176 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2177 self.actor_id, self.trader_id
2178 )
2179 })
2180 .borrow()
2181 }
2182
2183 pub fn cache(&self) -> Ref<'_, Cache> {
2189 self.cache
2190 .as_ref()
2191 .expect("DataActor must be registered before accessing cache")
2192 .borrow()
2193 }
2194
2195 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2201 self.cache
2202 .as_ref()
2203 .expect("DataActor must be registered before accessing cache")
2204 .clone()
2205 }
2206
2207 pub fn register(
2216 &mut self,
2217 trader_id: TraderId,
2218 clock: Rc<RefCell<dyn Clock>>,
2219 cache: Rc<RefCell<Cache>>,
2220 ) -> anyhow::Result<()> {
2221 if let Some(existing_trader_id) = self.trader_id {
2222 anyhow::bail!(
2223 "DataActor {} already registered with trader {existing_trader_id}",
2224 self.actor_id
2225 );
2226 }
2227
2228 {
2230 let _timestamp = clock.borrow().timestamp_ns();
2231 }
2232
2233 {
2235 let _cache_borrow = cache.borrow();
2236 }
2237
2238 self.trader_id = Some(trader_id);
2239 self.clock = Some(clock);
2240 self.cache = Some(cache);
2241
2242 if !self.is_properly_registered() {
2244 anyhow::bail!(
2245 "DataActor {} registration incomplete - validation failed",
2246 self.actor_id
2247 );
2248 }
2249
2250 log::debug!("Registered '{}' with trader {trader_id}", self.actor_id);
2251 Ok(())
2252 }
2253
2254 pub fn register_warning_event(&mut self, event_type: &str) {
2256 self.warning_events.insert(event_type.to_string());
2257 log::debug!("Registered event type '{event_type}' for warning logs");
2258 }
2259
2260 pub fn deregister_warning_event(&mut self, event_type: &str) {
2262 self.warning_events.remove(event_type);
2263 log::debug!("Deregistered event type '{event_type}' from warning logs");
2264 }
2265
2266 pub fn is_registered(&self) -> bool {
2267 self.trader_id.is_some()
2268 }
2269
2270 pub(crate) fn check_registered(&self) {
2271 assert!(
2272 self.is_registered(),
2273 "Actor has not been registered with a Trader"
2274 );
2275 }
2276
2277 fn is_properly_registered(&self) -> bool {
2279 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2280 }
2281
2282 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2283 if self.config.log_commands {
2284 log::info!("{CMD}{SEND} {command:?}");
2285 }
2286
2287 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2288 msgbus::send_any(endpoint, command.as_any());
2289 }
2290
2291 #[allow(dead_code)]
2292 fn send_data_req(&self, request: RequestCommand) {
2293 if self.config.log_commands {
2294 log::info!("{REQ}{SEND} {request:?}");
2295 }
2296
2297 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2300 msgbus::send_any(endpoint, request.as_any());
2301 }
2302
2303 pub fn shutdown_system(&self, reason: Option<String>) {
2309 self.check_registered();
2310
2311 let command = ShutdownSystem::new(
2313 self.trader_id().unwrap(),
2314 self.actor_id.inner(),
2315 reason,
2316 UUID4::new(),
2317 self.timestamp_ns(),
2318 );
2319
2320 let endpoint = "command.system.shutdown".into();
2321 msgbus::send_any(endpoint, command.as_any());
2322 }
2323
2324 pub fn subscribe_data(
2332 &mut self,
2333 handler: ShareableMessageHandler,
2334 data_type: DataType,
2335 client_id: Option<ClientId>,
2336 params: Option<IndexMap<String, String>>,
2337 ) {
2338 if !self.is_properly_registered() {
2339 panic!(
2340 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2341 self.actor_id,
2342 self.trader_id,
2343 self.clock.is_some(),
2344 self.cache.is_some()
2345 );
2346 }
2347
2348 let topic = get_custom_topic(&data_type);
2349 self.add_subscription(topic, handler);
2350
2351 if client_id.is_none() {
2353 return;
2354 }
2355
2356 let command = SubscribeCommand::Data(SubscribeCustomData {
2357 data_type,
2358 client_id,
2359 venue: None,
2360 command_id: UUID4::new(),
2361 ts_init: self.timestamp_ns(),
2362 params,
2363 });
2364
2365 self.send_data_cmd(DataCommand::Subscribe(command));
2366 }
2367
2368 pub fn subscribe_quotes(
2370 &mut self,
2371 topic: MStr<Topic>,
2372 handler: ShareableMessageHandler,
2373 instrument_id: InstrumentId,
2374 client_id: Option<ClientId>,
2375 params: Option<IndexMap<String, String>>,
2376 ) {
2377 self.check_registered();
2378
2379 self.add_subscription(topic, handler);
2380
2381 let command = SubscribeCommand::Quotes(SubscribeQuotes {
2382 instrument_id,
2383 client_id,
2384 venue: Some(instrument_id.venue),
2385 command_id: UUID4::new(),
2386 ts_init: self.timestamp_ns(),
2387 params,
2388 });
2389
2390 self.send_data_cmd(DataCommand::Subscribe(command));
2391 }
2392
2393 pub fn subscribe_instruments(
2395 &mut self,
2396 topic: MStr<Topic>,
2397 handler: ShareableMessageHandler,
2398 venue: Venue,
2399 client_id: Option<ClientId>,
2400 params: Option<IndexMap<String, String>>,
2401 ) {
2402 self.check_registered();
2403
2404 self.add_subscription(topic, handler);
2405
2406 let command = SubscribeCommand::Instruments(SubscribeInstruments {
2407 client_id,
2408 venue,
2409 command_id: UUID4::new(),
2410 ts_init: self.timestamp_ns(),
2411 params,
2412 });
2413
2414 self.send_data_cmd(DataCommand::Subscribe(command));
2415 }
2416
2417 pub fn subscribe_instrument(
2419 &mut self,
2420 topic: MStr<Topic>,
2421 handler: ShareableMessageHandler,
2422 instrument_id: InstrumentId,
2423 client_id: Option<ClientId>,
2424 params: Option<IndexMap<String, String>>,
2425 ) {
2426 self.check_registered();
2427
2428 self.add_subscription(topic, handler);
2429
2430 let command = SubscribeCommand::Instrument(SubscribeInstrument {
2431 instrument_id,
2432 client_id,
2433 venue: Some(instrument_id.venue),
2434 command_id: UUID4::new(),
2435 ts_init: self.timestamp_ns(),
2436 params,
2437 });
2438
2439 self.send_data_cmd(DataCommand::Subscribe(command));
2440 }
2441
2442 #[allow(clippy::too_many_arguments)]
2444 pub fn subscribe_book_deltas(
2445 &mut self,
2446 topic: MStr<Topic>,
2447 handler: ShareableMessageHandler,
2448 instrument_id: InstrumentId,
2449 book_type: BookType,
2450 depth: Option<NonZeroUsize>,
2451 client_id: Option<ClientId>,
2452 managed: bool,
2453 params: Option<IndexMap<String, String>>,
2454 ) {
2455 self.check_registered();
2456
2457 self.add_subscription(topic, handler);
2458
2459 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2460 instrument_id,
2461 book_type,
2462 client_id,
2463 venue: Some(instrument_id.venue),
2464 command_id: UUID4::new(),
2465 ts_init: self.timestamp_ns(),
2466 depth,
2467 managed,
2468 params,
2469 });
2470
2471 self.send_data_cmd(DataCommand::Subscribe(command));
2472 }
2473
2474 #[allow(clippy::too_many_arguments)]
2476 pub fn subscribe_book_at_interval(
2477 &mut self,
2478 topic: MStr<Topic>,
2479 handler: ShareableMessageHandler,
2480 instrument_id: InstrumentId,
2481 book_type: BookType,
2482 depth: Option<NonZeroUsize>,
2483 interval_ms: NonZeroUsize,
2484 client_id: Option<ClientId>,
2485 params: Option<IndexMap<String, String>>,
2486 ) {
2487 self.check_registered();
2488
2489 self.add_subscription(topic, handler);
2490
2491 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2492 instrument_id,
2493 book_type,
2494 client_id,
2495 venue: Some(instrument_id.venue),
2496 command_id: UUID4::new(),
2497 ts_init: self.timestamp_ns(),
2498 depth,
2499 interval_ms,
2500 params,
2501 });
2502
2503 self.send_data_cmd(DataCommand::Subscribe(command));
2504 }
2505
2506 pub fn subscribe_trades(
2508 &mut self,
2509 topic: MStr<Topic>,
2510 handler: ShareableMessageHandler,
2511 instrument_id: InstrumentId,
2512 client_id: Option<ClientId>,
2513 params: Option<IndexMap<String, String>>,
2514 ) {
2515 self.check_registered();
2516
2517 self.add_subscription(topic, handler);
2518
2519 let command = SubscribeCommand::Trades(SubscribeTrades {
2520 instrument_id,
2521 client_id,
2522 venue: Some(instrument_id.venue),
2523 command_id: UUID4::new(),
2524 ts_init: self.timestamp_ns(),
2525 params,
2526 });
2527
2528 self.send_data_cmd(DataCommand::Subscribe(command));
2529 }
2530
2531 pub fn subscribe_bars(
2533 &mut self,
2534 topic: MStr<Topic>,
2535 handler: ShareableMessageHandler,
2536 bar_type: BarType,
2537 client_id: Option<ClientId>,
2538 params: Option<IndexMap<String, String>>,
2539 ) {
2540 self.check_registered();
2541
2542 self.add_subscription(topic, handler);
2543
2544 let command = SubscribeCommand::Bars(SubscribeBars {
2545 bar_type,
2546 client_id,
2547 venue: Some(bar_type.instrument_id().venue),
2548 command_id: UUID4::new(),
2549 ts_init: self.timestamp_ns(),
2550 params,
2551 });
2552
2553 self.send_data_cmd(DataCommand::Subscribe(command));
2554 }
2555
2556 pub fn subscribe_mark_prices(
2558 &mut self,
2559 topic: MStr<Topic>,
2560 handler: ShareableMessageHandler,
2561 instrument_id: InstrumentId,
2562 client_id: Option<ClientId>,
2563 params: Option<IndexMap<String, String>>,
2564 ) {
2565 self.check_registered();
2566
2567 self.add_subscription(topic, handler);
2568
2569 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2570 instrument_id,
2571 client_id,
2572 venue: Some(instrument_id.venue),
2573 command_id: UUID4::new(),
2574 ts_init: self.timestamp_ns(),
2575 params,
2576 });
2577
2578 self.send_data_cmd(DataCommand::Subscribe(command));
2579 }
2580
2581 pub fn subscribe_index_prices(
2583 &mut self,
2584 topic: MStr<Topic>,
2585 handler: ShareableMessageHandler,
2586 instrument_id: InstrumentId,
2587 client_id: Option<ClientId>,
2588 params: Option<IndexMap<String, String>>,
2589 ) {
2590 self.check_registered();
2591
2592 self.add_subscription(topic, handler);
2593
2594 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2595 instrument_id,
2596 client_id,
2597 venue: Some(instrument_id.venue),
2598 command_id: UUID4::new(),
2599 ts_init: self.timestamp_ns(),
2600 params,
2601 });
2602
2603 self.send_data_cmd(DataCommand::Subscribe(command));
2604 }
2605
2606 pub fn subscribe_funding_rates(
2608 &mut self,
2609 topic: MStr<Topic>,
2610 handler: ShareableMessageHandler,
2611 instrument_id: InstrumentId,
2612 client_id: Option<ClientId>,
2613 params: Option<IndexMap<String, String>>,
2614 ) {
2615 self.check_registered();
2616
2617 self.add_subscription(topic, handler);
2618
2619 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
2620 instrument_id,
2621 client_id,
2622 venue: Some(instrument_id.venue),
2623 command_id: UUID4::new(),
2624 ts_init: self.timestamp_ns(),
2625 params,
2626 });
2627
2628 self.send_data_cmd(DataCommand::Subscribe(command));
2629 }
2630
2631 pub fn subscribe_instrument_status(
2633 &mut self,
2634 topic: MStr<Topic>,
2635 handler: ShareableMessageHandler,
2636 instrument_id: InstrumentId,
2637 client_id: Option<ClientId>,
2638 params: Option<IndexMap<String, String>>,
2639 ) {
2640 self.check_registered();
2641
2642 self.add_subscription(topic, handler);
2643
2644 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2645 instrument_id,
2646 client_id,
2647 venue: Some(instrument_id.venue),
2648 command_id: UUID4::new(),
2649 ts_init: self.timestamp_ns(),
2650 params,
2651 });
2652
2653 self.send_data_cmd(DataCommand::Subscribe(command));
2654 }
2655
2656 pub fn subscribe_instrument_close(
2658 &mut self,
2659 topic: MStr<Topic>,
2660 handler: ShareableMessageHandler,
2661 instrument_id: InstrumentId,
2662 client_id: Option<ClientId>,
2663 params: Option<IndexMap<String, String>>,
2664 ) {
2665 self.check_registered();
2666
2667 self.add_subscription(topic, handler);
2668
2669 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2670 instrument_id,
2671 client_id,
2672 venue: Some(instrument_id.venue),
2673 command_id: UUID4::new(),
2674 ts_init: self.timestamp_ns(),
2675 params,
2676 });
2677
2678 self.send_data_cmd(DataCommand::Subscribe(command));
2679 }
2680
2681 pub fn subscribe_order_fills(&mut self, topic: MStr<Topic>, handler: ShareableMessageHandler) {
2683 self.check_registered();
2684 self.add_subscription(topic, handler);
2685 }
2686
2687 pub fn subscribe_order_cancels(
2689 &mut self,
2690 topic: MStr<Topic>,
2691 handler: ShareableMessageHandler,
2692 ) {
2693 self.check_registered();
2694 self.add_subscription(topic, handler);
2695 }
2696
2697 pub fn unsubscribe_data(
2699 &mut self,
2700 data_type: DataType,
2701 client_id: Option<ClientId>,
2702 params: Option<IndexMap<String, String>>,
2703 ) {
2704 self.check_registered();
2705
2706 let topic = get_custom_topic(&data_type);
2707 self.remove_subscription(topic);
2708
2709 if client_id.is_none() {
2710 return;
2711 }
2712
2713 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2714 data_type,
2715 client_id,
2716 venue: None,
2717 command_id: UUID4::new(),
2718 ts_init: self.timestamp_ns(),
2719 params,
2720 });
2721
2722 self.send_data_cmd(DataCommand::Unsubscribe(command));
2723 }
2724
2725 pub fn unsubscribe_instruments(
2727 &mut self,
2728 venue: Venue,
2729 client_id: Option<ClientId>,
2730 params: Option<IndexMap<String, String>>,
2731 ) {
2732 self.check_registered();
2733
2734 let topic = get_instruments_topic(venue);
2735 self.remove_subscription(topic);
2736
2737 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2738 client_id,
2739 venue,
2740 command_id: UUID4::new(),
2741 ts_init: self.timestamp_ns(),
2742 params,
2743 });
2744
2745 self.send_data_cmd(DataCommand::Unsubscribe(command));
2746 }
2747
2748 pub fn unsubscribe_instrument(
2750 &mut self,
2751 instrument_id: InstrumentId,
2752 client_id: Option<ClientId>,
2753 params: Option<IndexMap<String, String>>,
2754 ) {
2755 self.check_registered();
2756
2757 let topic = get_instrument_topic(instrument_id);
2758 self.remove_subscription(topic);
2759
2760 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2761 instrument_id,
2762 client_id,
2763 venue: Some(instrument_id.venue),
2764 command_id: UUID4::new(),
2765 ts_init: self.timestamp_ns(),
2766 params,
2767 });
2768
2769 self.send_data_cmd(DataCommand::Unsubscribe(command));
2770 }
2771
2772 pub fn unsubscribe_book_deltas(
2774 &mut self,
2775 instrument_id: InstrumentId,
2776 client_id: Option<ClientId>,
2777 params: Option<IndexMap<String, String>>,
2778 ) {
2779 self.check_registered();
2780
2781 let topic = get_book_deltas_topic(instrument_id);
2782 self.remove_subscription(topic);
2783
2784 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2785 instrument_id,
2786 client_id,
2787 venue: Some(instrument_id.venue),
2788 command_id: UUID4::new(),
2789 ts_init: self.timestamp_ns(),
2790 params,
2791 });
2792
2793 self.send_data_cmd(DataCommand::Unsubscribe(command));
2794 }
2795
2796 pub fn unsubscribe_book_at_interval(
2798 &mut self,
2799 instrument_id: InstrumentId,
2800 interval_ms: NonZeroUsize,
2801 client_id: Option<ClientId>,
2802 params: Option<IndexMap<String, String>>,
2803 ) {
2804 self.check_registered();
2805
2806 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
2807 self.remove_subscription(topic);
2808
2809 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2810 instrument_id,
2811 client_id,
2812 venue: Some(instrument_id.venue),
2813 command_id: UUID4::new(),
2814 ts_init: self.timestamp_ns(),
2815 params,
2816 });
2817
2818 self.send_data_cmd(DataCommand::Unsubscribe(command));
2819 }
2820
2821 pub fn unsubscribe_quotes(
2823 &mut self,
2824 instrument_id: InstrumentId,
2825 client_id: Option<ClientId>,
2826 params: Option<IndexMap<String, String>>,
2827 ) {
2828 self.check_registered();
2829
2830 let topic = get_quotes_topic(instrument_id);
2831 self.remove_subscription(topic);
2832
2833 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2834 instrument_id,
2835 client_id,
2836 venue: Some(instrument_id.venue),
2837 command_id: UUID4::new(),
2838 ts_init: self.timestamp_ns(),
2839 params,
2840 });
2841
2842 self.send_data_cmd(DataCommand::Unsubscribe(command));
2843 }
2844
2845 pub fn unsubscribe_trades(
2847 &mut self,
2848 instrument_id: InstrumentId,
2849 client_id: Option<ClientId>,
2850 params: Option<IndexMap<String, String>>,
2851 ) {
2852 self.check_registered();
2853
2854 let topic = get_trades_topic(instrument_id);
2855 self.remove_subscription(topic);
2856
2857 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2858 instrument_id,
2859 client_id,
2860 venue: Some(instrument_id.venue),
2861 command_id: UUID4::new(),
2862 ts_init: self.timestamp_ns(),
2863 params,
2864 });
2865
2866 self.send_data_cmd(DataCommand::Unsubscribe(command));
2867 }
2868
2869 pub fn unsubscribe_bars(
2871 &mut self,
2872 bar_type: BarType,
2873 client_id: Option<ClientId>,
2874 params: Option<IndexMap<String, String>>,
2875 ) {
2876 self.check_registered();
2877
2878 let topic = get_bars_topic(bar_type);
2879 self.remove_subscription(topic);
2880
2881 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2882 bar_type,
2883 client_id,
2884 venue: Some(bar_type.instrument_id().venue),
2885 command_id: UUID4::new(),
2886 ts_init: self.timestamp_ns(),
2887 params,
2888 });
2889
2890 self.send_data_cmd(DataCommand::Unsubscribe(command));
2891 }
2892
2893 pub fn unsubscribe_mark_prices(
2895 &mut self,
2896 instrument_id: InstrumentId,
2897 client_id: Option<ClientId>,
2898 params: Option<IndexMap<String, String>>,
2899 ) {
2900 self.check_registered();
2901
2902 let topic = get_mark_price_topic(instrument_id);
2903 self.remove_subscription(topic);
2904
2905 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2906 instrument_id,
2907 client_id,
2908 venue: Some(instrument_id.venue),
2909 command_id: UUID4::new(),
2910 ts_init: self.timestamp_ns(),
2911 params,
2912 });
2913
2914 self.send_data_cmd(DataCommand::Unsubscribe(command));
2915 }
2916
2917 pub fn unsubscribe_index_prices(
2919 &mut self,
2920 instrument_id: InstrumentId,
2921 client_id: Option<ClientId>,
2922 params: Option<IndexMap<String, String>>,
2923 ) {
2924 self.check_registered();
2925
2926 let topic = get_index_price_topic(instrument_id);
2927 self.remove_subscription(topic);
2928
2929 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2930 instrument_id,
2931 client_id,
2932 venue: Some(instrument_id.venue),
2933 command_id: UUID4::new(),
2934 ts_init: self.timestamp_ns(),
2935 params,
2936 });
2937
2938 self.send_data_cmd(DataCommand::Unsubscribe(command));
2939 }
2940
2941 pub fn unsubscribe_funding_rates(
2943 &mut self,
2944 instrument_id: InstrumentId,
2945 client_id: Option<ClientId>,
2946 params: Option<IndexMap<String, String>>,
2947 ) {
2948 self.check_registered();
2949
2950 let topic = get_funding_rate_topic(instrument_id);
2951 self.remove_subscription(topic);
2952
2953 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
2954 instrument_id,
2955 client_id,
2956 venue: Some(instrument_id.venue),
2957 command_id: UUID4::new(),
2958 ts_init: self.timestamp_ns(),
2959 params,
2960 });
2961
2962 self.send_data_cmd(DataCommand::Unsubscribe(command));
2963 }
2964
2965 pub fn unsubscribe_instrument_status(
2967 &mut self,
2968 instrument_id: InstrumentId,
2969 client_id: Option<ClientId>,
2970 params: Option<IndexMap<String, String>>,
2971 ) {
2972 self.check_registered();
2973
2974 let topic = get_instrument_status_topic(instrument_id);
2975 self.remove_subscription(topic);
2976
2977 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2978 instrument_id,
2979 client_id,
2980 venue: Some(instrument_id.venue),
2981 command_id: UUID4::new(),
2982 ts_init: self.timestamp_ns(),
2983 params,
2984 });
2985
2986 self.send_data_cmd(DataCommand::Unsubscribe(command));
2987 }
2988
2989 pub fn unsubscribe_instrument_close(
2991 &mut self,
2992 instrument_id: InstrumentId,
2993 client_id: Option<ClientId>,
2994 params: Option<IndexMap<String, String>>,
2995 ) {
2996 self.check_registered();
2997
2998 let topic = get_instrument_close_topic(instrument_id);
2999 self.remove_subscription(topic);
3000
3001 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3002 instrument_id,
3003 client_id,
3004 venue: Some(instrument_id.venue),
3005 command_id: UUID4::new(),
3006 ts_init: self.timestamp_ns(),
3007 params,
3008 });
3009
3010 self.send_data_cmd(DataCommand::Unsubscribe(command));
3011 }
3012
3013 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3015 self.check_registered();
3016
3017 let topic = get_order_fills_topic(instrument_id);
3018 self.remove_subscription(topic);
3019 }
3020
3021 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3023 self.check_registered();
3024
3025 let topic = get_order_cancels_topic(instrument_id);
3026 self.remove_subscription(topic);
3027 }
3028
3029 #[allow(clippy::too_many_arguments)]
3035 pub fn request_data(
3036 &self,
3037 data_type: DataType,
3038 client_id: ClientId,
3039 start: Option<DateTime<Utc>>,
3040 end: Option<DateTime<Utc>>,
3041 limit: Option<NonZeroUsize>,
3042 params: Option<IndexMap<String, String>>,
3043 handler: ShareableMessageHandler,
3044 ) -> anyhow::Result<UUID4> {
3045 self.check_registered();
3046
3047 let now = self.clock_ref().utc_now();
3048 check_timestamps(now, start, end)?;
3049
3050 let request_id = UUID4::new();
3051 let command = RequestCommand::Data(RequestCustomData {
3052 client_id,
3053 data_type,
3054 start,
3055 end,
3056 limit,
3057 request_id,
3058 ts_init: self.timestamp_ns(),
3059 params,
3060 });
3061
3062 get_message_bus()
3063 .borrow_mut()
3064 .register_response_handler(command.request_id(), handler)?;
3065
3066 self.send_data_cmd(DataCommand::Request(command));
3067
3068 Ok(request_id)
3069 }
3070
3071 pub fn request_instrument(
3077 &self,
3078 instrument_id: InstrumentId,
3079 start: Option<DateTime<Utc>>,
3080 end: Option<DateTime<Utc>>,
3081 client_id: Option<ClientId>,
3082 params: Option<IndexMap<String, String>>,
3083 handler: ShareableMessageHandler,
3084 ) -> anyhow::Result<UUID4> {
3085 self.check_registered();
3086
3087 let now = self.clock_ref().utc_now();
3088 check_timestamps(now, start, end)?;
3089
3090 let request_id = UUID4::new();
3091 let command = RequestCommand::Instrument(RequestInstrument {
3092 instrument_id,
3093 start,
3094 end,
3095 client_id,
3096 request_id,
3097 ts_init: now.into(),
3098 params,
3099 });
3100
3101 get_message_bus()
3102 .borrow_mut()
3103 .register_response_handler(command.request_id(), handler)?;
3104
3105 self.send_data_cmd(DataCommand::Request(command));
3106
3107 Ok(request_id)
3108 }
3109
3110 pub fn request_instruments(
3116 &self,
3117 venue: Option<Venue>,
3118 start: Option<DateTime<Utc>>,
3119 end: Option<DateTime<Utc>>,
3120 client_id: Option<ClientId>,
3121 params: Option<IndexMap<String, String>>,
3122 handler: ShareableMessageHandler,
3123 ) -> anyhow::Result<UUID4> {
3124 self.check_registered();
3125
3126 let now = self.clock_ref().utc_now();
3127 check_timestamps(now, start, end)?;
3128
3129 let request_id = UUID4::new();
3130 let command = RequestCommand::Instruments(RequestInstruments {
3131 venue,
3132 start,
3133 end,
3134 client_id,
3135 request_id,
3136 ts_init: now.into(),
3137 params,
3138 });
3139
3140 get_message_bus()
3141 .borrow_mut()
3142 .register_response_handler(command.request_id(), handler)?;
3143
3144 self.send_data_cmd(DataCommand::Request(command));
3145
3146 Ok(request_id)
3147 }
3148
3149 pub fn request_book_snapshot(
3155 &self,
3156 instrument_id: InstrumentId,
3157 depth: Option<NonZeroUsize>,
3158 client_id: Option<ClientId>,
3159 params: Option<IndexMap<String, String>>,
3160 handler: ShareableMessageHandler,
3161 ) -> anyhow::Result<UUID4> {
3162 self.check_registered();
3163
3164 let request_id = UUID4::new();
3165 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3166 instrument_id,
3167 depth,
3168 client_id,
3169 request_id,
3170 ts_init: self.timestamp_ns(),
3171 params,
3172 });
3173
3174 get_message_bus()
3175 .borrow_mut()
3176 .register_response_handler(command.request_id(), handler)?;
3177
3178 self.send_data_cmd(DataCommand::Request(command));
3179
3180 Ok(request_id)
3181 }
3182
3183 #[allow(clippy::too_many_arguments)]
3189 pub fn request_quotes(
3190 &self,
3191 instrument_id: InstrumentId,
3192 start: Option<DateTime<Utc>>,
3193 end: Option<DateTime<Utc>>,
3194 limit: Option<NonZeroUsize>,
3195 client_id: Option<ClientId>,
3196 params: Option<IndexMap<String, String>>,
3197 handler: ShareableMessageHandler,
3198 ) -> anyhow::Result<UUID4> {
3199 self.check_registered();
3200
3201 let now = self.clock_ref().utc_now();
3202 check_timestamps(now, start, end)?;
3203
3204 let request_id = UUID4::new();
3205 let command = RequestCommand::Quotes(RequestQuotes {
3206 instrument_id,
3207 start,
3208 end,
3209 limit,
3210 client_id,
3211 request_id,
3212 ts_init: now.into(),
3213 params,
3214 });
3215
3216 get_message_bus()
3217 .borrow_mut()
3218 .register_response_handler(command.request_id(), handler)?;
3219
3220 self.send_data_cmd(DataCommand::Request(command));
3221
3222 Ok(request_id)
3223 }
3224
3225 #[allow(clippy::too_many_arguments)]
3231 pub fn request_trades(
3232 &self,
3233 instrument_id: InstrumentId,
3234 start: Option<DateTime<Utc>>,
3235 end: Option<DateTime<Utc>>,
3236 limit: Option<NonZeroUsize>,
3237 client_id: Option<ClientId>,
3238 params: Option<IndexMap<String, String>>,
3239 handler: ShareableMessageHandler,
3240 ) -> anyhow::Result<UUID4> {
3241 self.check_registered();
3242
3243 let now = self.clock_ref().utc_now();
3244 check_timestamps(now, start, end)?;
3245
3246 let request_id = UUID4::new();
3247 let command = RequestCommand::Trades(RequestTrades {
3248 instrument_id,
3249 start,
3250 end,
3251 limit,
3252 client_id,
3253 request_id,
3254 ts_init: now.into(),
3255 params,
3256 });
3257
3258 get_message_bus()
3259 .borrow_mut()
3260 .register_response_handler(command.request_id(), handler)?;
3261
3262 self.send_data_cmd(DataCommand::Request(command));
3263
3264 Ok(request_id)
3265 }
3266
3267 #[allow(clippy::too_many_arguments)]
3273 pub fn request_bars(
3274 &self,
3275 bar_type: BarType,
3276 start: Option<DateTime<Utc>>,
3277 end: Option<DateTime<Utc>>,
3278 limit: Option<NonZeroUsize>,
3279 client_id: Option<ClientId>,
3280 params: Option<IndexMap<String, String>>,
3281 handler: ShareableMessageHandler,
3282 ) -> anyhow::Result<UUID4> {
3283 self.check_registered();
3284
3285 let now = self.clock_ref().utc_now();
3286 check_timestamps(now, start, end)?;
3287
3288 let request_id = UUID4::new();
3289 let command = RequestCommand::Bars(RequestBars {
3290 bar_type,
3291 start,
3292 end,
3293 limit,
3294 client_id,
3295 request_id,
3296 ts_init: now.into(),
3297 params,
3298 });
3299
3300 get_message_bus()
3301 .borrow_mut()
3302 .register_response_handler(command.request_id(), handler)?;
3303
3304 self.send_data_cmd(DataCommand::Request(command));
3305
3306 Ok(request_id)
3307 }
3308}
3309
3310fn check_timestamps(
3311 now: DateTime<Utc>,
3312 start: Option<DateTime<Utc>>,
3313 end: Option<DateTime<Utc>>,
3314) -> anyhow::Result<()> {
3315 if let Some(start) = start {
3316 check_predicate_true(start <= now, "start was > now")?;
3317 }
3318 if let Some(end) = end {
3319 check_predicate_true(end <= now, "end was > now")?;
3320 }
3321
3322 if let (Some(start), Some(end)) = (start, end) {
3323 check_predicate_true(start < end, "start was >= end")?;
3324 }
3325
3326 Ok(())
3327}
3328
3329fn log_error(e: &anyhow::Error) {
3330 log::error!("{e}");
3331}
3332
3333fn log_not_running<T>(msg: &T)
3334where
3335 T: Debug,
3336{
3337 log::warn!("Received message when not running - skipping {msg:?}");
3339}
3340
3341fn log_received<T>(msg: &T)
3342where
3343 T: Debug,
3344{
3345 log::debug!("{RECV} {msg:?}");
3346}