1use std::{
22 any::Any,
23 fmt::{Debug, Display},
24 ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29 RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
30 RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
31 SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeIndexPrices,
32 SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
33 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
34 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
35 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
36 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
37 UnsubscribeTrades,
38};
39use nautilus_model::{
40 data::{BarType, DataType},
41 identifiers::{ClientId, InstrumentId, Venue},
42};
43
44#[async_trait::async_trait]
46pub trait DataClient: Any + Sync + Send {
47 fn client_id(&self) -> ClientId;
49
50 fn venue(&self) -> Option<Venue>;
52
53 fn start(&self) -> anyhow::Result<()>;
59
60 fn stop(&self) -> anyhow::Result<()>;
66
67 fn reset(&self) -> anyhow::Result<()>;
73
74 fn dispose(&self) -> anyhow::Result<()>;
80
81 async fn connect(&self) -> anyhow::Result<()>;
87
88 async fn disconnect(&self) -> anyhow::Result<()>;
94
95 fn is_connected(&self) -> bool;
97
98 fn is_disconnected(&self) -> bool;
100
101 fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
107 log_not_implemented(&cmd);
108 Ok(())
109 }
110
111 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
117 log_not_implemented(&cmd);
118 Ok(())
119 }
120
121 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
127 log_not_implemented(&cmd);
128 Ok(())
129 }
130
131 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
137 log_not_implemented(&cmd);
138 Ok(())
139 }
140
141 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
147 log_not_implemented(&cmd);
148 Ok(())
149 }
150
151 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
157 log_not_implemented(&cmd);
158 Ok(())
159 }
160
161 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
167 log_not_implemented(&cmd);
168 Ok(())
169 }
170
171 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
177 log_not_implemented(&cmd);
178 Ok(())
179 }
180
181 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
187 log_not_implemented(&cmd);
188 Ok(())
189 }
190
191 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
197 log_not_implemented(&cmd);
198 Ok(())
199 }
200
201 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
207 log_not_implemented(&cmd);
208 Ok(())
209 }
210
211 fn subscribe_instrument_status(
217 &mut self,
218 cmd: &SubscribeInstrumentStatus,
219 ) -> anyhow::Result<()> {
220 log_not_implemented(&cmd);
221 Ok(())
222 }
223
224 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
230 log_not_implemented(&cmd);
231 Ok(())
232 }
233
234 fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
240 log_not_implemented(&cmd);
241 Ok(())
242 }
243
244 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
250 log_not_implemented(&cmd);
251 Ok(())
252 }
253
254 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
260 log_not_implemented(&cmd);
261 Ok(())
262 }
263
264 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
270 log_not_implemented(&cmd);
271 Ok(())
272 }
273
274 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
280 log_not_implemented(&cmd);
281 Ok(())
282 }
283
284 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
290 log_not_implemented(&cmd);
291 Ok(())
292 }
293
294 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
300 log_not_implemented(&cmd);
301 Ok(())
302 }
303
304 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
310 log_not_implemented(&cmd);
311 Ok(())
312 }
313
314 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
320 log_not_implemented(&cmd);
321 Ok(())
322 }
323
324 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
330 log_not_implemented(&cmd);
331 Ok(())
332 }
333
334 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
340 log_not_implemented(&cmd);
341 Ok(())
342 }
343
344 fn unsubscribe_instrument_status(
350 &mut self,
351 cmd: &UnsubscribeInstrumentStatus,
352 ) -> anyhow::Result<()> {
353 log_not_implemented(&cmd);
354 Ok(())
355 }
356
357 fn unsubscribe_instrument_close(
363 &mut self,
364 cmd: &UnsubscribeInstrumentClose,
365 ) -> anyhow::Result<()> {
366 log_not_implemented(&cmd);
367 Ok(())
368 }
369
370 fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
376 log_not_implemented(&request);
377 Ok(())
378 }
379
380 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
386 log_not_implemented(&request);
387 Ok(())
388 }
389
390 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
396 log_not_implemented(&request);
397 Ok(())
398 }
399
400 fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
406 log_not_implemented(&request);
407 Ok(())
408 }
409
410 fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
416 log_not_implemented(&request);
417 Ok(())
418 }
419
420 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
426 log_not_implemented(&request);
427 Ok(())
428 }
429
430 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
436 log_not_implemented(&request);
437 Ok(())
438 }
439}
440
441pub struct DataClientAdapter {
443 client: Box<dyn DataClient>,
444 pub client_id: ClientId,
445 pub venue: Option<Venue>,
446 pub handles_book_deltas: bool,
447 pub handles_book_snapshots: bool,
448 pub subscriptions_custom: AHashSet<DataType>,
449 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
450 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
451 pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
452 pub subscriptions_quotes: AHashSet<InstrumentId>,
453 pub subscriptions_trades: AHashSet<InstrumentId>,
454 pub subscriptions_bars: AHashSet<BarType>,
455 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
456 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
457 pub subscriptions_instrument: AHashSet<InstrumentId>,
458 pub subscriptions_instrument_venue: AHashSet<Venue>,
459 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
460 pub subscriptions_index_prices: AHashSet<InstrumentId>,
461}
462
463impl Deref for DataClientAdapter {
464 type Target = Box<dyn DataClient>;
465
466 fn deref(&self) -> &Self::Target {
467 &self.client
468 }
469}
470
471impl DerefMut for DataClientAdapter {
472 fn deref_mut(&mut self) -> &mut Self::Target {
473 &mut self.client
474 }
475}
476
477impl Debug for DataClientAdapter {
478 #[rustfmt::skip]
479 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
480 f.debug_struct(stringify!(DataClientAdapter))
481 .field("client_id", &self.client_id)
482 .field("venue", &self.venue)
483 .field("handles_book_deltas", &self.handles_book_deltas)
484 .field("handles_book_snapshots", &self.handles_book_snapshots)
485 .field("subscriptions_custom", &self.subscriptions_custom)
486 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
487 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
488 .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
489 .field("subscriptions_quotes", &self.subscriptions_quotes)
490 .field("subscriptions_trades", &self.subscriptions_trades)
491 .field("subscriptions_bars", &self.subscriptions_bars)
492 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
493 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
494 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
495 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
496 .field("subscriptions_instrument", &self.subscriptions_instrument)
497 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
498 .finish()
499 }
500}
501
502impl DataClientAdapter {
503 #[must_use]
505 pub fn new(
506 client_id: ClientId,
507 venue: Option<Venue>,
508 handles_order_book_deltas: bool,
509 handles_order_book_snapshots: bool,
510 client: Box<dyn DataClient>,
511 ) -> Self {
512 Self {
513 client,
514 client_id,
515 venue,
516 handles_book_deltas: handles_order_book_deltas,
517 handles_book_snapshots: handles_order_book_snapshots,
518 subscriptions_custom: AHashSet::new(),
519 subscriptions_book_deltas: AHashSet::new(),
520 subscriptions_book_depth10: AHashSet::new(),
521 subscriptions_book_snapshots: AHashSet::new(),
522 subscriptions_quotes: AHashSet::new(),
523 subscriptions_trades: AHashSet::new(),
524 subscriptions_mark_prices: AHashSet::new(),
525 subscriptions_index_prices: AHashSet::new(),
526 subscriptions_bars: AHashSet::new(),
527 subscriptions_instrument_status: AHashSet::new(),
528 subscriptions_instrument_close: AHashSet::new(),
529 subscriptions_instrument: AHashSet::new(),
530 subscriptions_instrument_venue: AHashSet::new(),
531 }
532 }
533
534 #[allow(clippy::borrowed_box)]
535 #[must_use]
536 pub fn get_client(&self) -> &Box<dyn DataClient> {
537 &self.client
538 }
539
540 #[inline]
541 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
542 if let Err(e) = match cmd {
543 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
544 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
545 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
546 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
547 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
548 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
549 SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
550 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
551 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
552 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
553 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
554 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
555 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
556 } {
557 log_command_error(&cmd, &e);
558 }
559 }
560
561 #[inline]
562 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
563 if let Err(e) = match cmd {
564 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
565 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
566 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
567 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
568 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
569 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
570 UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
571 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
572 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
573 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
574 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
575 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
576 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
577 } {
578 log_command_error(&cmd, &e);
579 }
580 }
581
582 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
590 if !self.subscriptions_custom.contains(&cmd.data_type) {
591 self.subscriptions_custom.insert(cmd.data_type.clone());
592 self.client.subscribe(cmd)?;
593 }
594 Ok(())
595 }
596
597 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
603 if self.subscriptions_custom.contains(&cmd.data_type) {
604 self.subscriptions_custom.remove(&cmd.data_type);
605 self.client.unsubscribe(cmd)?;
606 }
607 Ok(())
608 }
609
610 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
616 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
617 self.subscriptions_instrument_venue.insert(cmd.venue);
618 self.client.subscribe_instruments(cmd)?;
619 }
620
621 Ok(())
622 }
623
624 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
630 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
631 self.subscriptions_instrument_venue.remove(&cmd.venue);
632 self.client.unsubscribe_instruments(cmd)?;
633 }
634
635 Ok(())
636 }
637
638 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
644 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
645 self.subscriptions_instrument.insert(cmd.instrument_id);
646 self.client.subscribe_instrument(cmd)?;
647 }
648
649 Ok(())
650 }
651
652 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
658 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
659 self.subscriptions_instrument.remove(&cmd.instrument_id);
660 self.client.unsubscribe_instrument(cmd)?;
661 }
662
663 Ok(())
664 }
665
666 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
672 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
673 self.subscriptions_book_deltas.insert(cmd.instrument_id);
674 self.client.subscribe_book_deltas(cmd)?;
675 }
676
677 Ok(())
678 }
679
680 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
686 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
687 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
688 self.client.unsubscribe_book_deltas(cmd)?;
689 }
690
691 Ok(())
692 }
693
694 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
700 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
701 self.subscriptions_book_depth10.insert(cmd.instrument_id);
702 self.client.subscribe_book_depth10(cmd)?;
703 }
704
705 Ok(())
706 }
707
708 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
714 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
715 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
716 self.client.unsubscribe_book_depth10(cmd)?;
717 }
718
719 Ok(())
720 }
721
722 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
728 if !self
729 .subscriptions_book_snapshots
730 .contains(&cmd.instrument_id)
731 {
732 self.subscriptions_book_snapshots.insert(cmd.instrument_id);
733 self.client.subscribe_book_snapshots(cmd)?;
734 }
735
736 Ok(())
737 }
738
739 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
745 if self
746 .subscriptions_book_snapshots
747 .contains(&cmd.instrument_id)
748 {
749 self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
750 self.client.unsubscribe_book_snapshots(cmd)?;
751 }
752
753 Ok(())
754 }
755
756 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
762 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
763 self.subscriptions_quotes.insert(cmd.instrument_id);
764 self.client.subscribe_quotes(cmd)?;
765 }
766 Ok(())
767 }
768
769 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
775 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
776 self.subscriptions_quotes.remove(&cmd.instrument_id);
777 self.client.unsubscribe_quotes(cmd)?;
778 }
779 Ok(())
780 }
781
782 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
788 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
789 self.subscriptions_trades.insert(cmd.instrument_id);
790 self.client.subscribe_trades(cmd)?;
791 }
792 Ok(())
793 }
794
795 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
801 if self.subscriptions_trades.contains(&cmd.instrument_id) {
802 self.subscriptions_trades.remove(&cmd.instrument_id);
803 self.client.unsubscribe_trades(cmd)?;
804 }
805 Ok(())
806 }
807
808 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
814 if !self.subscriptions_bars.contains(&cmd.bar_type) {
815 self.subscriptions_bars.insert(cmd.bar_type);
816 self.client.subscribe_bars(cmd)?;
817 }
818 Ok(())
819 }
820
821 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
827 if self.subscriptions_bars.contains(&cmd.bar_type) {
828 self.subscriptions_bars.remove(&cmd.bar_type);
829 self.client.unsubscribe_bars(cmd)?;
830 }
831 Ok(())
832 }
833
834 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
840 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
841 self.subscriptions_mark_prices.insert(cmd.instrument_id);
842 self.client.subscribe_mark_prices(cmd)?;
843 }
844 Ok(())
845 }
846
847 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
853 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
854 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
855 self.client.unsubscribe_mark_prices(cmd)?;
856 }
857 Ok(())
858 }
859
860 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
866 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
867 self.subscriptions_index_prices.insert(cmd.instrument_id);
868 self.client.subscribe_index_prices(cmd)?;
869 }
870 Ok(())
871 }
872
873 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
879 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
880 self.subscriptions_index_prices.remove(&cmd.instrument_id);
881 self.client.unsubscribe_index_prices(cmd)?;
882 }
883 Ok(())
884 }
885
886 fn subscribe_instrument_status(
892 &mut self,
893 cmd: &SubscribeInstrumentStatus,
894 ) -> anyhow::Result<()> {
895 if !self
896 .subscriptions_instrument_status
897 .contains(&cmd.instrument_id)
898 {
899 self.subscriptions_instrument_status
900 .insert(cmd.instrument_id);
901 self.client.subscribe_instrument_status(cmd)?;
902 }
903 Ok(())
904 }
905
906 fn unsubscribe_instrument_status(
912 &mut self,
913 cmd: &UnsubscribeInstrumentStatus,
914 ) -> anyhow::Result<()> {
915 if self
916 .subscriptions_instrument_status
917 .contains(&cmd.instrument_id)
918 {
919 self.subscriptions_instrument_status
920 .remove(&cmd.instrument_id);
921 self.client.unsubscribe_instrument_status(cmd)?;
922 }
923 Ok(())
924 }
925
926 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
932 if !self
933 .subscriptions_instrument_close
934 .contains(&cmd.instrument_id)
935 {
936 self.subscriptions_instrument_close
937 .insert(cmd.instrument_id);
938 self.client.subscribe_instrument_close(cmd)?;
939 }
940 Ok(())
941 }
942
943 fn unsubscribe_instrument_close(
949 &mut self,
950 cmd: &UnsubscribeInstrumentClose,
951 ) -> anyhow::Result<()> {
952 if self
953 .subscriptions_instrument_close
954 .contains(&cmd.instrument_id)
955 {
956 self.subscriptions_instrument_close
957 .remove(&cmd.instrument_id);
958 self.client.unsubscribe_instrument_close(cmd)?;
959 }
960 Ok(())
961 }
962
963 pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
971 self.client.request_data(req)
972 }
973
974 pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
980 self.client.request_instrument(req)
981 }
982
983 pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
989 self.client.request_instruments(req)
990 }
991
992 pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
998 self.client.request_quotes(req)
999 }
1000
1001 pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1007 self.client.request_trades(req)
1008 }
1009
1010 pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1016 self.client.request_bars(req)
1017 }
1018}
1019
1020#[inline(always)]
1021fn log_not_implemented<T: Debug>(msg: &T) {
1022 log::warn!("{msg:?} – handler not implemented");
1023}
1024
1025#[inline(always)]
1026fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1027 log::error!("Error on {cmd:?}: {e}");
1028}