1use std::{
22 fmt::Debug,
23 ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use nautilus_common::{
28 clients::{DataClient, log_command_error},
29 messages::data::{
30 RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData,
31 RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
32 RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
33 SubscribeCommand, SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices,
34 SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
35 SubscribeInstruments, SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes,
36 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
37 UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
38 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
39 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes,
40 UnsubscribeTrades,
41 },
42};
43#[cfg(feature = "defi")]
44use nautilus_model::defi::Blockchain;
45use nautilus_model::{
46 data::{BarType, DataType},
47 identifiers::{ClientId, InstrumentId, Venue},
48};
49
50#[cfg(feature = "defi")]
51#[allow(unused_imports)] use crate::defi::client as _;
53
54pub struct DataClientAdapter {
56 pub(crate) client: Box<dyn DataClient>,
57 pub client_id: ClientId,
58 pub venue: Option<Venue>,
59 pub handles_book_deltas: bool,
60 pub handles_book_snapshots: bool,
61 pub subscriptions_custom: AHashSet<DataType>,
62 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
63 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
64 pub subscriptions_quotes: AHashSet<InstrumentId>,
65 pub subscriptions_trades: AHashSet<InstrumentId>,
66 pub subscriptions_bars: AHashSet<BarType>,
67 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
68 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
69 pub subscriptions_instrument: AHashSet<InstrumentId>,
70 pub subscriptions_instrument_venue: AHashSet<Venue>,
71 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
72 pub subscriptions_index_prices: AHashSet<InstrumentId>,
73 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
74 pub subscriptions_option_greeks: AHashSet<InstrumentId>,
75 #[cfg(feature = "defi")]
76 pub subscriptions_blocks: AHashSet<Blockchain>,
77 #[cfg(feature = "defi")]
78 pub subscriptions_pools: AHashSet<InstrumentId>,
79 #[cfg(feature = "defi")]
80 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
81 #[cfg(feature = "defi")]
82 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
83 #[cfg(feature = "defi")]
84 pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
85 #[cfg(feature = "defi")]
86 pub subscriptions_pool_flash: AHashSet<InstrumentId>,
87}
88
89impl Deref for DataClientAdapter {
90 type Target = Box<dyn DataClient>;
91
92 fn deref(&self) -> &Self::Target {
93 &self.client
94 }
95}
96
97impl DerefMut for DataClientAdapter {
98 fn deref_mut(&mut self) -> &mut Self::Target {
99 &mut self.client
100 }
101}
102
103impl Debug for DataClientAdapter {
104 #[rustfmt::skip]
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct(stringify!(DataClientAdapter))
107 .field("client_id", &self.client_id)
108 .field("venue", &self.venue)
109 .field("handles_book_deltas", &self.handles_book_deltas)
110 .field("handles_book_snapshots", &self.handles_book_snapshots)
111 .field("subscriptions_custom", &self.subscriptions_custom)
112 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
113 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
114 .field("subscriptions_quotes", &self.subscriptions_quotes)
115 .field("subscriptions_trades", &self.subscriptions_trades)
116 .field("subscriptions_bars", &self.subscriptions_bars)
117 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
118 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
119 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
120 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
121 .field("subscriptions_instrument", &self.subscriptions_instrument)
122 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
123 .finish()
124 }
125}
126
127impl DataClientAdapter {
128 #[must_use]
130 pub fn new(
131 client_id: ClientId,
132 venue: Option<Venue>,
133 handles_order_book_deltas: bool,
134 handles_order_book_snapshots: bool,
135 client: Box<dyn DataClient>,
136 ) -> Self {
137 Self {
138 client,
139 client_id,
140 venue,
141 handles_book_deltas: handles_order_book_deltas,
142 handles_book_snapshots: handles_order_book_snapshots,
143 subscriptions_custom: AHashSet::new(),
144 subscriptions_book_deltas: AHashSet::new(),
145 subscriptions_book_depth10: AHashSet::new(),
146 subscriptions_quotes: AHashSet::new(),
147 subscriptions_trades: AHashSet::new(),
148 subscriptions_mark_prices: AHashSet::new(),
149 subscriptions_index_prices: AHashSet::new(),
150 subscriptions_funding_rates: AHashSet::new(),
151 subscriptions_option_greeks: AHashSet::new(),
152 subscriptions_bars: AHashSet::new(),
153 subscriptions_instrument_status: AHashSet::new(),
154 subscriptions_instrument_close: AHashSet::new(),
155 subscriptions_instrument: AHashSet::new(),
156 subscriptions_instrument_venue: AHashSet::new(),
157 #[cfg(feature = "defi")]
158 subscriptions_blocks: AHashSet::new(),
159 #[cfg(feature = "defi")]
160 subscriptions_pools: AHashSet::new(),
161 #[cfg(feature = "defi")]
162 subscriptions_pool_swaps: AHashSet::new(),
163 #[cfg(feature = "defi")]
164 subscriptions_pool_liquidity_updates: AHashSet::new(),
165 #[cfg(feature = "defi")]
166 subscriptions_pool_fee_collects: AHashSet::new(),
167 #[cfg(feature = "defi")]
168 subscriptions_pool_flash: AHashSet::new(),
169 }
170 }
171
172 #[allow(clippy::borrowed_box)]
173 #[must_use]
174 pub fn get_client(&self) -> &Box<dyn DataClient> {
175 &self.client
176 }
177
178 pub async fn connect(&mut self) -> anyhow::Result<()> {
184 self.client.connect().await
185 }
186
187 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
193 self.client.disconnect().await
194 }
195
196 #[inline]
197 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
198 if let Err(e) = match cmd {
199 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
200 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
201 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
202 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
203 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
204 SubscribeCommand::BookSnapshots(_) => Ok(()), SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
206 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
207 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
208 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
209 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
210 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
211 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
212 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
213 SubscribeCommand::OptionGreeks(cmd) => self.subscribe_option_greeks(cmd),
214 SubscribeCommand::OptionChain(_) => Ok(()), } {
216 log_command_error(&cmd, &e);
217 }
218 }
219
220 #[inline]
221 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
222 if let Err(e) = match cmd {
223 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
224 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
225 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
226 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
227 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
228 UnsubscribeCommand::BookSnapshots(_) => Ok(()), UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
230 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
231 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
232 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
233 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
234 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
235 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
236 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
237 UnsubscribeCommand::OptionGreeks(cmd) => self.unsubscribe_option_greeks(cmd),
238 UnsubscribeCommand::OptionChain(_) => Ok(()), } {
240 log_command_error(&cmd, &e);
241 }
242 }
243
244 pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
252 if !self.subscriptions_custom.contains(&cmd.data_type) {
253 self.subscriptions_custom.insert(cmd.data_type.clone());
254 self.client.subscribe(cmd)?;
255 }
256 Ok(())
257 }
258
259 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
265 if self.subscriptions_custom.contains(&cmd.data_type) {
266 self.subscriptions_custom.remove(&cmd.data_type);
267 self.client.unsubscribe(cmd)?;
268 }
269 Ok(())
270 }
271
272 fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
278 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
279 self.subscriptions_instrument_venue.insert(cmd.venue);
280 self.client.subscribe_instruments(cmd)?;
281 }
282
283 Ok(())
284 }
285
286 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
292 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
293 self.subscriptions_instrument_venue.remove(&cmd.venue);
294 self.client.unsubscribe_instruments(cmd)?;
295 }
296
297 Ok(())
298 }
299
300 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
306 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
307 self.subscriptions_instrument.insert(cmd.instrument_id);
308 self.client.subscribe_instrument(cmd)?;
309 }
310
311 Ok(())
312 }
313
314 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
320 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
321 self.subscriptions_instrument.remove(&cmd.instrument_id);
322 self.client.unsubscribe_instrument(cmd)?;
323 }
324
325 Ok(())
326 }
327
328 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
334 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
335 self.subscriptions_book_deltas.insert(cmd.instrument_id);
336 self.client.subscribe_book_deltas(cmd)?;
337 }
338
339 Ok(())
340 }
341
342 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
348 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
349 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
350 self.client.unsubscribe_book_deltas(cmd)?;
351 }
352
353 Ok(())
354 }
355
356 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
362 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
363 self.subscriptions_book_depth10.insert(cmd.instrument_id);
364 self.client.subscribe_book_depth10(cmd)?;
365 }
366
367 Ok(())
368 }
369
370 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
376 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
377 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
378 self.client.unsubscribe_book_depth10(cmd)?;
379 }
380
381 Ok(())
382 }
383
384 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
390 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
391 self.subscriptions_quotes.insert(cmd.instrument_id);
392 self.client.subscribe_quotes(cmd)?;
393 }
394 Ok(())
395 }
396
397 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
403 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
404 self.subscriptions_quotes.remove(&cmd.instrument_id);
405 self.client.unsubscribe_quotes(cmd)?;
406 }
407 Ok(())
408 }
409
410 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
416 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
417 self.subscriptions_trades.insert(cmd.instrument_id);
418 self.client.subscribe_trades(cmd)?;
419 }
420 Ok(())
421 }
422
423 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
429 if self.subscriptions_trades.contains(&cmd.instrument_id) {
430 self.subscriptions_trades.remove(&cmd.instrument_id);
431 self.client.unsubscribe_trades(cmd)?;
432 }
433 Ok(())
434 }
435
436 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
442 if !self.subscriptions_bars.contains(&cmd.bar_type) {
443 self.subscriptions_bars.insert(cmd.bar_type);
444 self.client.subscribe_bars(cmd)?;
445 }
446 Ok(())
447 }
448
449 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
455 if self.subscriptions_bars.contains(&cmd.bar_type) {
456 self.subscriptions_bars.remove(&cmd.bar_type);
457 self.client.unsubscribe_bars(cmd)?;
458 }
459 Ok(())
460 }
461
462 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
468 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
469 self.subscriptions_mark_prices.insert(cmd.instrument_id);
470 self.client.subscribe_mark_prices(cmd)?;
471 }
472 Ok(())
473 }
474
475 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
481 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
482 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
483 self.client.unsubscribe_mark_prices(cmd)?;
484 }
485 Ok(())
486 }
487
488 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
494 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
495 self.subscriptions_index_prices.insert(cmd.instrument_id);
496 self.client.subscribe_index_prices(cmd)?;
497 }
498 Ok(())
499 }
500
501 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
507 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
508 self.subscriptions_index_prices.remove(&cmd.instrument_id);
509 self.client.unsubscribe_index_prices(cmd)?;
510 }
511 Ok(())
512 }
513
514 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
520 if !self
521 .subscriptions_funding_rates
522 .contains(&cmd.instrument_id)
523 {
524 self.subscriptions_funding_rates.insert(cmd.instrument_id);
525 self.client.subscribe_funding_rates(cmd)?;
526 }
527 Ok(())
528 }
529
530 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
536 if self
537 .subscriptions_funding_rates
538 .contains(&cmd.instrument_id)
539 {
540 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
541 self.client.unsubscribe_funding_rates(cmd)?;
542 }
543 Ok(())
544 }
545
546 fn subscribe_instrument_status(
552 &mut self,
553 cmd: &SubscribeInstrumentStatus,
554 ) -> anyhow::Result<()> {
555 if !self
556 .subscriptions_instrument_status
557 .contains(&cmd.instrument_id)
558 {
559 self.subscriptions_instrument_status
560 .insert(cmd.instrument_id);
561 self.client.subscribe_instrument_status(cmd)?;
562 }
563 Ok(())
564 }
565
566 fn unsubscribe_instrument_status(
572 &mut self,
573 cmd: &UnsubscribeInstrumentStatus,
574 ) -> anyhow::Result<()> {
575 if self
576 .subscriptions_instrument_status
577 .contains(&cmd.instrument_id)
578 {
579 self.subscriptions_instrument_status
580 .remove(&cmd.instrument_id);
581 self.client.unsubscribe_instrument_status(cmd)?;
582 }
583 Ok(())
584 }
585
586 fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
592 if !self
593 .subscriptions_instrument_close
594 .contains(&cmd.instrument_id)
595 {
596 self.subscriptions_instrument_close
597 .insert(cmd.instrument_id);
598 self.client.subscribe_instrument_close(cmd)?;
599 }
600 Ok(())
601 }
602
603 fn unsubscribe_instrument_close(
609 &mut self,
610 cmd: &UnsubscribeInstrumentClose,
611 ) -> anyhow::Result<()> {
612 if self
613 .subscriptions_instrument_close
614 .contains(&cmd.instrument_id)
615 {
616 self.subscriptions_instrument_close
617 .remove(&cmd.instrument_id);
618 self.client.unsubscribe_instrument_close(cmd)?;
619 }
620 Ok(())
621 }
622
623 fn subscribe_option_greeks(&mut self, cmd: &SubscribeOptionGreeks) -> anyhow::Result<()> {
629 if !self
630 .subscriptions_option_greeks
631 .contains(&cmd.instrument_id)
632 {
633 self.subscriptions_option_greeks.insert(cmd.instrument_id);
634 self.client.subscribe_option_greeks(cmd)?;
635 }
636 Ok(())
637 }
638
639 fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
645 if self
646 .subscriptions_option_greeks
647 .contains(&cmd.instrument_id)
648 {
649 self.subscriptions_option_greeks.remove(&cmd.instrument_id);
650 self.client.unsubscribe_option_greeks(cmd)?;
651 }
652 Ok(())
653 }
654
655 pub fn request_data(&self, req: RequestCustomData) -> anyhow::Result<()> {
663 self.client.request_data(req)
664 }
665
666 pub fn request_instrument(&self, req: RequestInstrument) -> anyhow::Result<()> {
672 self.client.request_instrument(req)
673 }
674
675 pub fn request_instruments(&self, req: RequestInstruments) -> anyhow::Result<()> {
681 self.client.request_instruments(req)
682 }
683
684 pub fn request_book_snapshot(&self, req: RequestBookSnapshot) -> anyhow::Result<()> {
690 self.client.request_book_snapshot(req)
691 }
692
693 pub fn request_quotes(&self, req: RequestQuotes) -> anyhow::Result<()> {
699 self.client.request_quotes(req)
700 }
701
702 pub fn request_trades(&self, req: RequestTrades) -> anyhow::Result<()> {
708 self.client.request_trades(req)
709 }
710
711 pub fn request_funding_rates(&self, req: RequestFundingRates) -> anyhow::Result<()> {
717 self.client.request_funding_rates(req)
718 }
719
720 pub fn request_forward_prices(&self, req: RequestForwardPrices) -> anyhow::Result<()> {
726 self.client.request_forward_prices(req)
727 }
728
729 pub fn request_bars(&self, req: RequestBars) -> anyhow::Result<()> {
735 self.client.request_bars(req)
736 }
737
738 pub fn request_book_depth(&self, req: RequestBookDepth) -> anyhow::Result<()> {
744 self.client.request_book_depth(req)
745 }
746}