Skip to main content

nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Provides the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    fmt::Debug,
23    ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use nautilus_common::{
28    clients::{DataClient, log_command_error},
29    messages::data::{
30        RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData,
31        RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
32        RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
33        SubscribeCommand, SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices,
34        SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
35        SubscribeInstruments, SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes,
36        SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
37        UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
38        UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
39        UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes,
40        UnsubscribeTrades,
41    },
42};
43#[cfg(feature = "defi")]
44use nautilus_model::defi::Blockchain;
45use nautilus_model::{
46    data::{BarType, DataType},
47    identifiers::{ClientId, InstrumentId, Venue},
48};
49
50#[cfg(feature = "defi")]
51#[allow(unused_imports)] // Brings DeFi impl blocks into scope
52use crate::defi::client as _;
53
54/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
55pub struct DataClientAdapter {
56    pub(crate) client: Box<dyn DataClient>,
57    pub client_id: ClientId,
58    pub venue: Option<Venue>,
59    pub handles_book_deltas: bool,
60    pub handles_book_snapshots: bool,
61    pub subscriptions_custom: AHashSet<DataType>,
62    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
63    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
64    pub subscriptions_quotes: AHashSet<InstrumentId>,
65    pub subscriptions_trades: AHashSet<InstrumentId>,
66    pub subscriptions_bars: AHashSet<BarType>,
67    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
68    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
69    pub subscriptions_instrument: AHashSet<InstrumentId>,
70    pub subscriptions_instrument_venue: AHashSet<Venue>,
71    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
72    pub subscriptions_index_prices: AHashSet<InstrumentId>,
73    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
74    pub subscriptions_option_greeks: AHashSet<InstrumentId>,
75    #[cfg(feature = "defi")]
76    pub subscriptions_blocks: AHashSet<Blockchain>,
77    #[cfg(feature = "defi")]
78    pub subscriptions_pools: AHashSet<InstrumentId>,
79    #[cfg(feature = "defi")]
80    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
81    #[cfg(feature = "defi")]
82    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
83    #[cfg(feature = "defi")]
84    pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
85    #[cfg(feature = "defi")]
86    pub subscriptions_pool_flash: AHashSet<InstrumentId>,
87}
88
89impl Deref for DataClientAdapter {
90    type Target = Box<dyn DataClient>;
91
92    fn deref(&self) -> &Self::Target {
93        &self.client
94    }
95}
96
97impl DerefMut for DataClientAdapter {
98    fn deref_mut(&mut self) -> &mut Self::Target {
99        &mut self.client
100    }
101}
102
103impl Debug for DataClientAdapter {
104    #[rustfmt::skip]
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct(stringify!(DataClientAdapter))
107            .field("client_id", &self.client_id)
108            .field("venue", &self.venue)
109            .field("handles_book_deltas", &self.handles_book_deltas)
110            .field("handles_book_snapshots", &self.handles_book_snapshots)
111            .field("subscriptions_custom", &self.subscriptions_custom)
112            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
113            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
114            .field("subscriptions_quotes", &self.subscriptions_quotes)
115            .field("subscriptions_trades", &self.subscriptions_trades)
116            .field("subscriptions_bars", &self.subscriptions_bars)
117            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
118            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
119            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
120            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
121            .field("subscriptions_instrument", &self.subscriptions_instrument)
122            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
123            .finish()
124    }
125}
126
127impl DataClientAdapter {
128    /// Creates a new [`DataClientAdapter`] with the given client and clock.
129    #[must_use]
130    pub fn new(
131        client_id: ClientId,
132        venue: Option<Venue>,
133        handles_order_book_deltas: bool,
134        handles_order_book_snapshots: bool,
135        client: Box<dyn DataClient>,
136    ) -> Self {
137        Self {
138            client,
139            client_id,
140            venue,
141            handles_book_deltas: handles_order_book_deltas,
142            handles_book_snapshots: handles_order_book_snapshots,
143            subscriptions_custom: AHashSet::new(),
144            subscriptions_book_deltas: AHashSet::new(),
145            subscriptions_book_depth10: AHashSet::new(),
146            subscriptions_quotes: AHashSet::new(),
147            subscriptions_trades: AHashSet::new(),
148            subscriptions_mark_prices: AHashSet::new(),
149            subscriptions_index_prices: AHashSet::new(),
150            subscriptions_funding_rates: AHashSet::new(),
151            subscriptions_option_greeks: AHashSet::new(),
152            subscriptions_bars: AHashSet::new(),
153            subscriptions_instrument_status: AHashSet::new(),
154            subscriptions_instrument_close: AHashSet::new(),
155            subscriptions_instrument: AHashSet::new(),
156            subscriptions_instrument_venue: AHashSet::new(),
157            #[cfg(feature = "defi")]
158            subscriptions_blocks: AHashSet::new(),
159            #[cfg(feature = "defi")]
160            subscriptions_pools: AHashSet::new(),
161            #[cfg(feature = "defi")]
162            subscriptions_pool_swaps: AHashSet::new(),
163            #[cfg(feature = "defi")]
164            subscriptions_pool_liquidity_updates: AHashSet::new(),
165            #[cfg(feature = "defi")]
166            subscriptions_pool_fee_collects: AHashSet::new(),
167            #[cfg(feature = "defi")]
168            subscriptions_pool_flash: AHashSet::new(),
169        }
170    }
171
172    #[allow(clippy::borrowed_box)]
173    #[must_use]
174    pub fn get_client(&self) -> &Box<dyn DataClient> {
175        &self.client
176    }
177
178    /// Connects the underlying client to the data provider.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the connection fails.
183    pub async fn connect(&mut self) -> anyhow::Result<()> {
184        self.client.connect().await
185    }
186
187    /// Disconnects the underlying client from the data provider.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the disconnection fails.
192    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
193        self.client.disconnect().await
194    }
195
196    #[inline]
197    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
198        if let Err(e) = match cmd {
199            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
200            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
201            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
202            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
203            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
204            SubscribeCommand::BookSnapshots(_) => Ok(()), // Handled internally by engine
205            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
206            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
207            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
208            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
209            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
210            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
211            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
212            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
213            SubscribeCommand::OptionGreeks(cmd) => self.subscribe_option_greeks(cmd),
214            SubscribeCommand::OptionChain(_) => Ok(()), // Handled internally by engine
215        } {
216            log_command_error(&cmd, &e);
217        }
218    }
219
220    #[inline]
221    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
222        if let Err(e) = match cmd {
223            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
224            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
225            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
226            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
227            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
228            UnsubscribeCommand::BookSnapshots(_) => Ok(()), // Handled internally by engine
229            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
230            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
231            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
232            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
233            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
234            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
235            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
236            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
237            UnsubscribeCommand::OptionGreeks(cmd) => self.unsubscribe_option_greeks(cmd),
238            UnsubscribeCommand::OptionChain(_) => Ok(()), // Handled internally by engine
239        } {
240            log_command_error(&cmd, &e);
241        }
242    }
243
244    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
245
246    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if the underlying client subscribe operation fails.
251    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
252        if !self.subscriptions_custom.contains(&cmd.data_type) {
253            self.subscriptions_custom.insert(cmd.data_type.clone());
254            self.client.subscribe(cmd)?;
255        }
256        Ok(())
257    }
258
259    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the underlying client unsubscribe operation fails.
264    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
265        if self.subscriptions_custom.contains(&cmd.data_type) {
266            self.subscriptions_custom.remove(&cmd.data_type);
267            self.client.unsubscribe(cmd)?;
268        }
269        Ok(())
270    }
271
272    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the underlying client subscribe operation fails.
277    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
278        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
279            self.subscriptions_instrument_venue.insert(cmd.venue);
280            self.client.subscribe_instruments(cmd)?;
281        }
282
283        Ok(())
284    }
285
286    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if the underlying client unsubscribe operation fails.
291    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
292        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
293            self.subscriptions_instrument_venue.remove(&cmd.venue);
294            self.client.unsubscribe_instruments(cmd)?;
295        }
296
297        Ok(())
298    }
299
300    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the underlying client subscribe operation fails.
305    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
306        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
307            self.subscriptions_instrument.insert(cmd.instrument_id);
308            self.client.subscribe_instrument(cmd)?;
309        }
310
311        Ok(())
312    }
313
314    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if the underlying client unsubscribe operation fails.
319    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
320        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
321            self.subscriptions_instrument.remove(&cmd.instrument_id);
322            self.client.unsubscribe_instrument(cmd)?;
323        }
324
325        Ok(())
326    }
327
328    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if the underlying client subscribe operation fails.
333    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
334        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
335            self.subscriptions_book_deltas.insert(cmd.instrument_id);
336            self.client.subscribe_book_deltas(cmd)?;
337        }
338
339        Ok(())
340    }
341
342    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the underlying client unsubscribe operation fails.
347    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
348        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
349            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
350            self.client.unsubscribe_book_deltas(cmd)?;
351        }
352
353        Ok(())
354    }
355
356    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the underlying client subscribe operation fails.
361    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
362        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
363            self.subscriptions_book_depth10.insert(cmd.instrument_id);
364            self.client.subscribe_book_depth10(cmd)?;
365        }
366
367        Ok(())
368    }
369
370    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the underlying client unsubscribe operation fails.
375    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
376        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
377            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
378            self.client.unsubscribe_book_depth10(cmd)?;
379        }
380
381        Ok(())
382    }
383
384    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if the underlying client subscribe operation fails.
389    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
390        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
391            self.subscriptions_quotes.insert(cmd.instrument_id);
392            self.client.subscribe_quotes(cmd)?;
393        }
394        Ok(())
395    }
396
397    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if the underlying client unsubscribe operation fails.
402    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
403        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
404            self.subscriptions_quotes.remove(&cmd.instrument_id);
405            self.client.unsubscribe_quotes(cmd)?;
406        }
407        Ok(())
408    }
409
410    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the underlying client subscribe operation fails.
415    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
416        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
417            self.subscriptions_trades.insert(cmd.instrument_id);
418            self.client.subscribe_trades(cmd)?;
419        }
420        Ok(())
421    }
422
423    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if the underlying client unsubscribe operation fails.
428    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
429        if self.subscriptions_trades.contains(&cmd.instrument_id) {
430            self.subscriptions_trades.remove(&cmd.instrument_id);
431            self.client.unsubscribe_trades(cmd)?;
432        }
433        Ok(())
434    }
435
436    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the underlying client subscribe operation fails.
441    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
442        if !self.subscriptions_bars.contains(&cmd.bar_type) {
443            self.subscriptions_bars.insert(cmd.bar_type);
444            self.client.subscribe_bars(cmd)?;
445        }
446        Ok(())
447    }
448
449    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
450    ///
451    /// # Errors
452    ///
453    /// Returns an error if the underlying client unsubscribe operation fails.
454    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
455        if self.subscriptions_bars.contains(&cmd.bar_type) {
456            self.subscriptions_bars.remove(&cmd.bar_type);
457            self.client.unsubscribe_bars(cmd)?;
458        }
459        Ok(())
460    }
461
462    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the underlying client subscribe operation fails.
467    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
468        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
469            self.subscriptions_mark_prices.insert(cmd.instrument_id);
470            self.client.subscribe_mark_prices(cmd)?;
471        }
472        Ok(())
473    }
474
475    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the underlying client unsubscribe operation fails.
480    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
481        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
482            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
483            self.client.unsubscribe_mark_prices(cmd)?;
484        }
485        Ok(())
486    }
487
488    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if the underlying client subscribe operation fails.
493    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
494        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
495            self.subscriptions_index_prices.insert(cmd.instrument_id);
496            self.client.subscribe_index_prices(cmd)?;
497        }
498        Ok(())
499    }
500
501    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
502    ///
503    /// # Errors
504    ///
505    /// Returns an error if the underlying client unsubscribe operation fails.
506    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
507        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
508            self.subscriptions_index_prices.remove(&cmd.instrument_id);
509            self.client.unsubscribe_index_prices(cmd)?;
510        }
511        Ok(())
512    }
513
514    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
515    ///
516    /// # Errors
517    ///
518    /// Returns an error if the underlying client subscribe operation fails.
519    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
520        if !self
521            .subscriptions_funding_rates
522            .contains(&cmd.instrument_id)
523        {
524            self.subscriptions_funding_rates.insert(cmd.instrument_id);
525            self.client.subscribe_funding_rates(cmd)?;
526        }
527        Ok(())
528    }
529
530    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if the underlying client unsubscribe operation fails.
535    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
536        if self
537            .subscriptions_funding_rates
538            .contains(&cmd.instrument_id)
539        {
540            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
541            self.client.unsubscribe_funding_rates(cmd)?;
542        }
543        Ok(())
544    }
545
546    /// Subscribes to instrument status updates for the specified instrument.
547    ///
548    /// # Errors
549    ///
550    /// Returns an error if the underlying client subscribe operation fails.
551    fn subscribe_instrument_status(
552        &mut self,
553        cmd: &SubscribeInstrumentStatus,
554    ) -> anyhow::Result<()> {
555        if !self
556            .subscriptions_instrument_status
557            .contains(&cmd.instrument_id)
558        {
559            self.subscriptions_instrument_status
560                .insert(cmd.instrument_id);
561            self.client.subscribe_instrument_status(cmd)?;
562        }
563        Ok(())
564    }
565
566    /// Unsubscribes from instrument status updates for the specified instrument.
567    ///
568    /// # Errors
569    ///
570    /// Returns an error if the underlying client unsubscribe operation fails.
571    fn unsubscribe_instrument_status(
572        &mut self,
573        cmd: &UnsubscribeInstrumentStatus,
574    ) -> anyhow::Result<()> {
575        if self
576            .subscriptions_instrument_status
577            .contains(&cmd.instrument_id)
578        {
579            self.subscriptions_instrument_status
580                .remove(&cmd.instrument_id);
581            self.client.unsubscribe_instrument_status(cmd)?;
582        }
583        Ok(())
584    }
585
586    /// Subscribes to instrument close events for the specified instrument.
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if the underlying client subscribe operation fails.
591    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
592        if !self
593            .subscriptions_instrument_close
594            .contains(&cmd.instrument_id)
595        {
596            self.subscriptions_instrument_close
597                .insert(cmd.instrument_id);
598            self.client.subscribe_instrument_close(cmd)?;
599        }
600        Ok(())
601    }
602
603    /// Unsubscribes from instrument close events for the specified instrument.
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if the underlying client unsubscribe operation fails.
608    fn unsubscribe_instrument_close(
609        &mut self,
610        cmd: &UnsubscribeInstrumentClose,
611    ) -> anyhow::Result<()> {
612        if self
613            .subscriptions_instrument_close
614            .contains(&cmd.instrument_id)
615        {
616            self.subscriptions_instrument_close
617                .remove(&cmd.instrument_id);
618            self.client.unsubscribe_instrument_close(cmd)?;
619        }
620        Ok(())
621    }
622
623    /// Subscribes to option greeks for an instrument, updating internal state and forwarding to the client.
624    ///
625    /// # Errors
626    ///
627    /// Returns an error if the underlying client subscribe operation fails.
628    fn subscribe_option_greeks(&mut self, cmd: &SubscribeOptionGreeks) -> anyhow::Result<()> {
629        if !self
630            .subscriptions_option_greeks
631            .contains(&cmd.instrument_id)
632        {
633            self.subscriptions_option_greeks.insert(cmd.instrument_id);
634            self.client.subscribe_option_greeks(cmd)?;
635        }
636        Ok(())
637    }
638
639    /// Unsubscribes from option greeks for an instrument, updating internal state and forwarding to the client.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the underlying client unsubscribe operation fails.
644    fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
645        if self
646            .subscriptions_option_greeks
647            .contains(&cmd.instrument_id)
648        {
649            self.subscriptions_option_greeks.remove(&cmd.instrument_id);
650            self.client.unsubscribe_option_greeks(cmd)?;
651        }
652        Ok(())
653    }
654
655    // -- REQUEST HANDLERS ------------------------------------------------------------------------
656
657    /// Sends a data request to the underlying client.
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if the client request fails.
662    pub fn request_data(&self, req: RequestCustomData) -> anyhow::Result<()> {
663        self.client.request_data(req)
664    }
665
666    /// Sends a single instrument request to the client.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if the client fails to process the request.
671    pub fn request_instrument(&self, req: RequestInstrument) -> anyhow::Result<()> {
672        self.client.request_instrument(req)
673    }
674
675    /// Sends a batch instruments request to the client.
676    ///
677    /// # Errors
678    ///
679    /// Returns an error if the client fails to process the request.
680    pub fn request_instruments(&self, req: RequestInstruments) -> anyhow::Result<()> {
681        self.client.request_instruments(req)
682    }
683
684    /// Sends a book snapshot request for a given instrument.
685    ///
686    /// # Errors
687    ///
688    /// Returns an error if the client fails to process the book snapshot request.
689    pub fn request_book_snapshot(&self, req: RequestBookSnapshot) -> anyhow::Result<()> {
690        self.client.request_book_snapshot(req)
691    }
692
693    /// Sends a quotes request for a given instrument.
694    ///
695    /// # Errors
696    ///
697    /// Returns an error if the client fails to process the quotes request.
698    pub fn request_quotes(&self, req: RequestQuotes) -> anyhow::Result<()> {
699        self.client.request_quotes(req)
700    }
701
702    /// Sends a trades request for a given instrument.
703    ///
704    /// # Errors
705    ///
706    /// Returns an error if the client fails to process the trades request.
707    pub fn request_trades(&self, req: RequestTrades) -> anyhow::Result<()> {
708        self.client.request_trades(req)
709    }
710
711    /// Sends a funding rates request for a given instrument.
712    ///
713    /// # Errors
714    ///
715    /// Returns an error if the client fails to process the trades request.
716    pub fn request_funding_rates(&self, req: RequestFundingRates) -> anyhow::Result<()> {
717        self.client.request_funding_rates(req)
718    }
719
720    /// Sends a forward prices request for derivatives instruments.
721    ///
722    /// # Errors
723    ///
724    /// Returns an error if the client fails to process the forward prices request.
725    pub fn request_forward_prices(&self, req: RequestForwardPrices) -> anyhow::Result<()> {
726        self.client.request_forward_prices(req)
727    }
728
729    /// Sends a bars request for a given instrument and bar type.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if the client fails to process the bars request.
734    pub fn request_bars(&self, req: RequestBars) -> anyhow::Result<()> {
735        self.client.request_bars(req)
736    }
737
738    /// Sends an order book depths request for a given instrument.
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the client fails to process the order book depths request.
743    pub fn request_book_depth(&self, req: RequestBookDepth) -> anyhow::Result<()> {
744        self.client.request_book_depth(req)
745    }
746}