nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24    sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, close::InstrumentClose,
39    },
40    enums::BookType,
41    events::order::{canceled::OrderCanceled, filled::OrderFilled},
42    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, TraderId, Venue},
43    instruments::InstrumentAny,
44    orderbook::OrderBook,
45};
46use ustr::Ustr;
47
48#[cfg(feature = "indicators")]
49use super::indicators::Indicators;
50use super::{
51    Actor,
52    registry::{get_actor_unchecked, try_get_actor_unchecked},
53};
54#[cfg(feature = "defi")]
55use crate::defi;
56#[cfg(feature = "defi")]
57#[allow(unused_imports)]
58use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
59use crate::{
60    cache::Cache,
61    clock::Clock,
62    component::Component,
63    enums::{ComponentState, ComponentTrigger},
64    logging::{CMD, RECV, REQ, SEND},
65    messages::{
66        data::{
67            BarsResponse, BookResponse, CustomDataResponse, DataCommand, InstrumentResponse,
68            InstrumentsResponse, QuotesResponse, RequestBars, RequestBookSnapshot, RequestCommand,
69            RequestCustomData, RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
70            SubscribeBars, SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand,
71            SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
72            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
73            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
74            UnsubscribeBookDeltas, UnsubscribeBookSnapshots, UnsubscribeCommand,
75            UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
76            UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
77            UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
78        },
79        system::ShutdownSystem,
80    },
81    msgbus::{
82        self, MStr, Topic, get_message_bus,
83        handler::{ShareableMessageHandler, TypedMessageHandler},
84        switchboard::{
85            MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
86            get_custom_topic, get_funding_rate_topic, get_index_price_topic,
87            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
88            get_instruments_topic, get_mark_price_topic, get_order_cancels_topic,
89            get_order_fills_topic, get_quotes_topic, get_trades_topic,
90        },
91    },
92    signal::Signal,
93    timer::{TimeEvent, TimeEventCallback},
94};
95
96/// Common configuration for [`DataActor`] based components.
97#[derive(Debug, Clone)]
98#[cfg_attr(
99    feature = "python",
100    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", subclass)
101)]
102pub struct DataActorConfig {
103    /// The custom identifier for the Actor.
104    pub actor_id: Option<ActorId>,
105    /// If events should be logged.
106    pub log_events: bool,
107    /// If commands should be logged.
108    pub log_commands: bool,
109}
110
111impl Default for DataActorConfig {
112    fn default() -> Self {
113        Self {
114            actor_id: None,
115            log_events: true,
116            log_commands: true,
117        }
118    }
119}
120
121/// Configuration for creating actors from importable paths.
122#[derive(Debug, Clone)]
123#[cfg_attr(
124    feature = "python",
125    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
126)]
127pub struct ImportableActorConfig {
128    /// The fully qualified name of the Actor class.
129    pub actor_path: String,
130    /// The fully qualified name of the Actor config class.
131    pub config_path: String,
132    /// The actor configuration as a dictionary.
133    pub config: HashMap<String, serde_json::Value>,
134}
135
136type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
137
138pub trait DataActor:
139    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
140{
141    /// Actions to be performed when the actor state is saved.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if saving the actor state fails.
146    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
147        Ok(IndexMap::new())
148    }
149
150    /// Actions to be performed when the actor state is loaded.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if loading the actor state fails.
155    #[allow(unused_variables)]
156    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
157        Ok(())
158    }
159
160    /// Actions to be performed on start.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if starting the actor fails.
165    fn on_start(&mut self) -> anyhow::Result<()> {
166        log::warn!(
167            "The `on_start` handler was called when not overridden, \
168            it's expected that any actions required when starting the actor \
169            occur here, such as subscribing/requesting data"
170        );
171        Ok(())
172    }
173
174    /// Actions to be performed on stop.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if stopping the actor fails.
179    fn on_stop(&mut self) -> anyhow::Result<()> {
180        log::warn!(
181            "The `on_stop` handler was called when not overridden, \
182            it's expected that any actions required when stopping the actor \
183            occur here, such as unsubscribing from data",
184        );
185        Ok(())
186    }
187
188    /// Actions to be performed on resume.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if resuming the actor fails.
193    fn on_resume(&mut self) -> anyhow::Result<()> {
194        log::warn!(
195            "The `on_resume` handler was called when not overridden, \
196            it's expected that any actions required when resuming the actor \
197            following a stop occur here"
198        );
199        Ok(())
200    }
201
202    /// Actions to be performed on reset.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if resetting the actor fails.
207    fn on_reset(&mut self) -> anyhow::Result<()> {
208        log::warn!(
209            "The `on_reset` handler was called when not overridden, \
210            it's expected that any actions required when resetting the actor \
211            occur here, such as resetting indicators and other state"
212        );
213        Ok(())
214    }
215
216    /// Actions to be performed on dispose.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if disposing the actor fails.
221    fn on_dispose(&mut self) -> anyhow::Result<()> {
222        Ok(())
223    }
224
225    /// Actions to be performed on degrade.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if degrading the actor fails.
230    fn on_degrade(&mut self) -> anyhow::Result<()> {
231        Ok(())
232    }
233
234    /// Actions to be performed on fault.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if faulting the actor fails.
239    fn on_fault(&mut self) -> anyhow::Result<()> {
240        Ok(())
241    }
242
243    /// Actions to be performed when receiving a time event.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if handling the time event fails.
248    #[allow(unused_variables)]
249    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
250        Ok(())
251    }
252
253    /// Actions to be performed when receiving custom data.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if handling the data fails.
258    #[allow(unused_variables)]
259    fn on_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
260        Ok(())
261    }
262
263    /// Actions to be performed when receiving a signal.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if handling the signal fails.
268    #[allow(unused_variables)]
269    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
270        Ok(())
271    }
272
273    /// Actions to be performed when receiving an instrument.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if handling the instrument fails.
278    #[allow(unused_variables)]
279    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
280        Ok(())
281    }
282
283    /// Actions to be performed when receiving order book deltas.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if handling the book deltas fails.
288    #[allow(unused_variables)]
289    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
290        Ok(())
291    }
292
293    /// Actions to be performed when receiving an order book.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if handling the book fails.
298    #[allow(unused_variables)]
299    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
300        Ok(())
301    }
302
303    /// Actions to be performed when receiving a quote.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if handling the quote fails.
308    #[allow(unused_variables)]
309    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
310        Ok(())
311    }
312
313    /// Actions to be performed when receiving a trade.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if handling the trade fails.
318    #[allow(unused_variables)]
319    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
320        Ok(())
321    }
322
323    /// Actions to be performed when receiving a bar.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if handling the bar fails.
328    #[allow(unused_variables)]
329    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
330        Ok(())
331    }
332
333    /// Actions to be performed when receiving a mark price update.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if handling the mark price update fails.
338    #[allow(unused_variables)]
339    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
340        Ok(())
341    }
342
343    /// Actions to be performed when receiving an index price update.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if handling the index price update fails.
348    #[allow(unused_variables)]
349    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
350        Ok(())
351    }
352
353    /// Actions to be performed when receiving a funding rate update.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if handling the funding rate update fails.
358    #[allow(unused_variables)]
359    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
360        Ok(())
361    }
362
363    /// Actions to be performed when receiving an instrument status update.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if handling the instrument status update fails.
368    #[allow(unused_variables)]
369    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
370        Ok(())
371    }
372
373    /// Actions to be performed when receiving an instrument close update.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if handling the instrument close update fails.
378    #[allow(unused_variables)]
379    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
380        Ok(())
381    }
382
383    /// Actions to be performed when receiving an order filled event.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if handling the order filled event fails.
388    #[allow(unused_variables)]
389    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
390        Ok(())
391    }
392
393    /// Actions to be performed when receiving an order canceled event.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if handling the order canceled event fails.
398    #[allow(unused_variables)]
399    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
400        Ok(())
401    }
402
403    #[cfg(feature = "defi")]
404    /// Actions to be performed when receiving a block.
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if handling the block fails.
409    #[allow(unused_variables)]
410    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
411        Ok(())
412    }
413
414    #[cfg(feature = "defi")]
415    /// Actions to be performed when receiving a pool.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if handling the pool fails.
420    #[allow(unused_variables)]
421    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
422        Ok(())
423    }
424
425    #[cfg(feature = "defi")]
426    /// Actions to be performed when receiving a pool swap.
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if handling the pool swap fails.
431    #[allow(unused_variables)]
432    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
433        Ok(())
434    }
435
436    #[cfg(feature = "defi")]
437    /// Actions to be performed when receiving a pool liquidity update.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if handling the pool liquidity update fails.
442    #[allow(unused_variables)]
443    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
444        Ok(())
445    }
446
447    #[cfg(feature = "defi")]
448    /// Actions to be performed when receiving a pool fee collect event.
449    ///
450    /// # Errors
451    ///
452    /// Returns an error if handling the pool fee collect fails.
453    #[allow(unused_variables)]
454    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
455        Ok(())
456    }
457
458    #[cfg(feature = "defi")]
459    /// Actions to be performed when receiving a pool flash event.
460    ///
461    /// # Errors
462    ///
463    /// Returns an error if handling the pool flash fails.
464    #[allow(unused_variables)]
465    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
466        Ok(())
467    }
468
469    /// Actions to be performed when receiving historical data.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if handling the historical data fails.
474    #[allow(unused_variables)]
475    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
476        Ok(())
477    }
478
479    /// Actions to be performed when receiving historical quotes.
480    ///
481    /// # Errors
482    ///
483    /// Returns an error if handling the historical quotes fails.
484    #[allow(unused_variables)]
485    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
486        Ok(())
487    }
488
489    /// Actions to be performed when receiving historical trades.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if handling the historical trades fails.
494    #[allow(unused_variables)]
495    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
496        Ok(())
497    }
498
499    /// Actions to be performed when receiving historical bars.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if handling the historical bars fails.
504    #[allow(unused_variables)]
505    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
506        Ok(())
507    }
508
509    /// Actions to be performed when receiving historical mark prices.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if handling the historical mark prices fails.
514    #[allow(unused_variables)]
515    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
516        Ok(())
517    }
518
519    /// Actions to be performed when receiving historical index prices.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if handling the historical index prices fails.
524    #[allow(unused_variables)]
525    fn on_historical_index_prices(
526        &mut self,
527        index_prices: &[IndexPriceUpdate],
528    ) -> anyhow::Result<()> {
529        Ok(())
530    }
531
532    /// Handles a received time event.
533    fn handle_time_event(&mut self, event: &TimeEvent) {
534        log_received(&event);
535
536        if let Err(e) = DataActor::on_time_event(self, event) {
537            log_error(&e);
538        }
539    }
540
541    /// Handles a received custom data point.
542    fn handle_data(&mut self, data: &dyn Any) {
543        log_received(&data);
544
545        if self.not_running() {
546            log_not_running(&data);
547            return;
548        }
549
550        if let Err(e) = self.on_data(data) {
551            log_error(&e);
552        }
553    }
554
555    /// Handles a received signal.
556    fn handle_signal(&mut self, signal: &Signal) {
557        log_received(&signal);
558
559        if self.not_running() {
560            log_not_running(&signal);
561            return;
562        }
563
564        if let Err(e) = self.on_signal(signal) {
565            log_error(&e);
566        }
567    }
568
569    /// Handles a received instrument.
570    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
571        log_received(&instrument);
572
573        if self.not_running() {
574            log_not_running(&instrument);
575            return;
576        }
577
578        if let Err(e) = self.on_instrument(instrument) {
579            log_error(&e);
580        }
581    }
582
583    /// Handles received order book deltas.
584    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
585        log_received(&deltas);
586
587        if self.not_running() {
588            log_not_running(&deltas);
589            return;
590        }
591
592        if let Err(e) = self.on_book_deltas(deltas) {
593            log_error(&e);
594        }
595    }
596
597    /// Handles a received order book reference.
598    fn handle_book(&mut self, book: &OrderBook) {
599        log_received(&book);
600
601        if self.not_running() {
602            log_not_running(&book);
603            return;
604        }
605
606        if let Err(e) = self.on_book(book) {
607            log_error(&e);
608        };
609    }
610
611    /// Handles a received quote.
612    fn handle_quote(&mut self, quote: &QuoteTick) {
613        log_received(&quote);
614
615        if self.not_running() {
616            log_not_running(&quote);
617            return;
618        }
619
620        if let Err(e) = self.on_quote(quote) {
621            log_error(&e);
622        }
623    }
624
625    /// Handles a received trade.
626    fn handle_trade(&mut self, trade: &TradeTick) {
627        log_received(&trade);
628
629        if self.not_running() {
630            log_not_running(&trade);
631            return;
632        }
633
634        if let Err(e) = self.on_trade(trade) {
635            log_error(&e);
636        }
637    }
638
639    /// Handles a receiving bar.
640    fn handle_bar(&mut self, bar: &Bar) {
641        log_received(&bar);
642
643        if self.not_running() {
644            log_not_running(&bar);
645            return;
646        }
647
648        if let Err(e) = self.on_bar(bar) {
649            log_error(&e);
650        }
651    }
652
653    /// Handles a received mark price update.
654    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
655        log_received(&mark_price);
656
657        if self.not_running() {
658            log_not_running(&mark_price);
659            return;
660        }
661
662        if let Err(e) = self.on_mark_price(mark_price) {
663            log_error(&e);
664        }
665    }
666
667    /// Handles a received index price update.
668    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
669        log_received(&index_price);
670
671        if self.not_running() {
672            log_not_running(&index_price);
673            return;
674        }
675
676        if let Err(e) = self.on_index_price(index_price) {
677            log_error(&e);
678        }
679    }
680
681    /// Handles a received funding rate update.
682    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
683        log_received(&funding_rate);
684
685        if self.not_running() {
686            log_not_running(&funding_rate);
687            return;
688        }
689
690        if let Err(e) = self.on_funding_rate(funding_rate) {
691            log_error(&e);
692        }
693    }
694
695    /// Handles a received instrument status.
696    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
697        log_received(&status);
698
699        if self.not_running() {
700            log_not_running(&status);
701            return;
702        }
703
704        if let Err(e) = self.on_instrument_status(status) {
705            log_error(&e);
706        }
707    }
708
709    /// Handles a received instrument close.
710    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
711        log_received(&close);
712
713        if self.not_running() {
714            log_not_running(&close);
715            return;
716        }
717
718        if let Err(e) = self.on_instrument_close(close) {
719            log_error(&e);
720        }
721    }
722
723    /// Handles a received order filled event.
724    fn handle_order_filled(&mut self, event: &OrderFilled) {
725        log_received(&event);
726
727        // Check for double-handling: if the event's strategy_id matches this actor's id,
728        // it means a Strategy is receiving its own fill event through both automatic
729        // subscription and manual subscribe_order_fills, so skip the manual handler.
730        if event.strategy_id.inner() == self.actor_id().inner() {
731            return;
732        }
733
734        if self.not_running() {
735            log_not_running(&event);
736            return;
737        }
738
739        if let Err(e) = self.on_order_filled(event) {
740            log_error(&e);
741        }
742    }
743
744    /// Handles a received order canceled event.
745    fn handle_order_canceled(&mut self, event: &OrderCanceled) {
746        log_received(&event);
747
748        // Check for double-handling: if the event's strategy_id matches this actor's id,
749        // it means a Strategy is receiving its own cancel event through both automatic
750        // subscription and manual subscribe_order_cancels, so skip the manual handler.
751        if event.strategy_id.inner() == self.actor_id().inner() {
752            return;
753        }
754
755        if self.not_running() {
756            log_not_running(&event);
757            return;
758        }
759
760        if let Err(e) = self.on_order_canceled(event) {
761            log_error(&e);
762        }
763    }
764
765    #[cfg(feature = "defi")]
766    /// Handles a received block.
767    fn handle_block(&mut self, block: &Block) {
768        log_received(&block);
769
770        if self.not_running() {
771            log_not_running(&block);
772            return;
773        }
774
775        if let Err(e) = self.on_block(block) {
776            log_error(&e);
777        }
778    }
779
780    #[cfg(feature = "defi")]
781    /// Handles a received pool definition update.
782    fn handle_pool(&mut self, pool: &Pool) {
783        log_received(&pool);
784
785        if self.not_running() {
786            log_not_running(&pool);
787            return;
788        }
789
790        if let Err(e) = self.on_pool(pool) {
791            log_error(&e);
792        }
793    }
794
795    #[cfg(feature = "defi")]
796    /// Handles a received pool swap.
797    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
798        log_received(&swap);
799
800        if self.not_running() {
801            log_not_running(&swap);
802            return;
803        }
804
805        if let Err(e) = self.on_pool_swap(swap) {
806            log_error(&e);
807        }
808    }
809
810    #[cfg(feature = "defi")]
811    /// Handles a received pool liquidity update.
812    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
813        log_received(&update);
814
815        if self.not_running() {
816            log_not_running(&update);
817            return;
818        }
819
820        if let Err(e) = self.on_pool_liquidity_update(update) {
821            log_error(&e);
822        }
823    }
824
825    #[cfg(feature = "defi")]
826    /// Handles a received pool fee collect.
827    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
828        log_received(&collect);
829
830        if self.not_running() {
831            log_not_running(&collect);
832            return;
833        }
834
835        if let Err(e) = self.on_pool_fee_collect(collect) {
836            log_error(&e);
837        }
838    }
839
840    #[cfg(feature = "defi")]
841    /// Handles a received pool flash event.
842    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
843        log_received(&flash);
844
845        if self.not_running() {
846            log_not_running(&flash);
847            return;
848        }
849
850        if let Err(e) = self.on_pool_flash(flash) {
851            log_error(&e);
852        }
853    }
854
855    /// Handles received historical data.
856    fn handle_historical_data(&mut self, data: &dyn Any) {
857        log_received(&data);
858
859        if let Err(e) = self.on_historical_data(data) {
860            log_error(&e);
861        }
862    }
863
864    /// Handles a data response.
865    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
866        log_received(&resp);
867
868        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
869            log_error(&e);
870        }
871    }
872
873    /// Handles an instrument response.
874    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
875        log_received(&resp);
876
877        if let Err(e) = self.on_instrument(&resp.data) {
878            log_error(&e);
879        }
880    }
881
882    /// Handles an instruments response.
883    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
884        log_received(&resp);
885
886        for inst in &resp.data {
887            if let Err(e) = self.on_instrument(inst) {
888                log_error(&e);
889            }
890        }
891    }
892
893    /// Handles a book response.
894    fn handle_book_response(&mut self, resp: &BookResponse) {
895        log_received(&resp);
896
897        if let Err(e) = self.on_book(&resp.data) {
898            log_error(&e);
899        }
900    }
901
902    /// Handles a quotes response.
903    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
904        log_received(&resp);
905
906        if let Err(e) = self.on_historical_quotes(&resp.data) {
907            log_error(&e);
908        }
909    }
910
911    /// Handles a trades response.
912    fn handle_trades_response(&mut self, resp: &TradesResponse) {
913        log_received(&resp);
914
915        if let Err(e) = self.on_historical_trades(&resp.data) {
916            log_error(&e);
917        }
918    }
919
920    /// Handles a bars response.
921    fn handle_bars_response(&mut self, resp: &BarsResponse) {
922        log_received(&resp);
923
924        if let Err(e) = self.on_historical_bars(&resp.data) {
925            log_error(&e);
926        }
927    }
928
929    /// Subscribe to streaming `data_type` data.
930    fn subscribe_data(
931        &mut self,
932        data_type: DataType,
933        client_id: Option<ClientId>,
934        params: Option<IndexMap<String, String>>,
935    ) where
936        Self: 'static + Debug + Sized,
937    {
938        let actor_id = self.actor_id().inner();
939        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::with_any(
940            move |data: &dyn Any| {
941                get_actor_unchecked::<Self>(&actor_id).handle_data(data);
942            },
943        )));
944
945        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
946    }
947
948    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
949    fn subscribe_quotes(
950        &mut self,
951        instrument_id: InstrumentId,
952        client_id: Option<ClientId>,
953        params: Option<IndexMap<String, String>>,
954    ) where
955        Self: 'static + Debug + Sized,
956    {
957        let actor_id = self.actor_id().inner();
958        let topic = get_quotes_topic(instrument_id);
959
960        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
961            move |quote: &QuoteTick| {
962                if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
963                    actor.handle_quote(quote);
964                } else {
965                    log::error!("Actor {actor_id} not found for quote handling");
966                }
967            },
968        )));
969
970        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
971    }
972
973    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
974    fn subscribe_instruments(
975        &mut self,
976        venue: Venue,
977        client_id: Option<ClientId>,
978        params: Option<IndexMap<String, String>>,
979    ) where
980        Self: 'static + Debug + Sized,
981    {
982        let actor_id = self.actor_id().inner();
983        let topic = get_instruments_topic(venue);
984
985        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
986            move |instrument: &InstrumentAny| {
987                if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
988                    actor.handle_instrument(instrument);
989                } else {
990                    log::error!("Actor {actor_id} not found for instruments handling");
991                }
992            },
993        )));
994
995        DataActorCore::subscribe_instruments(self, topic, handler, venue, client_id, params);
996    }
997
998    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
999    fn subscribe_instrument(
1000        &mut self,
1001        instrument_id: InstrumentId,
1002        client_id: Option<ClientId>,
1003        params: Option<IndexMap<String, String>>,
1004    ) where
1005        Self: 'static + Debug + Sized,
1006    {
1007        let actor_id = self.actor_id().inner();
1008        let topic = get_instrument_topic(instrument_id);
1009
1010        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1011            move |instrument: &InstrumentAny| {
1012                if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1013                    actor.handle_instrument(instrument);
1014                } else {
1015                    log::error!("Actor {actor_id} not found for instrument handling");
1016                }
1017            },
1018        )));
1019
1020        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1021    }
1022
1023    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1024    fn subscribe_book_deltas(
1025        &mut self,
1026        instrument_id: InstrumentId,
1027        book_type: BookType,
1028        depth: Option<NonZeroUsize>,
1029        client_id: Option<ClientId>,
1030        managed: bool,
1031        params: Option<IndexMap<String, String>>,
1032    ) where
1033        Self: 'static + Debug + Sized,
1034    {
1035        let actor_id = self.actor_id().inner();
1036        let topic = get_book_deltas_topic(instrument_id);
1037
1038        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1039            move |deltas: &OrderBookDeltas| {
1040                get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1041            },
1042        )));
1043
1044        DataActorCore::subscribe_book_deltas(
1045            self,
1046            topic,
1047            handler,
1048            instrument_id,
1049            book_type,
1050            depth,
1051            client_id,
1052            managed,
1053            params,
1054        );
1055    }
1056
1057    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1058    fn subscribe_book_at_interval(
1059        &mut self,
1060        instrument_id: InstrumentId,
1061        book_type: BookType,
1062        depth: Option<NonZeroUsize>,
1063        interval_ms: NonZeroUsize,
1064        client_id: Option<ClientId>,
1065        params: Option<IndexMap<String, String>>,
1066    ) where
1067        Self: 'static + Debug + Sized,
1068    {
1069        let actor_id = self.actor_id().inner();
1070        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1071
1072        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1073            move |book: &OrderBook| {
1074                get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1075            },
1076        )));
1077
1078        DataActorCore::subscribe_book_at_interval(
1079            self,
1080            topic,
1081            handler,
1082            instrument_id,
1083            book_type,
1084            depth,
1085            interval_ms,
1086            client_id,
1087            params,
1088        );
1089    }
1090
1091    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1092    fn subscribe_trades(
1093        &mut self,
1094        instrument_id: InstrumentId,
1095        client_id: Option<ClientId>,
1096        params: Option<IndexMap<String, String>>,
1097    ) where
1098        Self: 'static + Debug + Sized,
1099    {
1100        let actor_id = self.actor_id().inner();
1101        let topic = get_trades_topic(instrument_id);
1102
1103        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1104            move |trade: &TradeTick| {
1105                get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1106            },
1107        )));
1108
1109        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1110    }
1111
1112    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1113    fn subscribe_bars(
1114        &mut self,
1115        bar_type: BarType,
1116        client_id: Option<ClientId>,
1117        params: Option<IndexMap<String, String>>,
1118    ) where
1119        Self: 'static + Debug + Sized,
1120    {
1121        let actor_id = self.actor_id().inner();
1122        let topic = get_bars_topic(bar_type);
1123
1124        let handler =
1125            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |bar: &Bar| {
1126                get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1127            })));
1128
1129        DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1130    }
1131
1132    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1133    fn subscribe_mark_prices(
1134        &mut self,
1135        instrument_id: InstrumentId,
1136        client_id: Option<ClientId>,
1137        params: Option<IndexMap<String, String>>,
1138    ) where
1139        Self: 'static + Debug + Sized,
1140    {
1141        let actor_id = self.actor_id().inner();
1142        let topic = get_mark_price_topic(instrument_id);
1143
1144        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1145            move |mark_price: &MarkPriceUpdate| {
1146                get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1147            },
1148        )));
1149
1150        DataActorCore::subscribe_mark_prices(
1151            self,
1152            topic,
1153            handler,
1154            instrument_id,
1155            client_id,
1156            params,
1157        );
1158    }
1159
1160    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1161    fn subscribe_index_prices(
1162        &mut self,
1163        instrument_id: InstrumentId,
1164        client_id: Option<ClientId>,
1165        params: Option<IndexMap<String, String>>,
1166    ) where
1167        Self: 'static + Debug + Sized,
1168    {
1169        let actor_id = self.actor_id().inner();
1170        let topic = get_index_price_topic(instrument_id);
1171
1172        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1173            move |index_price: &IndexPriceUpdate| {
1174                get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1175            },
1176        )));
1177
1178        DataActorCore::subscribe_index_prices(
1179            self,
1180            topic,
1181            handler,
1182            instrument_id,
1183            client_id,
1184            params,
1185        );
1186    }
1187
1188    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1189    fn subscribe_funding_rates(
1190        &mut self,
1191        instrument_id: InstrumentId,
1192        client_id: Option<ClientId>,
1193        params: Option<IndexMap<String, String>>,
1194    ) where
1195        Self: 'static + Debug + Sized,
1196    {
1197        let actor_id = self.actor_id().inner();
1198        let topic = get_funding_rate_topic(instrument_id);
1199
1200        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1201            move |funding_rate: &FundingRateUpdate| {
1202                get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1203            },
1204        )));
1205
1206        DataActorCore::subscribe_funding_rates(
1207            self,
1208            topic,
1209            handler,
1210            instrument_id,
1211            client_id,
1212            params,
1213        );
1214    }
1215
1216    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1217    fn subscribe_instrument_status(
1218        &mut self,
1219        instrument_id: InstrumentId,
1220        client_id: Option<ClientId>,
1221        params: Option<IndexMap<String, String>>,
1222    ) where
1223        Self: 'static + Debug + Sized,
1224    {
1225        let actor_id = self.actor_id().inner();
1226        let topic = get_instrument_status_topic(instrument_id);
1227
1228        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1229            move |status: &InstrumentStatus| {
1230                get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1231            },
1232        )));
1233
1234        DataActorCore::subscribe_instrument_status(
1235            self,
1236            topic,
1237            handler,
1238            instrument_id,
1239            client_id,
1240            params,
1241        );
1242    }
1243
1244    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1245    fn subscribe_instrument_close(
1246        &mut self,
1247        instrument_id: InstrumentId,
1248        client_id: Option<ClientId>,
1249        params: Option<IndexMap<String, String>>,
1250    ) where
1251        Self: 'static + Debug + Sized,
1252    {
1253        let actor_id = self.actor_id().inner();
1254        let topic = get_instrument_close_topic(instrument_id);
1255
1256        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1257            move |close: &InstrumentClose| {
1258                get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1259            },
1260        )));
1261
1262        DataActorCore::subscribe_instrument_close(
1263            self,
1264            topic,
1265            handler,
1266            instrument_id,
1267            client_id,
1268            params,
1269        );
1270    }
1271
1272    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1273    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1274    where
1275        Self: 'static + Debug + Sized,
1276    {
1277        let actor_id = self.actor_id().inner();
1278        let topic = get_order_fills_topic(instrument_id);
1279
1280        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1281            move |event: &OrderFilled| {
1282                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(event);
1283            },
1284        )));
1285
1286        DataActorCore::subscribe_order_fills(self, topic, handler);
1287    }
1288
1289    /// Subscribe to [`OrderCanceled`] events for the `instrument_id`.
1290    fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1291    where
1292        Self: 'static + Debug + Sized,
1293    {
1294        let actor_id = self.actor_id().inner();
1295        let topic = get_order_cancels_topic(instrument_id);
1296
1297        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1298            move |event: &OrderCanceled| {
1299                get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(event);
1300            },
1301        )));
1302
1303        DataActorCore::subscribe_order_cancels(self, topic, handler);
1304    }
1305
1306    #[cfg(feature = "defi")]
1307    /// Subscribe to streaming [`Block`] data for the `chain`.
1308    fn subscribe_blocks(
1309        &mut self,
1310        chain: Blockchain,
1311        client_id: Option<ClientId>,
1312        params: Option<IndexMap<String, String>>,
1313    ) where
1314        Self: 'static + Debug + Sized,
1315    {
1316        let actor_id = self.actor_id().inner();
1317        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1318
1319        let handler =
1320            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |block: &Block| {
1321                get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1322            })));
1323
1324        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1325    }
1326
1327    #[cfg(feature = "defi")]
1328    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1329    fn subscribe_pool(
1330        &mut self,
1331        instrument_id: InstrumentId,
1332        client_id: Option<ClientId>,
1333        params: Option<IndexMap<String, String>>,
1334    ) where
1335        Self: 'static + Debug + Sized,
1336    {
1337        let actor_id = self.actor_id().inner();
1338        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1339
1340        let handler =
1341            ShareableMessageHandler(Rc::new(TypedMessageHandler::from(move |pool: &Pool| {
1342                get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1343            })));
1344
1345        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1346    }
1347
1348    #[cfg(feature = "defi")]
1349    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1350    fn subscribe_pool_swaps(
1351        &mut self,
1352        instrument_id: InstrumentId,
1353        client_id: Option<ClientId>,
1354        params: Option<IndexMap<String, String>>,
1355    ) where
1356        Self: 'static + Debug + Sized,
1357    {
1358        let actor_id = self.actor_id().inner();
1359        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1360
1361        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1362            move |swap: &PoolSwap| {
1363                get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1364            },
1365        )));
1366
1367        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1368    }
1369
1370    #[cfg(feature = "defi")]
1371    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1372    fn subscribe_pool_liquidity_updates(
1373        &mut self,
1374        instrument_id: InstrumentId,
1375        client_id: Option<ClientId>,
1376        params: Option<IndexMap<String, String>>,
1377    ) where
1378        Self: 'static + Debug + Sized,
1379    {
1380        let actor_id = self.actor_id().inner();
1381        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1382
1383        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1384            move |update: &PoolLiquidityUpdate| {
1385                get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1386            },
1387        )));
1388
1389        DataActorCore::subscribe_pool_liquidity_updates(
1390            self,
1391            topic,
1392            handler,
1393            instrument_id,
1394            client_id,
1395            params,
1396        );
1397    }
1398
1399    #[cfg(feature = "defi")]
1400    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1401    fn subscribe_pool_fee_collects(
1402        &mut self,
1403        instrument_id: InstrumentId,
1404        client_id: Option<ClientId>,
1405        params: Option<IndexMap<String, String>>,
1406    ) where
1407        Self: 'static + Debug + Sized,
1408    {
1409        let actor_id = self.actor_id().inner();
1410        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1411
1412        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1413            move |collect: &PoolFeeCollect| {
1414                get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1415            },
1416        )));
1417
1418        DataActorCore::subscribe_pool_fee_collects(
1419            self,
1420            topic,
1421            handler,
1422            instrument_id,
1423            client_id,
1424            params,
1425        );
1426    }
1427
1428    #[cfg(feature = "defi")]
1429    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1430    fn subscribe_pool_flash_events(
1431        &mut self,
1432        instrument_id: InstrumentId,
1433        client_id: Option<ClientId>,
1434        params: Option<IndexMap<String, String>>,
1435    ) where
1436        Self: 'static + Debug + Sized,
1437    {
1438        let actor_id = self.actor_id().inner();
1439        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1440
1441        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1442            move |flash: &PoolFlash| {
1443                get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1444            },
1445        )));
1446
1447        DataActorCore::subscribe_pool_flash_events(
1448            self,
1449            topic,
1450            handler,
1451            instrument_id,
1452            client_id,
1453            params,
1454        );
1455    }
1456
1457    /// Unsubscribe from streaming `data_type` data.
1458    fn unsubscribe_data(
1459        &mut self,
1460        data_type: DataType,
1461        client_id: Option<ClientId>,
1462        params: Option<IndexMap<String, String>>,
1463    ) where
1464        Self: 'static + Debug + Sized,
1465    {
1466        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1467    }
1468
1469    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1470    fn unsubscribe_instruments(
1471        &mut self,
1472        venue: Venue,
1473        client_id: Option<ClientId>,
1474        params: Option<IndexMap<String, String>>,
1475    ) where
1476        Self: 'static + Debug + Sized,
1477    {
1478        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1479    }
1480
1481    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1482    fn unsubscribe_instrument(
1483        &mut self,
1484        instrument_id: InstrumentId,
1485        client_id: Option<ClientId>,
1486        params: Option<IndexMap<String, String>>,
1487    ) where
1488        Self: 'static + Debug + Sized,
1489    {
1490        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1491    }
1492
1493    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1494    fn unsubscribe_book_deltas(
1495        &mut self,
1496        instrument_id: InstrumentId,
1497        client_id: Option<ClientId>,
1498        params: Option<IndexMap<String, String>>,
1499    ) where
1500        Self: 'static + Debug + Sized,
1501    {
1502        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1503    }
1504
1505    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1506    fn unsubscribe_book_at_interval(
1507        &mut self,
1508        instrument_id: InstrumentId,
1509        interval_ms: NonZeroUsize,
1510        client_id: Option<ClientId>,
1511        params: Option<IndexMap<String, String>>,
1512    ) where
1513        Self: 'static + Debug + Sized,
1514    {
1515        DataActorCore::unsubscribe_book_at_interval(
1516            self,
1517            instrument_id,
1518            interval_ms,
1519            client_id,
1520            params,
1521        );
1522    }
1523
1524    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1525    fn unsubscribe_quotes(
1526        &mut self,
1527        instrument_id: InstrumentId,
1528        client_id: Option<ClientId>,
1529        params: Option<IndexMap<String, String>>,
1530    ) where
1531        Self: 'static + Debug + Sized,
1532    {
1533        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1534    }
1535
1536    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1537    fn unsubscribe_trades(
1538        &mut self,
1539        instrument_id: InstrumentId,
1540        client_id: Option<ClientId>,
1541        params: Option<IndexMap<String, String>>,
1542    ) where
1543        Self: 'static + Debug + Sized,
1544    {
1545        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1546    }
1547
1548    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1549    fn unsubscribe_bars(
1550        &mut self,
1551        bar_type: BarType,
1552        client_id: Option<ClientId>,
1553        params: Option<IndexMap<String, String>>,
1554    ) where
1555        Self: 'static + Debug + Sized,
1556    {
1557        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1558    }
1559
1560    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1561    fn unsubscribe_mark_prices(
1562        &mut self,
1563        instrument_id: InstrumentId,
1564        client_id: Option<ClientId>,
1565        params: Option<IndexMap<String, String>>,
1566    ) where
1567        Self: 'static + Debug + Sized,
1568    {
1569        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1570    }
1571
1572    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1573    fn unsubscribe_index_prices(
1574        &mut self,
1575        instrument_id: InstrumentId,
1576        client_id: Option<ClientId>,
1577        params: Option<IndexMap<String, String>>,
1578    ) where
1579        Self: 'static + Debug + Sized,
1580    {
1581        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1582    }
1583
1584    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1585    fn unsubscribe_funding_rates(
1586        &mut self,
1587        instrument_id: InstrumentId,
1588        client_id: Option<ClientId>,
1589        params: Option<IndexMap<String, String>>,
1590    ) where
1591        Self: 'static + Debug + Sized,
1592    {
1593        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1594    }
1595
1596    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1597    fn unsubscribe_instrument_status(
1598        &mut self,
1599        instrument_id: InstrumentId,
1600        client_id: Option<ClientId>,
1601        params: Option<IndexMap<String, String>>,
1602    ) where
1603        Self: 'static + Debug + Sized,
1604    {
1605        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1606    }
1607
1608    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1609    fn unsubscribe_instrument_close(
1610        &mut self,
1611        instrument_id: InstrumentId,
1612        client_id: Option<ClientId>,
1613        params: Option<IndexMap<String, String>>,
1614    ) where
1615        Self: 'static + Debug + Sized,
1616    {
1617        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1618    }
1619
1620    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
1621    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1622    where
1623        Self: 'static + Debug + Sized,
1624    {
1625        DataActorCore::unsubscribe_order_fills(self, instrument_id);
1626    }
1627
1628    /// Unsubscribe from [`OrderCanceled`] events for the `instrument_id`.
1629    fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1630    where
1631        Self: 'static + Debug + Sized,
1632    {
1633        DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1634    }
1635
1636    #[cfg(feature = "defi")]
1637    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1638    fn unsubscribe_blocks(
1639        &mut self,
1640        chain: Blockchain,
1641        client_id: Option<ClientId>,
1642        params: Option<IndexMap<String, String>>,
1643    ) where
1644        Self: 'static + Debug + Sized,
1645    {
1646        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1647    }
1648
1649    #[cfg(feature = "defi")]
1650    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1651    fn unsubscribe_pool(
1652        &mut self,
1653        instrument_id: InstrumentId,
1654        client_id: Option<ClientId>,
1655        params: Option<IndexMap<String, String>>,
1656    ) where
1657        Self: 'static + Debug + Sized,
1658    {
1659        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1660    }
1661
1662    #[cfg(feature = "defi")]
1663    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1664    fn unsubscribe_pool_swaps(
1665        &mut self,
1666        instrument_id: InstrumentId,
1667        client_id: Option<ClientId>,
1668        params: Option<IndexMap<String, String>>,
1669    ) where
1670        Self: 'static + Debug + Sized,
1671    {
1672        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1673    }
1674
1675    #[cfg(feature = "defi")]
1676    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1677    fn unsubscribe_pool_liquidity_updates(
1678        &mut self,
1679        instrument_id: InstrumentId,
1680        client_id: Option<ClientId>,
1681        params: Option<IndexMap<String, String>>,
1682    ) where
1683        Self: 'static + Debug + Sized,
1684    {
1685        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1686    }
1687
1688    #[cfg(feature = "defi")]
1689    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
1690    fn unsubscribe_pool_fee_collects(
1691        &mut self,
1692        instrument_id: InstrumentId,
1693        client_id: Option<ClientId>,
1694        params: Option<IndexMap<String, String>>,
1695    ) where
1696        Self: 'static + Debug + Sized,
1697    {
1698        DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1699    }
1700
1701    #[cfg(feature = "defi")]
1702    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
1703    fn unsubscribe_pool_flash_events(
1704        &mut self,
1705        instrument_id: InstrumentId,
1706        client_id: Option<ClientId>,
1707        params: Option<IndexMap<String, String>>,
1708    ) where
1709        Self: 'static + Debug + Sized,
1710    {
1711        DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1712    }
1713
1714    /// Request historical custom data of the given `data_type`.
1715    ///
1716    /// # Errors
1717    ///
1718    /// Returns an error if input parameters are invalid.
1719    fn request_data(
1720        &mut self,
1721        data_type: DataType,
1722        client_id: ClientId,
1723        start: Option<DateTime<Utc>>,
1724        end: Option<DateTime<Utc>>,
1725        limit: Option<NonZeroUsize>,
1726        params: Option<IndexMap<String, String>>,
1727    ) -> anyhow::Result<UUID4>
1728    where
1729        Self: 'static + Debug + Sized,
1730    {
1731        let actor_id = self.actor_id().inner();
1732        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1733            move |resp: &CustomDataResponse| {
1734                get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1735            },
1736        )));
1737
1738        DataActorCore::request_data(
1739            self, data_type, client_id, start, end, limit, params, handler,
1740        )
1741    }
1742
1743    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1744    ///
1745    /// # Errors
1746    ///
1747    /// Returns an error if input parameters are invalid.
1748    fn request_instrument(
1749        &mut self,
1750        instrument_id: InstrumentId,
1751        start: Option<DateTime<Utc>>,
1752        end: Option<DateTime<Utc>>,
1753        client_id: Option<ClientId>,
1754        params: Option<IndexMap<String, String>>,
1755    ) -> anyhow::Result<UUID4>
1756    where
1757        Self: 'static + Debug + Sized,
1758    {
1759        let actor_id = self.actor_id().inner();
1760        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1761            move |resp: &InstrumentResponse| {
1762                get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1763            },
1764        )));
1765
1766        DataActorCore::request_instrument(
1767            self,
1768            instrument_id,
1769            start,
1770            end,
1771            client_id,
1772            params,
1773            handler,
1774        )
1775    }
1776
1777    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1778    ///
1779    /// # Errors
1780    ///
1781    /// Returns an error if input parameters are invalid.
1782    fn request_instruments(
1783        &mut self,
1784        venue: Option<Venue>,
1785        start: Option<DateTime<Utc>>,
1786        end: Option<DateTime<Utc>>,
1787        client_id: Option<ClientId>,
1788        params: Option<IndexMap<String, String>>,
1789    ) -> anyhow::Result<UUID4>
1790    where
1791        Self: 'static + Debug + Sized,
1792    {
1793        let actor_id = self.actor_id().inner();
1794        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1795            move |resp: &InstrumentsResponse| {
1796                get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1797            },
1798        )));
1799
1800        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1801    }
1802
1803    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1804    ///
1805    /// # Errors
1806    ///
1807    /// Returns an error if input parameters are invalid.
1808    fn request_book_snapshot(
1809        &mut self,
1810        instrument_id: InstrumentId,
1811        depth: Option<NonZeroUsize>,
1812        client_id: Option<ClientId>,
1813        params: Option<IndexMap<String, String>>,
1814    ) -> anyhow::Result<UUID4>
1815    where
1816        Self: 'static + Debug + Sized,
1817    {
1818        let actor_id = self.actor_id().inner();
1819        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1820            move |resp: &BookResponse| {
1821                get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1822            },
1823        )));
1824
1825        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1826    }
1827
1828    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1829    ///
1830    /// # Errors
1831    ///
1832    /// Returns an error if input parameters are invalid.
1833    fn request_quotes(
1834        &mut self,
1835        instrument_id: InstrumentId,
1836        start: Option<DateTime<Utc>>,
1837        end: Option<DateTime<Utc>>,
1838        limit: Option<NonZeroUsize>,
1839        client_id: Option<ClientId>,
1840        params: Option<IndexMap<String, String>>,
1841    ) -> anyhow::Result<UUID4>
1842    where
1843        Self: 'static + Debug + Sized,
1844    {
1845        let actor_id = self.actor_id().inner();
1846        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1847            move |resp: &QuotesResponse| {
1848                get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
1849            },
1850        )));
1851
1852        DataActorCore::request_quotes(
1853            self,
1854            instrument_id,
1855            start,
1856            end,
1857            limit,
1858            client_id,
1859            params,
1860            handler,
1861        )
1862    }
1863
1864    /// Request historical [`TradeTick`] data for the given `instrument_id`.
1865    ///
1866    /// # Errors
1867    ///
1868    /// Returns an error if input parameters are invalid.
1869    fn request_trades(
1870        &mut self,
1871        instrument_id: InstrumentId,
1872        start: Option<DateTime<Utc>>,
1873        end: Option<DateTime<Utc>>,
1874        limit: Option<NonZeroUsize>,
1875        client_id: Option<ClientId>,
1876        params: Option<IndexMap<String, String>>,
1877    ) -> anyhow::Result<UUID4>
1878    where
1879        Self: 'static + Debug + Sized,
1880    {
1881        let actor_id = self.actor_id().inner();
1882        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1883            move |resp: &TradesResponse| {
1884                get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
1885            },
1886        )));
1887
1888        DataActorCore::request_trades(
1889            self,
1890            instrument_id,
1891            start,
1892            end,
1893            limit,
1894            client_id,
1895            params,
1896            handler,
1897        )
1898    }
1899
1900    /// Request historical [`Bar`] data for the given `bar_type`.
1901    ///
1902    /// # Errors
1903    ///
1904    /// Returns an error if input parameters are invalid.
1905    fn request_bars(
1906        &mut self,
1907        bar_type: BarType,
1908        start: Option<DateTime<Utc>>,
1909        end: Option<DateTime<Utc>>,
1910        limit: Option<NonZeroUsize>,
1911        client_id: Option<ClientId>,
1912        params: Option<IndexMap<String, String>>,
1913    ) -> anyhow::Result<UUID4>
1914    where
1915        Self: 'static + Debug + Sized,
1916    {
1917        let actor_id = self.actor_id().inner();
1918        let handler = ShareableMessageHandler(Rc::new(TypedMessageHandler::from(
1919            move |resp: &BarsResponse| {
1920                get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
1921            },
1922        )));
1923
1924        DataActorCore::request_bars(
1925            self, bar_type, start, end, limit, client_id, params, handler,
1926        )
1927    }
1928}
1929
1930// Blanket implementation: any DataActor automatically implements Actor
1931impl<T> Actor for T
1932where
1933    T: DataActor + Debug + 'static,
1934{
1935    fn id(&self) -> Ustr {
1936        self.actor_id.inner()
1937    }
1938
1939    #[allow(unused_variables)]
1940    fn handle(&mut self, msg: &dyn Any) {
1941        // Default empty implementation - concrete actors can override if needed
1942    }
1943
1944    fn as_any(&self) -> &dyn Any {
1945        self
1946    }
1947}
1948
1949// Blanket implementation: any DataActor automatically implements Component
1950impl<T> Component for T
1951where
1952    T: DataActor + Debug + 'static,
1953{
1954    fn component_id(&self) -> ComponentId {
1955        ComponentId::new(self.actor_id.inner().as_str())
1956    }
1957
1958    fn state(&self) -> ComponentState {
1959        self.state
1960    }
1961
1962    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1963        self.state = self.state.transition(&trigger)?;
1964        log::info!("{}", self.state.variant_name());
1965        Ok(())
1966    }
1967
1968    fn register(
1969        &mut self,
1970        trader_id: TraderId,
1971        clock: Rc<RefCell<dyn Clock>>,
1972        cache: Rc<RefCell<Cache>>,
1973    ) -> anyhow::Result<()> {
1974        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
1975
1976        // Register default time event handler for this actor
1977        let actor_id = self.actor_id().inner();
1978        let callback = TimeEventCallback::from(move |event: TimeEvent| {
1979            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1980                actor.handle_time_event(&event);
1981            } else {
1982                log::error!("Actor {actor_id} not found for time event handling");
1983            }
1984        });
1985
1986        clock.borrow_mut().register_default_handler(callback);
1987
1988        self.initialize()
1989    }
1990
1991    fn on_start(&mut self) -> anyhow::Result<()> {
1992        DataActor::on_start(self)
1993    }
1994
1995    fn on_stop(&mut self) -> anyhow::Result<()> {
1996        DataActor::on_stop(self)
1997    }
1998
1999    fn on_resume(&mut self) -> anyhow::Result<()> {
2000        DataActor::on_resume(self)
2001    }
2002
2003    fn on_degrade(&mut self) -> anyhow::Result<()> {
2004        DataActor::on_degrade(self)
2005    }
2006
2007    fn on_fault(&mut self) -> anyhow::Result<()> {
2008        DataActor::on_fault(self)
2009    }
2010
2011    fn on_reset(&mut self) -> anyhow::Result<()> {
2012        DataActor::on_reset(self)
2013    }
2014
2015    fn on_dispose(&mut self) -> anyhow::Result<()> {
2016        DataActor::on_dispose(self)
2017    }
2018}
2019
2020/// Core functionality for all actors.
2021#[derive(Clone)]
2022#[allow(
2023    dead_code,
2024    reason = "TODO: Under development (pending_requests, signal_classes)"
2025)]
2026pub struct DataActorCore {
2027    /// The actor identifier.
2028    pub actor_id: ActorId,
2029    /// The actors configuration.
2030    pub config: DataActorConfig,
2031    trader_id: Option<TraderId>,
2032    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
2033    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
2034    state: ComponentState,
2035    topic_handlers: AHashMap<MStr<Topic>, ShareableMessageHandler>,
2036    warning_events: AHashSet<String>, // TODO: TBD
2037    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2038    signal_classes: AHashMap<String, String>,
2039    #[cfg(feature = "indicators")]
2040    indicators: Indicators,
2041}
2042
2043impl Debug for DataActorCore {
2044    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2045        f.debug_struct(stringify!(DataActorCore))
2046            .field("actor_id", &self.actor_id)
2047            .field("config", &self.config)
2048            .field("state", &self.state)
2049            .field("trader_id", &self.trader_id)
2050            .finish()
2051    }
2052}
2053
2054impl DataActorCore {
2055    /// Adds a subscription handler for the `topic`.
2056    ///
2057    //// Logs a warning if the actor is already subscribed to the topic.
2058    pub(crate) fn add_subscription(
2059        &mut self,
2060        topic: MStr<Topic>,
2061        handler: ShareableMessageHandler,
2062    ) {
2063        if self.topic_handlers.contains_key(&topic) {
2064            log::warn!(
2065                "Actor {} attempted duplicate subscription to topic '{topic}'",
2066                self.actor_id,
2067            );
2068            return;
2069        }
2070
2071        self.topic_handlers.insert(topic, handler.clone());
2072        msgbus::subscribe_topic(topic, handler, None);
2073    }
2074
2075    /// Removes a subscription handler for the `topic` if present.
2076    ///
2077    /// Logs a warning if the actor is not currently subscribed to the topic.
2078    pub(crate) fn remove_subscription(&mut self, topic: MStr<Topic>) {
2079        if let Some(handler) = self.topic_handlers.remove(&topic) {
2080            msgbus::unsubscribe_topic(topic, handler);
2081        } else {
2082            log::warn!(
2083                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2084                self.actor_id,
2085            );
2086        }
2087    }
2088
2089    /// Creates a new [`DataActorCore`] instance.
2090    pub fn new(config: DataActorConfig) -> Self {
2091        let actor_id = config
2092            .actor_id
2093            .unwrap_or_else(|| Self::default_actor_id(&config));
2094
2095        Self {
2096            actor_id,
2097            config,
2098            trader_id: None, // None until registered
2099            clock: None,     // None until registered
2100            cache: None,     // None until registered
2101            state: ComponentState::default(),
2102            topic_handlers: AHashMap::new(),
2103            warning_events: AHashSet::new(),
2104            pending_requests: AHashMap::new(),
2105            signal_classes: AHashMap::new(),
2106            #[cfg(feature = "indicators")]
2107            indicators: Indicators::default(),
2108        }
2109    }
2110
2111    /// Returns the memory address of this instance as a hexadecimal string.
2112    #[must_use]
2113    pub fn mem_address(&self) -> String {
2114        format!("{self:p}")
2115    }
2116
2117    /// Returns the actors state.
2118    pub fn state(&self) -> ComponentState {
2119        self.state
2120    }
2121
2122    /// Returns the trader ID this actor is registered to.
2123    pub fn trader_id(&self) -> Option<TraderId> {
2124        self.trader_id
2125    }
2126
2127    /// Returns the actors ID.
2128    pub fn actor_id(&self) -> ActorId {
2129        self.actor_id
2130    }
2131
2132    fn default_actor_id(config: &DataActorConfig) -> ActorId {
2133        let memory_address = std::ptr::from_ref(config) as *const _ as usize;
2134        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2135    }
2136
2137    /// Returns a UNIX nanoseconds timestamp from the actor's internal clock.
2138    pub fn timestamp_ns(&self) -> UnixNanos {
2139        self.clock_ref().timestamp_ns()
2140    }
2141
2142    /// Returns the clock for the actor (if registered).
2143    ///
2144    /// # Panics
2145    ///
2146    /// Panics if the actor has not been registered with a trader.
2147    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2148        self.clock
2149            .as_ref()
2150            .unwrap_or_else(|| {
2151                panic!(
2152                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2153                    self.actor_id, self.trader_id
2154                )
2155            })
2156            .borrow_mut()
2157    }
2158
2159    /// Returns a clone of the reference-counted clock.
2160    ///
2161    /// # Panics
2162    ///
2163    /// Panics if the actor has not yet been registered (clock is `None`).
2164    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2165        self.clock
2166            .as_ref()
2167            .expect("DataActor must be registered before accessing clock")
2168            .clone()
2169    }
2170
2171    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2172        self.clock
2173            .as_ref()
2174            .unwrap_or_else(|| {
2175                panic!(
2176                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2177                    self.actor_id, self.trader_id
2178                )
2179            })
2180            .borrow()
2181    }
2182
2183    /// Returns a read-only reference to the cache.
2184    ///
2185    /// # Panics
2186    ///
2187    /// Panics if the actor has not yet been registered (cache is `None`).
2188    pub fn cache(&self) -> Ref<'_, Cache> {
2189        self.cache
2190            .as_ref()
2191            .expect("DataActor must be registered before accessing cache")
2192            .borrow()
2193    }
2194
2195    /// Returns a clone of the reference-counted cache.
2196    ///
2197    /// # Panics
2198    ///
2199    /// Panics if the actor has not yet been registered (cache is `None`).
2200    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2201        self.cache
2202            .as_ref()
2203            .expect("DataActor must be registered before accessing cache")
2204            .clone()
2205    }
2206
2207    // -- REGISTRATION ----------------------------------------------------------------------------
2208
2209    /// Register the data actor with a trader.
2210    ///
2211    /// # Errors
2212    ///
2213    /// Returns an error if the actor has already been registered with a trader
2214    /// or if the provided dependencies are invalid.
2215    pub fn register(
2216        &mut self,
2217        trader_id: TraderId,
2218        clock: Rc<RefCell<dyn Clock>>,
2219        cache: Rc<RefCell<Cache>>,
2220    ) -> anyhow::Result<()> {
2221        if let Some(existing_trader_id) = self.trader_id {
2222            anyhow::bail!(
2223                "DataActor {} already registered with trader {existing_trader_id}",
2224                self.actor_id
2225            );
2226        }
2227
2228        // Validate clock by attempting to access it
2229        {
2230            let _timestamp = clock.borrow().timestamp_ns();
2231        }
2232
2233        // Validate cache by attempting to access it
2234        {
2235            let _cache_borrow = cache.borrow();
2236        }
2237
2238        self.trader_id = Some(trader_id);
2239        self.clock = Some(clock);
2240        self.cache = Some(cache);
2241
2242        // Verify complete registration
2243        if !self.is_properly_registered() {
2244            anyhow::bail!(
2245                "DataActor {} registration incomplete - validation failed",
2246                self.actor_id
2247            );
2248        }
2249
2250        log::debug!("Registered '{}' with trader {trader_id}", self.actor_id);
2251        Ok(())
2252    }
2253
2254    /// Register an event type for warning log levels.
2255    pub fn register_warning_event(&mut self, event_type: &str) {
2256        self.warning_events.insert(event_type.to_string());
2257        log::debug!("Registered event type '{event_type}' for warning logs");
2258    }
2259
2260    /// Deregister an event type from warning log levels.
2261    pub fn deregister_warning_event(&mut self, event_type: &str) {
2262        self.warning_events.remove(event_type);
2263        log::debug!("Deregistered event type '{event_type}' from warning logs");
2264    }
2265
2266    pub fn is_registered(&self) -> bool {
2267        self.trader_id.is_some()
2268    }
2269
2270    pub(crate) fn check_registered(&self) {
2271        assert!(
2272            self.is_registered(),
2273            "Actor has not been registered with a Trader"
2274        );
2275    }
2276
2277    /// Validates registration state without panicking.
2278    fn is_properly_registered(&self) -> bool {
2279        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2280    }
2281
2282    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2283        if self.config.log_commands {
2284            log::info!("{CMD}{SEND} {command:?}");
2285        }
2286
2287        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2288        msgbus::send_any(endpoint, command.as_any());
2289    }
2290
2291    #[allow(dead_code)]
2292    fn send_data_req(&self, request: RequestCommand) {
2293        if self.config.log_commands {
2294            log::info!("{REQ}{SEND} {request:?}");
2295        }
2296
2297        // For now, simplified approach - data requests without dynamic handlers
2298        // TODO: Implement proper dynamic dispatch for response handlers
2299        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
2300        msgbus::send_any(endpoint, request.as_any());
2301    }
2302
2303    /// Sends a shutdown command to the system with an optional reason.
2304    ///
2305    /// # Panics
2306    ///
2307    /// Panics if the actor is not registered or has no trader ID.
2308    pub fn shutdown_system(&self, reason: Option<String>) {
2309        self.check_registered();
2310
2311        // SAFETY: Checked registered before unwrapping trader ID
2312        let command = ShutdownSystem::new(
2313            self.trader_id().unwrap(),
2314            self.actor_id.inner(),
2315            reason,
2316            UUID4::new(),
2317            self.timestamp_ns(),
2318        );
2319
2320        let endpoint = "command.system.shutdown".into();
2321        msgbus::send_any(endpoint, command.as_any());
2322    }
2323
2324    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
2325
2326    /// Helper method for registering data subscriptions from the trait.
2327    ///
2328    /// # Panics
2329    ///
2330    /// Panics if the actor is not properly registered.
2331    pub fn subscribe_data(
2332        &mut self,
2333        handler: ShareableMessageHandler,
2334        data_type: DataType,
2335        client_id: Option<ClientId>,
2336        params: Option<IndexMap<String, String>>,
2337    ) {
2338        if !self.is_properly_registered() {
2339            panic!(
2340                "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
2341                self.actor_id,
2342                self.trader_id,
2343                self.clock.is_some(),
2344                self.cache.is_some()
2345            );
2346        }
2347
2348        let topic = get_custom_topic(&data_type);
2349        self.add_subscription(topic, handler);
2350
2351        // If no client ID specified, just subscribe to the topic
2352        if client_id.is_none() {
2353            return;
2354        }
2355
2356        let command = SubscribeCommand::Data(SubscribeCustomData {
2357            data_type,
2358            client_id,
2359            venue: None,
2360            command_id: UUID4::new(),
2361            ts_init: self.timestamp_ns(),
2362            params,
2363        });
2364
2365        self.send_data_cmd(DataCommand::Subscribe(command));
2366    }
2367
2368    /// Helper method for registering quotes subscriptions from the trait.
2369    pub fn subscribe_quotes(
2370        &mut self,
2371        topic: MStr<Topic>,
2372        handler: ShareableMessageHandler,
2373        instrument_id: InstrumentId,
2374        client_id: Option<ClientId>,
2375        params: Option<IndexMap<String, String>>,
2376    ) {
2377        self.check_registered();
2378
2379        self.add_subscription(topic, handler);
2380
2381        let command = SubscribeCommand::Quotes(SubscribeQuotes {
2382            instrument_id,
2383            client_id,
2384            venue: Some(instrument_id.venue),
2385            command_id: UUID4::new(),
2386            ts_init: self.timestamp_ns(),
2387            params,
2388        });
2389
2390        self.send_data_cmd(DataCommand::Subscribe(command));
2391    }
2392
2393    /// Helper method for registering instruments subscriptions from the trait.
2394    pub fn subscribe_instruments(
2395        &mut self,
2396        topic: MStr<Topic>,
2397        handler: ShareableMessageHandler,
2398        venue: Venue,
2399        client_id: Option<ClientId>,
2400        params: Option<IndexMap<String, String>>,
2401    ) {
2402        self.check_registered();
2403
2404        self.add_subscription(topic, handler);
2405
2406        let command = SubscribeCommand::Instruments(SubscribeInstruments {
2407            client_id,
2408            venue,
2409            command_id: UUID4::new(),
2410            ts_init: self.timestamp_ns(),
2411            params,
2412        });
2413
2414        self.send_data_cmd(DataCommand::Subscribe(command));
2415    }
2416
2417    /// Helper method for registering instrument subscriptions from the trait.
2418    pub fn subscribe_instrument(
2419        &mut self,
2420        topic: MStr<Topic>,
2421        handler: ShareableMessageHandler,
2422        instrument_id: InstrumentId,
2423        client_id: Option<ClientId>,
2424        params: Option<IndexMap<String, String>>,
2425    ) {
2426        self.check_registered();
2427
2428        self.add_subscription(topic, handler);
2429
2430        let command = SubscribeCommand::Instrument(SubscribeInstrument {
2431            instrument_id,
2432            client_id,
2433            venue: Some(instrument_id.venue),
2434            command_id: UUID4::new(),
2435            ts_init: self.timestamp_ns(),
2436            params,
2437        });
2438
2439        self.send_data_cmd(DataCommand::Subscribe(command));
2440    }
2441
2442    /// Helper method for registering book deltas subscriptions from the trait.
2443    #[allow(clippy::too_many_arguments)]
2444    pub fn subscribe_book_deltas(
2445        &mut self,
2446        topic: MStr<Topic>,
2447        handler: ShareableMessageHandler,
2448        instrument_id: InstrumentId,
2449        book_type: BookType,
2450        depth: Option<NonZeroUsize>,
2451        client_id: Option<ClientId>,
2452        managed: bool,
2453        params: Option<IndexMap<String, String>>,
2454    ) {
2455        self.check_registered();
2456
2457        self.add_subscription(topic, handler);
2458
2459        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
2460            instrument_id,
2461            book_type,
2462            client_id,
2463            venue: Some(instrument_id.venue),
2464            command_id: UUID4::new(),
2465            ts_init: self.timestamp_ns(),
2466            depth,
2467            managed,
2468            params,
2469        });
2470
2471        self.send_data_cmd(DataCommand::Subscribe(command));
2472    }
2473
2474    /// Helper method for registering book snapshots subscriptions from the trait.
2475    #[allow(clippy::too_many_arguments)]
2476    pub fn subscribe_book_at_interval(
2477        &mut self,
2478        topic: MStr<Topic>,
2479        handler: ShareableMessageHandler,
2480        instrument_id: InstrumentId,
2481        book_type: BookType,
2482        depth: Option<NonZeroUsize>,
2483        interval_ms: NonZeroUsize,
2484        client_id: Option<ClientId>,
2485        params: Option<IndexMap<String, String>>,
2486    ) {
2487        self.check_registered();
2488
2489        self.add_subscription(topic, handler);
2490
2491        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
2492            instrument_id,
2493            book_type,
2494            client_id,
2495            venue: Some(instrument_id.venue),
2496            command_id: UUID4::new(),
2497            ts_init: self.timestamp_ns(),
2498            depth,
2499            interval_ms,
2500            params,
2501        });
2502
2503        self.send_data_cmd(DataCommand::Subscribe(command));
2504    }
2505
2506    /// Helper method for registering trades subscriptions from the trait.
2507    pub fn subscribe_trades(
2508        &mut self,
2509        topic: MStr<Topic>,
2510        handler: ShareableMessageHandler,
2511        instrument_id: InstrumentId,
2512        client_id: Option<ClientId>,
2513        params: Option<IndexMap<String, String>>,
2514    ) {
2515        self.check_registered();
2516
2517        self.add_subscription(topic, handler);
2518
2519        let command = SubscribeCommand::Trades(SubscribeTrades {
2520            instrument_id,
2521            client_id,
2522            venue: Some(instrument_id.venue),
2523            command_id: UUID4::new(),
2524            ts_init: self.timestamp_ns(),
2525            params,
2526        });
2527
2528        self.send_data_cmd(DataCommand::Subscribe(command));
2529    }
2530
2531    /// Helper method for registering bars subscriptions from the trait.
2532    pub fn subscribe_bars(
2533        &mut self,
2534        topic: MStr<Topic>,
2535        handler: ShareableMessageHandler,
2536        bar_type: BarType,
2537        client_id: Option<ClientId>,
2538        params: Option<IndexMap<String, String>>,
2539    ) {
2540        self.check_registered();
2541
2542        self.add_subscription(topic, handler);
2543
2544        let command = SubscribeCommand::Bars(SubscribeBars {
2545            bar_type,
2546            client_id,
2547            venue: Some(bar_type.instrument_id().venue),
2548            command_id: UUID4::new(),
2549            ts_init: self.timestamp_ns(),
2550            params,
2551        });
2552
2553        self.send_data_cmd(DataCommand::Subscribe(command));
2554    }
2555
2556    /// Helper method for registering mark prices subscriptions from the trait.
2557    pub fn subscribe_mark_prices(
2558        &mut self,
2559        topic: MStr<Topic>,
2560        handler: ShareableMessageHandler,
2561        instrument_id: InstrumentId,
2562        client_id: Option<ClientId>,
2563        params: Option<IndexMap<String, String>>,
2564    ) {
2565        self.check_registered();
2566
2567        self.add_subscription(topic, handler);
2568
2569        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
2570            instrument_id,
2571            client_id,
2572            venue: Some(instrument_id.venue),
2573            command_id: UUID4::new(),
2574            ts_init: self.timestamp_ns(),
2575            params,
2576        });
2577
2578        self.send_data_cmd(DataCommand::Subscribe(command));
2579    }
2580
2581    /// Helper method for registering index prices subscriptions from the trait.
2582    pub fn subscribe_index_prices(
2583        &mut self,
2584        topic: MStr<Topic>,
2585        handler: ShareableMessageHandler,
2586        instrument_id: InstrumentId,
2587        client_id: Option<ClientId>,
2588        params: Option<IndexMap<String, String>>,
2589    ) {
2590        self.check_registered();
2591
2592        self.add_subscription(topic, handler);
2593
2594        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
2595            instrument_id,
2596            client_id,
2597            venue: Some(instrument_id.venue),
2598            command_id: UUID4::new(),
2599            ts_init: self.timestamp_ns(),
2600            params,
2601        });
2602
2603        self.send_data_cmd(DataCommand::Subscribe(command));
2604    }
2605
2606    /// Helper method for registering funding rates subscriptions from the trait.
2607    pub fn subscribe_funding_rates(
2608        &mut self,
2609        topic: MStr<Topic>,
2610        handler: ShareableMessageHandler,
2611        instrument_id: InstrumentId,
2612        client_id: Option<ClientId>,
2613        params: Option<IndexMap<String, String>>,
2614    ) {
2615        self.check_registered();
2616
2617        self.add_subscription(topic, handler);
2618
2619        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
2620            instrument_id,
2621            client_id,
2622            venue: Some(instrument_id.venue),
2623            command_id: UUID4::new(),
2624            ts_init: self.timestamp_ns(),
2625            params,
2626        });
2627
2628        self.send_data_cmd(DataCommand::Subscribe(command));
2629    }
2630
2631    /// Helper method for registering instrument status subscriptions from the trait.
2632    pub fn subscribe_instrument_status(
2633        &mut self,
2634        topic: MStr<Topic>,
2635        handler: ShareableMessageHandler,
2636        instrument_id: InstrumentId,
2637        client_id: Option<ClientId>,
2638        params: Option<IndexMap<String, String>>,
2639    ) {
2640        self.check_registered();
2641
2642        self.add_subscription(topic, handler);
2643
2644        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
2645            instrument_id,
2646            client_id,
2647            venue: Some(instrument_id.venue),
2648            command_id: UUID4::new(),
2649            ts_init: self.timestamp_ns(),
2650            params,
2651        });
2652
2653        self.send_data_cmd(DataCommand::Subscribe(command));
2654    }
2655
2656    /// Helper method for registering instrument close subscriptions from the trait.
2657    pub fn subscribe_instrument_close(
2658        &mut self,
2659        topic: MStr<Topic>,
2660        handler: ShareableMessageHandler,
2661        instrument_id: InstrumentId,
2662        client_id: Option<ClientId>,
2663        params: Option<IndexMap<String, String>>,
2664    ) {
2665        self.check_registered();
2666
2667        self.add_subscription(topic, handler);
2668
2669        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
2670            instrument_id,
2671            client_id,
2672            venue: Some(instrument_id.venue),
2673            command_id: UUID4::new(),
2674            ts_init: self.timestamp_ns(),
2675            params,
2676        });
2677
2678        self.send_data_cmd(DataCommand::Subscribe(command));
2679    }
2680
2681    /// Helper method for registering order fills subscriptions from the trait.
2682    pub fn subscribe_order_fills(&mut self, topic: MStr<Topic>, handler: ShareableMessageHandler) {
2683        self.check_registered();
2684        self.add_subscription(topic, handler);
2685    }
2686
2687    /// Helper method for registering order cancels subscriptions from the trait.
2688    pub fn subscribe_order_cancels(
2689        &mut self,
2690        topic: MStr<Topic>,
2691        handler: ShareableMessageHandler,
2692    ) {
2693        self.check_registered();
2694        self.add_subscription(topic, handler);
2695    }
2696
2697    /// Helper method for unsubscribing from data.
2698    pub fn unsubscribe_data(
2699        &mut self,
2700        data_type: DataType,
2701        client_id: Option<ClientId>,
2702        params: Option<IndexMap<String, String>>,
2703    ) {
2704        self.check_registered();
2705
2706        let topic = get_custom_topic(&data_type);
2707        self.remove_subscription(topic);
2708
2709        if client_id.is_none() {
2710            return;
2711        }
2712
2713        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
2714            data_type,
2715            client_id,
2716            venue: None,
2717            command_id: UUID4::new(),
2718            ts_init: self.timestamp_ns(),
2719            params,
2720        });
2721
2722        self.send_data_cmd(DataCommand::Unsubscribe(command));
2723    }
2724
2725    /// Helper method for unsubscribing from instruments.
2726    pub fn unsubscribe_instruments(
2727        &mut self,
2728        venue: Venue,
2729        client_id: Option<ClientId>,
2730        params: Option<IndexMap<String, String>>,
2731    ) {
2732        self.check_registered();
2733
2734        let topic = get_instruments_topic(venue);
2735        self.remove_subscription(topic);
2736
2737        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
2738            client_id,
2739            venue,
2740            command_id: UUID4::new(),
2741            ts_init: self.timestamp_ns(),
2742            params,
2743        });
2744
2745        self.send_data_cmd(DataCommand::Unsubscribe(command));
2746    }
2747
2748    /// Helper method for unsubscribing from instrument.
2749    pub fn unsubscribe_instrument(
2750        &mut self,
2751        instrument_id: InstrumentId,
2752        client_id: Option<ClientId>,
2753        params: Option<IndexMap<String, String>>,
2754    ) {
2755        self.check_registered();
2756
2757        let topic = get_instrument_topic(instrument_id);
2758        self.remove_subscription(topic);
2759
2760        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
2761            instrument_id,
2762            client_id,
2763            venue: Some(instrument_id.venue),
2764            command_id: UUID4::new(),
2765            ts_init: self.timestamp_ns(),
2766            params,
2767        });
2768
2769        self.send_data_cmd(DataCommand::Unsubscribe(command));
2770    }
2771
2772    /// Helper method for unsubscribing from book deltas.
2773    pub fn unsubscribe_book_deltas(
2774        &mut self,
2775        instrument_id: InstrumentId,
2776        client_id: Option<ClientId>,
2777        params: Option<IndexMap<String, String>>,
2778    ) {
2779        self.check_registered();
2780
2781        let topic = get_book_deltas_topic(instrument_id);
2782        self.remove_subscription(topic);
2783
2784        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
2785            instrument_id,
2786            client_id,
2787            venue: Some(instrument_id.venue),
2788            command_id: UUID4::new(),
2789            ts_init: self.timestamp_ns(),
2790            params,
2791        });
2792
2793        self.send_data_cmd(DataCommand::Unsubscribe(command));
2794    }
2795
2796    /// Helper method for unsubscribing from book snapshots at interval.
2797    pub fn unsubscribe_book_at_interval(
2798        &mut self,
2799        instrument_id: InstrumentId,
2800        interval_ms: NonZeroUsize,
2801        client_id: Option<ClientId>,
2802        params: Option<IndexMap<String, String>>,
2803    ) {
2804        self.check_registered();
2805
2806        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
2807        self.remove_subscription(topic);
2808
2809        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
2810            instrument_id,
2811            client_id,
2812            venue: Some(instrument_id.venue),
2813            command_id: UUID4::new(),
2814            ts_init: self.timestamp_ns(),
2815            params,
2816        });
2817
2818        self.send_data_cmd(DataCommand::Unsubscribe(command));
2819    }
2820
2821    /// Helper method for unsubscribing from quotes.
2822    pub fn unsubscribe_quotes(
2823        &mut self,
2824        instrument_id: InstrumentId,
2825        client_id: Option<ClientId>,
2826        params: Option<IndexMap<String, String>>,
2827    ) {
2828        self.check_registered();
2829
2830        let topic = get_quotes_topic(instrument_id);
2831        self.remove_subscription(topic);
2832
2833        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
2834            instrument_id,
2835            client_id,
2836            venue: Some(instrument_id.venue),
2837            command_id: UUID4::new(),
2838            ts_init: self.timestamp_ns(),
2839            params,
2840        });
2841
2842        self.send_data_cmd(DataCommand::Unsubscribe(command));
2843    }
2844
2845    /// Helper method for unsubscribing from trades.
2846    pub fn unsubscribe_trades(
2847        &mut self,
2848        instrument_id: InstrumentId,
2849        client_id: Option<ClientId>,
2850        params: Option<IndexMap<String, String>>,
2851    ) {
2852        self.check_registered();
2853
2854        let topic = get_trades_topic(instrument_id);
2855        self.remove_subscription(topic);
2856
2857        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
2858            instrument_id,
2859            client_id,
2860            venue: Some(instrument_id.venue),
2861            command_id: UUID4::new(),
2862            ts_init: self.timestamp_ns(),
2863            params,
2864        });
2865
2866        self.send_data_cmd(DataCommand::Unsubscribe(command));
2867    }
2868
2869    /// Helper method for unsubscribing from bars.
2870    pub fn unsubscribe_bars(
2871        &mut self,
2872        bar_type: BarType,
2873        client_id: Option<ClientId>,
2874        params: Option<IndexMap<String, String>>,
2875    ) {
2876        self.check_registered();
2877
2878        let topic = get_bars_topic(bar_type);
2879        self.remove_subscription(topic);
2880
2881        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
2882            bar_type,
2883            client_id,
2884            venue: Some(bar_type.instrument_id().venue),
2885            command_id: UUID4::new(),
2886            ts_init: self.timestamp_ns(),
2887            params,
2888        });
2889
2890        self.send_data_cmd(DataCommand::Unsubscribe(command));
2891    }
2892
2893    /// Helper method for unsubscribing from mark prices.
2894    pub fn unsubscribe_mark_prices(
2895        &mut self,
2896        instrument_id: InstrumentId,
2897        client_id: Option<ClientId>,
2898        params: Option<IndexMap<String, String>>,
2899    ) {
2900        self.check_registered();
2901
2902        let topic = get_mark_price_topic(instrument_id);
2903        self.remove_subscription(topic);
2904
2905        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
2906            instrument_id,
2907            client_id,
2908            venue: Some(instrument_id.venue),
2909            command_id: UUID4::new(),
2910            ts_init: self.timestamp_ns(),
2911            params,
2912        });
2913
2914        self.send_data_cmd(DataCommand::Unsubscribe(command));
2915    }
2916
2917    /// Helper method for unsubscribing from index prices.
2918    pub fn unsubscribe_index_prices(
2919        &mut self,
2920        instrument_id: InstrumentId,
2921        client_id: Option<ClientId>,
2922        params: Option<IndexMap<String, String>>,
2923    ) {
2924        self.check_registered();
2925
2926        let topic = get_index_price_topic(instrument_id);
2927        self.remove_subscription(topic);
2928
2929        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
2930            instrument_id,
2931            client_id,
2932            venue: Some(instrument_id.venue),
2933            command_id: UUID4::new(),
2934            ts_init: self.timestamp_ns(),
2935            params,
2936        });
2937
2938        self.send_data_cmd(DataCommand::Unsubscribe(command));
2939    }
2940
2941    /// Helper method for unsubscribing from funding rates.
2942    pub fn unsubscribe_funding_rates(
2943        &mut self,
2944        instrument_id: InstrumentId,
2945        client_id: Option<ClientId>,
2946        params: Option<IndexMap<String, String>>,
2947    ) {
2948        self.check_registered();
2949
2950        let topic = get_funding_rate_topic(instrument_id);
2951        self.remove_subscription(topic);
2952
2953        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
2954            instrument_id,
2955            client_id,
2956            venue: Some(instrument_id.venue),
2957            command_id: UUID4::new(),
2958            ts_init: self.timestamp_ns(),
2959            params,
2960        });
2961
2962        self.send_data_cmd(DataCommand::Unsubscribe(command));
2963    }
2964
2965    /// Helper method for unsubscribing from instrument status.
2966    pub fn unsubscribe_instrument_status(
2967        &mut self,
2968        instrument_id: InstrumentId,
2969        client_id: Option<ClientId>,
2970        params: Option<IndexMap<String, String>>,
2971    ) {
2972        self.check_registered();
2973
2974        let topic = get_instrument_status_topic(instrument_id);
2975        self.remove_subscription(topic);
2976
2977        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
2978            instrument_id,
2979            client_id,
2980            venue: Some(instrument_id.venue),
2981            command_id: UUID4::new(),
2982            ts_init: self.timestamp_ns(),
2983            params,
2984        });
2985
2986        self.send_data_cmd(DataCommand::Unsubscribe(command));
2987    }
2988
2989    /// Helper method for unsubscribing from instrument close.
2990    pub fn unsubscribe_instrument_close(
2991        &mut self,
2992        instrument_id: InstrumentId,
2993        client_id: Option<ClientId>,
2994        params: Option<IndexMap<String, String>>,
2995    ) {
2996        self.check_registered();
2997
2998        let topic = get_instrument_close_topic(instrument_id);
2999        self.remove_subscription(topic);
3000
3001        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3002            instrument_id,
3003            client_id,
3004            venue: Some(instrument_id.venue),
3005            command_id: UUID4::new(),
3006            ts_init: self.timestamp_ns(),
3007            params,
3008        });
3009
3010        self.send_data_cmd(DataCommand::Unsubscribe(command));
3011    }
3012
3013    /// Helper method for unsubscribing from order fills.
3014    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3015        self.check_registered();
3016
3017        let topic = get_order_fills_topic(instrument_id);
3018        self.remove_subscription(topic);
3019    }
3020
3021    /// Helper method for unsubscribing from order cancels.
3022    pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3023        self.check_registered();
3024
3025        let topic = get_order_cancels_topic(instrument_id);
3026        self.remove_subscription(topic);
3027    }
3028
3029    /// Helper method for requesting data.
3030    ///
3031    /// # Errors
3032    ///
3033    /// Returns an error if input parameters are invalid.
3034    #[allow(clippy::too_many_arguments)]
3035    pub fn request_data(
3036        &self,
3037        data_type: DataType,
3038        client_id: ClientId,
3039        start: Option<DateTime<Utc>>,
3040        end: Option<DateTime<Utc>>,
3041        limit: Option<NonZeroUsize>,
3042        params: Option<IndexMap<String, String>>,
3043        handler: ShareableMessageHandler,
3044    ) -> anyhow::Result<UUID4> {
3045        self.check_registered();
3046
3047        let now = self.clock_ref().utc_now();
3048        check_timestamps(now, start, end)?;
3049
3050        let request_id = UUID4::new();
3051        let command = RequestCommand::Data(RequestCustomData {
3052            client_id,
3053            data_type,
3054            start,
3055            end,
3056            limit,
3057            request_id,
3058            ts_init: self.timestamp_ns(),
3059            params,
3060        });
3061
3062        get_message_bus()
3063            .borrow_mut()
3064            .register_response_handler(command.request_id(), handler)?;
3065
3066        self.send_data_cmd(DataCommand::Request(command));
3067
3068        Ok(request_id)
3069    }
3070
3071    /// Helper method for requesting instrument.
3072    ///
3073    /// # Errors
3074    ///
3075    /// Returns an error if input parameters are invalid.
3076    pub fn request_instrument(
3077        &self,
3078        instrument_id: InstrumentId,
3079        start: Option<DateTime<Utc>>,
3080        end: Option<DateTime<Utc>>,
3081        client_id: Option<ClientId>,
3082        params: Option<IndexMap<String, String>>,
3083        handler: ShareableMessageHandler,
3084    ) -> anyhow::Result<UUID4> {
3085        self.check_registered();
3086
3087        let now = self.clock_ref().utc_now();
3088        check_timestamps(now, start, end)?;
3089
3090        let request_id = UUID4::new();
3091        let command = RequestCommand::Instrument(RequestInstrument {
3092            instrument_id,
3093            start,
3094            end,
3095            client_id,
3096            request_id,
3097            ts_init: now.into(),
3098            params,
3099        });
3100
3101        get_message_bus()
3102            .borrow_mut()
3103            .register_response_handler(command.request_id(), handler)?;
3104
3105        self.send_data_cmd(DataCommand::Request(command));
3106
3107        Ok(request_id)
3108    }
3109
3110    /// Helper method for requesting instruments.
3111    ///
3112    /// # Errors
3113    ///
3114    /// Returns an error if input parameters are invalid.
3115    pub fn request_instruments(
3116        &self,
3117        venue: Option<Venue>,
3118        start: Option<DateTime<Utc>>,
3119        end: Option<DateTime<Utc>>,
3120        client_id: Option<ClientId>,
3121        params: Option<IndexMap<String, String>>,
3122        handler: ShareableMessageHandler,
3123    ) -> anyhow::Result<UUID4> {
3124        self.check_registered();
3125
3126        let now = self.clock_ref().utc_now();
3127        check_timestamps(now, start, end)?;
3128
3129        let request_id = UUID4::new();
3130        let command = RequestCommand::Instruments(RequestInstruments {
3131            venue,
3132            start,
3133            end,
3134            client_id,
3135            request_id,
3136            ts_init: now.into(),
3137            params,
3138        });
3139
3140        get_message_bus()
3141            .borrow_mut()
3142            .register_response_handler(command.request_id(), handler)?;
3143
3144        self.send_data_cmd(DataCommand::Request(command));
3145
3146        Ok(request_id)
3147    }
3148
3149    /// Helper method for requesting book snapshot.
3150    ///
3151    /// # Errors
3152    ///
3153    /// Returns an error if input parameters are invalid.
3154    pub fn request_book_snapshot(
3155        &self,
3156        instrument_id: InstrumentId,
3157        depth: Option<NonZeroUsize>,
3158        client_id: Option<ClientId>,
3159        params: Option<IndexMap<String, String>>,
3160        handler: ShareableMessageHandler,
3161    ) -> anyhow::Result<UUID4> {
3162        self.check_registered();
3163
3164        let request_id = UUID4::new();
3165        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
3166            instrument_id,
3167            depth,
3168            client_id,
3169            request_id,
3170            ts_init: self.timestamp_ns(),
3171            params,
3172        });
3173
3174        get_message_bus()
3175            .borrow_mut()
3176            .register_response_handler(command.request_id(), handler)?;
3177
3178        self.send_data_cmd(DataCommand::Request(command));
3179
3180        Ok(request_id)
3181    }
3182
3183    /// Helper method for requesting quotes.
3184    ///
3185    /// # Errors
3186    ///
3187    /// Returns an error if input parameters are invalid.
3188    #[allow(clippy::too_many_arguments)]
3189    pub fn request_quotes(
3190        &self,
3191        instrument_id: InstrumentId,
3192        start: Option<DateTime<Utc>>,
3193        end: Option<DateTime<Utc>>,
3194        limit: Option<NonZeroUsize>,
3195        client_id: Option<ClientId>,
3196        params: Option<IndexMap<String, String>>,
3197        handler: ShareableMessageHandler,
3198    ) -> anyhow::Result<UUID4> {
3199        self.check_registered();
3200
3201        let now = self.clock_ref().utc_now();
3202        check_timestamps(now, start, end)?;
3203
3204        let request_id = UUID4::new();
3205        let command = RequestCommand::Quotes(RequestQuotes {
3206            instrument_id,
3207            start,
3208            end,
3209            limit,
3210            client_id,
3211            request_id,
3212            ts_init: now.into(),
3213            params,
3214        });
3215
3216        get_message_bus()
3217            .borrow_mut()
3218            .register_response_handler(command.request_id(), handler)?;
3219
3220        self.send_data_cmd(DataCommand::Request(command));
3221
3222        Ok(request_id)
3223    }
3224
3225    /// Helper method for requesting trades.
3226    ///
3227    /// # Errors
3228    ///
3229    /// Returns an error if input parameters are invalid.
3230    #[allow(clippy::too_many_arguments)]
3231    pub fn request_trades(
3232        &self,
3233        instrument_id: InstrumentId,
3234        start: Option<DateTime<Utc>>,
3235        end: Option<DateTime<Utc>>,
3236        limit: Option<NonZeroUsize>,
3237        client_id: Option<ClientId>,
3238        params: Option<IndexMap<String, String>>,
3239        handler: ShareableMessageHandler,
3240    ) -> anyhow::Result<UUID4> {
3241        self.check_registered();
3242
3243        let now = self.clock_ref().utc_now();
3244        check_timestamps(now, start, end)?;
3245
3246        let request_id = UUID4::new();
3247        let command = RequestCommand::Trades(RequestTrades {
3248            instrument_id,
3249            start,
3250            end,
3251            limit,
3252            client_id,
3253            request_id,
3254            ts_init: now.into(),
3255            params,
3256        });
3257
3258        get_message_bus()
3259            .borrow_mut()
3260            .register_response_handler(command.request_id(), handler)?;
3261
3262        self.send_data_cmd(DataCommand::Request(command));
3263
3264        Ok(request_id)
3265    }
3266
3267    /// Helper method for requesting bars.
3268    ///
3269    /// # Errors
3270    ///
3271    /// Returns an error if input parameters are invalid.
3272    #[allow(clippy::too_many_arguments)]
3273    pub fn request_bars(
3274        &self,
3275        bar_type: BarType,
3276        start: Option<DateTime<Utc>>,
3277        end: Option<DateTime<Utc>>,
3278        limit: Option<NonZeroUsize>,
3279        client_id: Option<ClientId>,
3280        params: Option<IndexMap<String, String>>,
3281        handler: ShareableMessageHandler,
3282    ) -> anyhow::Result<UUID4> {
3283        self.check_registered();
3284
3285        let now = self.clock_ref().utc_now();
3286        check_timestamps(now, start, end)?;
3287
3288        let request_id = UUID4::new();
3289        let command = RequestCommand::Bars(RequestBars {
3290            bar_type,
3291            start,
3292            end,
3293            limit,
3294            client_id,
3295            request_id,
3296            ts_init: now.into(),
3297            params,
3298        });
3299
3300        get_message_bus()
3301            .borrow_mut()
3302            .register_response_handler(command.request_id(), handler)?;
3303
3304        self.send_data_cmd(DataCommand::Request(command));
3305
3306        Ok(request_id)
3307    }
3308}
3309
3310fn check_timestamps(
3311    now: DateTime<Utc>,
3312    start: Option<DateTime<Utc>>,
3313    end: Option<DateTime<Utc>>,
3314) -> anyhow::Result<()> {
3315    if let Some(start) = start {
3316        check_predicate_true(start <= now, "start was > now")?;
3317    }
3318    if let Some(end) = end {
3319        check_predicate_true(end <= now, "end was > now")?;
3320    }
3321
3322    if let (Some(start), Some(end)) = (start, end) {
3323        check_predicate_true(start < end, "start was >= end")?;
3324    }
3325
3326    Ok(())
3327}
3328
3329fn log_error(e: &anyhow::Error) {
3330    log::error!("{e}");
3331}
3332
3333fn log_not_running<T>(msg: &T)
3334where
3335    T: Debug,
3336{
3337    // TODO: Potentially temporary for development? drop level at some stage
3338    log::warn!("Received message when not running - skipping {msg:?}");
3339}
3340
3341fn log_received<T>(msg: &T)
3342where
3343    T: Debug,
3344{
3345    log::debug!("{RECV} {msg:?}");
3346}