use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use super::context::TenantId;
#[derive(Debug, Clone)]
pub enum PoolStrategy {
Shared {
max_connections: usize,
},
PerTenant {
max_pools: usize,
pool_size: usize,
},
PerDatabase {
max_databases: usize,
pool_size: usize,
},
}
impl Default for PoolStrategy {
fn default() -> Self {
Self::Shared {
max_connections: 20,
}
}
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub strategy: PoolStrategy,
pub warmup_size: usize,
pub idle_timeout: Duration,
pub max_lifetime: Duration,
pub health_check: bool,
pub health_check_interval: Duration,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
strategy: PoolStrategy::default(),
warmup_size: 1,
idle_timeout: Duration::from_secs(300),
max_lifetime: Duration::from_secs(1800),
health_check: true,
health_check_interval: Duration::from_secs(30),
}
}
}
impl PoolConfig {
pub fn builder() -> PoolConfigBuilder {
PoolConfigBuilder::default()
}
}
#[derive(Default)]
pub struct PoolConfigBuilder {
strategy: Option<PoolStrategy>,
warmup_size: Option<usize>,
idle_timeout: Option<Duration>,
max_lifetime: Option<Duration>,
health_check: Option<bool>,
health_check_interval: Option<Duration>,
}
impl PoolConfigBuilder {
pub fn strategy(mut self, strategy: PoolStrategy) -> Self {
self.strategy = Some(strategy);
self
}
pub fn shared(mut self, max_connections: usize) -> Self {
self.strategy = Some(PoolStrategy::Shared { max_connections });
self
}
pub fn per_tenant(mut self, max_pools: usize, pool_size: usize) -> Self {
self.strategy = Some(PoolStrategy::PerTenant {
max_pools,
pool_size,
});
self
}
pub fn per_database(mut self, max_databases: usize, pool_size: usize) -> Self {
self.strategy = Some(PoolStrategy::PerDatabase {
max_databases,
pool_size,
});
self
}
pub fn warmup_size(mut self, size: usize) -> Self {
self.warmup_size = Some(size);
self
}
pub fn idle_timeout(mut self, timeout: Duration) -> Self {
self.idle_timeout = Some(timeout);
self
}
pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
self.max_lifetime = Some(lifetime);
self
}
pub fn health_check(mut self, enabled: bool) -> Self {
self.health_check = Some(enabled);
self
}
pub fn health_check_interval(mut self, interval: Duration) -> Self {
self.health_check_interval = Some(interval);
self
}
pub fn build(self) -> PoolConfig {
PoolConfig {
strategy: self.strategy.unwrap_or_default(),
warmup_size: self.warmup_size.unwrap_or(1),
idle_timeout: self.idle_timeout.unwrap_or(Duration::from_secs(300)),
max_lifetime: self.max_lifetime.unwrap_or(Duration::from_secs(1800)),
health_check: self.health_check.unwrap_or(true),
health_check_interval: self
.health_check_interval
.unwrap_or(Duration::from_secs(30)),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
pub connections_acquired: u64,
pub connections_released: u64,
pub active_connections: usize,
pub idle_connections: usize,
pub total_wait_time_ms: u64,
pub max_wait_time_ms: u64,
pub timeouts: u64,
pub health_check_failures: u64,
pub created_at: Option<Instant>,
pub last_activity: Option<Instant>,
}
pub struct AtomicPoolStats {
connections_acquired: AtomicU64,
connections_released: AtomicU64,
active_connections: AtomicUsize,
idle_connections: AtomicUsize,
total_wait_time_ms: AtomicU64,
max_wait_time_ms: AtomicU64,
timeouts: AtomicU64,
health_check_failures: AtomicU64,
created_at: Mutex<Option<Instant>>,
last_activity: Mutex<Option<Instant>>,
}
impl Default for AtomicPoolStats {
fn default() -> Self {
Self::new()
}
}
impl AtomicPoolStats {
pub fn new() -> Self {
Self {
connections_acquired: AtomicU64::new(0),
connections_released: AtomicU64::new(0),
active_connections: AtomicUsize::new(0),
idle_connections: AtomicUsize::new(0),
total_wait_time_ms: AtomicU64::new(0),
max_wait_time_ms: AtomicU64::new(0),
timeouts: AtomicU64::new(0),
health_check_failures: AtomicU64::new(0),
created_at: Mutex::new(None),
last_activity: Mutex::new(None),
}
}
pub fn record_acquire(&self, wait_time: Duration) {
self.connections_acquired.fetch_add(1, Ordering::Relaxed);
self.active_connections.fetch_add(1, Ordering::Relaxed);
let wait_ms = wait_time.as_millis() as u64;
self.total_wait_time_ms
.fetch_add(wait_ms, Ordering::Relaxed);
let mut current = self.max_wait_time_ms.load(Ordering::Relaxed);
while wait_ms > current {
match self.max_wait_time_ms.compare_exchange_weak(
current,
wait_ms,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(c) => current = c,
}
}
*self.last_activity.lock() = Some(Instant::now());
}
pub fn record_release(&self) {
self.connections_released.fetch_add(1, Ordering::Relaxed);
self.active_connections.fetch_sub(1, Ordering::Relaxed);
*self.last_activity.lock() = Some(Instant::now());
}
pub fn record_timeout(&self) {
self.timeouts.fetch_add(1, Ordering::Relaxed);
}
pub fn record_health_failure(&self) {
self.health_check_failures.fetch_add(1, Ordering::Relaxed);
}
pub fn set_idle(&self, count: usize) {
self.idle_connections.store(count, Ordering::Relaxed);
}
pub fn mark_created(&self) {
*self.created_at.lock() = Some(Instant::now());
}
pub fn snapshot(&self) -> PoolStats {
PoolStats {
connections_acquired: self.connections_acquired.load(Ordering::Relaxed),
connections_released: self.connections_released.load(Ordering::Relaxed),
active_connections: self.active_connections.load(Ordering::Relaxed),
idle_connections: self.idle_connections.load(Ordering::Relaxed),
total_wait_time_ms: self.total_wait_time_ms.load(Ordering::Relaxed),
max_wait_time_ms: self.max_wait_time_ms.load(Ordering::Relaxed),
timeouts: self.timeouts.load(Ordering::Relaxed),
health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
created_at: *self.created_at.lock(),
last_activity: *self.last_activity.lock(),
}
}
}
struct LruEntry<T> {
value: T,
last_access: Instant,
access_count: u64,
}
pub struct TenantLruCache<K, V>
where
K: Eq + Hash + Clone,
{
entries: RwLock<HashMap<K, LruEntry<V>>>,
max_size: usize,
idle_timeout: Duration,
}
impl<K, V> TenantLruCache<K, V>
where
K: Eq + Hash + Clone,
{
pub fn new(max_size: usize, idle_timeout: Duration) -> Self {
Self {
entries: RwLock::new(HashMap::with_capacity(max_size)),
max_size,
idle_timeout,
}
}
pub fn get(&self, key: &K) -> Option<V>
where
V: Clone,
{
let mut entries = self.entries.write();
if let Some(entry) = entries.get_mut(key) {
entry.last_access = Instant::now();
entry.access_count += 1;
Some(entry.value.clone())
} else {
None
}
}
pub fn insert(&self, key: K, value: V) {
let mut entries = self.entries.write();
if entries.len() >= self.max_size && !entries.contains_key(&key) {
self.evict_one(&mut entries);
}
entries.insert(
key,
LruEntry {
value,
last_access: Instant::now(),
access_count: 1,
},
);
}
pub fn remove(&self, key: &K) -> Option<V> {
self.entries.write().remove(key).map(|e| e.value)
}
pub fn evict_expired(&self) -> usize {
let mut entries = self.entries.write();
let now = Instant::now();
let before = entries.len();
entries.retain(|_, entry| now.duration_since(entry.last_access) < self.idle_timeout);
before - entries.len()
}
pub fn len(&self) -> usize {
self.entries.read().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn evict_one(&self, entries: &mut HashMap<K, LruEntry<V>>) {
let now = Instant::now();
let expired_key = entries
.iter()
.filter(|(_, e)| now.duration_since(e.last_access) >= self.idle_timeout)
.map(|(k, _)| k.clone())
.next();
if let Some(key) = expired_key {
entries.remove(&key);
return;
}
let lru_key = entries
.iter()
.min_by_key(|(_, e)| e.last_access)
.map(|(k, _)| k.clone());
if let Some(key) = lru_key {
entries.remove(&key);
}
}
}
pub struct TenantPoolEntry {
pub tenant_id: TenantId,
pub stats: Arc<AtomicPoolStats>,
pub state: PoolState,
pub schema: Option<String>,
pub database: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PoolState {
Initializing,
Ready,
WarmingUp,
Draining,
Closed,
}
impl TenantPoolEntry {
pub fn new(tenant_id: TenantId) -> Self {
let stats = Arc::new(AtomicPoolStats::new());
stats.mark_created();
Self {
tenant_id,
stats,
state: PoolState::Initializing,
schema: None,
database: None,
}
}
pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
self.schema = Some(schema.into());
self
}
pub fn with_database(mut self, database: impl Into<String>) -> Self {
self.database = Some(database.into());
self
}
pub fn mark_ready(&mut self) {
self.state = PoolState::Ready;
}
pub fn is_ready(&self) -> bool {
self.state == PoolState::Ready
}
pub fn stats(&self) -> PoolStats {
self.stats.snapshot()
}
pub fn should_evict(&self, idle_timeout: Duration) -> bool {
if let Some(last) = self.stats.snapshot().last_activity {
Instant::now().duration_since(last) > idle_timeout
} else {
false
}
}
}
pub struct TenantPoolManager {
config: PoolConfig,
pools: TenantLruCache<String, Arc<TenantPoolEntry>>,
global_stats: Arc<AtomicPoolStats>,
}
impl TenantPoolManager {
pub fn new(config: PoolConfig) -> Self {
let max_pools = match &config.strategy {
PoolStrategy::Shared { .. } => 1,
PoolStrategy::PerTenant { max_pools, .. } => *max_pools,
PoolStrategy::PerDatabase { max_databases, .. } => *max_databases,
};
Self {
pools: TenantLruCache::new(max_pools, config.idle_timeout),
config,
global_stats: Arc::new(AtomicPoolStats::new()),
}
}
pub fn builder() -> TenantPoolManagerBuilder {
TenantPoolManagerBuilder::default()
}
pub fn get_or_create(&self, tenant_id: &TenantId) -> Arc<TenantPoolEntry> {
let key = tenant_id.as_str().to_string();
if let Some(entry) = self.pools.get(&key) {
return entry;
}
let entry = Arc::new(TenantPoolEntry::new(tenant_id.clone()));
self.pools.insert(key, entry.clone());
entry
}
pub fn global_stats(&self) -> PoolStats {
self.global_stats.snapshot()
}
pub fn active_pools(&self) -> usize {
self.pools.len()
}
pub fn evict_expired(&self) -> usize {
self.pools.evict_expired()
}
pub fn config(&self) -> &PoolConfig {
&self.config
}
}
#[derive(Default)]
pub struct TenantPoolManagerBuilder {
config: Option<PoolConfig>,
}
impl TenantPoolManagerBuilder {
pub fn config(mut self, config: PoolConfig) -> Self {
self.config = Some(config);
self
}
pub fn shared(self, max_connections: usize) -> Self {
self.config(PoolConfig::builder().shared(max_connections).build())
}
pub fn per_tenant(self, max_pools: usize, pool_size: usize) -> Self {
self.config(
PoolConfig::builder()
.per_tenant(max_pools, pool_size)
.build(),
)
}
pub fn build(self) -> TenantPoolManager {
TenantPoolManager::new(self.config.unwrap_or_default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_config_builder() {
let config = PoolConfig::builder()
.per_tenant(100, 5)
.warmup_size(2)
.idle_timeout(Duration::from_secs(600))
.build();
assert!(matches!(config.strategy, PoolStrategy::PerTenant { .. }));
assert_eq!(config.warmup_size, 2);
assert_eq!(config.idle_timeout, Duration::from_secs(600));
}
#[test]
fn test_atomic_stats() {
let stats = AtomicPoolStats::new();
stats.mark_created();
stats.record_acquire(Duration::from_millis(5));
stats.record_acquire(Duration::from_millis(10));
stats.record_release();
let snapshot = stats.snapshot();
assert_eq!(snapshot.connections_acquired, 2);
assert_eq!(snapshot.connections_released, 1);
assert_eq!(snapshot.active_connections, 1);
assert_eq!(snapshot.max_wait_time_ms, 10);
}
#[test]
fn test_lru_cache() {
let cache: TenantLruCache<String, i32> = TenantLruCache::new(3, Duration::from_secs(60));
cache.insert("a".to_string(), 1);
cache.insert("b".to_string(), 2);
cache.insert("c".to_string(), 3);
assert_eq!(cache.len(), 3);
assert_eq!(cache.get(&"a".to_string()), Some(1));
cache.insert("d".to_string(), 4);
assert_eq!(cache.len(), 3);
assert_eq!(cache.get(&"a".to_string()), Some(1));
}
#[test]
fn test_tenant_pool_entry() {
let entry = TenantPoolEntry::new(TenantId::new("test")).with_schema("tenant_test");
assert_eq!(entry.schema, Some("tenant_test".to_string()));
assert_eq!(entry.state, PoolState::Initializing);
}
#[test]
fn test_pool_manager_creation() {
let manager = TenantPoolManager::builder().per_tenant(100, 5).build();
assert_eq!(manager.active_pools(), 0);
let entry = manager.get_or_create(&TenantId::new("tenant-1"));
assert_eq!(entry.tenant_id.as_str(), "tenant-1");
assert_eq!(manager.active_pools(), 1);
let _entry2 = manager.get_or_create(&TenantId::new("tenant-1"));
assert_eq!(manager.active_pools(), 1);
}
}