1use parking_lot::{Mutex, RwLock};
40use std::collections::HashMap;
41use std::hash::Hash;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
44use std::time::{Duration, Instant};
45
46use super::context::TenantId;
47
48#[derive(Debug, Clone)]
50pub enum PoolStrategy {
51 Shared {
54 max_connections: usize,
56 },
57
58 PerTenant {
61 max_pools: usize,
63 pool_size: usize,
65 },
66
67 PerDatabase {
70 max_databases: usize,
72 pool_size: usize,
74 },
75}
76
77impl Default for PoolStrategy {
78 fn default() -> Self {
79 Self::Shared {
80 max_connections: 20,
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct PoolConfig {
88 pub strategy: PoolStrategy,
90 pub warmup_size: usize,
92 pub idle_timeout: Duration,
94 pub max_lifetime: Duration,
96 pub health_check: bool,
98 pub health_check_interval: Duration,
100}
101
102impl Default for PoolConfig {
103 fn default() -> Self {
104 Self {
105 strategy: PoolStrategy::default(),
106 warmup_size: 1,
107 idle_timeout: Duration::from_secs(300),
108 max_lifetime: Duration::from_secs(1800),
109 health_check: true,
110 health_check_interval: Duration::from_secs(30),
111 }
112 }
113}
114
115impl PoolConfig {
116 pub fn builder() -> PoolConfigBuilder {
118 PoolConfigBuilder::default()
119 }
120}
121
122#[derive(Default)]
124pub struct PoolConfigBuilder {
125 strategy: Option<PoolStrategy>,
126 warmup_size: Option<usize>,
127 idle_timeout: Option<Duration>,
128 max_lifetime: Option<Duration>,
129 health_check: Option<bool>,
130 health_check_interval: Option<Duration>,
131}
132
133impl PoolConfigBuilder {
134 pub fn strategy(mut self, strategy: PoolStrategy) -> Self {
136 self.strategy = Some(strategy);
137 self
138 }
139
140 pub fn shared(mut self, max_connections: usize) -> Self {
142 self.strategy = Some(PoolStrategy::Shared { max_connections });
143 self
144 }
145
146 pub fn per_tenant(mut self, max_pools: usize, pool_size: usize) -> Self {
148 self.strategy = Some(PoolStrategy::PerTenant {
149 max_pools,
150 pool_size,
151 });
152 self
153 }
154
155 pub fn per_database(mut self, max_databases: usize, pool_size: usize) -> Self {
157 self.strategy = Some(PoolStrategy::PerDatabase {
158 max_databases,
159 pool_size,
160 });
161 self
162 }
163
164 pub fn warmup_size(mut self, size: usize) -> Self {
166 self.warmup_size = Some(size);
167 self
168 }
169
170 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
172 self.idle_timeout = Some(timeout);
173 self
174 }
175
176 pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
178 self.max_lifetime = Some(lifetime);
179 self
180 }
181
182 pub fn health_check(mut self, enabled: bool) -> Self {
184 self.health_check = Some(enabled);
185 self
186 }
187
188 pub fn health_check_interval(mut self, interval: Duration) -> Self {
190 self.health_check_interval = Some(interval);
191 self
192 }
193
194 pub fn build(self) -> PoolConfig {
196 PoolConfig {
197 strategy: self.strategy.unwrap_or_default(),
198 warmup_size: self.warmup_size.unwrap_or(1),
199 idle_timeout: self.idle_timeout.unwrap_or(Duration::from_secs(300)),
200 max_lifetime: self.max_lifetime.unwrap_or(Duration::from_secs(1800)),
201 health_check: self.health_check.unwrap_or(true),
202 health_check_interval: self
203 .health_check_interval
204 .unwrap_or(Duration::from_secs(30)),
205 }
206 }
207}
208
209#[derive(Debug, Clone, Default)]
211pub struct PoolStats {
212 pub connections_acquired: u64,
214 pub connections_released: u64,
216 pub active_connections: usize,
218 pub idle_connections: usize,
220 pub total_wait_time_ms: u64,
222 pub max_wait_time_ms: u64,
224 pub timeouts: u64,
226 pub health_check_failures: u64,
228 pub created_at: Option<Instant>,
230 pub last_activity: Option<Instant>,
232}
233
234pub struct AtomicPoolStats {
236 connections_acquired: AtomicU64,
237 connections_released: AtomicU64,
238 active_connections: AtomicUsize,
239 idle_connections: AtomicUsize,
240 total_wait_time_ms: AtomicU64,
241 max_wait_time_ms: AtomicU64,
242 timeouts: AtomicU64,
243 health_check_failures: AtomicU64,
244 created_at: Mutex<Option<Instant>>,
245 last_activity: Mutex<Option<Instant>>,
246}
247
248impl Default for AtomicPoolStats {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254impl AtomicPoolStats {
255 pub fn new() -> Self {
257 Self {
258 connections_acquired: AtomicU64::new(0),
259 connections_released: AtomicU64::new(0),
260 active_connections: AtomicUsize::new(0),
261 idle_connections: AtomicUsize::new(0),
262 total_wait_time_ms: AtomicU64::new(0),
263 max_wait_time_ms: AtomicU64::new(0),
264 timeouts: AtomicU64::new(0),
265 health_check_failures: AtomicU64::new(0),
266 created_at: Mutex::new(None),
267 last_activity: Mutex::new(None),
268 }
269 }
270
271 pub fn record_acquire(&self, wait_time: Duration) {
273 self.connections_acquired.fetch_add(1, Ordering::Relaxed);
274 self.active_connections.fetch_add(1, Ordering::Relaxed);
275
276 let wait_ms = wait_time.as_millis() as u64;
277 self.total_wait_time_ms
278 .fetch_add(wait_ms, Ordering::Relaxed);
279
280 let mut current = self.max_wait_time_ms.load(Ordering::Relaxed);
282 while wait_ms > current {
283 match self.max_wait_time_ms.compare_exchange_weak(
284 current,
285 wait_ms,
286 Ordering::Relaxed,
287 Ordering::Relaxed,
288 ) {
289 Ok(_) => break,
290 Err(c) => current = c,
291 }
292 }
293
294 *self.last_activity.lock() = Some(Instant::now());
295 }
296
297 pub fn record_release(&self) {
299 self.connections_released.fetch_add(1, Ordering::Relaxed);
300 self.active_connections.fetch_sub(1, Ordering::Relaxed);
301 *self.last_activity.lock() = Some(Instant::now());
302 }
303
304 pub fn record_timeout(&self) {
306 self.timeouts.fetch_add(1, Ordering::Relaxed);
307 }
308
309 pub fn record_health_failure(&self) {
311 self.health_check_failures.fetch_add(1, Ordering::Relaxed);
312 }
313
314 pub fn set_idle(&self, count: usize) {
316 self.idle_connections.store(count, Ordering::Relaxed);
317 }
318
319 pub fn mark_created(&self) {
321 *self.created_at.lock() = Some(Instant::now());
322 }
323
324 pub fn snapshot(&self) -> PoolStats {
326 PoolStats {
327 connections_acquired: self.connections_acquired.load(Ordering::Relaxed),
328 connections_released: self.connections_released.load(Ordering::Relaxed),
329 active_connections: self.active_connections.load(Ordering::Relaxed),
330 idle_connections: self.idle_connections.load(Ordering::Relaxed),
331 total_wait_time_ms: self.total_wait_time_ms.load(Ordering::Relaxed),
332 max_wait_time_ms: self.max_wait_time_ms.load(Ordering::Relaxed),
333 timeouts: self.timeouts.load(Ordering::Relaxed),
334 health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
335 created_at: *self.created_at.lock(),
336 last_activity: *self.last_activity.lock(),
337 }
338 }
339}
340
341struct LruEntry<T> {
343 value: T,
344 last_access: Instant,
345 access_count: u64,
346}
347
348pub struct TenantLruCache<K, V>
350where
351 K: Eq + Hash + Clone,
352{
353 entries: RwLock<HashMap<K, LruEntry<V>>>,
354 max_size: usize,
355 idle_timeout: Duration,
356}
357
358impl<K, V> TenantLruCache<K, V>
359where
360 K: Eq + Hash + Clone,
361{
362 pub fn new(max_size: usize, idle_timeout: Duration) -> Self {
364 Self {
365 entries: RwLock::new(HashMap::with_capacity(max_size)),
366 max_size,
367 idle_timeout,
368 }
369 }
370
371 pub fn get(&self, key: &K) -> Option<V>
373 where
374 V: Clone,
375 {
376 let mut entries = self.entries.write();
377 if let Some(entry) = entries.get_mut(key) {
378 entry.last_access = Instant::now();
379 entry.access_count += 1;
380 Some(entry.value.clone())
381 } else {
382 None
383 }
384 }
385
386 pub fn insert(&self, key: K, value: V) {
388 let mut entries = self.entries.write();
389
390 if entries.len() >= self.max_size && !entries.contains_key(&key) {
392 self.evict_one(&mut entries);
393 }
394
395 entries.insert(
396 key,
397 LruEntry {
398 value,
399 last_access: Instant::now(),
400 access_count: 1,
401 },
402 );
403 }
404
405 pub fn remove(&self, key: &K) -> Option<V> {
407 self.entries.write().remove(key).map(|e| e.value)
408 }
409
410 pub fn evict_expired(&self) -> usize {
412 let mut entries = self.entries.write();
413 let now = Instant::now();
414 let before = entries.len();
415
416 entries.retain(|_, entry| now.duration_since(entry.last_access) < self.idle_timeout);
417
418 before - entries.len()
419 }
420
421 pub fn len(&self) -> usize {
423 self.entries.read().len()
424 }
425
426 pub fn is_empty(&self) -> bool {
428 self.len() == 0
429 }
430
431 fn evict_one(&self, entries: &mut HashMap<K, LruEntry<V>>) {
433 let now = Instant::now();
434
435 let expired_key = entries
437 .iter()
438 .filter(|(_, e)| now.duration_since(e.last_access) >= self.idle_timeout)
439 .map(|(k, _)| k.clone())
440 .next();
441
442 if let Some(key) = expired_key {
443 entries.remove(&key);
444 return;
445 }
446
447 let lru_key = entries
449 .iter()
450 .min_by_key(|(_, e)| e.last_access)
451 .map(|(k, _)| k.clone());
452
453 if let Some(key) = lru_key {
454 entries.remove(&key);
455 }
456 }
457}
458
459pub struct TenantPoolEntry {
461 pub tenant_id: TenantId,
463 pub stats: Arc<AtomicPoolStats>,
465 pub state: PoolState,
467 pub schema: Option<String>,
469 pub database: Option<String>,
471}
472
473#[derive(Debug, Clone, Copy, PartialEq, Eq)]
475pub enum PoolState {
476 Initializing,
478 Ready,
480 WarmingUp,
482 Draining,
484 Closed,
486}
487
488impl TenantPoolEntry {
489 pub fn new(tenant_id: TenantId) -> Self {
491 let stats = Arc::new(AtomicPoolStats::new());
492 stats.mark_created();
493
494 Self {
495 tenant_id,
496 stats,
497 state: PoolState::Initializing,
498 schema: None,
499 database: None,
500 }
501 }
502
503 pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
505 self.schema = Some(schema.into());
506 self
507 }
508
509 pub fn with_database(mut self, database: impl Into<String>) -> Self {
511 self.database = Some(database.into());
512 self
513 }
514
515 pub fn mark_ready(&mut self) {
517 self.state = PoolState::Ready;
518 }
519
520 pub fn is_ready(&self) -> bool {
522 self.state == PoolState::Ready
523 }
524
525 pub fn stats(&self) -> PoolStats {
527 self.stats.snapshot()
528 }
529
530 pub fn should_evict(&self, idle_timeout: Duration) -> bool {
532 if let Some(last) = self.stats.snapshot().last_activity {
533 Instant::now().duration_since(last) > idle_timeout
534 } else {
535 false
536 }
537 }
538}
539
540pub struct TenantPoolManager {
545 config: PoolConfig,
546 pools: TenantLruCache<String, Arc<TenantPoolEntry>>,
547 global_stats: Arc<AtomicPoolStats>,
548}
549
550impl TenantPoolManager {
551 pub fn new(config: PoolConfig) -> Self {
553 let max_pools = match &config.strategy {
554 PoolStrategy::Shared { .. } => 1,
555 PoolStrategy::PerTenant { max_pools, .. } => *max_pools,
556 PoolStrategy::PerDatabase { max_databases, .. } => *max_databases,
557 };
558
559 Self {
560 pools: TenantLruCache::new(max_pools, config.idle_timeout),
561 config,
562 global_stats: Arc::new(AtomicPoolStats::new()),
563 }
564 }
565
566 pub fn builder() -> TenantPoolManagerBuilder {
568 TenantPoolManagerBuilder::default()
569 }
570
571 pub fn get_or_create(&self, tenant_id: &TenantId) -> Arc<TenantPoolEntry> {
573 let key = tenant_id.as_str().to_string();
574
575 if let Some(entry) = self.pools.get(&key) {
577 return entry;
578 }
579
580 let entry = Arc::new(TenantPoolEntry::new(tenant_id.clone()));
582 self.pools.insert(key, entry.clone());
583 entry
584 }
585
586 pub fn global_stats(&self) -> PoolStats {
588 self.global_stats.snapshot()
589 }
590
591 pub fn active_pools(&self) -> usize {
593 self.pools.len()
594 }
595
596 pub fn evict_expired(&self) -> usize {
598 self.pools.evict_expired()
599 }
600
601 pub fn config(&self) -> &PoolConfig {
603 &self.config
604 }
605}
606
607#[derive(Default)]
609pub struct TenantPoolManagerBuilder {
610 config: Option<PoolConfig>,
611}
612
613impl TenantPoolManagerBuilder {
614 pub fn config(mut self, config: PoolConfig) -> Self {
616 self.config = Some(config);
617 self
618 }
619
620 pub fn shared(self, max_connections: usize) -> Self {
622 self.config(PoolConfig::builder().shared(max_connections).build())
623 }
624
625 pub fn per_tenant(self, max_pools: usize, pool_size: usize) -> Self {
627 self.config(
628 PoolConfig::builder()
629 .per_tenant(max_pools, pool_size)
630 .build(),
631 )
632 }
633
634 pub fn build(self) -> TenantPoolManager {
636 TenantPoolManager::new(self.config.unwrap_or_default())
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643
644 #[test]
645 fn test_pool_config_builder() {
646 let config = PoolConfig::builder()
647 .per_tenant(100, 5)
648 .warmup_size(2)
649 .idle_timeout(Duration::from_secs(600))
650 .build();
651
652 assert!(matches!(config.strategy, PoolStrategy::PerTenant { .. }));
653 assert_eq!(config.warmup_size, 2);
654 assert_eq!(config.idle_timeout, Duration::from_secs(600));
655 }
656
657 #[test]
658 fn test_atomic_stats() {
659 let stats = AtomicPoolStats::new();
660 stats.mark_created();
661
662 stats.record_acquire(Duration::from_millis(5));
663 stats.record_acquire(Duration::from_millis(10));
664 stats.record_release();
665
666 let snapshot = stats.snapshot();
667 assert_eq!(snapshot.connections_acquired, 2);
668 assert_eq!(snapshot.connections_released, 1);
669 assert_eq!(snapshot.active_connections, 1);
670 assert_eq!(snapshot.max_wait_time_ms, 10);
671 }
672
673 #[test]
674 fn test_lru_cache() {
675 let cache: TenantLruCache<String, i32> = TenantLruCache::new(3, Duration::from_secs(60));
676
677 cache.insert("a".to_string(), 1);
678 cache.insert("b".to_string(), 2);
679 cache.insert("c".to_string(), 3);
680
681 assert_eq!(cache.len(), 3);
682 assert_eq!(cache.get(&"a".to_string()), Some(1));
683
684 cache.insert("d".to_string(), 4);
686 assert_eq!(cache.len(), 3);
687
688 assert_eq!(cache.get(&"a".to_string()), Some(1));
690 }
691
692 #[test]
693 fn test_tenant_pool_entry() {
694 let entry = TenantPoolEntry::new(TenantId::new("test")).with_schema("tenant_test");
695
696 assert_eq!(entry.schema, Some("tenant_test".to_string()));
697 assert_eq!(entry.state, PoolState::Initializing);
698 }
699
700 #[test]
701 fn test_pool_manager_creation() {
702 let manager = TenantPoolManager::builder().per_tenant(100, 5).build();
703
704 assert_eq!(manager.active_pools(), 0);
705
706 let entry = manager.get_or_create(&TenantId::new("tenant-1"));
707 assert_eq!(entry.tenant_id.as_str(), "tenant-1");
708 assert_eq!(manager.active_pools(), 1);
709
710 let _entry2 = manager.get_or_create(&TenantId::new("tenant-1"));
712 assert_eq!(manager.active_pools(), 1);
713 }
714}