use nautilus_common::{
clients::log_command_error,
messages::defi::{
DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates,
UnsubscribePoolSwaps,
},
};
use crate::client::DataClientAdapter;
impl DataClientAdapter {
#[inline]
pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) {
if let Err(e) = match cmd {
DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
self.subscribe_pool_liquidity_updates(cmd)
}
DefiSubscribeCommand::PoolFeeCollects(cmd) => self.subscribe_pool_fee_collects(cmd),
DefiSubscribeCommand::PoolFlashEvents(cmd) => self.subscribe_pool_flash_events(cmd),
} {
log_command_error(&cmd, &e);
}
}
#[inline]
pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
if let Err(e) = match cmd {
DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
self.unsubscribe_pool_liquidity_updates(cmd)
}
DefiUnsubscribeCommand::PoolFeeCollects(cmd) => self.unsubscribe_pool_fee_collects(cmd),
DefiUnsubscribeCommand::PoolFlashEvents(cmd) => self.unsubscribe_pool_flash_events(cmd),
} {
log_command_error(&cmd, &e);
}
}
#[inline]
pub fn execute_defi_request(&self, cmd: DefiRequestCommand) -> anyhow::Result<()> {
match cmd {
DefiRequestCommand::PoolSnapshot(cmd) => self.request_pool_snapshot(cmd),
}
}
fn subscribe_blocks(&mut self, cmd: &SubscribeBlocks) -> anyhow::Result<()> {
if !self.subscriptions_blocks.contains(&cmd.chain) {
self.subscriptions_blocks.insert(cmd.chain);
self.client.subscribe_blocks(cmd)?;
}
Ok(())
}
fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
if self.subscriptions_blocks.contains(&cmd.chain) {
self.subscriptions_blocks.remove(&cmd.chain);
self.client.unsubscribe_blocks(cmd)?;
}
Ok(())
}
fn subscribe_pool(&mut self, cmd: &SubscribePool) -> anyhow::Result<()> {
if !self.subscriptions_pools.contains(&cmd.instrument_id) {
self.subscriptions_pools.insert(cmd.instrument_id);
self.client.subscribe_pool(cmd)?;
}
Ok(())
}
fn subscribe_pool_swaps(&mut self, cmd: &SubscribePoolSwaps) -> anyhow::Result<()> {
if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
self.subscriptions_pool_swaps.insert(cmd.instrument_id);
self.client.subscribe_pool_swaps(cmd)?;
}
Ok(())
}
fn subscribe_pool_liquidity_updates(
&mut self,
cmd: &SubscribePoolLiquidityUpdates,
) -> anyhow::Result<()> {
if !self
.subscriptions_pool_liquidity_updates
.contains(&cmd.instrument_id)
{
self.subscriptions_pool_liquidity_updates
.insert(cmd.instrument_id);
self.client.subscribe_pool_liquidity_updates(cmd)?;
}
Ok(())
}
fn subscribe_pool_fee_collects(
&mut self,
cmd: &SubscribePoolFeeCollects,
) -> anyhow::Result<()> {
if !self
.subscriptions_pool_fee_collects
.contains(&cmd.instrument_id)
{
self.subscriptions_pool_fee_collects
.insert(cmd.instrument_id);
self.client.subscribe_pool_fee_collects(cmd)?;
}
Ok(())
}
fn subscribe_pool_flash_events(
&mut self,
cmd: &SubscribePoolFlashEvents,
) -> anyhow::Result<()> {
if !self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
self.subscriptions_pool_flash.insert(cmd.instrument_id);
self.client.subscribe_pool_flash_events(cmd)?;
}
Ok(())
}
fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
if self.subscriptions_pools.contains(&cmd.instrument_id) {
self.subscriptions_pools.remove(&cmd.instrument_id);
self.client.unsubscribe_pool(cmd)?;
}
Ok(())
}
fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
self.client.unsubscribe_pool_swaps(cmd)?;
}
Ok(())
}
fn unsubscribe_pool_liquidity_updates(
&mut self,
cmd: &UnsubscribePoolLiquidityUpdates,
) -> anyhow::Result<()> {
if self
.subscriptions_pool_liquidity_updates
.contains(&cmd.instrument_id)
{
self.subscriptions_pool_liquidity_updates
.remove(&cmd.instrument_id);
self.client.unsubscribe_pool_liquidity_updates(cmd)?;
}
Ok(())
}
fn unsubscribe_pool_fee_collects(
&mut self,
cmd: &UnsubscribePoolFeeCollects,
) -> anyhow::Result<()> {
if self
.subscriptions_pool_fee_collects
.contains(&cmd.instrument_id)
{
self.subscriptions_pool_fee_collects
.remove(&cmd.instrument_id);
self.client.unsubscribe_pool_fee_collects(cmd)?;
}
Ok(())
}
fn unsubscribe_pool_flash_events(
&mut self,
cmd: &UnsubscribePoolFlashEvents,
) -> anyhow::Result<()> {
if self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
self.subscriptions_pool_flash.remove(&cmd.instrument_id);
self.client.unsubscribe_pool_flash_events(cmd)?;
}
Ok(())
}
pub fn request_pool_snapshot(&self, req: RequestPoolSnapshot) -> anyhow::Result<()> {
self.client.request_pool_snapshot(req)
}
}