use std::{
fmt::Debug,
ops::{Deref, DerefMut},
};
use ahash::AHashSet;
use nautilus_common::{
clients::{DataClient, log_command_error},
messages::data::{
RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData,
RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
SubscribeCommand, SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices,
SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
SubscribeInstruments, SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes,
SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes,
UnsubscribeTrades,
},
};
#[cfg(feature = "defi")]
use nautilus_model::defi::Blockchain;
use nautilus_model::{
data::{BarType, DataType},
identifiers::{ClientId, InstrumentId, Venue},
};
#[cfg(feature = "defi")]
#[allow(unused_imports)] use crate::defi::client as _;
pub struct DataClientAdapter {
pub(crate) client: Box<dyn DataClient>,
pub client_id: ClientId,
pub venue: Option<Venue>,
pub handles_book_deltas: bool,
pub handles_book_snapshots: bool,
pub subscriptions_custom: AHashSet<DataType>,
pub subscriptions_book_deltas: AHashSet<InstrumentId>,
pub subscriptions_book_depth10: AHashSet<InstrumentId>,
pub subscriptions_quotes: AHashSet<InstrumentId>,
pub subscriptions_trades: AHashSet<InstrumentId>,
pub subscriptions_bars: AHashSet<BarType>,
pub subscriptions_instrument_status: AHashSet<InstrumentId>,
pub subscriptions_instrument_close: AHashSet<InstrumentId>,
pub subscriptions_instrument: AHashSet<InstrumentId>,
pub subscriptions_instrument_venue: AHashSet<Venue>,
pub subscriptions_mark_prices: AHashSet<InstrumentId>,
pub subscriptions_index_prices: AHashSet<InstrumentId>,
pub subscriptions_funding_rates: AHashSet<InstrumentId>,
pub subscriptions_option_greeks: AHashSet<InstrumentId>,
#[cfg(feature = "defi")]
pub subscriptions_blocks: AHashSet<Blockchain>,
#[cfg(feature = "defi")]
pub subscriptions_pools: AHashSet<InstrumentId>,
#[cfg(feature = "defi")]
pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
#[cfg(feature = "defi")]
pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
#[cfg(feature = "defi")]
pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
#[cfg(feature = "defi")]
pub subscriptions_pool_flash: AHashSet<InstrumentId>,
}
impl Deref for DataClientAdapter {
type Target = Box<dyn DataClient>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl DerefMut for DataClientAdapter {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
impl Debug for DataClientAdapter {
#[rustfmt::skip]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(DataClientAdapter))
.field("client_id", &self.client_id)
.field("venue", &self.venue)
.field("handles_book_deltas", &self.handles_book_deltas)
.field("handles_book_snapshots", &self.handles_book_snapshots)
.field("subscriptions_custom", &self.subscriptions_custom)
.field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
.field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
.field("subscriptions_quotes", &self.subscriptions_quotes)
.field("subscriptions_trades", &self.subscriptions_trades)
.field("subscriptions_bars", &self.subscriptions_bars)
.field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
.field("subscriptions_index_prices", &self.subscriptions_index_prices)
.field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
.field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
.field("subscriptions_instrument", &self.subscriptions_instrument)
.field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
.finish()
}
}
impl DataClientAdapter {
#[must_use]
pub fn new(
client_id: ClientId,
venue: Option<Venue>,
handles_order_book_deltas: bool,
handles_order_book_snapshots: bool,
client: Box<dyn DataClient>,
) -> Self {
Self {
client,
client_id,
venue,
handles_book_deltas: handles_order_book_deltas,
handles_book_snapshots: handles_order_book_snapshots,
subscriptions_custom: AHashSet::new(),
subscriptions_book_deltas: AHashSet::new(),
subscriptions_book_depth10: AHashSet::new(),
subscriptions_quotes: AHashSet::new(),
subscriptions_trades: AHashSet::new(),
subscriptions_mark_prices: AHashSet::new(),
subscriptions_index_prices: AHashSet::new(),
subscriptions_funding_rates: AHashSet::new(),
subscriptions_option_greeks: AHashSet::new(),
subscriptions_bars: AHashSet::new(),
subscriptions_instrument_status: AHashSet::new(),
subscriptions_instrument_close: AHashSet::new(),
subscriptions_instrument: AHashSet::new(),
subscriptions_instrument_venue: AHashSet::new(),
#[cfg(feature = "defi")]
subscriptions_blocks: AHashSet::new(),
#[cfg(feature = "defi")]
subscriptions_pools: AHashSet::new(),
#[cfg(feature = "defi")]
subscriptions_pool_swaps: AHashSet::new(),
#[cfg(feature = "defi")]
subscriptions_pool_liquidity_updates: AHashSet::new(),
#[cfg(feature = "defi")]
subscriptions_pool_fee_collects: AHashSet::new(),
#[cfg(feature = "defi")]
subscriptions_pool_flash: AHashSet::new(),
}
}
#[allow(clippy::borrowed_box)]
#[must_use]
pub fn get_client(&self) -> &Box<dyn DataClient> {
&self.client
}
pub async fn connect(&mut self) -> anyhow::Result<()> {
self.client.connect().await
}
pub async fn disconnect(&mut self) -> anyhow::Result<()> {
self.client.disconnect().await
}
#[inline]
pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) {
if let Err(e) = match cmd {
SubscribeCommand::Data(cmd) => self.subscribe(cmd),
SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
SubscribeCommand::BookSnapshots(_) => Ok(()), SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
SubscribeCommand::OptionGreeks(cmd) => self.subscribe_option_greeks(cmd),
SubscribeCommand::OptionChain(_) => Ok(()), } {
log_command_error(&cmd, &e);
}
}
#[inline]
pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
if let Err(e) = match cmd {
UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
UnsubscribeCommand::BookSnapshots(_) => Ok(()), UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
UnsubscribeCommand::OptionGreeks(cmd) => self.unsubscribe_option_greeks(cmd),
UnsubscribeCommand::OptionChain(_) => Ok(()), } {
log_command_error(&cmd, &e);
}
}
pub fn subscribe(&mut self, cmd: &SubscribeCustomData) -> anyhow::Result<()> {
if !self.subscriptions_custom.contains(&cmd.data_type) {
self.subscriptions_custom.insert(cmd.data_type.clone());
self.client.subscribe(cmd)?;
}
Ok(())
}
pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
if self.subscriptions_custom.contains(&cmd.data_type) {
self.subscriptions_custom.remove(&cmd.data_type);
self.client.unsubscribe(cmd)?;
}
Ok(())
}
fn subscribe_instruments(&mut self, cmd: &SubscribeInstruments) -> anyhow::Result<()> {
if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
self.subscriptions_instrument_venue.insert(cmd.venue);
self.client.subscribe_instruments(cmd)?;
}
Ok(())
}
fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
if self.subscriptions_instrument_venue.contains(&cmd.venue) {
self.subscriptions_instrument_venue.remove(&cmd.venue);
self.client.unsubscribe_instruments(cmd)?;
}
Ok(())
}
fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
self.subscriptions_instrument.insert(cmd.instrument_id);
self.client.subscribe_instrument(cmd)?;
}
Ok(())
}
fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
if self.subscriptions_instrument.contains(&cmd.instrument_id) {
self.subscriptions_instrument.remove(&cmd.instrument_id);
self.client.unsubscribe_instrument(cmd)?;
}
Ok(())
}
fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
self.subscriptions_book_deltas.insert(cmd.instrument_id);
self.client.subscribe_book_deltas(cmd)?;
}
Ok(())
}
fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
self.subscriptions_book_deltas.remove(&cmd.instrument_id);
self.client.unsubscribe_book_deltas(cmd)?;
}
Ok(())
}
fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
self.subscriptions_book_depth10.insert(cmd.instrument_id);
self.client.subscribe_book_depth10(cmd)?;
}
Ok(())
}
fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
self.subscriptions_book_depth10.remove(&cmd.instrument_id);
self.client.unsubscribe_book_depth10(cmd)?;
}
Ok(())
}
fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
self.subscriptions_quotes.insert(cmd.instrument_id);
self.client.subscribe_quotes(cmd)?;
}
Ok(())
}
fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
if self.subscriptions_quotes.contains(&cmd.instrument_id) {
self.subscriptions_quotes.remove(&cmd.instrument_id);
self.client.unsubscribe_quotes(cmd)?;
}
Ok(())
}
fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
if !self.subscriptions_trades.contains(&cmd.instrument_id) {
self.subscriptions_trades.insert(cmd.instrument_id);
self.client.subscribe_trades(cmd)?;
}
Ok(())
}
fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
if self.subscriptions_trades.contains(&cmd.instrument_id) {
self.subscriptions_trades.remove(&cmd.instrument_id);
self.client.unsubscribe_trades(cmd)?;
}
Ok(())
}
fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
if !self.subscriptions_bars.contains(&cmd.bar_type) {
self.subscriptions_bars.insert(cmd.bar_type);
self.client.subscribe_bars(cmd)?;
}
Ok(())
}
fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
if self.subscriptions_bars.contains(&cmd.bar_type) {
self.subscriptions_bars.remove(&cmd.bar_type);
self.client.unsubscribe_bars(cmd)?;
}
Ok(())
}
fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
self.subscriptions_mark_prices.insert(cmd.instrument_id);
self.client.subscribe_mark_prices(cmd)?;
}
Ok(())
}
fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
self.subscriptions_mark_prices.remove(&cmd.instrument_id);
self.client.unsubscribe_mark_prices(cmd)?;
}
Ok(())
}
fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
self.subscriptions_index_prices.insert(cmd.instrument_id);
self.client.subscribe_index_prices(cmd)?;
}
Ok(())
}
fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
self.subscriptions_index_prices.remove(&cmd.instrument_id);
self.client.unsubscribe_index_prices(cmd)?;
}
Ok(())
}
fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
if !self
.subscriptions_funding_rates
.contains(&cmd.instrument_id)
{
self.subscriptions_funding_rates.insert(cmd.instrument_id);
self.client.subscribe_funding_rates(cmd)?;
}
Ok(())
}
fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
if self
.subscriptions_funding_rates
.contains(&cmd.instrument_id)
{
self.subscriptions_funding_rates.remove(&cmd.instrument_id);
self.client.unsubscribe_funding_rates(cmd)?;
}
Ok(())
}
fn subscribe_instrument_status(
&mut self,
cmd: &SubscribeInstrumentStatus,
) -> anyhow::Result<()> {
if !self
.subscriptions_instrument_status
.contains(&cmd.instrument_id)
{
self.subscriptions_instrument_status
.insert(cmd.instrument_id);
self.client.subscribe_instrument_status(cmd)?;
}
Ok(())
}
fn unsubscribe_instrument_status(
&mut self,
cmd: &UnsubscribeInstrumentStatus,
) -> anyhow::Result<()> {
if self
.subscriptions_instrument_status
.contains(&cmd.instrument_id)
{
self.subscriptions_instrument_status
.remove(&cmd.instrument_id);
self.client.unsubscribe_instrument_status(cmd)?;
}
Ok(())
}
fn subscribe_instrument_close(&mut self, cmd: &SubscribeInstrumentClose) -> anyhow::Result<()> {
if !self
.subscriptions_instrument_close
.contains(&cmd.instrument_id)
{
self.subscriptions_instrument_close
.insert(cmd.instrument_id);
self.client.subscribe_instrument_close(cmd)?;
}
Ok(())
}
fn unsubscribe_instrument_close(
&mut self,
cmd: &UnsubscribeInstrumentClose,
) -> anyhow::Result<()> {
if self
.subscriptions_instrument_close
.contains(&cmd.instrument_id)
{
self.subscriptions_instrument_close
.remove(&cmd.instrument_id);
self.client.unsubscribe_instrument_close(cmd)?;
}
Ok(())
}
fn subscribe_option_greeks(&mut self, cmd: &SubscribeOptionGreeks) -> anyhow::Result<()> {
if !self
.subscriptions_option_greeks
.contains(&cmd.instrument_id)
{
self.subscriptions_option_greeks.insert(cmd.instrument_id);
self.client.subscribe_option_greeks(cmd)?;
}
Ok(())
}
fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
if self
.subscriptions_option_greeks
.contains(&cmd.instrument_id)
{
self.subscriptions_option_greeks.remove(&cmd.instrument_id);
self.client.unsubscribe_option_greeks(cmd)?;
}
Ok(())
}
pub fn request_data(&self, req: RequestCustomData) -> anyhow::Result<()> {
self.client.request_data(req)
}
pub fn request_instrument(&self, req: RequestInstrument) -> anyhow::Result<()> {
self.client.request_instrument(req)
}
pub fn request_instruments(&self, req: RequestInstruments) -> anyhow::Result<()> {
self.client.request_instruments(req)
}
pub fn request_book_snapshot(&self, req: RequestBookSnapshot) -> anyhow::Result<()> {
self.client.request_book_snapshot(req)
}
pub fn request_quotes(&self, req: RequestQuotes) -> anyhow::Result<()> {
self.client.request_quotes(req)
}
pub fn request_trades(&self, req: RequestTrades) -> anyhow::Result<()> {
self.client.request_trades(req)
}
pub fn request_funding_rates(&self, req: RequestFundingRates) -> anyhow::Result<()> {
self.client.request_funding_rates(req)
}
pub fn request_forward_prices(&self, req: RequestForwardPrices) -> anyhow::Result<()> {
self.client.request_forward_prices(req)
}
pub fn request_bars(&self, req: RequestBars) -> anyhow::Result<()> {
self.client.request_bars(req)
}
pub fn request_book_depth(&self, req: RequestBookDepth) -> anyhow::Result<()> {
self.client.request_book_depth(req)
}
}