nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Defines the `DataClient` trait, the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    any::Any,
23    fmt::{Debug, Display},
24    ops::{Deref, DerefMut},
25};
26
27use ahash::AHashSet;
28use nautilus_common::messages::data::{
29    RequestBars, RequestBookSnapshot, RequestCustomData, RequestInstrument, RequestInstruments,
30    RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
31    SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData, SubscribeIndexPrices,
32    SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
33    SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas,
34    UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
35    UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeInstrumentClose,
36    UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes,
37    UnsubscribeTrades,
38};
39use nautilus_model::{
40    data::{BarType, DataType},
41    identifiers::{ClientId, InstrumentId, Venue},
42};
43
44/// Defines the interface for a data client, managing connections, subscriptions, and requests.
45#[async_trait::async_trait]
46pub trait DataClient: Any + Sync + Send {
47    /// Returns the unique identifier for this data client.
48    fn client_id(&self) -> ClientId;
49
50    /// Returns the optional venue this client is associated with.
51    fn venue(&self) -> Option<Venue>;
52
53    /// Starts the data client.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the operation fails.
58    fn start(&self) -> anyhow::Result<()>;
59
60    /// Stops the data client.
61    ///
62    /// # Errors
63    ///
64    /// Returns an error if the operation fails.
65    fn stop(&self) -> anyhow::Result<()>;
66
67    /// Resets the data client to its initial state.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the operation fails.
72    fn reset(&self) -> anyhow::Result<()>;
73
74    /// Disposes of client resources and cleans up.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if the operation fails.
79    fn dispose(&self) -> anyhow::Result<()>;
80
81    /// Connects external API's if needed.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the operation fails.
86    async fn connect(&self) -> anyhow::Result<()>;
87
88    /// Disconnects external API's if needed.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the operation fails.
93    async fn disconnect(&self) -> anyhow::Result<()>;
94
95    /// Returns `true` if the client is currently connected.
96    fn is_connected(&self) -> bool;
97
98    /// Returns `true` if the client is currently disconnected.
99    fn is_disconnected(&self) -> bool;
100
101    /// Subscribes to custom data types according to the command.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the subscribe operation fails.
106    fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
107        log_not_implemented(&cmd);
108        Ok(())
109    }
110
111    /// Subscribes to instruments list for the specified venue.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the subscribe operation fails.
116    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
117        log_not_implemented(&cmd);
118        Ok(())
119    }
120
121    /// Subscribes to data for a single instrument.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the subscribe operation fails.
126    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
127        log_not_implemented(&cmd);
128        Ok(())
129    }
130
131    /// Subscribes to order book delta updates for the specified instrument.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the subscribe operation fails.
136    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
137        log_not_implemented(&cmd);
138        Ok(())
139    }
140
141    /// Subscribes to top 10 order book depth updates for the specified instrument.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the subscribe operation fails.
146    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
147        log_not_implemented(&cmd);
148        Ok(())
149    }
150
151    /// Subscribes to periodic order book snapshots for the specified instrument.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the subscribe operation fails.
156    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
157        log_not_implemented(&cmd);
158        Ok(())
159    }
160
161    /// Subscribes to quote updates for the specified instrument.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the subscribe operation fails.
166    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
167        log_not_implemented(&cmd);
168        Ok(())
169    }
170
171    /// Subscribes to trade updates for the specified instrument.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the subscribe operation fails.
176    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
177        log_not_implemented(&cmd);
178        Ok(())
179    }
180
181    /// Subscribes to mark price updates for the specified instrument.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if the subscribe operation fails.
186    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
187        log_not_implemented(&cmd);
188        Ok(())
189    }
190
191    /// Subscribes to index price updates for the specified instrument.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if the subscribe operation fails.
196    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
197        log_not_implemented(&cmd);
198        Ok(())
199    }
200
201    /// Subscribes to bar updates of the specified bar type.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the subscribe operation fails.
206    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
207        log_not_implemented(&cmd);
208        Ok(())
209    }
210
211    /// Subscribes to status updates for the specified instrument.
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if the subscribe operation fails.
216    fn subscribe_instrument_status(
217        &mut self,
218        cmd: &SubscribeInstrumentStatus,
219    ) -> anyhow::Result<()> {
220        log_not_implemented(&cmd);
221        Ok(())
222    }
223
224    /// Subscribes to instrument close events for the specified instrument.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if the subscription operation fails.
229    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
230        log_not_implemented(&cmd);
231        Ok(())
232    }
233
234    /// Unsubscribes from custom data types according to the command.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the unsubscribe operation fails.
239    fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
240        log_not_implemented(&cmd);
241        Ok(())
242    }
243
244    /// Unsubscribes from instruments list for the specified venue.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the unsubscribe operation fails.
249    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
250        log_not_implemented(&cmd);
251        Ok(())
252    }
253
254    /// Unsubscribes from data for the specified instrument.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the unsubscribe operation fails.
259    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
260        log_not_implemented(&cmd);
261        Ok(())
262    }
263
264    /// Unsubscribes from order book delta updates for the specified instrument.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the unsubscribe operation fails.
269    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
270        log_not_implemented(&cmd);
271        Ok(())
272    }
273
274    /// Unsubscribes from top 10 order book depth updates for the specified instrument.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the unsubscribe operation fails.
279    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
280        log_not_implemented(&cmd);
281        Ok(())
282    }
283
284    /// Unsubscribes from periodic order book snapshots for the specified instrument.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if the unsubscribe operation fails.
289    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
290        log_not_implemented(&cmd);
291        Ok(())
292    }
293
294    /// Unsubscribes from quote updates for the specified instrument.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if the unsubscribe operation fails.
299    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
300        log_not_implemented(&cmd);
301        Ok(())
302    }
303
304    /// Unsubscribes from trade updates for the specified instrument.
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if the unsubscribe operation fails.
309    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
310        log_not_implemented(&cmd);
311        Ok(())
312    }
313
314    /// Unsubscribes from mark price updates for the specified instrument.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if the unsubscribe operation fails.
319    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
320        log_not_implemented(&cmd);
321        Ok(())
322    }
323
324    /// Unsubscribes from index price updates for the specified instrument.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the unsubscribe operation fails.
329    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
330        log_not_implemented(&cmd);
331        Ok(())
332    }
333
334    /// Unsubscribes from bar updates of the specified bar type.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the unsubscribe operation fails.
339    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
340        log_not_implemented(&cmd);
341        Ok(())
342    }
343
344    /// Unsubscribes from instrument status updates for the specified instrument.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the unsubscribe operation fails.
349    fn unsubscribe_instrument_status(
350        &mut self,
351        cmd: &UnsubscribeInstrumentStatus,
352    ) -> anyhow::Result<()> {
353        log_not_implemented(&cmd);
354        Ok(())
355    }
356
357    /// Unsubscribes from instrument close events for the specified instrument.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the unsubscribe operation fails.
362    fn unsubscribe_instrument_close(
363        &mut self,
364        cmd: &UnsubscribeInstrumentClose,
365    ) -> anyhow::Result<()> {
366        log_not_implemented(&cmd);
367        Ok(())
368    }
369
370    /// Sends a custom data request to the provider.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the data request fails.
375    fn request_data(&self, request: &RequestCustomData) -> anyhow::Result<()> {
376        log_not_implemented(&request);
377        Ok(())
378    }
379
380    /// Requests a list of instruments from the provider for a given venue.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the instruments request fails.
385    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
386        log_not_implemented(&request);
387        Ok(())
388    }
389
390    /// Requests detailed data for a single instrument.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the instrument request fails.
395    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
396        log_not_implemented(&request);
397        Ok(())
398    }
399
400    /// Requests a snapshot of the order book for a specified instrument.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the book snapshot request fails.
405    fn request_book_snapshot(&self, request: &RequestBookSnapshot) -> anyhow::Result<()> {
406        log_not_implemented(&request);
407        Ok(())
408    }
409
410    /// Requests historical or streaming quote data for a specified instrument.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the quotes request fails.
415    fn request_quotes(&self, request: &RequestQuotes) -> anyhow::Result<()> {
416        log_not_implemented(&request);
417        Ok(())
418    }
419
420    /// Requests historical or streaming trade data for a specified instrument.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if the trades request fails.
425    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
426        log_not_implemented(&request);
427        Ok(())
428    }
429
430    /// Requests historical or streaming bar data for a specified instrument and bar type.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if the bars request fails.
435    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
436        log_not_implemented(&request);
437        Ok(())
438    }
439}
440
441/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
442pub struct DataClientAdapter {
443    client: Box<dyn DataClient>,
444    pub client_id: ClientId,
445    pub venue: Option<Venue>,
446    pub handles_book_deltas: bool,
447    pub handles_book_snapshots: bool,
448    pub subscriptions_custom: AHashSet<DataType>,
449    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
450    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
451    pub subscriptions_book_snapshots: AHashSet<InstrumentId>,
452    pub subscriptions_quotes: AHashSet<InstrumentId>,
453    pub subscriptions_trades: AHashSet<InstrumentId>,
454    pub subscriptions_bars: AHashSet<BarType>,
455    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
456    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
457    pub subscriptions_instrument: AHashSet<InstrumentId>,
458    pub subscriptions_instrument_venue: AHashSet<Venue>,
459    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
460    pub subscriptions_index_prices: AHashSet<InstrumentId>,
461}
462
463impl Deref for DataClientAdapter {
464    type Target = Box<dyn DataClient>;
465
466    fn deref(&self) -> &Self::Target {
467        &self.client
468    }
469}
470
471impl DerefMut for DataClientAdapter {
472    fn deref_mut(&mut self) -> &mut Self::Target {
473        &mut self.client
474    }
475}
476
477impl Debug for DataClientAdapter {
478    #[rustfmt::skip]
479    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
480        f.debug_struct(stringify!(DataClientAdapter))
481            .field("client_id", &self.client_id)
482            .field("venue", &self.venue)
483            .field("handles_book_deltas", &self.handles_book_deltas)
484            .field("handles_book_snapshots", &self.handles_book_snapshots)
485            .field("subscriptions_custom", &self.subscriptions_custom)
486            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
487            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
488            .field("subscriptions_book_snapshot", &self.subscriptions_book_snapshots)
489            .field("subscriptions_quotes", &self.subscriptions_quotes)
490            .field("subscriptions_trades", &self.subscriptions_trades)
491            .field("subscriptions_bars", &self.subscriptions_bars)
492            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
493            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
494            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
495            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
496            .field("subscriptions_instrument", &self.subscriptions_instrument)
497            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
498            .finish()
499    }
500}
501
502impl DataClientAdapter {
503    /// Creates a new [`DataClientAdapter`] with the given client and clock.
504    #[must_use]
505    pub fn new(
506        client_id: ClientId,
507        venue: Option<Venue>,
508        handles_order_book_deltas: bool,
509        handles_order_book_snapshots: bool,
510        client: Box<dyn DataClient>,
511    ) -> Self {
512        Self {
513            client,
514            client_id,
515            venue,
516            handles_book_deltas: handles_order_book_deltas,
517            handles_book_snapshots: handles_order_book_snapshots,
518            subscriptions_custom: AHashSet::new(),
519            subscriptions_book_deltas: AHashSet::new(),
520            subscriptions_book_depth10: AHashSet::new(),
521            subscriptions_book_snapshots: AHashSet::new(),
522            subscriptions_quotes: AHashSet::new(),
523            subscriptions_trades: AHashSet::new(),
524            subscriptions_mark_prices: AHashSet::new(),
525            subscriptions_index_prices: AHashSet::new(),
526            subscriptions_bars: AHashSet::new(),
527            subscriptions_instrument_status: AHashSet::new(),
528            subscriptions_instrument_close: AHashSet::new(),
529            subscriptions_instrument: AHashSet::new(),
530            subscriptions_instrument_venue: AHashSet::new(),
531        }
532    }
533
534    #[allow(clippy::borrowed_box)]
535    #[must_use]
536    pub fn get_client(&self) -> &Box<dyn DataClient> {
537        &self.client
538    }
539
540    #[inline]
541    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
542        if let Err(e) = match cmd {
543            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
544            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
545            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
546            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
547            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
548            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd),
549            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
550            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
551            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
552            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
553            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
554            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
555            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
556        } {
557            log_command_error(&cmd, &e);
558        }
559    }
560
561    #[inline]
562    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
563        if let Err(e) = match cmd {
564            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
565            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
566            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
567            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
568            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
569            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd),
570            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
571            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
572            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
573            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
574            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
575            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
576            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
577        } {
578            log_command_error(&cmd, &e);
579        }
580    }
581
582    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
583
584    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the underlying client subscribe operation fails.
589    pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
590        if !self.subscriptions_custom.contains(&cmd.data_type) {
591            self.subscriptions_custom.insert(cmd.data_type.clone());
592            self.client.subscribe(cmd)?;
593        }
594        Ok(())
595    }
596
597    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the underlying client unsubscribe operation fails.
602    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
603        if self.subscriptions_custom.contains(&cmd.data_type) {
604            self.subscriptions_custom.remove(&cmd.data_type);
605            self.client.unsubscribe(cmd)?;
606        }
607        Ok(())
608    }
609
610    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
611    ///
612    /// # Errors
613    ///
614    /// Returns an error if the underlying client subscribe operation fails.
615    fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
616        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
617            self.subscriptions_instrument_venue.insert(cmd.venue);
618            self.client.subscribe_instruments(cmd)?;
619        }
620
621        Ok(())
622    }
623
624    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the underlying client unsubscribe operation fails.
629    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
630        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
631            self.subscriptions_instrument_venue.remove(&cmd.venue);
632            self.client.unsubscribe_instruments(cmd)?;
633        }
634
635        Ok(())
636    }
637
638    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if the underlying client subscribe operation fails.
643    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
644        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
645            self.subscriptions_instrument.insert(cmd.instrument_id);
646            self.client.subscribe_instrument(cmd)?;
647        }
648
649        Ok(())
650    }
651
652    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
653    ///
654    /// # Errors
655    ///
656    /// Returns an error if the underlying client unsubscribe operation fails.
657    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
658        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
659            self.subscriptions_instrument.remove(&cmd.instrument_id);
660            self.client.unsubscribe_instrument(cmd)?;
661        }
662
663        Ok(())
664    }
665
666    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
667    ///
668    /// # Errors
669    ///
670    /// Returns an error if the underlying client subscribe operation fails.
671    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
672        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
673            self.subscriptions_book_deltas.insert(cmd.instrument_id);
674            self.client.subscribe_book_deltas(cmd)?;
675        }
676
677        Ok(())
678    }
679
680    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
681    ///
682    /// # Errors
683    ///
684    /// Returns an error if the underlying client unsubscribe operation fails.
685    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
686        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
687            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
688            self.client.unsubscribe_book_deltas(cmd)?;
689        }
690
691        Ok(())
692    }
693
694    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
695    ///
696    /// # Errors
697    ///
698    /// Returns an error if the underlying client subscribe operation fails.
699    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
700        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
701            self.subscriptions_book_depth10.insert(cmd.instrument_id);
702            self.client.subscribe_book_depth10(cmd)?;
703        }
704
705        Ok(())
706    }
707
708    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if the underlying client unsubscribe operation fails.
713    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
714        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
715            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
716            self.client.unsubscribe_book_depth10(cmd)?;
717        }
718
719        Ok(())
720    }
721
722    /// Subscribes to book snapshots for an instrument, updating internal state and forwarding to the client.
723    ///
724    /// # Errors
725    ///
726    /// Returns an error if the underlying client subscribe operation fails.
727    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
728        if !self
729            .subscriptions_book_snapshots
730            .contains(&cmd.instrument_id)
731        {
732            self.subscriptions_book_snapshots.insert(cmd.instrument_id);
733            self.client.subscribe_book_snapshots(cmd)?;
734        }
735
736        Ok(())
737    }
738
739    /// Unsubscribes from book snapshots for an instrument, updating internal state and forwarding to the client.
740    ///
741    /// # Errors
742    ///
743    /// Returns an error if the underlying client unsubscribe operation fails.
744    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
745        if self
746            .subscriptions_book_snapshots
747            .contains(&cmd.instrument_id)
748        {
749            self.subscriptions_book_snapshots.remove(&cmd.instrument_id);
750            self.client.unsubscribe_book_snapshots(cmd)?;
751        }
752
753        Ok(())
754    }
755
756    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the underlying client subscribe operation fails.
761    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
762        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
763            self.subscriptions_quotes.insert(cmd.instrument_id);
764            self.client.subscribe_quotes(cmd)?;
765        }
766        Ok(())
767    }
768
769    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
770    ///
771    /// # Errors
772    ///
773    /// Returns an error if the underlying client unsubscribe operation fails.
774    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
775        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
776            self.subscriptions_quotes.remove(&cmd.instrument_id);
777            self.client.unsubscribe_quotes(cmd)?;
778        }
779        Ok(())
780    }
781
782    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
783    ///
784    /// # Errors
785    ///
786    /// Returns an error if the underlying client subscribe operation fails.
787    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
788        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
789            self.subscriptions_trades.insert(cmd.instrument_id);
790            self.client.subscribe_trades(cmd)?;
791        }
792        Ok(())
793    }
794
795    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
796    ///
797    /// # Errors
798    ///
799    /// Returns an error if the underlying client unsubscribe operation fails.
800    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
801        if self.subscriptions_trades.contains(&cmd.instrument_id) {
802            self.subscriptions_trades.remove(&cmd.instrument_id);
803            self.client.unsubscribe_trades(cmd)?;
804        }
805        Ok(())
806    }
807
808    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
809    ///
810    /// # Errors
811    ///
812    /// Returns an error if the underlying client subscribe operation fails.
813    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
814        if !self.subscriptions_bars.contains(&cmd.bar_type) {
815            self.subscriptions_bars.insert(cmd.bar_type);
816            self.client.subscribe_bars(cmd)?;
817        }
818        Ok(())
819    }
820
821    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if the underlying client unsubscribe operation fails.
826    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
827        if self.subscriptions_bars.contains(&cmd.bar_type) {
828            self.subscriptions_bars.remove(&cmd.bar_type);
829            self.client.unsubscribe_bars(cmd)?;
830        }
831        Ok(())
832    }
833
834    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
835    ///
836    /// # Errors
837    ///
838    /// Returns an error if the underlying client subscribe operation fails.
839    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
840        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
841            self.subscriptions_mark_prices.insert(cmd.instrument_id);
842            self.client.subscribe_mark_prices(cmd)?;
843        }
844        Ok(())
845    }
846
847    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
848    ///
849    /// # Errors
850    ///
851    /// Returns an error if the underlying client unsubscribe operation fails.
852    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
853        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
854            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
855            self.client.unsubscribe_mark_prices(cmd)?;
856        }
857        Ok(())
858    }
859
860    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
861    ///
862    /// # Errors
863    ///
864    /// Returns an error if the underlying client subscribe operation fails.
865    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
866        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
867            self.subscriptions_index_prices.insert(cmd.instrument_id);
868            self.client.subscribe_index_prices(cmd)?;
869        }
870        Ok(())
871    }
872
873    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
874    ///
875    /// # Errors
876    ///
877    /// Returns an error if the underlying client unsubscribe operation fails.
878    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
879        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
880            self.subscriptions_index_prices.remove(&cmd.instrument_id);
881            self.client.unsubscribe_index_prices(cmd)?;
882        }
883        Ok(())
884    }
885
886    /// Subscribes to instrument status updates for the specified instrument.
887    ///
888    /// # Errors
889    ///
890    /// Returns an error if the underlying client subscribe operation fails.
891    fn subscribe_instrument_status(
892        &mut self,
893        cmd: &SubscribeInstrumentStatus,
894    ) -> anyhow::Result<()> {
895        if !self
896            .subscriptions_instrument_status
897            .contains(&cmd.instrument_id)
898        {
899            self.subscriptions_instrument_status
900                .insert(cmd.instrument_id);
901            self.client.subscribe_instrument_status(cmd)?;
902        }
903        Ok(())
904    }
905
906    /// Unsubscribes from instrument status updates for the specified instrument.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if the underlying client unsubscribe operation fails.
911    fn unsubscribe_instrument_status(
912        &mut self,
913        cmd: &UnsubscribeInstrumentStatus,
914    ) -> anyhow::Result<()> {
915        if self
916            .subscriptions_instrument_status
917            .contains(&cmd.instrument_id)
918        {
919            self.subscriptions_instrument_status
920                .remove(&cmd.instrument_id);
921            self.client.unsubscribe_instrument_status(cmd)?;
922        }
923        Ok(())
924    }
925
926    /// Subscribes to instrument close events for the specified instrument.
927    ///
928    /// # Errors
929    ///
930    /// Returns an error if the underlying client subscribe operation fails.
931    fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
932        if !self
933            .subscriptions_instrument_close
934            .contains(&cmd.instrument_id)
935        {
936            self.subscriptions_instrument_close
937                .insert(cmd.instrument_id);
938            self.client.subscribe_instrument_close(cmd)?;
939        }
940        Ok(())
941    }
942
943    /// Unsubscribes from instrument close events for the specified instrument.
944    ///
945    /// # Errors
946    ///
947    /// Returns an error if the underlying client unsubscribe operation fails.
948    fn unsubscribe_instrument_close(
949        &mut self,
950        cmd: &UnsubscribeInstrumentClose,
951    ) -> anyhow::Result<()> {
952        if self
953            .subscriptions_instrument_close
954            .contains(&cmd.instrument_id)
955        {
956            self.subscriptions_instrument_close
957                .remove(&cmd.instrument_id);
958            self.client.unsubscribe_instrument_close(cmd)?;
959        }
960        Ok(())
961    }
962
963    // -- REQUEST HANDLERS ------------------------------------------------------------------------
964
965    /// Sends a data request to the underlying client.
966    ///
967    /// # Errors
968    ///
969    /// Returns an error if the client request fails.
970    pub fn request_data(&self, req: &RequestCustomData) -> anyhow::Result<()> {
971        self.client.request_data(req)
972    }
973
974    /// Sends a single instrument request to the client.
975    ///
976    /// # Errors
977    ///
978    /// Returns an error if the client fails to process the request.
979    pub fn request_instrument(&self, req: &RequestInstrument) -> anyhow::Result<()> {
980        self.client.request_instrument(req)
981    }
982
983    /// Sends a batch instruments request to the client.
984    ///
985    /// # Errors
986    ///
987    /// Returns an error if the client fails to process the request.
988    pub fn request_instruments(&self, req: &RequestInstruments) -> anyhow::Result<()> {
989        self.client.request_instruments(req)
990    }
991
992    /// Sends a quotes request for a given instrument.
993    ///
994    /// # Errors
995    ///
996    /// Returns an error if the client fails to process the quotes request.
997    pub fn request_quotes(&self, req: &RequestQuotes) -> anyhow::Result<()> {
998        self.client.request_quotes(req)
999    }
1000
1001    /// Sends a trades request for a given instrument.
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns an error if the client fails to process the trades request.
1006    pub fn request_trades(&self, req: &RequestTrades) -> anyhow::Result<()> {
1007        self.client.request_trades(req)
1008    }
1009
1010    /// Sends a bars request for a given instrument and bar type.
1011    ///
1012    /// # Errors
1013    ///
1014    /// Returns an error if the client fails to process the bars request.
1015    pub fn request_bars(&self, req: &RequestBars) -> anyhow::Result<()> {
1016        self.client.request_bars(req)
1017    }
1018}
1019
1020#[inline(always)]
1021fn log_not_implemented<T: Debug>(msg: &T) {
1022    log::warn!("{msg:?} – handler not implemented");
1023}
1024
1025#[inline(always)]
1026fn log_command_error<C: Debug, E: Display>(cmd: &C, e: &E) {
1027    log::error!("Error on {cmd:?}: {e}");
1028}