use dashmap::DashMap;
use std::sync::Arc;
use crate::connector_manager::AnyConnector;
use crate::core::traits::MarketData;
use crate::core::types::{
AccountCapabilities, AccountType, ExchangeId, MarketDataCapabilities, TradingCapabilities,
};
#[derive(Clone)]
pub struct ConnectorPool {
connectors: Arc<DashMap<ExchangeId, Arc<AnyConnector>>>,
}
impl ConnectorPool {
pub fn new() -> Self {
Self {
connectors: Arc::new(DashMap::new()),
}
}
pub fn insert(&self, id: ExchangeId, connector: Arc<AnyConnector>) -> Option<Arc<AnyConnector>> {
self.connectors.insert(id, connector)
}
pub fn get(&self, id: &ExchangeId) -> Option<Arc<AnyConnector>> {
self.connectors.get(id).map(|entry| entry.value().clone())
}
pub fn remove(&self, id: &ExchangeId) -> Option<Arc<AnyConnector>> {
self.connectors.remove(id).map(|(_, connector)| connector)
}
pub fn contains(&self, id: &ExchangeId) -> bool {
self.connectors.contains_key(id)
}
pub fn len(&self) -> usize {
self.connectors.len()
}
pub fn is_empty(&self) -> bool {
self.connectors.is_empty()
}
pub fn clear(&self) {
self.connectors.clear();
}
pub fn iter(&self) -> dashmap::iter::Iter<'_, ExchangeId, Arc<AnyConnector>> {
self.connectors.iter()
}
pub fn ids(&self) -> Vec<ExchangeId> {
self.connectors.iter().map(|entry| *entry.key()).collect()
}
pub fn market_data_capabilities(
&self,
id: &ExchangeId,
account_type: AccountType,
) -> Option<MarketDataCapabilities> {
self.connectors
.get(id)
.map(|c| c.market_data_capabilities(account_type))
}
pub fn trading_capabilities(
&self,
id: &ExchangeId,
account_type: AccountType,
) -> Option<TradingCapabilities> {
self.connectors
.get(id)
.map(|c| c.trading_capabilities(account_type))
}
pub fn account_capabilities(
&self,
id: &ExchangeId,
account_type: AccountType,
) -> Option<AccountCapabilities> {
self.connectors
.get(id)
.map(|c| c.account_capabilities(account_type))
}
}
impl Default for ConnectorPool {
fn default() -> Self {
Self::new()
}
}
pub struct ConnectorPoolBuilder {
pool: ConnectorPool,
}
impl ConnectorPoolBuilder {
pub fn new() -> Self {
Self {
pool: ConnectorPool::new(),
}
}
pub fn with_connector(self, id: ExchangeId, connector: Arc<AnyConnector>) -> Self {
self.pool.insert(id, connector);
self
}
pub fn build(self) -> ConnectorPool {
self.pool
}
}
impl Default for ConnectorPoolBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::l3::open::crypto::cex::okx::OkxConnector;
use std::thread;
fn create_mock_okx() -> Arc<AnyConnector> {
let rt = tokio::runtime::Runtime::new().unwrap();
let connector = rt.block_on(async {
OkxConnector::public(true).await.unwrap()
});
Arc::new(AnyConnector::OKX(Arc::new(connector)))
}
fn create_mock_okx_2() -> Arc<AnyConnector> {
let rt = tokio::runtime::Runtime::new().unwrap();
let connector = rt.block_on(async {
OkxConnector::public(false).await.unwrap()
});
Arc::new(AnyConnector::OKX(Arc::new(connector)))
}
#[test]
fn test_new_pool_is_empty() {
let pool = ConnectorPool::new();
assert!(pool.is_empty());
assert_eq!(pool.len(), 0);
}
#[test]
fn test_insert_and_get() {
let pool = ConnectorPool::new();
let connector = create_mock_okx();
let old = pool.insert(ExchangeId::Binance, connector.clone());
assert!(old.is_none());
assert!(!pool.is_empty());
assert_eq!(pool.len(), 1);
let retrieved = pool.get(&ExchangeId::Binance);
assert!(retrieved.is_some());
}
#[test]
fn test_insert_replace() {
let pool = ConnectorPool::new();
let connector1 = create_mock_okx();
let connector2 = create_mock_okx();
pool.insert(ExchangeId::Binance, connector1);
let old = pool.insert(ExchangeId::Binance, connector2);
assert!(old.is_some());
assert_eq!(pool.len(), 1); }
#[test]
fn test_get_nonexistent() {
let pool = ConnectorPool::new();
let result = pool.get(&ExchangeId::Binance);
assert!(result.is_none());
}
#[test]
fn test_contains() {
let pool = ConnectorPool::new();
let connector = create_mock_okx();
assert!(!pool.contains(&ExchangeId::Binance));
pool.insert(ExchangeId::Binance, connector);
assert!(pool.contains(&ExchangeId::Binance));
assert!(!pool.contains(&ExchangeId::KuCoin));
}
#[test]
fn test_remove() {
let pool = ConnectorPool::new();
let connector = create_mock_okx();
pool.insert(ExchangeId::Binance, connector);
assert_eq!(pool.len(), 1);
let removed = pool.remove(&ExchangeId::Binance);
assert!(removed.is_some());
assert_eq!(pool.len(), 0);
assert!(pool.is_empty());
let removed_again = pool.remove(&ExchangeId::Binance);
assert!(removed_again.is_none());
}
#[test]
fn test_clear() {
let pool = ConnectorPool::new();
pool.insert(ExchangeId::Binance, create_mock_okx());
pool.insert(ExchangeId::KuCoin, create_mock_okx_2());
assert_eq!(pool.len(), 2);
pool.clear();
assert!(pool.is_empty());
assert_eq!(pool.len(), 0);
}
#[test]
fn test_iter() {
let pool = ConnectorPool::new();
pool.insert(ExchangeId::Binance, create_mock_okx());
pool.insert(ExchangeId::KuCoin, create_mock_okx_2());
let mut count = 0;
for entry in pool.iter() {
count += 1;
assert!(
*entry.key() == ExchangeId::Binance || *entry.key() == ExchangeId::KuCoin
);
}
assert_eq!(count, 2);
}
#[test]
fn test_ids() {
let pool = ConnectorPool::new();
pool.insert(ExchangeId::Binance, create_mock_okx());
pool.insert(ExchangeId::KuCoin, create_mock_okx_2());
let ids = pool.ids();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&ExchangeId::Binance));
assert!(ids.contains(&ExchangeId::KuCoin));
}
#[test]
fn test_multiple_inserts() {
let pool = ConnectorPool::new();
for i in 0..10 {
let id = if i % 2 == 0 {
ExchangeId::Binance
} else {
ExchangeId::KuCoin
};
pool.insert(id, create_mock_okx());
}
assert_eq!(pool.len(), 2);
}
#[test]
fn test_builder_empty() {
let pool = ConnectorPoolBuilder::new().build();
assert!(pool.is_empty());
}
#[test]
fn test_builder_single_connector() {
let pool = ConnectorPoolBuilder::new()
.with_connector(ExchangeId::Binance, create_mock_okx())
.build();
assert_eq!(pool.len(), 1);
assert!(pool.contains(&ExchangeId::Binance));
}
#[test]
fn test_builder_multiple_connectors() {
let pool = ConnectorPoolBuilder::new()
.with_connector(ExchangeId::Binance, create_mock_okx())
.with_connector(ExchangeId::KuCoin, create_mock_okx_2())
.build();
assert_eq!(pool.len(), 2);
assert!(pool.contains(&ExchangeId::Binance));
assert!(pool.contains(&ExchangeId::KuCoin));
}
#[test]
fn test_builder_with_duplicates() {
let pool = ConnectorPoolBuilder::new()
.with_connector(ExchangeId::Binance, create_mock_okx())
.with_connector(ExchangeId::Binance, create_mock_okx()) .build();
assert_eq!(pool.len(), 1);
}
#[test]
fn test_concurrent_inserts() {
let pool = Arc::new(ConnectorPool::new());
let mut handles = vec![];
for i in 0..10 {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
let id = if i % 2 == 0 {
ExchangeId::Binance
} else {
ExchangeId::KuCoin
};
pool_clone.insert(id, create_mock_okx());
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(pool.len(), 2);
}
#[test]
fn test_concurrent_reads() {
let pool = Arc::new(ConnectorPool::new());
pool.insert(ExchangeId::Binance, create_mock_okx());
let mut handles = vec![];
for _ in 0..100 {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
let connector = pool_clone.get(&ExchangeId::Binance);
assert!(connector.is_some());
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_concurrent_mixed_operations() {
let pool = Arc::new(ConnectorPool::new());
pool.insert(ExchangeId::Binance, create_mock_okx());
let mut handles = vec![];
for i in 0..50 {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
match i % 3 {
0 => {
let _ = pool_clone.get(&ExchangeId::Binance);
}
1 => {
pool_clone.insert(ExchangeId::KuCoin, create_mock_okx_2());
}
2 => {
let _ = pool_clone.contains(&ExchangeId::Binance);
}
_ => unreachable!(),
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert!(pool.len() > 0);
}
#[test]
fn test_pool_clone() {
let pool1 = ConnectorPool::new();
pool1.insert(ExchangeId::Binance, create_mock_okx());
let pool2 = pool1.clone();
assert_eq!(pool1.len(), 1);
assert_eq!(pool2.len(), 1);
pool2.insert(ExchangeId::KuCoin, create_mock_okx_2());
assert_eq!(pool1.len(), 2);
assert_eq!(pool2.len(), 2);
}
#[test]
fn test_default_pool() {
let pool: ConnectorPool = Default::default();
assert!(pool.is_empty());
}
#[test]
fn test_default_builder() {
let pool = ConnectorPoolBuilder::default().build();
assert!(pool.is_empty());
}
}