use ahash::{AHashMap, AHashSet};
use alloy::primitives::{Address, keccak256};
use nautilus_core::hex;
use nautilus_model::defi::DexType;
#[derive(Debug)]
pub struct DefiDataSubscriptionManager {
pool_swap_event_encoded: AHashMap<DexType, String>,
pool_mint_event_encoded: AHashMap<DexType, String>,
pool_burn_event_encoded: AHashMap<DexType, String>,
pool_collect_event_encoded: AHashMap<DexType, String>,
pool_flash_event_encoded: AHashMap<DexType, String>,
subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
}
impl Default for DefiDataSubscriptionManager {
fn default() -> Self {
Self::new()
}
}
impl DefiDataSubscriptionManager {
#[must_use]
pub fn new() -> Self {
Self {
pool_swap_event_encoded: AHashMap::new(),
pool_burn_event_encoded: AHashMap::new(),
pool_mint_event_encoded: AHashMap::new(),
pool_collect_event_encoded: AHashMap::new(),
pool_flash_event_encoded: AHashMap::new(),
subscribed_pool_burns: AHashMap::new(),
subscribed_pool_mints: AHashMap::new(),
subscribed_pool_swaps: AHashMap::new(),
subscribed_pool_collects: AHashMap::new(),
subscribed_pool_flashes: AHashMap::new(),
}
}
#[must_use]
pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
let mut unique_addresses = AHashSet::new();
if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
unique_addresses.extend(addresses.iter().copied());
}
if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
unique_addresses.extend(addresses.iter().copied());
}
if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
unique_addresses.extend(addresses.iter().copied());
}
if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
unique_addresses.extend(addresses.iter().copied());
}
if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
unique_addresses.extend(addresses.iter().copied());
}
unique_addresses.into_iter().collect()
}
#[must_use]
pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
let mut result = Vec::new();
if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
result.push(swap_event_signature.clone());
}
if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
result.push(mint_event_signature.clone());
}
if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
result.push(burn_event_signature.clone());
}
if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
result.push(collect_event_signature.clone());
}
if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
result.push(flash_event_signature.clone());
}
result
}
#[must_use]
pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
self.pool_swap_event_encoded.get(dex).cloned()
}
#[must_use]
pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
self.pool_mint_event_encoded.get(dex).cloned()
}
#[must_use]
pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
self.pool_burn_event_encoded.get(dex).cloned()
}
#[must_use]
pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
self.pool_collect_event_encoded.get(dex).cloned()
}
fn normalize_topic(sig: &str) -> String {
let s = sig.trim();
if let Some(rest) = s.strip_prefix("0x")
&& rest.len() == 64
&& rest.chars().all(|c| c.is_ascii_hexdigit())
{
return format!("0x{}", rest.to_ascii_lowercase());
}
if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
return format!("0x{}", s.to_ascii_lowercase());
}
hex::encode_prefixed(keccak256(s.as_bytes()))
}
pub fn register_dex_for_subscriptions(
&mut self,
dex: DexType,
swap_event_signature: &str,
mint_event_signature: &str,
burn_event_signature: &str,
collect_event_signature: &str,
flash_event_signature: Option<&str>,
) {
self.subscribed_pool_swaps.insert(dex, AHashSet::new());
self.pool_swap_event_encoded
.insert(dex, Self::normalize_topic(swap_event_signature));
self.subscribed_pool_mints.insert(dex, AHashSet::new());
self.pool_mint_event_encoded
.insert(dex, Self::normalize_topic(mint_event_signature));
self.subscribed_pool_burns.insert(dex, AHashSet::new());
self.pool_burn_event_encoded
.insert(dex, Self::normalize_topic(burn_event_signature));
self.subscribed_pool_collects.insert(dex, AHashSet::new());
self.pool_collect_event_encoded
.insert(dex, Self::normalize_topic(collect_event_signature));
if let Some(flash_event_signature) = flash_event_signature {
self.subscribed_pool_flashes.insert(dex, AHashSet::new());
self.pool_flash_event_encoded
.insert(dex, Self::normalize_topic(flash_event_signature));
}
log::info!("Registered DEX for subscriptions: {dex:?}");
}
pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
pool_set.insert(address);
} else {
log::error!("DEX not registered for swap subscriptions: {dex:?}");
}
}
pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
pool_set.insert(address);
} else {
log::error!("DEX not registered for mint subscriptions: {dex:?}");
}
}
pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
pool_set.insert(address);
} else {
log::warn!("DEX not registered for burn subscriptions: {dex:?}");
}
}
pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
pool_set.remove(&address);
} else {
log::error!("DEX not registered for swap subscriptions: {dex:?}");
}
}
pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
pool_set.remove(&address);
} else {
log::error!("DEX not registered for mint subscriptions: {dex:?}");
}
}
pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
pool_set.remove(&address);
} else {
log::error!("DEX not registered for burn subscriptions: {dex:?}");
}
}
pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
pool_set.insert(address);
} else {
log::error!("DEX not registered for collect subscriptions: {dex:?}");
}
}
pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
pool_set.remove(&address);
} else {
log::error!("DEX not registered for collect subscriptions: {dex:?}");
}
}
pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
pool_set.insert(address);
} else {
log::error!("DEX not registered for flash subscriptions: {dex:?}");
}
}
pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
pool_set.remove(&address);
} else {
log::error!("DEX not registered for flash subscriptions: {dex:?}");
}
}
}
#[cfg(test)]
mod tests {
use alloy::primitives::address;
use rstest::{fixture, rstest};
use super::*;
#[fixture]
fn manager() -> DefiDataSubscriptionManager {
DefiDataSubscriptionManager::new()
}
#[fixture]
fn registered_manager() -> DefiDataSubscriptionManager {
let mut manager = DefiDataSubscriptionManager::new();
manager.register_dex_for_subscriptions(
DexType::UniswapV3,
"Swap(address,address,int256,int256,uint160,uint128,int24)",
"Mint(address,address,int24,int24,uint128,uint256,uint256)",
"Burn(address,int24,int24,uint128,uint256,uint256)",
"Collect(address,address,int24,int24,uint128,uint128)",
Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
);
manager
}
#[rstest]
fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
assert_eq!(
manager
.get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
.len(),
0
);
assert_eq!(
manager
.get_subscribed_dex_event_signatures(&DexType::UniswapV3)
.len(),
0
);
assert!(
manager
.get_dex_pool_swap_event_signature(&DexType::UniswapV3)
.is_none()
);
assert!(
manager
.get_dex_pool_mint_event_signature(&DexType::UniswapV3)
.is_none()
);
assert!(
manager
.get_dex_pool_burn_event_signature(&DexType::UniswapV3)
.is_none()
);
}
#[rstest]
fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
let signatures =
registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
assert_eq!(signatures.len(), 5);
assert!(
registered_manager
.get_dex_pool_swap_event_signature(&DexType::UniswapV3)
.is_some()
);
assert!(
registered_manager
.get_dex_pool_mint_event_signature(&DexType::UniswapV3)
.is_some()
);
assert!(
registered_manager
.get_dex_pool_burn_event_signature(&DexType::UniswapV3)
.is_some()
);
}
#[rstest]
fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
let pool_address = address!("1234567890123456789012345678901234567890");
registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
let addresses =
registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
assert_eq!(addresses.len(), 1);
assert_eq!(addresses[0], pool_address);
}
#[rstest]
fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
let pool_address = address!("1234567890123456789012345678901234567890");
manager.subscribe_swaps(DexType::UniswapV3, pool_address);
manager.subscribe_mints(DexType::UniswapV3, pool_address);
manager.subscribe_burns(DexType::UniswapV3, pool_address);
let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
assert_eq!(addresses.len(), 0);
}
#[rstest]
fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
let pool_address = address!("1234567890123456789012345678901234567890");
registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
assert_eq!(
registered_manager
.get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
.len(),
1
);
registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
assert_eq!(
registered_manager
.get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
.len(),
0
);
}
#[rstest]
fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
}
#[rstest]
fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
let pool_address = address!("1234567890123456789012345678901234567890");
registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
let addresses =
registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
assert_eq!(addresses.len(), 1);
}
#[rstest]
fn test_get_combined_addresses_from_all_events(
mut registered_manager: DefiDataSubscriptionManager,
) {
let pool1 = address!("1111111111111111111111111111111111111111");
let pool2 = address!("2222222222222222222222222222222222222222");
let pool3 = address!("3333333333333333333333333333333333333333");
registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
let addresses =
registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
assert_eq!(addresses.len(), 3);
assert!(addresses.contains(&pool1));
assert!(addresses.contains(&pool2));
assert!(addresses.contains(&pool3));
}
#[rstest]
fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
let swap_sig = registered_manager
.get_dex_pool_swap_event_signature(&DexType::UniswapV3)
.unwrap();
assert!(swap_sig.starts_with("0x"));
assert_eq!(swap_sig.len(), 66);
let hex_part = &swap_sig[2..];
assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
}
#[rstest]
#[case(DexType::UniswapV3)]
#[case(DexType::UniswapV2)]
fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
let mut manager = DefiDataSubscriptionManager::new();
let pool1 = address!("1111111111111111111111111111111111111111");
let pool2 = address!("2222222222222222222222222222222222222222");
manager.register_dex_for_subscriptions(
dex_type,
"Swap(address,uint256,uint256)",
"Mint(address,uint256)",
"Burn(address,uint256)",
"Collect(address,uint256,uint256)",
Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
);
manager.subscribe_swaps(dex_type, pool1);
manager.subscribe_swaps(dex_type, pool2);
manager.subscribe_mints(dex_type, pool1);
manager.subscribe_burns(dex_type, pool2);
let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
assert_eq!(addresses.len(), 2);
assert!(addresses.contains(&pool1));
assert!(addresses.contains(&pool2));
let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
assert_eq!(signatures.len(), 5);
manager.unsubscribe_swaps(dex_type, pool1);
manager.unsubscribe_burns(dex_type, pool2);
let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
assert!(remaining.contains(&pool1)); assert!(remaining.contains(&pool2)); }
#[rstest]
fn test_register_with_raw_signatures() {
let mut manager = DefiDataSubscriptionManager::new();
manager.register_dex_for_subscriptions(
DexType::UniswapV3,
"Swap(address,address,int256,int256,uint160,uint128,int24)",
"Mint(address,address,int24,int24,uint128,uint256,uint256)",
"Burn(address,int24,int24,uint128,uint256,uint256)",
"Collect(address,address,int24,int24,uint128,uint128)",
Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
);
let swap_sig = manager
.get_dex_pool_swap_event_signature(&DexType::UniswapV3)
.unwrap();
let mint_sig = manager
.get_dex_pool_mint_event_signature(&DexType::UniswapV3)
.unwrap();
let burn_sig = manager
.get_dex_pool_burn_event_signature(&DexType::UniswapV3)
.unwrap();
assert_eq!(
swap_sig,
"0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
);
assert_eq!(
mint_sig,
"0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
);
assert_eq!(
burn_sig,
"0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
);
}
#[rstest]
fn test_register_with_pre_encoded_signatures() {
let mut manager = DefiDataSubscriptionManager::new();
manager.register_dex_for_subscriptions(
DexType::UniswapV3,
"Swap(address,address,int256,int256,uint160,uint128,int24)",
"Mint(address,address,int24,int24,uint128,uint256,uint256)",
"Burn(address,int24,int24,uint128,uint256,uint256)",
"Collect(address,address,int24,int24,uint128,uint128)",
Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
);
let swap_sig = manager
.get_dex_pool_swap_event_signature(&DexType::UniswapV3)
.unwrap();
let mint_sig = manager
.get_dex_pool_mint_event_signature(&DexType::UniswapV3)
.unwrap();
let burn_sig = manager
.get_dex_pool_burn_event_signature(&DexType::UniswapV3)
.unwrap();
assert_eq!(
swap_sig,
"0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
);
assert_eq!(
mint_sig,
"0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
);
assert_eq!(
burn_sig,
"0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
);
}
#[rstest]
fn test_register_with_pre_encoded_signatures_no_prefix() {
let mut manager = DefiDataSubscriptionManager::new();
manager.register_dex_for_subscriptions(
DexType::UniswapV3,
"Swap(address,address,int256,int256,uint160,uint128,int24)",
"Mint(address,address,int24,int24,uint128,uint256,uint256)",
"Burn(address,int24,int24,uint128,uint256,uint256)",
"Collect(address,address,int24,int24,uint128,uint128)",
Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
);
let swap_sig = manager
.get_dex_pool_swap_event_signature(&DexType::UniswapV3)
.unwrap();
let mint_sig = manager
.get_dex_pool_mint_event_signature(&DexType::UniswapV3)
.unwrap();
let burn_sig = manager
.get_dex_pool_burn_event_signature(&DexType::UniswapV3)
.unwrap();
assert_eq!(
swap_sig,
"0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
);
assert_eq!(
mint_sig,
"0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
);
assert_eq!(
burn_sig,
"0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
);
}
}