use nautilus_core::{Params, UUID4};
use nautilus_model::{
defi::{Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap},
identifiers::{ClientId, InstrumentId},
};
use crate::{
actor::DataActorCore,
defi::{
DefiSubscribeCommand, DefiUnsubscribeCommand, SubscribeBlocks, SubscribePool,
SubscribePoolFeeCollects, SubscribePoolFlashEvents, SubscribePoolLiquidityUpdates,
SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool, UnsubscribePoolFeeCollects,
UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
switchboard::{
get_defi_blocks_topic, get_defi_collect_topic, get_defi_flash_topic,
get_defi_liquidity_topic, get_defi_pool_swaps_topic, get_defi_pool_topic,
},
},
messages::data::DataCommand,
msgbus::{MStr, Topic, TypedHandler},
};
impl DataActorCore {
pub fn subscribe_blocks(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<Block>,
chain: Blockchain,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_block_subscription(topic, handler);
let command = DefiSubscribeCommand::Blocks(SubscribeBlocks {
chain,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiSubscribe(command));
}
pub fn subscribe_pool(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<Pool>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_pool_subscription(topic, handler);
let command = DefiSubscribeCommand::Pool(SubscribePool {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiSubscribe(command));
}
pub fn subscribe_pool_swaps(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolSwap>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_pool_swap_subscription(topic, handler);
let command = DefiSubscribeCommand::PoolSwaps(SubscribePoolSwaps {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiSubscribe(command));
}
pub fn subscribe_pool_liquidity_updates(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolLiquidityUpdate>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_pool_liquidity_subscription(topic, handler);
let command = DefiSubscribeCommand::PoolLiquidityUpdates(SubscribePoolLiquidityUpdates {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiSubscribe(command));
}
pub fn subscribe_pool_fee_collects(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolFeeCollect>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_pool_collect_subscription(topic, handler);
let command = DefiSubscribeCommand::PoolFeeCollects(SubscribePoolFeeCollects {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiSubscribe(command));
}
pub fn subscribe_pool_flash_events(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolFlash>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_pool_flash_subscription(topic, handler);
let command = DefiSubscribeCommand::PoolFlashEvents(SubscribePoolFlashEvents {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiSubscribe(command));
}
pub fn unsubscribe_blocks(
&mut self,
chain: Blockchain,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_defi_blocks_topic(chain);
self.remove_block_subscription(topic);
let command = DefiUnsubscribeCommand::Blocks(UnsubscribeBlocks {
chain,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
}
pub fn unsubscribe_pool(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_defi_pool_topic(instrument_id);
self.remove_pool_subscription(topic);
let command = DefiUnsubscribeCommand::Pool(UnsubscribePool {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
}
pub fn unsubscribe_pool_swaps(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_defi_pool_swaps_topic(instrument_id);
self.remove_pool_swap_subscription(topic);
let command = DefiUnsubscribeCommand::PoolSwaps(UnsubscribePoolSwaps {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
}
pub fn unsubscribe_pool_liquidity_updates(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_defi_liquidity_topic(instrument_id);
self.remove_pool_liquidity_subscription(topic);
let command =
DefiUnsubscribeCommand::PoolLiquidityUpdates(UnsubscribePoolLiquidityUpdates {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
}
pub fn unsubscribe_pool_fee_collects(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_defi_collect_topic(instrument_id);
self.remove_pool_collect_subscription(topic);
let command = DefiUnsubscribeCommand::PoolFeeCollects(UnsubscribePoolFeeCollects {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
}
pub fn unsubscribe_pool_flash_events(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_defi_flash_topic(instrument_id);
self.remove_pool_flash_subscription(topic);
let command = DefiUnsubscribeCommand::PoolFlashEvents(UnsubscribePoolFlashEvents {
instrument_id,
client_id,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
params,
});
self.send_data_cmd(DataCommand::DefiUnsubscribe(command));
}
}