use dashmap::DashMap;
use std::sync::Arc;
use crate::core::traits::CoreConnector;
use crate::core::types::ExchangeId;
#[derive(Clone)]
pub(crate) struct ConnectorPool {
connectors: Arc<DashMap<ExchangeId, Arc<dyn CoreConnector>>>,
}
impl ConnectorPool {
pub(crate) fn new() -> Self {
Self {
connectors: Arc::new(DashMap::new()),
}
}
pub(crate) fn insert(&self, id: ExchangeId, connector: Arc<dyn CoreConnector>) -> Option<Arc<dyn CoreConnector>> {
self.connectors.insert(id, connector)
}
pub(crate) fn get(&self, id: &ExchangeId) -> Option<Arc<dyn CoreConnector>> {
self.connectors.get(id).map(|entry| entry.value().clone())
}
pub(crate) fn remove(&self, id: &ExchangeId) -> Option<Arc<dyn CoreConnector>> {
self.connectors.remove(id).map(|(_, connector)| connector)
}
pub(crate) fn contains(&self, id: &ExchangeId) -> bool {
self.connectors.contains_key(id)
}
pub(crate) fn len(&self) -> usize {
self.connectors.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.connectors.is_empty()
}
pub(crate) fn clear(&self) {
self.connectors.clear();
}
pub(crate) fn ids(&self) -> Vec<ExchangeId> {
self.connectors.iter().map(|entry| *entry.key()).collect()
}
}
impl Default for ConnectorPool {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::l3::open::crypto::cex::okx::OkxConnector;
use std::thread;
fn create_mock_okx() -> Arc<dyn CoreConnector> {
let rt = tokio::runtime::Runtime::new().unwrap();
let connector = rt.block_on(async {
OkxConnector::public(true).await.unwrap()
});
Arc::new(connector) as Arc<dyn CoreConnector>
}
fn create_mock_okx_2() -> Arc<dyn CoreConnector> {
let rt = tokio::runtime::Runtime::new().unwrap();
let connector = rt.block_on(async {
OkxConnector::public(false).await.unwrap()
});
Arc::new(connector) as Arc<dyn CoreConnector>
}
#[test]
fn test_new_pool_is_empty() {
let pool = ConnectorPool::new();
assert!(pool.is_empty());
assert_eq!(pool.len(), 0);
}
#[test]
fn test_insert_and_get() {
let pool = ConnectorPool::new();
let connector = create_mock_okx();
let old = pool.insert(ExchangeId::Binance, connector.clone());
assert!(old.is_none());
assert!(!pool.is_empty());
assert_eq!(pool.len(), 1);
let retrieved = pool.get(&ExchangeId::Binance);
assert!(retrieved.is_some());
}
#[test]
fn test_insert_replace() {
let pool = ConnectorPool::new();
let connector1 = create_mock_okx();
let connector2 = create_mock_okx();
pool.insert(ExchangeId::Binance, connector1);
let old = pool.insert(ExchangeId::Binance, connector2);
assert!(old.is_some());
assert_eq!(pool.len(), 1);
}
#[test]
fn test_get_nonexistent() {
let pool = ConnectorPool::new();
let result = pool.get(&ExchangeId::Binance);
assert!(result.is_none());
}
#[test]
fn test_contains() {
let pool = ConnectorPool::new();
let connector = create_mock_okx();
assert!(!pool.contains(&ExchangeId::Binance));
pool.insert(ExchangeId::Binance, connector);
assert!(pool.contains(&ExchangeId::Binance));
assert!(!pool.contains(&ExchangeId::KuCoin));
}
#[test]
fn test_remove() {
let pool = ConnectorPool::new();
let connector = create_mock_okx();
pool.insert(ExchangeId::Binance, connector);
assert_eq!(pool.len(), 1);
let removed = pool.remove(&ExchangeId::Binance);
assert!(removed.is_some());
assert_eq!(pool.len(), 0);
assert!(pool.is_empty());
let removed_again = pool.remove(&ExchangeId::Binance);
assert!(removed_again.is_none());
}
#[test]
fn test_clear() {
let pool = ConnectorPool::new();
pool.insert(ExchangeId::Binance, create_mock_okx());
pool.insert(ExchangeId::KuCoin, create_mock_okx_2());
assert_eq!(pool.len(), 2);
pool.clear();
assert!(pool.is_empty());
}
#[test]
fn test_ids() {
let pool = ConnectorPool::new();
pool.insert(ExchangeId::Binance, create_mock_okx());
pool.insert(ExchangeId::KuCoin, create_mock_okx_2());
let ids = pool.ids();
assert_eq!(ids.len(), 2);
assert!(ids.contains(&ExchangeId::Binance));
assert!(ids.contains(&ExchangeId::KuCoin));
}
#[test]
fn test_multiple_inserts() {
let pool = ConnectorPool::new();
for i in 0..10 {
let id = if i % 2 == 0 { ExchangeId::Binance } else { ExchangeId::KuCoin };
pool.insert(id, create_mock_okx());
}
assert_eq!(pool.len(), 2);
}
#[test]
fn test_concurrent_inserts() {
let pool = Arc::new(ConnectorPool::new());
let mut handles = vec![];
for i in 0..10 {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
let id = if i % 2 == 0 { ExchangeId::Binance } else { ExchangeId::KuCoin };
pool_clone.insert(id, create_mock_okx());
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(pool.len(), 2);
}
#[test]
fn test_concurrent_reads() {
let pool = Arc::new(ConnectorPool::new());
pool.insert(ExchangeId::Binance, create_mock_okx());
let mut handles = vec![];
for _ in 0..100 {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
let connector = pool_clone.get(&ExchangeId::Binance);
assert!(connector.is_some());
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_pool_clone() {
let pool1 = ConnectorPool::new();
pool1.insert(ExchangeId::Binance, create_mock_okx());
let pool2 = pool1.clone();
assert_eq!(pool1.len(), 1);
assert_eq!(pool2.len(), 1);
pool2.insert(ExchangeId::KuCoin, create_mock_okx_2());
assert_eq!(pool1.len(), 2);
assert_eq!(pool2.len(), 2);
}
#[test]
fn test_default_pool() {
let pool: ConnectorPool = Default::default();
assert!(pool.is_empty());
}
}