use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use super::config::{TenantConfig, TenantId, TenantPoolConfig};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Idle,
Active,
Validating,
Closed,
}
#[derive(Debug)]
pub struct PooledConnection {
id: u64,
tenant_id: TenantId,
created_at: Instant,
last_used: Instant,
state: ConnectionState,
queries_executed: u64,
backend_info: String,
}
impl PooledConnection {
pub fn new(id: u64, tenant_id: TenantId, backend_info: impl Into<String>) -> Self {
let now = Instant::now();
Self {
id,
tenant_id,
created_at: now,
last_used: now,
state: ConnectionState::Idle,
queries_executed: 0,
backend_info: backend_info.into(),
}
}
pub fn id(&self) -> u64 {
self.id
}
pub fn tenant_id(&self) -> &TenantId {
&self.tenant_id
}
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
pub fn idle_time(&self) -> Duration {
self.last_used.elapsed()
}
pub fn state(&self) -> ConnectionState {
self.state
}
pub fn is_available(&self) -> bool {
self.state == ConnectionState::Idle
}
pub fn mark_active(&mut self) {
self.state = ConnectionState::Active;
self.last_used = Instant::now();
}
pub fn mark_idle(&mut self) {
self.state = ConnectionState::Idle;
self.last_used = Instant::now();
}
pub fn record_query(&mut self) {
self.queries_executed += 1;
self.last_used = Instant::now();
}
pub fn backend_info(&self) -> &str {
&self.backend_info
}
pub fn queries_executed(&self) -> u64 {
self.queries_executed
}
}
#[derive(Debug)]
pub struct TenantPool {
tenant_id: TenantId,
config: TenantPoolConfig,
active_count: AtomicU32,
idle_count: AtomicU32,
total_created: AtomicU64,
total_closed: AtomicU64,
waiting_count: AtomicU32,
created_at: Instant,
}
impl TenantPool {
pub fn new(tenant_id: TenantId, config: TenantPoolConfig) -> Self {
Self {
tenant_id,
config,
active_count: AtomicU32::new(0),
idle_count: AtomicU32::new(0),
total_created: AtomicU64::new(0),
total_closed: AtomicU64::new(0),
waiting_count: AtomicU32::new(0),
created_at: Instant::now(),
}
}
pub fn tenant_id(&self) -> &TenantId {
&self.tenant_id
}
pub fn config(&self) -> &TenantPoolConfig {
&self.config
}
pub fn active_count(&self) -> u32 {
self.active_count.load(Ordering::Relaxed)
}
pub fn idle_count(&self) -> u32 {
self.idle_count.load(Ordering::Relaxed)
}
pub fn total_count(&self) -> u32 {
self.active_count() + self.idle_count()
}
pub fn waiting_count(&self) -> u32 {
self.waiting_count.load(Ordering::Relaxed)
}
pub fn is_at_capacity(&self) -> bool {
self.total_count() >= self.config.max_connections
}
pub fn can_create_connection(&self) -> bool {
self.total_count() < self.config.max_connections
}
pub fn has_available(&self) -> bool {
self.idle_count() > 0
}
pub fn record_acquire(&self) {
self.idle_count.fetch_sub(1, Ordering::Relaxed);
self.active_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_release(&self) {
self.active_count.fetch_sub(1, Ordering::Relaxed);
self.idle_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_created(&self) {
self.idle_count.fetch_add(1, Ordering::Relaxed);
self.total_created.fetch_add(1, Ordering::Relaxed);
}
pub fn record_closed(&self, was_active: bool) {
if was_active {
self.active_count.fetch_sub(1, Ordering::Relaxed);
} else {
self.idle_count.fetch_sub(1, Ordering::Relaxed);
}
self.total_closed.fetch_add(1, Ordering::Relaxed);
}
pub fn record_waiting(&self) {
self.waiting_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_not_waiting(&self) {
self.waiting_count.fetch_sub(1, Ordering::Relaxed);
}
pub fn utilization(&self) -> f32 {
let total = self.total_count();
if total == 0 {
return 0.0;
}
self.active_count() as f32 / self.config.max_connections as f32
}
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
pub fn stats(&self) -> TenantPoolStats {
TenantPoolStats {
tenant_id: self.tenant_id.clone(),
active: self.active_count(),
idle: self.idle_count(),
max: self.config.max_connections,
waiting: self.waiting_count(),
total_created: self.total_created.load(Ordering::Relaxed),
total_closed: self.total_closed.load(Ordering::Relaxed),
utilization: self.utilization(),
age: self.age(),
}
}
}
#[derive(Debug, Clone)]
pub struct TenantPoolStats {
pub tenant_id: TenantId,
pub active: u32,
pub idle: u32,
pub max: u32,
pub waiting: u32,
pub total_created: u64,
pub total_closed: u64,
pub utilization: f32,
pub age: Duration,
}
pub struct TenantConnectionPool {
pools: DashMap<TenantId, Arc<TenantPool>>,
shared_pool: Arc<TenantPool>,
default_config: TenantPoolConfig,
connection_counter: AtomicU64,
total_acquires: AtomicU64,
total_timeouts: AtomicU64,
dedicated_pool_threshold: u32,
}
impl TenantConnectionPool {
pub fn new(default_config: TenantPoolConfig) -> Self {
let shared_pool = Arc::new(TenantPool::new(
TenantId::new("__shared__"),
TenantPoolConfig {
max_connections: 50,
..default_config.clone()
},
));
Self {
pools: DashMap::new(),
shared_pool,
default_config,
connection_counter: AtomicU64::new(0),
total_acquires: AtomicU64::new(0),
total_timeouts: AtomicU64::new(0),
dedicated_pool_threshold: 5,
}
}
pub fn with_dedicated_threshold(mut self, threshold: u32) -> Self {
self.dedicated_pool_threshold = threshold;
self
}
pub fn get_pool(&self, tenant: &TenantId, config: &TenantConfig) -> Arc<TenantPool> {
if config.pool.dedicated_pool
|| config.pool.max_connections >= self.dedicated_pool_threshold
{
self.pools
.entry(tenant.clone())
.or_insert_with(|| Arc::new(TenantPool::new(tenant.clone(), config.pool.clone())))
.clone()
} else {
self.shared_pool.clone()
}
}
pub fn get_existing_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
self.pools.get(tenant).map(|p| p.clone())
}
pub fn create_tenant_pool(&self, tenant: &TenantId, config: TenantPoolConfig) {
self.pools
.insert(tenant.clone(), Arc::new(TenantPool::new(tenant.clone(), config)));
}
pub fn remove_tenant_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
self.pools.remove(tenant).map(|(_, pool)| pool)
}
pub fn next_connection_id(&self) -> u64 {
self.connection_counter.fetch_add(1, Ordering::Relaxed)
}
pub fn record_acquire(&self) {
self.total_acquires.fetch_add(1, Ordering::Relaxed);
}
pub fn record_timeout(&self) {
self.total_timeouts.fetch_add(1, Ordering::Relaxed);
}
pub fn all_stats(&self) -> Vec<TenantPoolStats> {
let mut stats: Vec<TenantPoolStats> = self
.pools
.iter()
.map(|entry| entry.value().stats())
.collect();
stats.push(self.shared_pool.stats());
stats
}
pub fn tenant_stats(&self, tenant: &TenantId) -> Option<TenantPoolStats> {
self.pools.get(tenant).map(|p| p.stats())
}
pub fn shared_pool_stats(&self) -> TenantPoolStats {
self.shared_pool.stats()
}
pub fn tenant_pool_count(&self) -> usize {
self.pools.len()
}
pub fn aggregate_stats(&self) -> AggregatePoolStats {
let mut total_active = 0u32;
let mut total_idle = 0u32;
let mut total_max = 0u32;
let mut total_waiting = 0u32;
for pool in self.pools.iter() {
total_active += pool.active_count();
total_idle += pool.idle_count();
total_max += pool.config().max_connections;
total_waiting += pool.waiting_count();
}
total_active += self.shared_pool.active_count();
total_idle += self.shared_pool.idle_count();
total_max += self.shared_pool.config().max_connections;
total_waiting += self.shared_pool.waiting_count();
AggregatePoolStats {
tenant_pools: self.pools.len(),
total_active,
total_idle,
total_max,
total_waiting,
total_acquires: self.total_acquires.load(Ordering::Relaxed),
total_timeouts: self.total_timeouts.load(Ordering::Relaxed),
average_utilization: if total_max > 0 {
total_active as f32 / total_max as f32
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct AggregatePoolStats {
pub tenant_pools: usize,
pub total_active: u32,
pub total_idle: u32,
pub total_max: u32,
pub total_waiting: u32,
pub total_acquires: u64,
pub total_timeouts: u64,
pub average_utilization: f32,
}
#[derive(Debug)]
pub enum AcquireResult {
Success(PooledConnection),
Waiting,
PoolExhausted,
TenantNotFound,
Timeout,
}
pub struct TenantConnectionLease {
connection: PooledConnection,
pool: Arc<TenantPool>,
leased_at: Instant,
used: bool,
}
impl TenantConnectionLease {
pub fn new(connection: PooledConnection, pool: Arc<TenantPool>) -> Self {
Self {
connection,
pool,
leased_at: Instant::now(),
used: false,
}
}
pub fn connection(&self) -> &PooledConnection {
&self.connection
}
pub fn connection_mut(&mut self) -> &mut PooledConnection {
self.used = true;
&mut self.connection
}
pub fn tenant_id(&self) -> &TenantId {
self.connection.tenant_id()
}
pub fn lease_duration(&self) -> Duration {
self.leased_at.elapsed()
}
pub fn mark_used(&mut self) {
self.used = true;
}
pub fn was_used(&self) -> bool {
self.used
}
pub fn release(mut self) {
self.connection.mark_idle();
self.pool.record_release();
}
}
impl Drop for TenantConnectionLease {
fn drop(&mut self) {
if self.connection.state() == ConnectionState::Active {
self.pool.record_release();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pooled_connection() {
let tenant = TenantId::new("test");
let mut conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
assert_eq!(conn.id(), 1);
assert_eq!(conn.tenant_id().as_str(), "test");
assert!(conn.is_available());
assert_eq!(conn.state(), ConnectionState::Idle);
conn.mark_active();
assert!(!conn.is_available());
assert_eq!(conn.state(), ConnectionState::Active);
conn.record_query();
assert_eq!(conn.queries_executed(), 1);
conn.mark_idle();
assert!(conn.is_available());
}
#[test]
fn test_tenant_pool() {
let tenant = TenantId::new("test");
let config = TenantPoolConfig {
max_connections: 10,
..Default::default()
};
let pool = TenantPool::new(tenant.clone(), config);
assert_eq!(pool.tenant_id().as_str(), "test");
assert_eq!(pool.active_count(), 0);
assert_eq!(pool.idle_count(), 0);
assert!(!pool.is_at_capacity());
assert!(pool.can_create_connection());
pool.record_created();
assert_eq!(pool.idle_count(), 1);
pool.record_acquire();
assert_eq!(pool.active_count(), 1);
assert_eq!(pool.idle_count(), 0);
pool.record_release();
assert_eq!(pool.active_count(), 0);
assert_eq!(pool.idle_count(), 1);
}
#[test]
fn test_tenant_pool_utilization() {
let tenant = TenantId::new("test");
let config = TenantPoolConfig {
max_connections: 10,
..Default::default()
};
let pool = TenantPool::new(tenant, config);
assert_eq!(pool.utilization(), 0.0);
for _ in 0..5 {
pool.record_created();
}
for _ in 0..3 {
pool.record_acquire();
}
assert_eq!(pool.active_count(), 3);
assert_eq!(pool.idle_count(), 2);
assert!((pool.utilization() - 0.3).abs() < 0.01);
}
#[test]
fn test_tenant_connection_pool() {
let config = TenantPoolConfig::default();
let manager = TenantConnectionPool::new(config.clone());
let tenant = TenantId::new("tenant_a");
let tenant_config = TenantConfig::builder()
.id("tenant_a")
.name("Tenant A")
.database_isolation("tenant_a_db")
.max_connections(20)
.pool(TenantPoolConfig {
max_connections: 20,
dedicated_pool: true,
..Default::default()
})
.build();
let pool = manager.get_pool(&tenant, &tenant_config);
assert_eq!(pool.config().max_connections, 20);
let stats = manager.aggregate_stats();
assert_eq!(stats.tenant_pools, 1);
}
#[test]
fn test_shared_pool_usage() {
let config = TenantPoolConfig::default();
let manager = TenantConnectionPool::new(config.clone());
let tenant = TenantId::new("small_tenant");
let tenant_config = TenantConfig::builder()
.id("small_tenant")
.name("Small Tenant")
.schema_isolation("shared", "small_tenant")
.max_connections(2) .build();
let pool = manager.get_pool(&tenant, &tenant_config);
assert_eq!(pool.tenant_id().as_str(), "__shared__");
}
#[test]
fn test_tenant_connection_lease() {
let tenant = TenantId::new("test");
let config = TenantPoolConfig::default();
let pool = Arc::new(TenantPool::new(tenant.clone(), config));
pool.record_created();
pool.record_acquire();
let conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
let mut lease = TenantConnectionLease::new(conn, pool.clone());
assert_eq!(lease.tenant_id().as_str(), "test");
assert!(!lease.was_used());
lease.mark_used();
assert!(lease.was_used());
lease.release();
assert_eq!(pool.active_count(), 0);
assert_eq!(pool.idle_count(), 1);
}
#[test]
fn test_pool_stats() {
let tenant = TenantId::new("test");
let config = TenantPoolConfig {
max_connections: 10,
..Default::default()
};
let pool = TenantPool::new(tenant, config);
pool.record_created();
pool.record_created();
pool.record_acquire();
let stats = pool.stats();
assert_eq!(stats.active, 1);
assert_eq!(stats.idle, 1);
assert_eq!(stats.max, 10);
assert_eq!(stats.total_created, 2);
}
}