1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24 sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39 close::InstrumentClose,
40 option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41 },
42 enums::BookType,
43 events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45 instruments::{InstrumentAny, SyntheticInstrument},
46 orderbook::OrderBook,
47};
48use serde::{Deserialize, Serialize};
49use ustr::Ustr;
50
51#[cfg(feature = "indicators")]
52use super::indicators::Indicators;
53use super::{
54 Actor,
55 registry::{get_actor_unchecked, try_get_actor_unchecked},
56};
57#[cfg(feature = "defi")]
58use crate::defi;
59#[cfg(feature = "defi")]
60#[allow(unused_imports)]
61use crate::defi::data_actor as _; use crate::{
63 cache::Cache,
64 clock::Clock,
65 component::Component,
66 enums::{ComponentState, ComponentTrigger},
67 logging::{CMD, RECV, REQ, SEND},
68 messages::{
69 data::{
70 BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
71 InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
72 RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
73 RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
74 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
75 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
76 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
77 SubscribeMarkPrices, SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes,
78 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
79 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
80 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
81 UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
82 UnsubscribeMarkPrices, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
83 UnsubscribeQuotes, UnsubscribeTrades, is_parent_subscription,
84 },
85 system::ShutdownSystem,
86 },
87 msgbus::{
88 self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
89 switchboard::{
90 MessagingSwitchboard, get_bars_topic, get_book_deltas_pattern, get_book_deltas_topic,
91 get_book_snapshots_topic, get_custom_topic, get_funding_rate_topic,
92 get_index_price_topic, get_instrument_close_topic, get_instrument_status_topic,
93 get_instrument_topic, get_instruments_pattern, get_mark_price_topic,
94 get_option_chain_topic, get_option_greeks_topic, get_order_cancels_topic,
95 get_order_fills_topic, get_quotes_topic, get_signal_pattern, get_trades_topic,
96 },
97 },
98 signal::Signal,
99 timer::{TimeEvent, TimeEventCallback},
100};
101
102#[derive(Debug, Clone, Deserialize, Serialize)]
104#[serde(default, deny_unknown_fields)]
105#[cfg_attr(
106 feature = "python",
107 pyo3::pyclass(
108 module = "nautilus_trader.core.nautilus_pyo3.common",
109 subclass,
110 from_py_object
111 )
112)]
113#[cfg_attr(
114 feature = "python",
115 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
116)]
117pub struct DataActorConfig {
118 pub actor_id: Option<ActorId>,
120 pub log_events: bool,
122 pub log_commands: bool,
124}
125
126impl Default for DataActorConfig {
127 fn default() -> Self {
128 Self {
129 actor_id: None,
130 log_events: true,
131 log_commands: true,
132 }
133 }
134}
135
136#[derive(Debug, Clone, Deserialize, Serialize)]
138#[serde(deny_unknown_fields)]
139#[cfg_attr(
140 feature = "python",
141 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
142)]
143#[cfg_attr(
144 feature = "python",
145 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
146)]
147pub struct ImportableActorConfig {
148 pub actor_path: String,
150 pub config_path: String,
152 pub config: HashMap<String, serde_json::Value>,
154}
155
156type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
157
158pub trait DataActor:
159 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
160{
161 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
167 Ok(IndexMap::new())
168 }
169
170 #[allow(unused_variables)]
176 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
177 Ok(())
178 }
179
180 fn on_start(&mut self) -> anyhow::Result<()> {
186 log::warn!(
187 "The `on_start` handler was called when not overridden, \
188 it's expected that any actions required when starting the actor \
189 occur here, such as subscribing/requesting data"
190 );
191 Ok(())
192 }
193
194 fn on_stop(&mut self) -> anyhow::Result<()> {
200 log::warn!(
201 "The `on_stop` handler was called when not overridden, \
202 it's expected that any actions required when stopping the actor \
203 occur here, such as unsubscribing from data",
204 );
205 Ok(())
206 }
207
208 fn on_resume(&mut self) -> anyhow::Result<()> {
214 log::warn!(
215 "The `on_resume` handler was called when not overridden, \
216 it's expected that any actions required when resuming the actor \
217 following a stop occur here"
218 );
219 Ok(())
220 }
221
222 fn on_reset(&mut self) -> anyhow::Result<()> {
228 log::warn!(
229 "The `on_reset` handler was called when not overridden, \
230 it's expected that any actions required when resetting the actor \
231 occur here, such as resetting indicators and other state"
232 );
233 Ok(())
234 }
235
236 fn on_dispose(&mut self) -> anyhow::Result<()> {
242 Ok(())
243 }
244
245 fn on_degrade(&mut self) -> anyhow::Result<()> {
251 Ok(())
252 }
253
254 fn on_fault(&mut self) -> anyhow::Result<()> {
260 Ok(())
261 }
262
263 #[allow(unused_variables)]
269 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
270 Ok(())
271 }
272
273 #[allow(unused_variables)]
279 fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
280 Ok(())
281 }
282
283 #[allow(unused_variables)]
289 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
290 Ok(())
291 }
292
293 #[allow(unused_variables)]
299 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 #[allow(unused_variables)]
309 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
310 Ok(())
311 }
312
313 #[allow(unused_variables)]
319 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
320 Ok(())
321 }
322
323 #[allow(unused_variables)]
329 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
330 Ok(())
331 }
332
333 #[allow(unused_variables)]
339 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
340 Ok(())
341 }
342
343 #[allow(unused_variables)]
349 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
350 Ok(())
351 }
352
353 #[allow(unused_variables)]
359 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
360 Ok(())
361 }
362
363 #[allow(unused_variables)]
369 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
370 Ok(())
371 }
372
373 #[allow(unused_variables)]
379 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
380 Ok(())
381 }
382
383 #[allow(unused_variables)]
389 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
390 Ok(())
391 }
392
393 #[allow(unused_variables)]
399 fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
400 Ok(())
401 }
402
403 #[allow(unused_variables)]
409 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
410 Ok(())
411 }
412
413 #[allow(unused_variables)]
419 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
420 Ok(())
421 }
422
423 #[allow(unused_variables)]
429 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
430 Ok(())
431 }
432
433 #[allow(unused_variables)]
439 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
440 Ok(())
441 }
442
443 #[cfg(feature = "defi")]
444 #[allow(unused_variables)]
450 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
451 Ok(())
452 }
453
454 #[cfg(feature = "defi")]
455 #[allow(unused_variables)]
461 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
462 Ok(())
463 }
464
465 #[cfg(feature = "defi")]
466 #[allow(unused_variables)]
472 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
473 Ok(())
474 }
475
476 #[cfg(feature = "defi")]
477 #[allow(unused_variables)]
483 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
484 Ok(())
485 }
486
487 #[cfg(feature = "defi")]
488 #[allow(unused_variables)]
494 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
495 Ok(())
496 }
497
498 #[cfg(feature = "defi")]
499 #[allow(unused_variables)]
505 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
506 Ok(())
507 }
508
509 #[allow(unused_variables)]
515 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
516 Ok(())
517 }
518
519 #[allow(unused_variables)]
525 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
526 Ok(())
527 }
528
529 #[allow(unused_variables)]
535 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
536 Ok(())
537 }
538
539 #[allow(unused_variables)]
545 fn on_historical_funding_rates(
546 &mut self,
547 funding_rates: &[FundingRateUpdate],
548 ) -> anyhow::Result<()> {
549 Ok(())
550 }
551
552 #[allow(unused_variables)]
558 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
559 Ok(())
560 }
561
562 #[allow(unused_variables)]
568 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
569 Ok(())
570 }
571
572 #[allow(unused_variables)]
578 fn on_historical_index_prices(
579 &mut self,
580 index_prices: &[IndexPriceUpdate],
581 ) -> anyhow::Result<()> {
582 Ok(())
583 }
584
585 fn handle_time_event(&mut self, event: &TimeEvent) {
587 log_received(&event);
588
589 if self.not_running() {
590 log_not_running(&event);
591 return;
592 }
593
594 if let Err(e) = DataActor::on_time_event(self, event) {
595 log_error(&e);
596 }
597 }
598
599 fn handle_data(&mut self, data: &CustomData) {
601 log_received(&data);
602
603 if self.not_running() {
604 log_not_running(&data);
605 return;
606 }
607
608 if let Err(e) = self.on_data(data) {
609 log_error(&e);
610 }
611 }
612
613 fn handle_signal(&mut self, signal: &Signal) {
615 log_received(&signal);
616
617 if self.not_running() {
618 log_not_running(&signal);
619 return;
620 }
621
622 if let Err(e) = self.on_signal(signal) {
623 log_error(&e);
624 }
625 }
626
627 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
629 log_received(&instrument);
630
631 if self.not_running() {
632 log_not_running(&instrument);
633 return;
634 }
635
636 if let Err(e) = self.on_instrument(instrument) {
637 log_error(&e);
638 }
639 }
640
641 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
643 log_received(&deltas);
644
645 if self.not_running() {
646 log_not_running(&deltas);
647 return;
648 }
649
650 if let Err(e) = self.on_book_deltas(deltas) {
651 log_error(&e);
652 }
653 }
654
655 fn handle_book(&mut self, book: &OrderBook) {
657 log_received(&book);
658
659 if self.not_running() {
660 log_not_running(&book);
661 return;
662 }
663
664 if let Err(e) = self.on_book(book) {
665 log_error(&e);
666 }
667 }
668
669 fn handle_quote(&mut self, quote: &QuoteTick) {
671 log_received("e);
672
673 if self.not_running() {
674 log_not_running("e);
675 return;
676 }
677
678 if let Err(e) = self.on_quote(quote) {
679 log_error(&e);
680 }
681 }
682
683 fn handle_trade(&mut self, trade: &TradeTick) {
685 log_received(&trade);
686
687 if self.not_running() {
688 log_not_running(&trade);
689 return;
690 }
691
692 if let Err(e) = self.on_trade(trade) {
693 log_error(&e);
694 }
695 }
696
697 fn handle_bar(&mut self, bar: &Bar) {
699 log_received(&bar);
700
701 if self.not_running() {
702 log_not_running(&bar);
703 return;
704 }
705
706 if let Err(e) = self.on_bar(bar) {
707 log_error(&e);
708 }
709 }
710
711 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
713 log_received(&mark_price);
714
715 if self.not_running() {
716 log_not_running(&mark_price);
717 return;
718 }
719
720 if let Err(e) = self.on_mark_price(mark_price) {
721 log_error(&e);
722 }
723 }
724
725 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
727 log_received(&index_price);
728
729 if self.not_running() {
730 log_not_running(&index_price);
731 return;
732 }
733
734 if let Err(e) = self.on_index_price(index_price) {
735 log_error(&e);
736 }
737 }
738
739 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
741 log_received(&funding_rate);
742
743 if self.not_running() {
744 log_not_running(&funding_rate);
745 return;
746 }
747
748 if let Err(e) = self.on_funding_rate(funding_rate) {
749 log_error(&e);
750 }
751 }
752
753 fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
755 log_received(&greeks);
756
757 if self.not_running() {
758 log_not_running(&greeks);
759 return;
760 }
761
762 if let Err(e) = self.on_option_greeks(greeks) {
763 log_error(&e);
764 }
765 }
766
767 fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
769 log_received(&slice);
770
771 if self.not_running() {
772 log_not_running(&slice);
773 return;
774 }
775
776 if let Err(e) = self.on_option_chain(slice) {
777 log_error(&e);
778 }
779 }
780
781 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
783 log_received(&status);
784
785 if self.not_running() {
786 log_not_running(&status);
787 return;
788 }
789
790 if let Err(e) = self.on_instrument_status(status) {
791 log_error(&e);
792 }
793 }
794
795 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
797 log_received(&close);
798
799 if self.not_running() {
800 log_not_running(&close);
801 return;
802 }
803
804 if let Err(e) = self.on_instrument_close(close) {
805 log_error(&e);
806 }
807 }
808
809 fn handle_order_filled(&mut self, event: &OrderFilled) {
811 log_received(&event);
812
813 if event.strategy_id.inner() == self.actor_id().inner() {
817 return;
818 }
819
820 if self.not_running() {
821 log_not_running(&event);
822 return;
823 }
824
825 if let Err(e) = self.on_order_filled(event) {
826 log_error(&e);
827 }
828 }
829
830 fn handle_order_canceled(&mut self, event: &OrderCanceled) {
832 log_received(&event);
833
834 if event.strategy_id.inner() == self.actor_id().inner() {
838 return;
839 }
840
841 if self.not_running() {
842 log_not_running(&event);
843 return;
844 }
845
846 if let Err(e) = self.on_order_canceled(event) {
847 log_error(&e);
848 }
849 }
850
851 #[cfg(feature = "defi")]
852 fn handle_block(&mut self, block: &Block) {
854 log_received(&block);
855
856 if self.not_running() {
857 log_not_running(&block);
858 return;
859 }
860
861 if let Err(e) = self.on_block(block) {
862 log_error(&e);
863 }
864 }
865
866 #[cfg(feature = "defi")]
867 fn handle_pool(&mut self, pool: &Pool) {
869 log_received(&pool);
870
871 if self.not_running() {
872 log_not_running(&pool);
873 return;
874 }
875
876 if let Err(e) = self.on_pool(pool) {
877 log_error(&e);
878 }
879 }
880
881 #[cfg(feature = "defi")]
882 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
884 log_received(&swap);
885
886 if self.not_running() {
887 log_not_running(&swap);
888 return;
889 }
890
891 if let Err(e) = self.on_pool_swap(swap) {
892 log_error(&e);
893 }
894 }
895
896 #[cfg(feature = "defi")]
897 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
899 log_received(&update);
900
901 if self.not_running() {
902 log_not_running(&update);
903 return;
904 }
905
906 if let Err(e) = self.on_pool_liquidity_update(update) {
907 log_error(&e);
908 }
909 }
910
911 #[cfg(feature = "defi")]
912 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
914 log_received(&collect);
915
916 if self.not_running() {
917 log_not_running(&collect);
918 return;
919 }
920
921 if let Err(e) = self.on_pool_fee_collect(collect) {
922 log_error(&e);
923 }
924 }
925
926 #[cfg(feature = "defi")]
927 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
929 log_received(&flash);
930
931 if self.not_running() {
932 log_not_running(&flash);
933 return;
934 }
935
936 if let Err(e) = self.on_pool_flash(flash) {
937 log_error(&e);
938 }
939 }
940
941 fn handle_historical_data(&mut self, data: &dyn Any) {
943 log_received(&data);
944
945 if let Err(e) = self.on_historical_data(data) {
946 log_error(&e);
947 }
948 }
949
950 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
952 log_received(&resp);
953
954 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
955 log_error(&e);
956 }
957 }
958
959 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
961 log_received(&resp);
962
963 if let Err(e) = self.on_instrument(&resp.data) {
964 log_error(&e);
965 }
966 }
967
968 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
970 log_received_bulk("InstrumentsResponse", &resp.correlation_id, resp.data.len());
971 log::trace!("{RECV} {resp:?}");
972
973 for inst in &resp.data {
974 if let Err(e) = self.on_instrument(inst) {
975 log_error(&e);
976 }
977 }
978 }
979
980 fn handle_book_response(&mut self, resp: &BookResponse) {
982 log_received(&resp);
983
984 if let Err(e) = self.on_book(&resp.data) {
985 log_error(&e);
986 }
987 }
988
989 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
991 log_received_bulk("QuotesResponse", &resp.correlation_id, resp.data.len());
992 log::trace!("{RECV} {resp:?}");
993
994 if let Err(e) = self.on_historical_quotes(&resp.data) {
995 log_error(&e);
996 }
997 }
998
999 fn handle_trades_response(&mut self, resp: &TradesResponse) {
1001 log_received_bulk("TradesResponse", &resp.correlation_id, resp.data.len());
1002 log::trace!("{RECV} {resp:?}");
1003
1004 if let Err(e) = self.on_historical_trades(&resp.data) {
1005 log_error(&e);
1006 }
1007 }
1008
1009 fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1011 log_received_bulk(
1012 "FundingRatesResponse",
1013 &resp.correlation_id,
1014 resp.data.len(),
1015 );
1016 log::trace!("{RECV} {resp:?}");
1017
1018 if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1019 log_error(&e);
1020 }
1021 }
1022
1023 fn handle_bars_response(&mut self, resp: &BarsResponse) {
1025 log_received_bulk("BarsResponse", &resp.correlation_id, resp.data.len());
1026 log::trace!("{RECV} {resp:?}");
1027
1028 if let Err(e) = self.on_historical_bars(&resp.data) {
1029 log_error(&e);
1030 }
1031 }
1032
1033 fn subscribe_data(
1035 &mut self,
1036 data_type: DataType,
1037 client_id: Option<ClientId>,
1038 params: Option<Params>,
1039 ) where
1040 Self: 'static + Debug + Sized,
1041 {
1042 let actor_id = self.actor_id().inner();
1043 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1044 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1045 });
1046
1047 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1048 }
1049
1050 fn subscribe_signal(&mut self, name: &str, priority: Option<u32>)
1065 where
1066 Self: 'static + Debug + Sized,
1067 {
1068 let actor_id = self.actor_id().inner();
1069 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1072 if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1073 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1074 actor.handle_signal(signal);
1075 } else {
1076 log::error!("Actor {actor_id} not found for signal handling");
1077 }
1078 }
1079 });
1080
1081 DataActorCore::subscribe_signal(self, handler, name, priority);
1082 }
1083
1084 fn subscribe_quotes(
1086 &mut self,
1087 instrument_id: InstrumentId,
1088 client_id: Option<ClientId>,
1089 params: Option<Params>,
1090 ) where
1091 Self: 'static + Debug + Sized,
1092 {
1093 let actor_id = self.actor_id().inner();
1094 let topic = get_quotes_topic(instrument_id);
1095
1096 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1097 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1098 actor.handle_quote(quote);
1099 } else {
1100 log::error!("Actor {actor_id} not found for quote handling");
1101 }
1102 });
1103
1104 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1105 }
1106
1107 fn subscribe_instruments(
1109 &mut self,
1110 venue: Venue,
1111 client_id: Option<ClientId>,
1112 params: Option<Params>,
1113 ) where
1114 Self: 'static + Debug + Sized,
1115 {
1116 let actor_id = self.actor_id().inner();
1117 let pattern = get_instruments_pattern(venue);
1118
1119 let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1120 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1121 actor.handle_instrument(instrument);
1122 } else {
1123 log::error!("Actor {actor_id} not found for instruments handling");
1124 }
1125 });
1126
1127 DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1128 }
1129
1130 fn subscribe_instrument(
1132 &mut self,
1133 instrument_id: InstrumentId,
1134 client_id: Option<ClientId>,
1135 params: Option<Params>,
1136 ) where
1137 Self: 'static + Debug + Sized,
1138 {
1139 let actor_id = self.actor_id().inner();
1140 let topic = get_instrument_topic(instrument_id);
1141
1142 let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
1143 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1144 actor.handle_instrument(instrument);
1145 } else {
1146 log::error!("Actor {actor_id} not found for instrument handling");
1147 }
1148 });
1149
1150 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1151 }
1152
1153 fn subscribe_book_deltas(
1155 &mut self,
1156 instrument_id: InstrumentId,
1157 book_type: BookType,
1158 depth: Option<NonZeroUsize>,
1159 client_id: Option<ClientId>,
1160 managed: bool,
1161 params: Option<Params>,
1162 ) where
1163 Self: 'static + Debug + Sized,
1164 {
1165 let actor_id = self.actor_id().inner();
1166 let is_parent = is_parent_subscription(params.as_ref());
1167 let pattern = if is_parent {
1168 get_book_deltas_pattern(instrument_id)
1169 } else {
1170 get_book_deltas_topic(instrument_id).into()
1171 };
1172
1173 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1174 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1175 });
1176
1177 DataActorCore::subscribe_book_deltas(
1178 self,
1179 pattern,
1180 handler,
1181 instrument_id,
1182 book_type,
1183 depth,
1184 client_id,
1185 managed,
1186 params,
1187 );
1188 }
1189
1190 fn subscribe_book_at_interval(
1192 &mut self,
1193 instrument_id: InstrumentId,
1194 book_type: BookType,
1195 depth: Option<NonZeroUsize>,
1196 interval_ms: NonZeroUsize,
1197 client_id: Option<ClientId>,
1198 params: Option<Params>,
1199 ) where
1200 Self: 'static + Debug + Sized,
1201 {
1202 let actor_id = self.actor_id().inner();
1203 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1204
1205 let handler = TypedHandler::from(move |book: &OrderBook| {
1206 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1207 });
1208
1209 DataActorCore::subscribe_book_at_interval(
1210 self,
1211 topic,
1212 handler,
1213 instrument_id,
1214 book_type,
1215 depth,
1216 interval_ms,
1217 client_id,
1218 params,
1219 );
1220 }
1221
1222 fn subscribe_trades(
1224 &mut self,
1225 instrument_id: InstrumentId,
1226 client_id: Option<ClientId>,
1227 params: Option<Params>,
1228 ) where
1229 Self: 'static + Debug + Sized,
1230 {
1231 let actor_id = self.actor_id().inner();
1232 let topic = get_trades_topic(instrument_id);
1233
1234 let handler = TypedHandler::from(move |trade: &TradeTick| {
1235 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1236 });
1237
1238 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1239 }
1240
1241 fn subscribe_bars(
1243 &mut self,
1244 bar_type: BarType,
1245 client_id: Option<ClientId>,
1246 params: Option<Params>,
1247 ) where
1248 Self: 'static + Debug + Sized,
1249 {
1250 let actor_id = self.actor_id().inner();
1251 let topic = get_bars_topic(bar_type);
1252
1253 let handler = TypedHandler::from(move |bar: &Bar| {
1254 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1255 });
1256
1257 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1258 }
1259
1260 fn subscribe_mark_prices(
1262 &mut self,
1263 instrument_id: InstrumentId,
1264 client_id: Option<ClientId>,
1265 params: Option<Params>,
1266 ) where
1267 Self: 'static + Debug + Sized,
1268 {
1269 let actor_id = self.actor_id().inner();
1270 let topic = get_mark_price_topic(instrument_id);
1271
1272 let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1273 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1274 });
1275
1276 DataActorCore::subscribe_mark_prices(
1277 self,
1278 topic,
1279 handler,
1280 instrument_id,
1281 client_id,
1282 params,
1283 );
1284 }
1285
1286 fn subscribe_index_prices(
1288 &mut self,
1289 instrument_id: InstrumentId,
1290 client_id: Option<ClientId>,
1291 params: Option<Params>,
1292 ) where
1293 Self: 'static + Debug + Sized,
1294 {
1295 let actor_id = self.actor_id().inner();
1296 let topic = get_index_price_topic(instrument_id);
1297
1298 let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1299 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1300 });
1301
1302 DataActorCore::subscribe_index_prices(
1303 self,
1304 topic,
1305 handler,
1306 instrument_id,
1307 client_id,
1308 params,
1309 );
1310 }
1311
1312 fn subscribe_funding_rates(
1314 &mut self,
1315 instrument_id: InstrumentId,
1316 client_id: Option<ClientId>,
1317 params: Option<Params>,
1318 ) where
1319 Self: 'static + Debug + Sized,
1320 {
1321 let actor_id = self.actor_id().inner();
1322 let topic = get_funding_rate_topic(instrument_id);
1323
1324 let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1325 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1326 });
1327
1328 DataActorCore::subscribe_funding_rates(
1329 self,
1330 topic,
1331 handler,
1332 instrument_id,
1333 client_id,
1334 params,
1335 );
1336 }
1337
1338 fn subscribe_option_greeks(
1340 &mut self,
1341 instrument_id: InstrumentId,
1342 client_id: Option<ClientId>,
1343 params: Option<Params>,
1344 ) where
1345 Self: 'static + Debug + Sized,
1346 {
1347 let actor_id = self.actor_id().inner();
1348 let topic = get_option_greeks_topic(instrument_id);
1349
1350 let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1351 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1352 actor.handle_option_greeks(option_greeks);
1353 } else {
1354 log::error!("Actor {actor_id} not found for option greeks handling");
1355 }
1356 });
1357
1358 DataActorCore::subscribe_option_greeks(
1359 self,
1360 topic,
1361 handler,
1362 instrument_id,
1363 client_id,
1364 params,
1365 );
1366 }
1367
1368 fn subscribe_instrument_status(
1370 &mut self,
1371 instrument_id: InstrumentId,
1372 client_id: Option<ClientId>,
1373 params: Option<Params>,
1374 ) where
1375 Self: 'static + Debug + Sized,
1376 {
1377 let actor_id = self.actor_id().inner();
1378 let topic = get_instrument_status_topic(instrument_id);
1379
1380 let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1381 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1382 });
1383
1384 DataActorCore::subscribe_instrument_status(
1385 self,
1386 topic,
1387 handler,
1388 instrument_id,
1389 client_id,
1390 params,
1391 );
1392 }
1393
1394 fn subscribe_instrument_close(
1396 &mut self,
1397 instrument_id: InstrumentId,
1398 client_id: Option<ClientId>,
1399 params: Option<Params>,
1400 ) where
1401 Self: 'static + Debug + Sized,
1402 {
1403 let actor_id = self.actor_id().inner();
1404 let topic = get_instrument_close_topic(instrument_id);
1405
1406 let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1407 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1408 });
1409
1410 DataActorCore::subscribe_instrument_close(
1411 self,
1412 topic,
1413 handler,
1414 instrument_id,
1415 client_id,
1416 params,
1417 );
1418 }
1419
1420 fn subscribe_option_chain(
1425 &mut self,
1426 series_id: OptionSeriesId,
1427 strike_range: StrikeRange,
1428 snapshot_interval_ms: Option<u64>,
1429 client_id: Option<ClientId>,
1430 params: Option<Params>,
1431 ) where
1432 Self: 'static + Debug + Sized,
1433 {
1434 let actor_id = self.actor_id().inner();
1435 let topic = get_option_chain_topic(series_id);
1436
1437 let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1438 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1439 actor.handle_option_chain(slice);
1440 } else {
1441 log::error!("Actor {actor_id} not found for option chain handling");
1442 }
1443 });
1444
1445 DataActorCore::subscribe_option_chain(
1446 self,
1447 topic,
1448 handler,
1449 series_id,
1450 strike_range,
1451 snapshot_interval_ms,
1452 client_id,
1453 params,
1454 );
1455 }
1456
1457 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1459 where
1460 Self: 'static + Debug + Sized,
1461 {
1462 let actor_id = self.actor_id().inner();
1463 let topic = get_order_fills_topic(instrument_id);
1464
1465 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1466 if let OrderEventAny::Filled(filled) = event {
1467 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1468 }
1469 });
1470
1471 DataActorCore::subscribe_order_fills(self, topic, handler);
1472 }
1473
1474 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1476 where
1477 Self: 'static + Debug + Sized,
1478 {
1479 let actor_id = self.actor_id().inner();
1480 let topic = get_order_cancels_topic(instrument_id);
1481
1482 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1483 if let OrderEventAny::Canceled(canceled) = event {
1484 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1485 }
1486 });
1487
1488 DataActorCore::subscribe_order_cancels(self, topic, handler);
1489 }
1490
1491 #[cfg(feature = "defi")]
1492 fn subscribe_blocks(
1494 &mut self,
1495 chain: Blockchain,
1496 client_id: Option<ClientId>,
1497 params: Option<Params>,
1498 ) where
1499 Self: 'static + Debug + Sized,
1500 {
1501 let actor_id = self.actor_id().inner();
1502 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1503
1504 let handler = TypedHandler::from(move |block: &Block| {
1505 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1506 });
1507
1508 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1509 }
1510
1511 #[cfg(feature = "defi")]
1512 fn subscribe_pool(
1514 &mut self,
1515 instrument_id: InstrumentId,
1516 client_id: Option<ClientId>,
1517 params: Option<Params>,
1518 ) where
1519 Self: 'static + Debug + Sized,
1520 {
1521 let actor_id = self.actor_id().inner();
1522 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1523
1524 let handler = TypedHandler::from(move |pool: &Pool| {
1525 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1526 });
1527
1528 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1529 }
1530
1531 #[cfg(feature = "defi")]
1532 fn subscribe_pool_swaps(
1534 &mut self,
1535 instrument_id: InstrumentId,
1536 client_id: Option<ClientId>,
1537 params: Option<Params>,
1538 ) where
1539 Self: 'static + Debug + Sized,
1540 {
1541 let actor_id = self.actor_id().inner();
1542 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1543
1544 let handler = TypedHandler::from(move |swap: &PoolSwap| {
1545 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1546 });
1547
1548 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1549 }
1550
1551 #[cfg(feature = "defi")]
1552 fn subscribe_pool_liquidity_updates(
1554 &mut self,
1555 instrument_id: InstrumentId,
1556 client_id: Option<ClientId>,
1557 params: Option<Params>,
1558 ) where
1559 Self: 'static + Debug + Sized,
1560 {
1561 let actor_id = self.actor_id().inner();
1562 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1563
1564 let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1565 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1566 });
1567
1568 DataActorCore::subscribe_pool_liquidity_updates(
1569 self,
1570 topic,
1571 handler,
1572 instrument_id,
1573 client_id,
1574 params,
1575 );
1576 }
1577
1578 #[cfg(feature = "defi")]
1579 fn subscribe_pool_fee_collects(
1581 &mut self,
1582 instrument_id: InstrumentId,
1583 client_id: Option<ClientId>,
1584 params: Option<Params>,
1585 ) where
1586 Self: 'static + Debug + Sized,
1587 {
1588 let actor_id = self.actor_id().inner();
1589 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1590
1591 let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1592 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1593 });
1594
1595 DataActorCore::subscribe_pool_fee_collects(
1596 self,
1597 topic,
1598 handler,
1599 instrument_id,
1600 client_id,
1601 params,
1602 );
1603 }
1604
1605 #[cfg(feature = "defi")]
1606 fn subscribe_pool_flash_events(
1608 &mut self,
1609 instrument_id: InstrumentId,
1610 client_id: Option<ClientId>,
1611 params: Option<Params>,
1612 ) where
1613 Self: 'static + Debug + Sized,
1614 {
1615 let actor_id = self.actor_id().inner();
1616 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1617
1618 let handler = TypedHandler::from(move |flash: &PoolFlash| {
1619 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1620 });
1621
1622 DataActorCore::subscribe_pool_flash_events(
1623 self,
1624 topic,
1625 handler,
1626 instrument_id,
1627 client_id,
1628 params,
1629 );
1630 }
1631
1632 fn unsubscribe_data(
1634 &mut self,
1635 data_type: DataType,
1636 client_id: Option<ClientId>,
1637 params: Option<Params>,
1638 ) where
1639 Self: 'static + Debug + Sized,
1640 {
1641 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1642 }
1643
1644 fn unsubscribe_signal(&mut self, name: &str)
1646 where
1647 Self: 'static + Debug + Sized,
1648 {
1649 DataActorCore::unsubscribe_signal(self, name);
1650 }
1651
1652 fn unsubscribe_instruments(
1654 &mut self,
1655 venue: Venue,
1656 client_id: Option<ClientId>,
1657 params: Option<Params>,
1658 ) where
1659 Self: 'static + Debug + Sized,
1660 {
1661 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1662 }
1663
1664 fn unsubscribe_instrument(
1666 &mut self,
1667 instrument_id: InstrumentId,
1668 client_id: Option<ClientId>,
1669 params: Option<Params>,
1670 ) where
1671 Self: 'static + Debug + Sized,
1672 {
1673 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1674 }
1675
1676 fn unsubscribe_book_deltas(
1678 &mut self,
1679 instrument_id: InstrumentId,
1680 client_id: Option<ClientId>,
1681 params: Option<Params>,
1682 ) where
1683 Self: 'static + Debug + Sized,
1684 {
1685 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1686 }
1687
1688 fn unsubscribe_book_at_interval(
1690 &mut self,
1691 instrument_id: InstrumentId,
1692 interval_ms: NonZeroUsize,
1693 client_id: Option<ClientId>,
1694 params: Option<Params>,
1695 ) where
1696 Self: 'static + Debug + Sized,
1697 {
1698 DataActorCore::unsubscribe_book_at_interval(
1699 self,
1700 instrument_id,
1701 interval_ms,
1702 client_id,
1703 params,
1704 );
1705 }
1706
1707 fn unsubscribe_quotes(
1709 &mut self,
1710 instrument_id: InstrumentId,
1711 client_id: Option<ClientId>,
1712 params: Option<Params>,
1713 ) where
1714 Self: 'static + Debug + Sized,
1715 {
1716 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1717 }
1718
1719 fn unsubscribe_trades(
1721 &mut self,
1722 instrument_id: InstrumentId,
1723 client_id: Option<ClientId>,
1724 params: Option<Params>,
1725 ) where
1726 Self: 'static + Debug + Sized,
1727 {
1728 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1729 }
1730
1731 fn unsubscribe_bars(
1733 &mut self,
1734 bar_type: BarType,
1735 client_id: Option<ClientId>,
1736 params: Option<Params>,
1737 ) where
1738 Self: 'static + Debug + Sized,
1739 {
1740 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1741 }
1742
1743 fn unsubscribe_mark_prices(
1745 &mut self,
1746 instrument_id: InstrumentId,
1747 client_id: Option<ClientId>,
1748 params: Option<Params>,
1749 ) where
1750 Self: 'static + Debug + Sized,
1751 {
1752 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1753 }
1754
1755 fn unsubscribe_index_prices(
1757 &mut self,
1758 instrument_id: InstrumentId,
1759 client_id: Option<ClientId>,
1760 params: Option<Params>,
1761 ) where
1762 Self: 'static + Debug + Sized,
1763 {
1764 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1765 }
1766
1767 fn unsubscribe_funding_rates(
1769 &mut self,
1770 instrument_id: InstrumentId,
1771 client_id: Option<ClientId>,
1772 params: Option<Params>,
1773 ) where
1774 Self: 'static + Debug + Sized,
1775 {
1776 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1777 }
1778
1779 fn unsubscribe_option_greeks(
1781 &mut self,
1782 instrument_id: InstrumentId,
1783 client_id: Option<ClientId>,
1784 params: Option<Params>,
1785 ) where
1786 Self: 'static + Debug + Sized,
1787 {
1788 DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1789 }
1790
1791 fn unsubscribe_instrument_status(
1793 &mut self,
1794 instrument_id: InstrumentId,
1795 client_id: Option<ClientId>,
1796 params: Option<Params>,
1797 ) where
1798 Self: 'static + Debug + Sized,
1799 {
1800 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1801 }
1802
1803 fn unsubscribe_instrument_close(
1805 &mut self,
1806 instrument_id: InstrumentId,
1807 client_id: Option<ClientId>,
1808 params: Option<Params>,
1809 ) where
1810 Self: 'static + Debug + Sized,
1811 {
1812 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1813 }
1814
1815 fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1817 where
1818 Self: 'static + Debug + Sized,
1819 {
1820 DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1821 }
1822
1823 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1825 where
1826 Self: 'static + Debug + Sized,
1827 {
1828 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1829 }
1830
1831 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1833 where
1834 Self: 'static + Debug + Sized,
1835 {
1836 DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1837 }
1838
1839 #[cfg(feature = "defi")]
1840 fn unsubscribe_blocks(
1842 &mut self,
1843 chain: Blockchain,
1844 client_id: Option<ClientId>,
1845 params: Option<Params>,
1846 ) where
1847 Self: 'static + Debug + Sized,
1848 {
1849 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1850 }
1851
1852 #[cfg(feature = "defi")]
1853 fn unsubscribe_pool(
1855 &mut self,
1856 instrument_id: InstrumentId,
1857 client_id: Option<ClientId>,
1858 params: Option<Params>,
1859 ) where
1860 Self: 'static + Debug + Sized,
1861 {
1862 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1863 }
1864
1865 #[cfg(feature = "defi")]
1866 fn unsubscribe_pool_swaps(
1868 &mut self,
1869 instrument_id: InstrumentId,
1870 client_id: Option<ClientId>,
1871 params: Option<Params>,
1872 ) where
1873 Self: 'static + Debug + Sized,
1874 {
1875 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1876 }
1877
1878 #[cfg(feature = "defi")]
1879 fn unsubscribe_pool_liquidity_updates(
1881 &mut self,
1882 instrument_id: InstrumentId,
1883 client_id: Option<ClientId>,
1884 params: Option<Params>,
1885 ) where
1886 Self: 'static + Debug + Sized,
1887 {
1888 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1889 }
1890
1891 #[cfg(feature = "defi")]
1892 fn unsubscribe_pool_fee_collects(
1894 &mut self,
1895 instrument_id: InstrumentId,
1896 client_id: Option<ClientId>,
1897 params: Option<Params>,
1898 ) where
1899 Self: 'static + Debug + Sized,
1900 {
1901 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1902 }
1903
1904 #[cfg(feature = "defi")]
1905 fn unsubscribe_pool_flash_events(
1907 &mut self,
1908 instrument_id: InstrumentId,
1909 client_id: Option<ClientId>,
1910 params: Option<Params>,
1911 ) where
1912 Self: 'static + Debug + Sized,
1913 {
1914 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1915 }
1916
1917 fn request_data(
1923 &mut self,
1924 data_type: DataType,
1925 client_id: ClientId,
1926 start: Option<DateTime<Utc>>,
1927 end: Option<DateTime<Utc>>,
1928 limit: Option<NonZeroUsize>,
1929 params: Option<Params>,
1930 ) -> anyhow::Result<UUID4>
1931 where
1932 Self: 'static + Debug + Sized,
1933 {
1934 let actor_id = self.actor_id().inner();
1935 let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1936 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1937 });
1938
1939 DataActorCore::request_data(
1940 self, data_type, client_id, start, end, limit, params, handler,
1941 )
1942 }
1943
1944 fn request_instrument(
1950 &mut self,
1951 instrument_id: InstrumentId,
1952 start: Option<DateTime<Utc>>,
1953 end: Option<DateTime<Utc>>,
1954 client_id: Option<ClientId>,
1955 params: Option<Params>,
1956 ) -> anyhow::Result<UUID4>
1957 where
1958 Self: 'static + Debug + Sized,
1959 {
1960 let actor_id = self.actor_id().inner();
1961 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1962 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1963 });
1964
1965 DataActorCore::request_instrument(
1966 self,
1967 instrument_id,
1968 start,
1969 end,
1970 client_id,
1971 params,
1972 handler,
1973 )
1974 }
1975
1976 fn request_instruments(
1982 &mut self,
1983 venue: Option<Venue>,
1984 start: Option<DateTime<Utc>>,
1985 end: Option<DateTime<Utc>>,
1986 client_id: Option<ClientId>,
1987 params: Option<Params>,
1988 ) -> anyhow::Result<UUID4>
1989 where
1990 Self: 'static + Debug + Sized,
1991 {
1992 let actor_id = self.actor_id().inner();
1993 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1994 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1995 });
1996
1997 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1998 }
1999
2000 fn request_book_snapshot(
2006 &mut self,
2007 instrument_id: InstrumentId,
2008 depth: Option<NonZeroUsize>,
2009 client_id: Option<ClientId>,
2010 params: Option<Params>,
2011 ) -> anyhow::Result<UUID4>
2012 where
2013 Self: 'static + Debug + Sized,
2014 {
2015 let actor_id = self.actor_id().inner();
2016 let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
2017 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
2018 });
2019
2020 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
2021 }
2022
2023 fn request_quotes(
2029 &mut self,
2030 instrument_id: InstrumentId,
2031 start: Option<DateTime<Utc>>,
2032 end: Option<DateTime<Utc>>,
2033 limit: Option<NonZeroUsize>,
2034 client_id: Option<ClientId>,
2035 params: Option<Params>,
2036 ) -> anyhow::Result<UUID4>
2037 where
2038 Self: 'static + Debug + Sized,
2039 {
2040 let actor_id = self.actor_id().inner();
2041 let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2042 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2043 });
2044
2045 DataActorCore::request_quotes(
2046 self,
2047 instrument_id,
2048 start,
2049 end,
2050 limit,
2051 client_id,
2052 params,
2053 handler,
2054 )
2055 }
2056
2057 fn request_trades(
2063 &mut self,
2064 instrument_id: InstrumentId,
2065 start: Option<DateTime<Utc>>,
2066 end: Option<DateTime<Utc>>,
2067 limit: Option<NonZeroUsize>,
2068 client_id: Option<ClientId>,
2069 params: Option<Params>,
2070 ) -> anyhow::Result<UUID4>
2071 where
2072 Self: 'static + Debug + Sized,
2073 {
2074 let actor_id = self.actor_id().inner();
2075 let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2076 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2077 });
2078
2079 DataActorCore::request_trades(
2080 self,
2081 instrument_id,
2082 start,
2083 end,
2084 limit,
2085 client_id,
2086 params,
2087 handler,
2088 )
2089 }
2090
2091 fn request_funding_rates(
2097 &mut self,
2098 instrument_id: InstrumentId,
2099 start: Option<DateTime<Utc>>,
2100 end: Option<DateTime<Utc>>,
2101 limit: Option<NonZeroUsize>,
2102 client_id: Option<ClientId>,
2103 params: Option<Params>,
2104 ) -> anyhow::Result<UUID4>
2105 where
2106 Self: 'static + Debug + Sized,
2107 {
2108 let actor_id = self.actor_id().inner();
2109 let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2110 get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2111 });
2112
2113 DataActorCore::request_funding_rates(
2114 self,
2115 instrument_id,
2116 start,
2117 end,
2118 limit,
2119 client_id,
2120 params,
2121 handler,
2122 )
2123 }
2124
2125 fn request_bars(
2131 &mut self,
2132 bar_type: BarType,
2133 start: Option<DateTime<Utc>>,
2134 end: Option<DateTime<Utc>>,
2135 limit: Option<NonZeroUsize>,
2136 client_id: Option<ClientId>,
2137 params: Option<Params>,
2138 ) -> anyhow::Result<UUID4>
2139 where
2140 Self: 'static + Debug + Sized,
2141 {
2142 let actor_id = self.actor_id().inner();
2143 let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2144 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2145 });
2146
2147 DataActorCore::request_bars(
2148 self, bar_type, start, end, limit, client_id, params, handler,
2149 )
2150 }
2151}
2152
2153impl<T> Actor for T
2155where
2156 T: DataActor + Debug + 'static,
2157{
2158 fn id(&self) -> Ustr {
2159 self.actor_id.inner()
2160 }
2161
2162 #[allow(unused_variables)]
2163 fn handle(&mut self, msg: &dyn Any) {
2164 }
2166
2167 fn as_any(&self) -> &dyn Any {
2168 self
2169 }
2170}
2171
2172impl<T> Component for T
2174where
2175 T: DataActor + Debug + 'static,
2176{
2177 fn component_id(&self) -> ComponentId {
2178 ComponentId::new(self.actor_id.inner().as_str())
2179 }
2180
2181 fn state(&self) -> ComponentState {
2182 self.state
2183 }
2184
2185 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2186 self.state = self.state.transition(&trigger)?;
2187 log::info!("{}", self.state.variant_name());
2188 Ok(())
2189 }
2190
2191 fn register(
2192 &mut self,
2193 trader_id: TraderId,
2194 clock: Rc<RefCell<dyn Clock>>,
2195 cache: Rc<RefCell<Cache>>,
2196 ) -> anyhow::Result<()> {
2197 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2198
2199 let actor_id = self.actor_id().inner();
2201 let callback = TimeEventCallback::from(move |event: TimeEvent| {
2202 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2203 actor.handle_time_event(&event);
2204 } else {
2205 log::error!("Actor {actor_id} not found for time event handling");
2206 }
2207 });
2208
2209 clock.borrow_mut().register_default_handler(callback);
2210
2211 self.initialize()
2212 }
2213
2214 fn on_start(&mut self) -> anyhow::Result<()> {
2215 DataActor::on_start(self)
2216 }
2217
2218 fn on_stop(&mut self) -> anyhow::Result<()> {
2219 DataActor::on_stop(self)
2220 }
2221
2222 fn on_resume(&mut self) -> anyhow::Result<()> {
2223 DataActor::on_resume(self)
2224 }
2225
2226 fn on_degrade(&mut self) -> anyhow::Result<()> {
2227 DataActor::on_degrade(self)
2228 }
2229
2230 fn on_fault(&mut self) -> anyhow::Result<()> {
2231 DataActor::on_fault(self)
2232 }
2233
2234 fn on_reset(&mut self) -> anyhow::Result<()> {
2235 DataActor::on_reset(self)
2236 }
2237
2238 fn on_dispose(&mut self) -> anyhow::Result<()> {
2239 DataActor::on_dispose(self)
2240 }
2241}
2242
2243#[derive(Clone)]
2245#[allow(
2246 dead_code,
2247 reason = "TODO: Under development (pending_requests, signal_classes)"
2248)]
2249pub struct DataActorCore {
2250 pub actor_id: ActorId,
2252 pub config: DataActorConfig,
2254 trader_id: Option<TraderId>,
2255 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2258 topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2259 instrument_handlers: AHashMap<MStr<Pattern>, TypedHandler<InstrumentAny>>,
2260 deltas_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDeltas>>,
2261 depth10_handlers: AHashMap<MStr<Pattern>, TypedHandler<OrderBookDepth10>>,
2262 book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2263 quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2264 trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2265 bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2266 mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2267 index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2268 funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2269 option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2270 option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2271 order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2272 #[cfg(feature = "defi")]
2273 block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2274 #[cfg(feature = "defi")]
2275 pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2276 #[cfg(feature = "defi")]
2277 pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2278 #[cfg(feature = "defi")]
2279 pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2280 #[cfg(feature = "defi")]
2281 pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2282 #[cfg(feature = "defi")]
2283 pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2284 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2286 signal_classes: AHashMap<String, String>,
2287 #[cfg(feature = "indicators")]
2288 indicators: Indicators,
2289}
2290
2291impl Debug for DataActorCore {
2292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2293 f.debug_struct(stringify!(DataActorCore))
2294 .field("actor_id", &self.actor_id)
2295 .field("config", &self.config)
2296 .field("state", &self.state)
2297 .field("trader_id", &self.trader_id)
2298 .finish()
2299 }
2300}
2301
2302impl DataActorCore {
2303 pub(crate) fn add_subscription_any(
2307 &mut self,
2308 topic: MStr<Topic>,
2309 handler: ShareableMessageHandler,
2310 ) {
2311 let pattern: MStr<Pattern> = topic.into();
2312 if self.topic_handlers.contains_key(&pattern) {
2313 log::warn!(
2314 "Actor {} attempted duplicate subscription to topic '{topic}'",
2315 self.actor_id,
2316 );
2317 return;
2318 }
2319
2320 self.topic_handlers.insert(pattern, handler.clone());
2321 msgbus::subscribe_any(pattern, handler, None);
2322 }
2323
2324 pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2328 let pattern: MStr<Pattern> = topic.into();
2329 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2330 msgbus::unsubscribe_any(pattern, &handler);
2331 } else {
2332 log::warn!(
2333 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2334 self.actor_id,
2335 );
2336 }
2337 }
2338
2339 pub(crate) fn add_quote_subscription(
2340 &mut self,
2341 topic: MStr<Topic>,
2342 handler: TypedHandler<QuoteTick>,
2343 ) {
2344 if self.quote_handlers.contains_key(&topic) {
2345 log::warn!(
2346 "Actor {} attempted duplicate quote subscription to '{topic}'",
2347 self.actor_id
2348 );
2349 return;
2350 }
2351 self.quote_handlers.insert(topic, handler.clone());
2352 msgbus::subscribe_quotes(topic.into(), handler, None);
2353 }
2354
2355 #[allow(dead_code)]
2356 pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2357 if let Some(handler) = self.quote_handlers.remove(&topic) {
2358 msgbus::unsubscribe_quotes(topic.into(), &handler);
2359 }
2360 }
2361
2362 pub(crate) fn add_trade_subscription(
2363 &mut self,
2364 topic: MStr<Topic>,
2365 handler: TypedHandler<TradeTick>,
2366 ) {
2367 if self.trade_handlers.contains_key(&topic) {
2368 log::warn!(
2369 "Actor {} attempted duplicate trade subscription to '{topic}'",
2370 self.actor_id
2371 );
2372 return;
2373 }
2374 self.trade_handlers.insert(topic, handler.clone());
2375 msgbus::subscribe_trades(topic.into(), handler, None);
2376 }
2377
2378 #[allow(dead_code)]
2379 pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2380 if let Some(handler) = self.trade_handlers.remove(&topic) {
2381 msgbus::unsubscribe_trades(topic.into(), &handler);
2382 }
2383 }
2384
2385 pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2386 if self.bar_handlers.contains_key(&topic) {
2387 log::warn!(
2388 "Actor {} attempted duplicate bar subscription to '{topic}'",
2389 self.actor_id
2390 );
2391 return;
2392 }
2393 self.bar_handlers.insert(topic, handler.clone());
2394 msgbus::subscribe_bars(topic.into(), handler, None);
2395 }
2396
2397 #[allow(dead_code)]
2398 pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2399 if let Some(handler) = self.bar_handlers.remove(&topic) {
2400 msgbus::unsubscribe_bars(topic.into(), &handler);
2401 }
2402 }
2403
2404 pub(crate) fn add_order_event_subscription(
2405 &mut self,
2406 topic: MStr<Topic>,
2407 handler: TypedHandler<OrderEventAny>,
2408 ) {
2409 if self.order_event_handlers.contains_key(&topic) {
2410 log::warn!(
2411 "Actor {} attempted duplicate order event subscription to '{topic}'",
2412 self.actor_id
2413 );
2414 return;
2415 }
2416 self.order_event_handlers.insert(topic, handler.clone());
2417 msgbus::subscribe_order_events(topic.into(), handler, None);
2418 }
2419
2420 #[allow(dead_code)]
2421 pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2422 if let Some(handler) = self.order_event_handlers.remove(&topic) {
2423 msgbus::unsubscribe_order_events(topic.into(), &handler);
2424 }
2425 }
2426
2427 pub(crate) fn add_deltas_subscription(
2428 &mut self,
2429 pattern: MStr<Pattern>,
2430 handler: TypedHandler<OrderBookDeltas>,
2431 ) {
2432 if self.deltas_handlers.contains_key(&pattern) {
2433 log::warn!(
2434 "Actor {} attempted duplicate deltas subscription to '{pattern}'",
2435 self.actor_id
2436 );
2437 return;
2438 }
2439 self.deltas_handlers.insert(pattern, handler.clone());
2440 msgbus::subscribe_book_deltas(pattern, handler, None);
2441 }
2442
2443 #[allow(dead_code)]
2444 pub(crate) fn remove_deltas_subscription(&mut self, pattern: MStr<Pattern>) {
2445 if let Some(handler) = self.deltas_handlers.remove(&pattern) {
2446 msgbus::unsubscribe_book_deltas(pattern, &handler);
2447 }
2448 }
2449
2450 #[allow(dead_code)]
2451 pub(crate) fn add_depth10_subscription(
2452 &mut self,
2453 pattern: MStr<Pattern>,
2454 handler: TypedHandler<OrderBookDepth10>,
2455 ) {
2456 if self.depth10_handlers.contains_key(&pattern) {
2457 log::warn!(
2458 "Actor {} attempted duplicate depth10 subscription to '{pattern}'",
2459 self.actor_id
2460 );
2461 return;
2462 }
2463 self.depth10_handlers.insert(pattern, handler.clone());
2464 msgbus::subscribe_book_depth10(pattern, handler, None);
2465 }
2466
2467 #[allow(dead_code)]
2468 pub(crate) fn remove_depth10_subscription(&mut self, pattern: MStr<Pattern>) {
2469 if let Some(handler) = self.depth10_handlers.remove(&pattern) {
2470 msgbus::unsubscribe_book_depth10(pattern, &handler);
2471 }
2472 }
2473
2474 pub(crate) fn add_instrument_subscription(
2475 &mut self,
2476 pattern: MStr<Pattern>,
2477 handler: TypedHandler<InstrumentAny>,
2478 ) {
2479 if self.instrument_handlers.contains_key(&pattern) {
2480 log::warn!(
2481 "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2482 self.actor_id
2483 );
2484 return;
2485 }
2486 self.instrument_handlers.insert(pattern, handler.clone());
2487 msgbus::subscribe_instruments(pattern, handler, None);
2488 }
2489
2490 #[allow(dead_code)]
2491 pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2492 if let Some(handler) = self.instrument_handlers.remove(&pattern) {
2493 msgbus::unsubscribe_instruments(pattern, &handler);
2494 }
2495 }
2496
2497 pub(crate) fn add_instrument_close_subscription(
2498 &mut self,
2499 topic: MStr<Topic>,
2500 handler: ShareableMessageHandler,
2501 ) {
2502 let pattern: MStr<Pattern> = topic.into();
2503 if self.topic_handlers.contains_key(&pattern) {
2504 log::warn!(
2505 "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2506 self.actor_id
2507 );
2508 return;
2509 }
2510 self.topic_handlers.insert(pattern, handler.clone());
2511 msgbus::subscribe_any(pattern, handler, None);
2512 }
2513
2514 #[allow(dead_code)]
2515 pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2516 let pattern: MStr<Pattern> = topic.into();
2517 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2518 msgbus::unsubscribe_any(pattern, &handler);
2519 }
2520 }
2521
2522 pub(crate) fn add_book_snapshot_subscription(
2523 &mut self,
2524 topic: MStr<Topic>,
2525 handler: TypedHandler<OrderBook>,
2526 ) {
2527 if self.book_handlers.contains_key(&topic) {
2528 log::warn!(
2529 "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2530 self.actor_id
2531 );
2532 return;
2533 }
2534 self.book_handlers.insert(topic, handler.clone());
2535 msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2536 }
2537
2538 #[allow(dead_code)]
2539 pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2540 if let Some(handler) = self.book_handlers.remove(&topic) {
2541 msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2542 }
2543 }
2544
2545 pub(crate) fn add_mark_price_subscription(
2546 &mut self,
2547 topic: MStr<Topic>,
2548 handler: TypedHandler<MarkPriceUpdate>,
2549 ) {
2550 if self.mark_price_handlers.contains_key(&topic) {
2551 log::warn!(
2552 "Actor {} attempted duplicate mark price subscription to '{topic}'",
2553 self.actor_id
2554 );
2555 return;
2556 }
2557 self.mark_price_handlers.insert(topic, handler.clone());
2558 msgbus::subscribe_mark_prices(topic.into(), handler, None);
2559 }
2560
2561 #[allow(dead_code)]
2562 pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2563 if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2564 msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2565 }
2566 }
2567
2568 pub(crate) fn add_index_price_subscription(
2569 &mut self,
2570 topic: MStr<Topic>,
2571 handler: TypedHandler<IndexPriceUpdate>,
2572 ) {
2573 if self.index_price_handlers.contains_key(&topic) {
2574 log::warn!(
2575 "Actor {} attempted duplicate index price subscription to '{topic}'",
2576 self.actor_id
2577 );
2578 return;
2579 }
2580 self.index_price_handlers.insert(topic, handler.clone());
2581 msgbus::subscribe_index_prices(topic.into(), handler, None);
2582 }
2583
2584 #[allow(dead_code)]
2585 pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2586 if let Some(handler) = self.index_price_handlers.remove(&topic) {
2587 msgbus::unsubscribe_index_prices(topic.into(), &handler);
2588 }
2589 }
2590
2591 pub(crate) fn add_funding_rate_subscription(
2592 &mut self,
2593 topic: MStr<Topic>,
2594 handler: TypedHandler<FundingRateUpdate>,
2595 ) {
2596 if self.funding_rate_handlers.contains_key(&topic) {
2597 log::warn!(
2598 "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2599 self.actor_id
2600 );
2601 return;
2602 }
2603 self.funding_rate_handlers.insert(topic, handler.clone());
2604 msgbus::subscribe_funding_rates(topic.into(), handler, None);
2605 }
2606
2607 #[allow(dead_code)]
2608 pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2609 if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2610 msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2611 }
2612 }
2613
2614 pub(crate) fn add_option_greeks_subscription(
2615 &mut self,
2616 topic: MStr<Topic>,
2617 handler: TypedHandler<OptionGreeks>,
2618 ) {
2619 if self.option_greeks_handlers.contains_key(&topic) {
2620 log::warn!(
2621 "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2622 self.actor_id
2623 );
2624 return;
2625 }
2626 self.option_greeks_handlers.insert(topic, handler.clone());
2627 msgbus::subscribe_option_greeks(topic.into(), handler, None);
2628 }
2629
2630 #[allow(dead_code)]
2631 pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2632 if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2633 msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2634 }
2635 }
2636
2637 pub(crate) fn add_option_chain_subscription(
2638 &mut self,
2639 topic: MStr<Topic>,
2640 handler: TypedHandler<OptionChainSlice>,
2641 ) {
2642 if self.option_chain_handlers.contains_key(&topic) {
2643 log::warn!(
2644 "Actor {} attempted duplicate option chain subscription to '{topic}'",
2645 self.actor_id
2646 );
2647 return;
2648 }
2649 self.option_chain_handlers.insert(topic, handler.clone());
2650 msgbus::subscribe_option_chain(topic.into(), handler, None);
2651 }
2652
2653 pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2654 if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2655 msgbus::unsubscribe_option_chain(topic.into(), &handler);
2656 }
2657 }
2658
2659 #[cfg(feature = "defi")]
2660 pub(crate) fn add_block_subscription(
2661 &mut self,
2662 topic: MStr<Topic>,
2663 handler: TypedHandler<Block>,
2664 ) {
2665 if self.block_handlers.contains_key(&topic) {
2666 log::warn!(
2667 "Actor {} attempted duplicate block subscription to '{topic}'",
2668 self.actor_id
2669 );
2670 return;
2671 }
2672 self.block_handlers.insert(topic, handler.clone());
2673 msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2674 }
2675
2676 #[cfg(feature = "defi")]
2677 #[allow(dead_code)]
2678 pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2679 if let Some(handler) = self.block_handlers.remove(&topic) {
2680 msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2681 }
2682 }
2683
2684 #[cfg(feature = "defi")]
2685 pub(crate) fn add_pool_subscription(
2686 &mut self,
2687 topic: MStr<Topic>,
2688 handler: TypedHandler<Pool>,
2689 ) {
2690 if self.pool_handlers.contains_key(&topic) {
2691 log::warn!(
2692 "Actor {} attempted duplicate pool subscription to '{topic}'",
2693 self.actor_id
2694 );
2695 return;
2696 }
2697 self.pool_handlers.insert(topic, handler.clone());
2698 msgbus::subscribe_defi_pools(topic.into(), handler, None);
2699 }
2700
2701 #[cfg(feature = "defi")]
2702 #[allow(dead_code)]
2703 pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2704 if let Some(handler) = self.pool_handlers.remove(&topic) {
2705 msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2706 }
2707 }
2708
2709 #[cfg(feature = "defi")]
2710 pub(crate) fn add_pool_swap_subscription(
2711 &mut self,
2712 topic: MStr<Topic>,
2713 handler: TypedHandler<PoolSwap>,
2714 ) {
2715 if self.pool_swap_handlers.contains_key(&topic) {
2716 log::warn!(
2717 "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2718 self.actor_id
2719 );
2720 return;
2721 }
2722 self.pool_swap_handlers.insert(topic, handler.clone());
2723 msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2724 }
2725
2726 #[cfg(feature = "defi")]
2727 #[allow(dead_code)]
2728 pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2729 if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2730 msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2731 }
2732 }
2733
2734 #[cfg(feature = "defi")]
2735 pub(crate) fn add_pool_liquidity_subscription(
2736 &mut self,
2737 topic: MStr<Topic>,
2738 handler: TypedHandler<PoolLiquidityUpdate>,
2739 ) {
2740 if self.pool_liquidity_handlers.contains_key(&topic) {
2741 log::warn!(
2742 "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2743 self.actor_id
2744 );
2745 return;
2746 }
2747 self.pool_liquidity_handlers.insert(topic, handler.clone());
2748 msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2749 }
2750
2751 #[cfg(feature = "defi")]
2752 #[allow(dead_code)]
2753 pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2754 if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2755 msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2756 }
2757 }
2758
2759 #[cfg(feature = "defi")]
2760 pub(crate) fn add_pool_collect_subscription(
2761 &mut self,
2762 topic: MStr<Topic>,
2763 handler: TypedHandler<PoolFeeCollect>,
2764 ) {
2765 if self.pool_collect_handlers.contains_key(&topic) {
2766 log::warn!(
2767 "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2768 self.actor_id
2769 );
2770 return;
2771 }
2772 self.pool_collect_handlers.insert(topic, handler.clone());
2773 msgbus::subscribe_defi_collects(topic.into(), handler, None);
2774 }
2775
2776 #[cfg(feature = "defi")]
2777 #[allow(dead_code)]
2778 pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2779 if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2780 msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2781 }
2782 }
2783
2784 #[cfg(feature = "defi")]
2785 pub(crate) fn add_pool_flash_subscription(
2786 &mut self,
2787 topic: MStr<Topic>,
2788 handler: TypedHandler<PoolFlash>,
2789 ) {
2790 if self.pool_flash_handlers.contains_key(&topic) {
2791 log::warn!(
2792 "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2793 self.actor_id
2794 );
2795 return;
2796 }
2797 self.pool_flash_handlers.insert(topic, handler.clone());
2798 msgbus::subscribe_defi_flash(topic.into(), handler, None);
2799 }
2800
2801 #[cfg(feature = "defi")]
2802 #[allow(dead_code)]
2803 pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2804 if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2805 msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2806 }
2807 }
2808
2809 pub fn new(config: DataActorConfig) -> Self {
2811 let actor_id = config
2812 .actor_id
2813 .unwrap_or_else(|| Self::default_actor_id(&config));
2814
2815 Self {
2816 actor_id,
2817 config,
2818 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2822 topic_handlers: AHashMap::new(),
2823 instrument_handlers: AHashMap::new(),
2824 deltas_handlers: AHashMap::new(),
2825 depth10_handlers: AHashMap::new(),
2826 book_handlers: AHashMap::new(),
2827 quote_handlers: AHashMap::new(),
2828 trade_handlers: AHashMap::new(),
2829 bar_handlers: AHashMap::new(),
2830 mark_price_handlers: AHashMap::new(),
2831 index_price_handlers: AHashMap::new(),
2832 funding_rate_handlers: AHashMap::new(),
2833 option_greeks_handlers: AHashMap::new(),
2834 option_chain_handlers: AHashMap::new(),
2835 order_event_handlers: AHashMap::new(),
2836 #[cfg(feature = "defi")]
2837 block_handlers: AHashMap::new(),
2838 #[cfg(feature = "defi")]
2839 pool_handlers: AHashMap::new(),
2840 #[cfg(feature = "defi")]
2841 pool_swap_handlers: AHashMap::new(),
2842 #[cfg(feature = "defi")]
2843 pool_liquidity_handlers: AHashMap::new(),
2844 #[cfg(feature = "defi")]
2845 pool_collect_handlers: AHashMap::new(),
2846 #[cfg(feature = "defi")]
2847 pool_flash_handlers: AHashMap::new(),
2848 warning_events: AHashSet::new(),
2849 pending_requests: AHashMap::new(),
2850 signal_classes: AHashMap::new(),
2851 #[cfg(feature = "indicators")]
2852 indicators: Indicators::default(),
2853 }
2854 }
2855
2856 #[must_use]
2858 pub fn mem_address(&self) -> String {
2859 format!("{self:p}")
2860 }
2861
2862 pub fn state(&self) -> ComponentState {
2864 self.state
2865 }
2866
2867 pub fn trader_id(&self) -> Option<TraderId> {
2869 self.trader_id
2870 }
2871
2872 pub fn actor_id(&self) -> ActorId {
2874 self.actor_id
2875 }
2876
2877 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2878 let memory_address = std::ptr::from_ref(config) as usize;
2879 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2880 }
2881
2882 pub fn timestamp_ns(&self) -> UnixNanos {
2884 self.clock_ref().timestamp_ns()
2885 }
2886
2887 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2893 self.clock
2894 .as_ref()
2895 .unwrap_or_else(|| {
2896 panic!(
2897 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2898 self.actor_id, self.trader_id
2899 )
2900 })
2901 .borrow_mut()
2902 }
2903
2904 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2910 self.clock
2911 .as_ref()
2912 .expect("DataActor must be registered before accessing clock")
2913 .clone()
2914 }
2915
2916 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2917 self.clock
2918 .as_ref()
2919 .unwrap_or_else(|| {
2920 panic!(
2921 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2922 self.actor_id, self.trader_id
2923 )
2924 })
2925 .borrow()
2926 }
2927
2928 pub fn cache(&self) -> Ref<'_, Cache> {
2934 self.cache
2935 .as_ref()
2936 .expect("DataActor must be registered before accessing cache")
2937 .borrow()
2938 }
2939
2940 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2946 self.cache
2947 .as_ref()
2948 .expect("DataActor must be registered before accessing cache")
2949 .clone()
2950 }
2951
2952 pub fn register(
2959 &mut self,
2960 trader_id: TraderId,
2961 clock: Rc<RefCell<dyn Clock>>,
2962 cache: Rc<RefCell<Cache>>,
2963 ) -> anyhow::Result<()> {
2964 if let Some(existing_trader_id) = self.trader_id {
2965 anyhow::bail!(
2966 "DataActor {} already registered with trader {existing_trader_id}",
2967 self.actor_id
2968 );
2969 }
2970
2971 {
2973 let _timestamp = clock.borrow().timestamp_ns();
2974 }
2975
2976 {
2978 let _cache_borrow = cache.borrow();
2979 }
2980
2981 self.trader_id = Some(trader_id);
2982 self.clock = Some(clock);
2983 self.cache = Some(cache);
2984
2985 if !self.is_properly_registered() {
2987 anyhow::bail!(
2988 "DataActor {} registration incomplete - validation failed",
2989 self.actor_id
2990 );
2991 }
2992
2993 log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2994 Ok(())
2995 }
2996
2997 pub fn register_warning_event(&mut self, event_type: &str) {
2999 self.warning_events.insert(event_type.to_string());
3000 log::debug!("Registered event type '{event_type}' for warning logs");
3001 }
3002
3003 pub fn deregister_warning_event(&mut self, event_type: &str) {
3005 self.warning_events.remove(event_type);
3006 log::debug!("Deregistered event type '{event_type}' from warning logs");
3007 }
3008
3009 pub fn is_registered(&self) -> bool {
3010 self.trader_id.is_some()
3011 }
3012
3013 pub(crate) fn check_registered(&self) {
3014 assert!(
3015 self.is_registered(),
3016 "Actor has not been registered with a Trader"
3017 );
3018 }
3019
3020 fn is_properly_registered(&self) -> bool {
3022 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
3023 }
3024
3025 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
3026 if self.config.log_commands {
3027 log::info!("{CMD}{SEND} {command:?}");
3028 }
3029
3030 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3031 msgbus::send_data_command(endpoint, command);
3032 }
3033
3034 #[allow(dead_code)]
3035 fn send_data_req(&self, request: &RequestCommand) {
3036 if self.config.log_commands {
3037 log::info!("{REQ}{SEND} {request:?}");
3038 }
3039
3040 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3043 msgbus::send_any(endpoint, request.as_any());
3044 }
3045
3046 pub fn shutdown_system(&self, reason: Option<String>) {
3052 self.check_registered();
3053
3054 let command = ShutdownSystem::new(
3056 self.trader_id().unwrap(),
3057 self.actor_id.inner(),
3058 reason,
3059 UUID4::new(),
3060 self.timestamp_ns(),
3061 );
3062
3063 let topic = MessagingSwitchboard::shutdown_system_topic();
3064 msgbus::publish_any(topic, command.as_any());
3065 }
3066
3067 pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3077 self.check_registered();
3078
3079 let topic = get_custom_topic(data_type);
3080 msgbus::publish_any(topic, data);
3081 }
3082
3083 pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3095 self.check_registered();
3096
3097 let now = self.timestamp_ns();
3098 let ts_event = if ts_event.as_u64() == 0 {
3099 now
3100 } else {
3101 ts_event
3102 };
3103 let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3104
3105 let data_type = DataType::new(
3106 &format!(
3107 "Signal{}",
3108 nautilus_core::string::conversions::title_case(name)
3109 ),
3110 None,
3111 None,
3112 );
3113 let data = CustomData::new(Arc::new(signal), data_type);
3114 let topic = get_custom_topic(&data.data_type);
3115 msgbus::publish_any(topic, &data);
3116 }
3117
3118 pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3126 self.check_registered();
3127
3128 let cache = self.cache_rc();
3129 if cache.borrow().synthetic(&synthetic.id).is_some() {
3130 anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3131 }
3132 cache.borrow_mut().add_synthetic(synthetic)
3133 }
3134
3135 pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3143 self.check_registered();
3144
3145 let cache = self.cache_rc();
3146 if cache.borrow().synthetic(&synthetic.id).is_none() {
3147 anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3148 }
3149 cache.borrow_mut().add_synthetic(synthetic)
3150 }
3151
3152 pub fn subscribe_data(
3158 &mut self,
3159 handler: ShareableMessageHandler,
3160 data_type: DataType,
3161 client_id: Option<ClientId>,
3162 params: Option<Params>,
3163 ) {
3164 assert!(
3165 self.is_properly_registered(),
3166 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3167 self.actor_id,
3168 self.trader_id,
3169 self.clock.is_some(),
3170 self.cache.is_some()
3171 );
3172
3173 let topic = get_custom_topic(&data_type);
3174 self.add_subscription_any(topic, handler);
3175
3176 if client_id.is_none() {
3178 return;
3179 }
3180
3181 let command = SubscribeCommand::Data(SubscribeCustomData {
3182 data_type,
3183 client_id,
3184 venue: None,
3185 command_id: UUID4::new(),
3186 ts_init: self.timestamp_ns(),
3187 correlation_id: None,
3188 params,
3189 });
3190
3191 self.send_data_cmd(DataCommand::Subscribe(command));
3192 }
3193
3194 pub fn subscribe_signal(
3202 &mut self,
3203 handler: ShareableMessageHandler,
3204 name: &str,
3205 priority: Option<u32>,
3206 ) {
3207 self.check_registered();
3208
3209 let pattern = get_signal_pattern(name);
3210 if self.topic_handlers.contains_key(&pattern) {
3211 log::warn!(
3212 "Actor {} attempted duplicate signal subscription to '{pattern}'",
3213 self.actor_id,
3214 );
3215 return;
3216 }
3217 self.topic_handlers.insert(pattern, handler.clone());
3218 msgbus::subscribe_any(pattern, handler, priority);
3219 }
3220
3221 pub fn subscribe_quotes(
3223 &mut self,
3224 topic: MStr<Topic>,
3225 handler: TypedHandler<QuoteTick>,
3226 instrument_id: InstrumentId,
3227 client_id: Option<ClientId>,
3228 params: Option<Params>,
3229 ) {
3230 self.check_registered();
3231
3232 self.add_quote_subscription(topic, handler);
3233
3234 let command = SubscribeCommand::Quotes(SubscribeQuotes {
3235 instrument_id,
3236 client_id,
3237 venue: Some(instrument_id.venue),
3238 command_id: UUID4::new(),
3239 ts_init: self.timestamp_ns(),
3240 correlation_id: None,
3241 params,
3242 });
3243
3244 self.send_data_cmd(DataCommand::Subscribe(command));
3245 }
3246
3247 pub fn subscribe_instruments(
3249 &mut self,
3250 pattern: MStr<Pattern>,
3251 handler: TypedHandler<InstrumentAny>,
3252 venue: Venue,
3253 client_id: Option<ClientId>,
3254 params: Option<Params>,
3255 ) {
3256 self.check_registered();
3257
3258 self.add_instrument_subscription(pattern, handler);
3259
3260 let command = SubscribeCommand::Instruments(SubscribeInstruments {
3261 client_id,
3262 venue,
3263 command_id: UUID4::new(),
3264 ts_init: self.timestamp_ns(),
3265 correlation_id: None,
3266 params,
3267 });
3268
3269 self.send_data_cmd(DataCommand::Subscribe(command));
3270 }
3271
3272 pub fn subscribe_instrument(
3274 &mut self,
3275 topic: MStr<Topic>,
3276 handler: TypedHandler<InstrumentAny>,
3277 instrument_id: InstrumentId,
3278 client_id: Option<ClientId>,
3279 params: Option<Params>,
3280 ) {
3281 self.check_registered();
3282
3283 self.add_instrument_subscription(topic.into(), handler);
3284
3285 let command = SubscribeCommand::Instrument(SubscribeInstrument {
3286 instrument_id,
3287 client_id,
3288 venue: Some(instrument_id.venue),
3289 command_id: UUID4::new(),
3290 ts_init: self.timestamp_ns(),
3291 correlation_id: None,
3292 params,
3293 });
3294
3295 self.send_data_cmd(DataCommand::Subscribe(command));
3296 }
3297
3298 #[expect(clippy::too_many_arguments)]
3300 pub fn subscribe_book_deltas(
3301 &mut self,
3302 pattern: MStr<Pattern>,
3303 handler: TypedHandler<OrderBookDeltas>,
3304 instrument_id: InstrumentId,
3305 book_type: BookType,
3306 depth: Option<NonZeroUsize>,
3307 client_id: Option<ClientId>,
3308 managed: bool,
3309 params: Option<Params>,
3310 ) {
3311 self.check_registered();
3312
3313 self.add_deltas_subscription(pattern, handler);
3314
3315 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3316 instrument_id,
3317 book_type,
3318 client_id,
3319 venue: Some(instrument_id.venue),
3320 command_id: UUID4::new(),
3321 ts_init: self.timestamp_ns(),
3322 depth,
3323 managed,
3324 correlation_id: None,
3325 params,
3326 });
3327
3328 self.send_data_cmd(DataCommand::Subscribe(command));
3329 }
3330
3331 #[expect(clippy::too_many_arguments)]
3333 pub fn subscribe_book_at_interval(
3334 &mut self,
3335 topic: MStr<Topic>,
3336 handler: TypedHandler<OrderBook>,
3337 instrument_id: InstrumentId,
3338 book_type: BookType,
3339 depth: Option<NonZeroUsize>,
3340 interval_ms: NonZeroUsize,
3341 client_id: Option<ClientId>,
3342 params: Option<Params>,
3343 ) {
3344 self.check_registered();
3345
3346 self.add_book_snapshot_subscription(topic, handler);
3347
3348 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3349 instrument_id,
3350 book_type,
3351 client_id,
3352 venue: Some(instrument_id.venue),
3353 command_id: UUID4::new(),
3354 ts_init: self.timestamp_ns(),
3355 depth,
3356 interval_ms,
3357 correlation_id: None,
3358 params,
3359 });
3360
3361 self.send_data_cmd(DataCommand::Subscribe(command));
3362 }
3363
3364 pub fn subscribe_trades(
3366 &mut self,
3367 topic: MStr<Topic>,
3368 handler: TypedHandler<TradeTick>,
3369 instrument_id: InstrumentId,
3370 client_id: Option<ClientId>,
3371 params: Option<Params>,
3372 ) {
3373 self.check_registered();
3374
3375 self.add_trade_subscription(topic, handler);
3376
3377 let command = SubscribeCommand::Trades(SubscribeTrades {
3378 instrument_id,
3379 client_id,
3380 venue: Some(instrument_id.venue),
3381 command_id: UUID4::new(),
3382 ts_init: self.timestamp_ns(),
3383 correlation_id: None,
3384 params,
3385 });
3386
3387 self.send_data_cmd(DataCommand::Subscribe(command));
3388 }
3389
3390 pub fn subscribe_bars(
3392 &mut self,
3393 topic: MStr<Topic>,
3394 handler: TypedHandler<Bar>,
3395 bar_type: BarType,
3396 client_id: Option<ClientId>,
3397 params: Option<Params>,
3398 ) {
3399 self.check_registered();
3400
3401 self.add_bar_subscription(topic, handler);
3402
3403 let command = SubscribeCommand::Bars(SubscribeBars {
3404 bar_type,
3405 client_id,
3406 venue: Some(bar_type.instrument_id().venue),
3407 command_id: UUID4::new(),
3408 ts_init: self.timestamp_ns(),
3409 correlation_id: None,
3410 params,
3411 });
3412
3413 self.send_data_cmd(DataCommand::Subscribe(command));
3414 }
3415
3416 pub fn subscribe_mark_prices(
3418 &mut self,
3419 topic: MStr<Topic>,
3420 handler: TypedHandler<MarkPriceUpdate>,
3421 instrument_id: InstrumentId,
3422 client_id: Option<ClientId>,
3423 params: Option<Params>,
3424 ) {
3425 self.check_registered();
3426
3427 self.add_mark_price_subscription(topic, handler);
3428
3429 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3430 instrument_id,
3431 client_id,
3432 venue: Some(instrument_id.venue),
3433 command_id: UUID4::new(),
3434 ts_init: self.timestamp_ns(),
3435 correlation_id: None,
3436 params,
3437 });
3438
3439 self.send_data_cmd(DataCommand::Subscribe(command));
3440 }
3441
3442 pub fn subscribe_index_prices(
3444 &mut self,
3445 topic: MStr<Topic>,
3446 handler: TypedHandler<IndexPriceUpdate>,
3447 instrument_id: InstrumentId,
3448 client_id: Option<ClientId>,
3449 params: Option<Params>,
3450 ) {
3451 self.check_registered();
3452
3453 self.add_index_price_subscription(topic, handler);
3454
3455 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3456 instrument_id,
3457 client_id,
3458 venue: Some(instrument_id.venue),
3459 command_id: UUID4::new(),
3460 ts_init: self.timestamp_ns(),
3461 correlation_id: None,
3462 params,
3463 });
3464
3465 self.send_data_cmd(DataCommand::Subscribe(command));
3466 }
3467
3468 pub fn subscribe_funding_rates(
3470 &mut self,
3471 topic: MStr<Topic>,
3472 handler: TypedHandler<FundingRateUpdate>,
3473 instrument_id: InstrumentId,
3474 client_id: Option<ClientId>,
3475 params: Option<Params>,
3476 ) {
3477 self.check_registered();
3478
3479 self.add_funding_rate_subscription(topic, handler);
3480
3481 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3482 instrument_id,
3483 client_id,
3484 venue: Some(instrument_id.venue),
3485 command_id: UUID4::new(),
3486 ts_init: self.timestamp_ns(),
3487 correlation_id: None,
3488 params,
3489 });
3490
3491 self.send_data_cmd(DataCommand::Subscribe(command));
3492 }
3493
3494 pub fn subscribe_option_greeks(
3496 &mut self,
3497 topic: MStr<Topic>,
3498 handler: TypedHandler<OptionGreeks>,
3499 instrument_id: InstrumentId,
3500 client_id: Option<ClientId>,
3501 params: Option<Params>,
3502 ) {
3503 self.check_registered();
3504
3505 self.add_option_greeks_subscription(topic, handler);
3506
3507 let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3508 instrument_id,
3509 client_id,
3510 venue: Some(instrument_id.venue),
3511 command_id: UUID4::new(),
3512 ts_init: self.timestamp_ns(),
3513 correlation_id: None,
3514 params,
3515 });
3516
3517 self.send_data_cmd(DataCommand::Subscribe(command));
3518 }
3519
3520 pub fn subscribe_instrument_status(
3522 &mut self,
3523 topic: MStr<Topic>,
3524 handler: ShareableMessageHandler,
3525 instrument_id: InstrumentId,
3526 client_id: Option<ClientId>,
3527 params: Option<Params>,
3528 ) {
3529 self.check_registered();
3530
3531 self.add_subscription_any(topic, handler);
3532
3533 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3534 instrument_id,
3535 client_id,
3536 venue: Some(instrument_id.venue),
3537 command_id: UUID4::new(),
3538 ts_init: self.timestamp_ns(),
3539 correlation_id: None,
3540 params,
3541 });
3542
3543 self.send_data_cmd(DataCommand::Subscribe(command));
3544 }
3545
3546 pub fn subscribe_instrument_close(
3548 &mut self,
3549 topic: MStr<Topic>,
3550 handler: ShareableMessageHandler,
3551 instrument_id: InstrumentId,
3552 client_id: Option<ClientId>,
3553 params: Option<Params>,
3554 ) {
3555 self.check_registered();
3556
3557 self.add_instrument_close_subscription(topic, handler);
3558
3559 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3560 instrument_id,
3561 client_id,
3562 venue: Some(instrument_id.venue),
3563 command_id: UUID4::new(),
3564 ts_init: self.timestamp_ns(),
3565 correlation_id: None,
3566 params,
3567 });
3568
3569 self.send_data_cmd(DataCommand::Subscribe(command));
3570 }
3571
3572 #[allow(clippy::too_many_arguments)]
3574 pub fn subscribe_option_chain(
3575 &mut self,
3576 topic: MStr<Topic>,
3577 handler: TypedHandler<OptionChainSlice>,
3578 series_id: OptionSeriesId,
3579 strike_range: StrikeRange,
3580 snapshot_interval_ms: Option<u64>,
3581 client_id: Option<ClientId>,
3582 params: Option<Params>,
3583 ) {
3584 self.check_registered();
3585
3586 self.add_option_chain_subscription(topic, handler);
3587
3588 let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3589 series_id,
3590 strike_range,
3591 snapshot_interval_ms,
3592 UUID4::new(),
3593 self.timestamp_ns(),
3594 client_id,
3595 Some(series_id.venue),
3596 params,
3597 ));
3598
3599 self.send_data_cmd(DataCommand::Subscribe(command));
3600 }
3601
3602 pub fn subscribe_order_fills(
3604 &mut self,
3605 topic: MStr<Topic>,
3606 handler: TypedHandler<OrderEventAny>,
3607 ) {
3608 self.check_registered();
3609 self.add_order_event_subscription(topic, handler);
3610 }
3611
3612 pub fn subscribe_order_cancels(
3614 &mut self,
3615 topic: MStr<Topic>,
3616 handler: TypedHandler<OrderEventAny>,
3617 ) {
3618 self.check_registered();
3619 self.add_order_event_subscription(topic, handler);
3620 }
3621
3622 pub fn unsubscribe_data(
3624 &mut self,
3625 data_type: DataType,
3626 client_id: Option<ClientId>,
3627 params: Option<Params>,
3628 ) {
3629 self.check_registered();
3630
3631 let topic = get_custom_topic(&data_type);
3632 self.remove_subscription_any(topic);
3633
3634 if client_id.is_none() {
3635 return;
3636 }
3637
3638 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3639 data_type,
3640 client_id,
3641 venue: None,
3642 command_id: UUID4::new(),
3643 ts_init: self.timestamp_ns(),
3644 correlation_id: None,
3645 params,
3646 });
3647
3648 self.send_data_cmd(DataCommand::Unsubscribe(command));
3649 }
3650
3651 pub fn unsubscribe_signal(&mut self, name: &str) {
3657 self.check_registered();
3658
3659 let pattern = get_signal_pattern(name);
3660 if let Some(handler) = self.topic_handlers.remove(&pattern) {
3661 msgbus::unsubscribe_any(pattern, &handler);
3662 } else {
3663 log::warn!(
3664 "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
3665 self.actor_id,
3666 );
3667 }
3668 }
3669
3670 pub fn unsubscribe_instruments(
3672 &mut self,
3673 venue: Venue,
3674 client_id: Option<ClientId>,
3675 params: Option<Params>,
3676 ) {
3677 self.check_registered();
3678
3679 let pattern = get_instruments_pattern(venue);
3680 self.remove_instrument_subscription(pattern);
3681
3682 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3683 client_id,
3684 venue,
3685 command_id: UUID4::new(),
3686 ts_init: self.timestamp_ns(),
3687 correlation_id: None,
3688 params,
3689 });
3690
3691 self.send_data_cmd(DataCommand::Unsubscribe(command));
3692 }
3693
3694 pub fn unsubscribe_instrument(
3696 &mut self,
3697 instrument_id: InstrumentId,
3698 client_id: Option<ClientId>,
3699 params: Option<Params>,
3700 ) {
3701 self.check_registered();
3702
3703 let topic = get_instrument_topic(instrument_id);
3704 self.remove_instrument_subscription(topic.into());
3705
3706 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3707 instrument_id,
3708 client_id,
3709 venue: Some(instrument_id.venue),
3710 command_id: UUID4::new(),
3711 ts_init: self.timestamp_ns(),
3712 correlation_id: None,
3713 params,
3714 });
3715
3716 self.send_data_cmd(DataCommand::Unsubscribe(command));
3717 }
3718
3719 pub fn unsubscribe_book_deltas(
3721 &mut self,
3722 instrument_id: InstrumentId,
3723 client_id: Option<ClientId>,
3724 params: Option<Params>,
3725 ) {
3726 self.check_registered();
3727
3728 let pattern = if is_parent_subscription(params.as_ref()) {
3729 get_book_deltas_pattern(instrument_id)
3730 } else {
3731 get_book_deltas_topic(instrument_id).into()
3732 };
3733 self.remove_deltas_subscription(pattern);
3734
3735 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3736 instrument_id,
3737 client_id,
3738 venue: Some(instrument_id.venue),
3739 command_id: UUID4::new(),
3740 ts_init: self.timestamp_ns(),
3741 correlation_id: None,
3742 params,
3743 });
3744
3745 self.send_data_cmd(DataCommand::Unsubscribe(command));
3746 }
3747
3748 pub fn unsubscribe_book_at_interval(
3750 &mut self,
3751 instrument_id: InstrumentId,
3752 interval_ms: NonZeroUsize,
3753 client_id: Option<ClientId>,
3754 params: Option<Params>,
3755 ) {
3756 self.check_registered();
3757
3758 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3759 self.remove_book_snapshot_subscription(topic);
3760
3761 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3762 instrument_id,
3763 interval_ms,
3764 client_id,
3765 venue: Some(instrument_id.venue),
3766 command_id: UUID4::new(),
3767 ts_init: self.timestamp_ns(),
3768 correlation_id: None,
3769 params,
3770 });
3771
3772 self.send_data_cmd(DataCommand::Unsubscribe(command));
3773 }
3774
3775 pub fn unsubscribe_quotes(
3777 &mut self,
3778 instrument_id: InstrumentId,
3779 client_id: Option<ClientId>,
3780 params: Option<Params>,
3781 ) {
3782 self.check_registered();
3783
3784 let topic = get_quotes_topic(instrument_id);
3785 self.remove_quote_subscription(topic);
3786
3787 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3788 instrument_id,
3789 client_id,
3790 venue: Some(instrument_id.venue),
3791 command_id: UUID4::new(),
3792 ts_init: self.timestamp_ns(),
3793 correlation_id: None,
3794 params,
3795 });
3796
3797 self.send_data_cmd(DataCommand::Unsubscribe(command));
3798 }
3799
3800 pub fn unsubscribe_trades(
3802 &mut self,
3803 instrument_id: InstrumentId,
3804 client_id: Option<ClientId>,
3805 params: Option<Params>,
3806 ) {
3807 self.check_registered();
3808
3809 let topic = get_trades_topic(instrument_id);
3810 self.remove_trade_subscription(topic);
3811
3812 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3813 instrument_id,
3814 client_id,
3815 venue: Some(instrument_id.venue),
3816 command_id: UUID4::new(),
3817 ts_init: self.timestamp_ns(),
3818 correlation_id: None,
3819 params,
3820 });
3821
3822 self.send_data_cmd(DataCommand::Unsubscribe(command));
3823 }
3824
3825 pub fn unsubscribe_bars(
3827 &mut self,
3828 bar_type: BarType,
3829 client_id: Option<ClientId>,
3830 params: Option<Params>,
3831 ) {
3832 self.check_registered();
3833
3834 let topic = get_bars_topic(bar_type);
3835 self.remove_bar_subscription(topic);
3836
3837 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3838 bar_type,
3839 client_id,
3840 venue: Some(bar_type.instrument_id().venue),
3841 command_id: UUID4::new(),
3842 ts_init: self.timestamp_ns(),
3843 correlation_id: None,
3844 params,
3845 });
3846
3847 self.send_data_cmd(DataCommand::Unsubscribe(command));
3848 }
3849
3850 pub fn unsubscribe_mark_prices(
3852 &mut self,
3853 instrument_id: InstrumentId,
3854 client_id: Option<ClientId>,
3855 params: Option<Params>,
3856 ) {
3857 self.check_registered();
3858
3859 let topic = get_mark_price_topic(instrument_id);
3860 self.remove_mark_price_subscription(topic);
3861
3862 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3863 instrument_id,
3864 client_id,
3865 venue: Some(instrument_id.venue),
3866 command_id: UUID4::new(),
3867 ts_init: self.timestamp_ns(),
3868 correlation_id: None,
3869 params,
3870 });
3871
3872 self.send_data_cmd(DataCommand::Unsubscribe(command));
3873 }
3874
3875 pub fn unsubscribe_index_prices(
3877 &mut self,
3878 instrument_id: InstrumentId,
3879 client_id: Option<ClientId>,
3880 params: Option<Params>,
3881 ) {
3882 self.check_registered();
3883
3884 let topic = get_index_price_topic(instrument_id);
3885 self.remove_index_price_subscription(topic);
3886
3887 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3888 instrument_id,
3889 client_id,
3890 venue: Some(instrument_id.venue),
3891 command_id: UUID4::new(),
3892 ts_init: self.timestamp_ns(),
3893 correlation_id: None,
3894 params,
3895 });
3896
3897 self.send_data_cmd(DataCommand::Unsubscribe(command));
3898 }
3899
3900 pub fn unsubscribe_funding_rates(
3902 &mut self,
3903 instrument_id: InstrumentId,
3904 client_id: Option<ClientId>,
3905 params: Option<Params>,
3906 ) {
3907 self.check_registered();
3908
3909 let topic = get_funding_rate_topic(instrument_id);
3910 self.remove_funding_rate_subscription(topic);
3911
3912 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3913 instrument_id,
3914 client_id,
3915 venue: Some(instrument_id.venue),
3916 command_id: UUID4::new(),
3917 ts_init: self.timestamp_ns(),
3918 correlation_id: None,
3919 params,
3920 });
3921
3922 self.send_data_cmd(DataCommand::Unsubscribe(command));
3923 }
3924
3925 pub fn unsubscribe_option_greeks(
3927 &mut self,
3928 instrument_id: InstrumentId,
3929 client_id: Option<ClientId>,
3930 params: Option<Params>,
3931 ) {
3932 self.check_registered();
3933
3934 let topic = get_option_greeks_topic(instrument_id);
3935 self.remove_option_greeks_subscription(topic);
3936
3937 let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
3938 instrument_id,
3939 client_id,
3940 venue: Some(instrument_id.venue),
3941 command_id: UUID4::new(),
3942 ts_init: self.timestamp_ns(),
3943 correlation_id: None,
3944 params,
3945 });
3946
3947 self.send_data_cmd(DataCommand::Unsubscribe(command));
3948 }
3949
3950 pub fn unsubscribe_instrument_status(
3952 &mut self,
3953 instrument_id: InstrumentId,
3954 client_id: Option<ClientId>,
3955 params: Option<Params>,
3956 ) {
3957 self.check_registered();
3958
3959 let topic = get_instrument_status_topic(instrument_id);
3960 self.remove_subscription_any(topic);
3961
3962 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3963 instrument_id,
3964 client_id,
3965 venue: Some(instrument_id.venue),
3966 command_id: UUID4::new(),
3967 ts_init: self.timestamp_ns(),
3968 correlation_id: None,
3969 params,
3970 });
3971
3972 self.send_data_cmd(DataCommand::Unsubscribe(command));
3973 }
3974
3975 pub fn unsubscribe_instrument_close(
3977 &mut self,
3978 instrument_id: InstrumentId,
3979 client_id: Option<ClientId>,
3980 params: Option<Params>,
3981 ) {
3982 self.check_registered();
3983
3984 let topic = get_instrument_close_topic(instrument_id);
3985 self.remove_instrument_close_subscription(topic);
3986
3987 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3988 instrument_id,
3989 client_id,
3990 venue: Some(instrument_id.venue),
3991 command_id: UUID4::new(),
3992 ts_init: self.timestamp_ns(),
3993 correlation_id: None,
3994 params,
3995 });
3996
3997 self.send_data_cmd(DataCommand::Unsubscribe(command));
3998 }
3999
4000 pub fn unsubscribe_option_chain(
4002 &mut self,
4003 series_id: OptionSeriesId,
4004 client_id: Option<ClientId>,
4005 ) {
4006 self.check_registered();
4007
4008 let topic = get_option_chain_topic(series_id);
4009 self.remove_option_chain_subscription(topic);
4010
4011 let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
4012 series_id,
4013 UUID4::new(),
4014 self.timestamp_ns(),
4015 client_id,
4016 Some(series_id.venue),
4017 ));
4018
4019 self.send_data_cmd(DataCommand::Unsubscribe(command));
4020 }
4021
4022 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
4024 self.check_registered();
4025
4026 let topic = get_order_fills_topic(instrument_id);
4027 self.remove_order_event_subscription(topic);
4028 }
4029
4030 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
4032 self.check_registered();
4033
4034 let topic = get_order_cancels_topic(instrument_id);
4035 self.remove_order_event_subscription(topic);
4036 }
4037
4038 #[expect(clippy::too_many_arguments)]
4044 pub fn request_data(
4045 &self,
4046 data_type: DataType,
4047 client_id: ClientId,
4048 start: Option<DateTime<Utc>>,
4049 end: Option<DateTime<Utc>>,
4050 limit: Option<NonZeroUsize>,
4051 params: Option<Params>,
4052 handler: ShareableMessageHandler,
4053 ) -> anyhow::Result<UUID4> {
4054 self.check_registered();
4055
4056 let now = self.clock_ref().utc_now();
4057 check_timestamps(now, start, end)?;
4058
4059 let request_id = UUID4::new();
4060 let command = RequestCommand::Data(RequestCustomData {
4061 client_id,
4062 data_type,
4063 start,
4064 end,
4065 limit,
4066 request_id,
4067 ts_init: self.timestamp_ns(),
4068 params,
4069 });
4070
4071 get_message_bus()
4072 .borrow_mut()
4073 .register_response_handler(command.request_id(), handler)?;
4074
4075 self.send_data_cmd(DataCommand::Request(command));
4076
4077 Ok(request_id)
4078 }
4079
4080 pub fn request_instrument(
4086 &self,
4087 instrument_id: InstrumentId,
4088 start: Option<DateTime<Utc>>,
4089 end: Option<DateTime<Utc>>,
4090 client_id: Option<ClientId>,
4091 params: Option<Params>,
4092 handler: ShareableMessageHandler,
4093 ) -> anyhow::Result<UUID4> {
4094 self.check_registered();
4095
4096 let now = self.clock_ref().utc_now();
4097 check_timestamps(now, start, end)?;
4098
4099 let request_id = UUID4::new();
4100 let command = RequestCommand::Instrument(RequestInstrument {
4101 instrument_id,
4102 start,
4103 end,
4104 client_id,
4105 request_id,
4106 ts_init: now.into(),
4107 params,
4108 });
4109
4110 get_message_bus()
4111 .borrow_mut()
4112 .register_response_handler(command.request_id(), handler)?;
4113
4114 self.send_data_cmd(DataCommand::Request(command));
4115
4116 Ok(request_id)
4117 }
4118
4119 pub fn request_instruments(
4125 &self,
4126 venue: Option<Venue>,
4127 start: Option<DateTime<Utc>>,
4128 end: Option<DateTime<Utc>>,
4129 client_id: Option<ClientId>,
4130 params: Option<Params>,
4131 handler: ShareableMessageHandler,
4132 ) -> anyhow::Result<UUID4> {
4133 self.check_registered();
4134
4135 let now = self.clock_ref().utc_now();
4136 check_timestamps(now, start, end)?;
4137
4138 let request_id = UUID4::new();
4139 let command = RequestCommand::Instruments(RequestInstruments {
4140 venue,
4141 start,
4142 end,
4143 client_id,
4144 request_id,
4145 ts_init: now.into(),
4146 params,
4147 });
4148
4149 get_message_bus()
4150 .borrow_mut()
4151 .register_response_handler(command.request_id(), handler)?;
4152
4153 self.send_data_cmd(DataCommand::Request(command));
4154
4155 Ok(request_id)
4156 }
4157
4158 pub fn request_book_snapshot(
4164 &self,
4165 instrument_id: InstrumentId,
4166 depth: Option<NonZeroUsize>,
4167 client_id: Option<ClientId>,
4168 params: Option<Params>,
4169 handler: ShareableMessageHandler,
4170 ) -> anyhow::Result<UUID4> {
4171 self.check_registered();
4172
4173 let request_id = UUID4::new();
4174 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4175 instrument_id,
4176 depth,
4177 client_id,
4178 request_id,
4179 ts_init: self.timestamp_ns(),
4180 params,
4181 });
4182
4183 get_message_bus()
4184 .borrow_mut()
4185 .register_response_handler(command.request_id(), handler)?;
4186
4187 self.send_data_cmd(DataCommand::Request(command));
4188
4189 Ok(request_id)
4190 }
4191
4192 #[expect(clippy::too_many_arguments)]
4198 pub fn request_quotes(
4199 &self,
4200 instrument_id: InstrumentId,
4201 start: Option<DateTime<Utc>>,
4202 end: Option<DateTime<Utc>>,
4203 limit: Option<NonZeroUsize>,
4204 client_id: Option<ClientId>,
4205 params: Option<Params>,
4206 handler: ShareableMessageHandler,
4207 ) -> anyhow::Result<UUID4> {
4208 self.check_registered();
4209
4210 let now = self.clock_ref().utc_now();
4211 check_timestamps(now, start, end)?;
4212
4213 let request_id = UUID4::new();
4214 let command = RequestCommand::Quotes(RequestQuotes {
4215 instrument_id,
4216 start,
4217 end,
4218 limit,
4219 client_id,
4220 request_id,
4221 ts_init: now.into(),
4222 params,
4223 });
4224
4225 get_message_bus()
4226 .borrow_mut()
4227 .register_response_handler(command.request_id(), handler)?;
4228
4229 self.send_data_cmd(DataCommand::Request(command));
4230
4231 Ok(request_id)
4232 }
4233
4234 #[expect(clippy::too_many_arguments)]
4240 pub fn request_trades(
4241 &self,
4242 instrument_id: InstrumentId,
4243 start: Option<DateTime<Utc>>,
4244 end: Option<DateTime<Utc>>,
4245 limit: Option<NonZeroUsize>,
4246 client_id: Option<ClientId>,
4247 params: Option<Params>,
4248 handler: ShareableMessageHandler,
4249 ) -> anyhow::Result<UUID4> {
4250 self.check_registered();
4251
4252 let now = self.clock_ref().utc_now();
4253 check_timestamps(now, start, end)?;
4254
4255 let request_id = UUID4::new();
4256 let command = RequestCommand::Trades(RequestTrades {
4257 instrument_id,
4258 start,
4259 end,
4260 limit,
4261 client_id,
4262 request_id,
4263 ts_init: now.into(),
4264 params,
4265 });
4266
4267 get_message_bus()
4268 .borrow_mut()
4269 .register_response_handler(command.request_id(), handler)?;
4270
4271 self.send_data_cmd(DataCommand::Request(command));
4272
4273 Ok(request_id)
4274 }
4275
4276 #[expect(clippy::too_many_arguments)]
4282 pub fn request_funding_rates(
4283 &self,
4284 instrument_id: InstrumentId,
4285 start: Option<DateTime<Utc>>,
4286 end: Option<DateTime<Utc>>,
4287 limit: Option<NonZeroUsize>,
4288 client_id: Option<ClientId>,
4289 params: Option<Params>,
4290 handler: ShareableMessageHandler,
4291 ) -> anyhow::Result<UUID4> {
4292 self.check_registered();
4293
4294 let now = self.clock_ref().utc_now();
4295 check_timestamps(now, start, end)?;
4296
4297 let request_id = UUID4::new();
4298 let command = RequestCommand::FundingRates(RequestFundingRates {
4299 instrument_id,
4300 start,
4301 end,
4302 limit,
4303 client_id,
4304 request_id,
4305 ts_init: now.into(),
4306 params,
4307 });
4308
4309 get_message_bus()
4310 .borrow_mut()
4311 .register_response_handler(command.request_id(), handler)?;
4312
4313 self.send_data_cmd(DataCommand::Request(command));
4314
4315 Ok(request_id)
4316 }
4317
4318 #[expect(clippy::too_many_arguments)]
4324 pub fn request_bars(
4325 &self,
4326 bar_type: BarType,
4327 start: Option<DateTime<Utc>>,
4328 end: Option<DateTime<Utc>>,
4329 limit: Option<NonZeroUsize>,
4330 client_id: Option<ClientId>,
4331 params: Option<Params>,
4332 handler: ShareableMessageHandler,
4333 ) -> anyhow::Result<UUID4> {
4334 self.check_registered();
4335
4336 let now = self.clock_ref().utc_now();
4337 check_timestamps(now, start, end)?;
4338
4339 let request_id = UUID4::new();
4340 let command = RequestCommand::Bars(RequestBars {
4341 bar_type,
4342 start,
4343 end,
4344 limit,
4345 client_id,
4346 request_id,
4347 ts_init: now.into(),
4348 params,
4349 });
4350
4351 get_message_bus()
4352 .borrow_mut()
4353 .register_response_handler(command.request_id(), handler)?;
4354
4355 self.send_data_cmd(DataCommand::Request(command));
4356
4357 Ok(request_id)
4358 }
4359
4360 #[cfg(test)]
4361 pub fn quote_handler_count(&self) -> usize {
4362 self.quote_handlers.len()
4363 }
4364
4365 #[cfg(test)]
4366 pub fn trade_handler_count(&self) -> usize {
4367 self.trade_handlers.len()
4368 }
4369
4370 #[cfg(test)]
4371 pub fn bar_handler_count(&self) -> usize {
4372 self.bar_handlers.len()
4373 }
4374
4375 #[cfg(test)]
4376 pub fn deltas_handler_count(&self) -> usize {
4377 self.deltas_handlers.len()
4378 }
4379
4380 #[cfg(test)]
4381 pub fn has_quote_handler(&self, topic: &str) -> bool {
4382 self.quote_handlers
4383 .contains_key(&MStr::<Topic>::from(topic))
4384 }
4385
4386 #[cfg(test)]
4387 pub fn has_trade_handler(&self, topic: &str) -> bool {
4388 self.trade_handlers
4389 .contains_key(&MStr::<Topic>::from(topic))
4390 }
4391
4392 #[cfg(test)]
4393 pub fn has_bar_handler(&self, topic: &str) -> bool {
4394 self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4395 }
4396
4397 #[cfg(test)]
4398 pub fn has_deltas_handler(&self, pattern: &str) -> bool {
4399 self.deltas_handlers
4400 .contains_key(&MStr::<Pattern>::from(pattern))
4401 }
4402}
4403
4404fn check_timestamps(
4405 now: DateTime<Utc>,
4406 start: Option<DateTime<Utc>>,
4407 end: Option<DateTime<Utc>>,
4408) -> anyhow::Result<()> {
4409 if let Some(start) = start {
4410 check_predicate_true(start <= now, "start was > now")?;
4411 }
4412
4413 if let Some(end) = end {
4414 check_predicate_true(end <= now, "end was > now")?;
4415 }
4416
4417 if let (Some(start), Some(end)) = (start, end) {
4418 check_predicate_true(start < end, "start was >= end")?;
4419 }
4420
4421 Ok(())
4422}
4423
4424fn log_error(e: &anyhow::Error) {
4425 log::error!("{e}");
4426}
4427
4428fn log_not_running<T>(msg: &T)
4429where
4430 T: Debug,
4431{
4432 log::trace!("Received message when not running - skipping {msg:?}");
4433}
4434
4435fn log_received<T>(msg: &T)
4436where
4437 T: Debug,
4438{
4439 log::debug!("{RECV} {msg:?}");
4440}
4441
4442fn log_received_bulk(kind: &str, correlation_id: &UUID4, records: usize) {
4443 log::debug!("{RECV} {kind} correlation_id={correlation_id} records={records}");
4444}