1use parking_lot::{Mutex, RwLock};
40use std::collections::HashMap;
41use std::hash::Hash;
42use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use super::context::TenantId;
47
48#[derive(Debug, Clone)]
50pub enum PoolStrategy {
51 Shared {
54 max_connections: usize,
56 },
57
58 PerTenant {
61 max_pools: usize,
63 pool_size: usize,
65 },
66
67 PerDatabase {
70 max_databases: usize,
72 pool_size: usize,
74 },
75}
76
77impl Default for PoolStrategy {
78 fn default() -> Self {
79 Self::Shared {
80 max_connections: 20,
81 }
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct PoolConfig {
88 pub strategy: PoolStrategy,
90 pub warmup_size: usize,
92 pub idle_timeout: Duration,
94 pub max_lifetime: Duration,
96 pub health_check: bool,
98 pub health_check_interval: Duration,
100}
101
102impl Default for PoolConfig {
103 fn default() -> Self {
104 Self {
105 strategy: PoolStrategy::default(),
106 warmup_size: 1,
107 idle_timeout: Duration::from_secs(300),
108 max_lifetime: Duration::from_secs(1800),
109 health_check: true,
110 health_check_interval: Duration::from_secs(30),
111 }
112 }
113}
114
115impl PoolConfig {
116 pub fn builder() -> PoolConfigBuilder {
118 PoolConfigBuilder::default()
119 }
120}
121
122#[derive(Default)]
124pub struct PoolConfigBuilder {
125 strategy: Option<PoolStrategy>,
126 warmup_size: Option<usize>,
127 idle_timeout: Option<Duration>,
128 max_lifetime: Option<Duration>,
129 health_check: Option<bool>,
130 health_check_interval: Option<Duration>,
131}
132
133impl PoolConfigBuilder {
134 pub fn strategy(mut self, strategy: PoolStrategy) -> Self {
136 self.strategy = Some(strategy);
137 self
138 }
139
140 pub fn shared(mut self, max_connections: usize) -> Self {
142 self.strategy = Some(PoolStrategy::Shared { max_connections });
143 self
144 }
145
146 pub fn per_tenant(mut self, max_pools: usize, pool_size: usize) -> Self {
148 self.strategy = Some(PoolStrategy::PerTenant {
149 max_pools,
150 pool_size,
151 });
152 self
153 }
154
155 pub fn per_database(mut self, max_databases: usize, pool_size: usize) -> Self {
157 self.strategy = Some(PoolStrategy::PerDatabase {
158 max_databases,
159 pool_size,
160 });
161 self
162 }
163
164 pub fn warmup_size(mut self, size: usize) -> Self {
166 self.warmup_size = Some(size);
167 self
168 }
169
170 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
172 self.idle_timeout = Some(timeout);
173 self
174 }
175
176 pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
178 self.max_lifetime = Some(lifetime);
179 self
180 }
181
182 pub fn health_check(mut self, enabled: bool) -> Self {
184 self.health_check = Some(enabled);
185 self
186 }
187
188 pub fn health_check_interval(mut self, interval: Duration) -> Self {
190 self.health_check_interval = Some(interval);
191 self
192 }
193
194 pub fn build(self) -> PoolConfig {
196 PoolConfig {
197 strategy: self.strategy.unwrap_or_default(),
198 warmup_size: self.warmup_size.unwrap_or(1),
199 idle_timeout: self.idle_timeout.unwrap_or(Duration::from_secs(300)),
200 max_lifetime: self.max_lifetime.unwrap_or(Duration::from_secs(1800)),
201 health_check: self.health_check.unwrap_or(true),
202 health_check_interval: self
203 .health_check_interval
204 .unwrap_or(Duration::from_secs(30)),
205 }
206 }
207}
208
209#[derive(Debug, Clone, Default)]
211pub struct PoolStats {
212 pub connections_acquired: u64,
214 pub connections_released: u64,
216 pub active_connections: usize,
218 pub idle_connections: usize,
220 pub total_wait_time_ms: u64,
222 pub max_wait_time_ms: u64,
224 pub timeouts: u64,
226 pub health_check_failures: u64,
228 pub created_at: Option<Instant>,
230 pub last_activity: Option<Instant>,
232}
233
234pub struct AtomicPoolStats {
236 connections_acquired: AtomicU64,
237 connections_released: AtomicU64,
238 active_connections: AtomicUsize,
239 idle_connections: AtomicUsize,
240 total_wait_time_ms: AtomicU64,
241 max_wait_time_ms: AtomicU64,
242 timeouts: AtomicU64,
243 health_check_failures: AtomicU64,
244 created_at: Mutex<Option<Instant>>,
245 last_activity: Mutex<Option<Instant>>,
246}
247
248impl Default for AtomicPoolStats {
249 fn default() -> Self {
250 Self::new()
251 }
252}
253
254impl AtomicPoolStats {
255 pub fn new() -> Self {
257 Self {
258 connections_acquired: AtomicU64::new(0),
259 connections_released: AtomicU64::new(0),
260 active_connections: AtomicUsize::new(0),
261 idle_connections: AtomicUsize::new(0),
262 total_wait_time_ms: AtomicU64::new(0),
263 max_wait_time_ms: AtomicU64::new(0),
264 timeouts: AtomicU64::new(0),
265 health_check_failures: AtomicU64::new(0),
266 created_at: Mutex::new(None),
267 last_activity: Mutex::new(None),
268 }
269 }
270
271 pub fn record_acquire(&self, wait_time: Duration) {
273 self.connections_acquired.fetch_add(1, Ordering::Relaxed);
274 self.active_connections.fetch_add(1, Ordering::Relaxed);
275
276 let wait_ms = wait_time.as_millis() as u64;
277 self.total_wait_time_ms.fetch_add(wait_ms, Ordering::Relaxed);
278
279 let mut current = self.max_wait_time_ms.load(Ordering::Relaxed);
281 while wait_ms > current {
282 match self.max_wait_time_ms.compare_exchange_weak(
283 current,
284 wait_ms,
285 Ordering::Relaxed,
286 Ordering::Relaxed,
287 ) {
288 Ok(_) => break,
289 Err(c) => current = c,
290 }
291 }
292
293 *self.last_activity.lock() = Some(Instant::now());
294 }
295
296 pub fn record_release(&self) {
298 self.connections_released.fetch_add(1, Ordering::Relaxed);
299 self.active_connections.fetch_sub(1, Ordering::Relaxed);
300 *self.last_activity.lock() = Some(Instant::now());
301 }
302
303 pub fn record_timeout(&self) {
305 self.timeouts.fetch_add(1, Ordering::Relaxed);
306 }
307
308 pub fn record_health_failure(&self) {
310 self.health_check_failures.fetch_add(1, Ordering::Relaxed);
311 }
312
313 pub fn set_idle(&self, count: usize) {
315 self.idle_connections.store(count, Ordering::Relaxed);
316 }
317
318 pub fn mark_created(&self) {
320 *self.created_at.lock() = Some(Instant::now());
321 }
322
323 pub fn snapshot(&self) -> PoolStats {
325 PoolStats {
326 connections_acquired: self.connections_acquired.load(Ordering::Relaxed),
327 connections_released: self.connections_released.load(Ordering::Relaxed),
328 active_connections: self.active_connections.load(Ordering::Relaxed),
329 idle_connections: self.idle_connections.load(Ordering::Relaxed),
330 total_wait_time_ms: self.total_wait_time_ms.load(Ordering::Relaxed),
331 max_wait_time_ms: self.max_wait_time_ms.load(Ordering::Relaxed),
332 timeouts: self.timeouts.load(Ordering::Relaxed),
333 health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
334 created_at: *self.created_at.lock(),
335 last_activity: *self.last_activity.lock(),
336 }
337 }
338}
339
340struct LruEntry<T> {
342 value: T,
343 last_access: Instant,
344 access_count: u64,
345}
346
347pub struct TenantLruCache<K, V>
349where
350 K: Eq + Hash + Clone,
351{
352 entries: RwLock<HashMap<K, LruEntry<V>>>,
353 max_size: usize,
354 idle_timeout: Duration,
355}
356
357impl<K, V> TenantLruCache<K, V>
358where
359 K: Eq + Hash + Clone,
360{
361 pub fn new(max_size: usize, idle_timeout: Duration) -> Self {
363 Self {
364 entries: RwLock::new(HashMap::with_capacity(max_size)),
365 max_size,
366 idle_timeout,
367 }
368 }
369
370 pub fn get(&self, key: &K) -> Option<V>
372 where
373 V: Clone,
374 {
375 let mut entries = self.entries.write();
376 if let Some(entry) = entries.get_mut(key) {
377 entry.last_access = Instant::now();
378 entry.access_count += 1;
379 Some(entry.value.clone())
380 } else {
381 None
382 }
383 }
384
385 pub fn insert(&self, key: K, value: V) {
387 let mut entries = self.entries.write();
388
389 if entries.len() >= self.max_size && !entries.contains_key(&key) {
391 self.evict_one(&mut entries);
392 }
393
394 entries.insert(
395 key,
396 LruEntry {
397 value,
398 last_access: Instant::now(),
399 access_count: 1,
400 },
401 );
402 }
403
404 pub fn remove(&self, key: &K) -> Option<V> {
406 self.entries.write().remove(key).map(|e| e.value)
407 }
408
409 pub fn evict_expired(&self) -> usize {
411 let mut entries = self.entries.write();
412 let now = Instant::now();
413 let before = entries.len();
414
415 entries.retain(|_, entry| now.duration_since(entry.last_access) < self.idle_timeout);
416
417 before - entries.len()
418 }
419
420 pub fn len(&self) -> usize {
422 self.entries.read().len()
423 }
424
425 pub fn is_empty(&self) -> bool {
427 self.len() == 0
428 }
429
430 fn evict_one(&self, entries: &mut HashMap<K, LruEntry<V>>) {
432 let now = Instant::now();
433
434 let expired_key = entries
436 .iter()
437 .filter(|(_, e)| now.duration_since(e.last_access) >= self.idle_timeout)
438 .map(|(k, _)| k.clone())
439 .next();
440
441 if let Some(key) = expired_key {
442 entries.remove(&key);
443 return;
444 }
445
446 let lru_key = entries
448 .iter()
449 .min_by_key(|(_, e)| e.last_access)
450 .map(|(k, _)| k.clone());
451
452 if let Some(key) = lru_key {
453 entries.remove(&key);
454 }
455 }
456}
457
458pub struct TenantPoolEntry {
460 pub tenant_id: TenantId,
462 pub stats: Arc<AtomicPoolStats>,
464 pub state: PoolState,
466 pub schema: Option<String>,
468 pub database: Option<String>,
470}
471
472#[derive(Debug, Clone, Copy, PartialEq, Eq)]
474pub enum PoolState {
475 Initializing,
477 Ready,
479 WarmingUp,
481 Draining,
483 Closed,
485}
486
487impl TenantPoolEntry {
488 pub fn new(tenant_id: TenantId) -> Self {
490 let stats = Arc::new(AtomicPoolStats::new());
491 stats.mark_created();
492
493 Self {
494 tenant_id,
495 stats,
496 state: PoolState::Initializing,
497 schema: None,
498 database: None,
499 }
500 }
501
502 pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
504 self.schema = Some(schema.into());
505 self
506 }
507
508 pub fn with_database(mut self, database: impl Into<String>) -> Self {
510 self.database = Some(database.into());
511 self
512 }
513
514 pub fn mark_ready(&mut self) {
516 self.state = PoolState::Ready;
517 }
518
519 pub fn is_ready(&self) -> bool {
521 self.state == PoolState::Ready
522 }
523
524 pub fn stats(&self) -> PoolStats {
526 self.stats.snapshot()
527 }
528
529 pub fn should_evict(&self, idle_timeout: Duration) -> bool {
531 if let Some(last) = self.stats.snapshot().last_activity {
532 Instant::now().duration_since(last) > idle_timeout
533 } else {
534 false
535 }
536 }
537}
538
539pub struct TenantPoolManager {
544 config: PoolConfig,
545 pools: TenantLruCache<String, Arc<TenantPoolEntry>>,
546 global_stats: Arc<AtomicPoolStats>,
547}
548
549impl TenantPoolManager {
550 pub fn new(config: PoolConfig) -> Self {
552 let max_pools = match &config.strategy {
553 PoolStrategy::Shared { .. } => 1,
554 PoolStrategy::PerTenant { max_pools, .. } => *max_pools,
555 PoolStrategy::PerDatabase { max_databases, .. } => *max_databases,
556 };
557
558 Self {
559 pools: TenantLruCache::new(max_pools, config.idle_timeout),
560 config,
561 global_stats: Arc::new(AtomicPoolStats::new()),
562 }
563 }
564
565 pub fn builder() -> TenantPoolManagerBuilder {
567 TenantPoolManagerBuilder::default()
568 }
569
570 pub fn get_or_create(&self, tenant_id: &TenantId) -> Arc<TenantPoolEntry> {
572 let key = tenant_id.as_str().to_string();
573
574 if let Some(entry) = self.pools.get(&key) {
576 return entry;
577 }
578
579 let entry = Arc::new(TenantPoolEntry::new(tenant_id.clone()));
581 self.pools.insert(key, entry.clone());
582 entry
583 }
584
585 pub fn global_stats(&self) -> PoolStats {
587 self.global_stats.snapshot()
588 }
589
590 pub fn active_pools(&self) -> usize {
592 self.pools.len()
593 }
594
595 pub fn evict_expired(&self) -> usize {
597 self.pools.evict_expired()
598 }
599
600 pub fn config(&self) -> &PoolConfig {
602 &self.config
603 }
604}
605
606#[derive(Default)]
608pub struct TenantPoolManagerBuilder {
609 config: Option<PoolConfig>,
610}
611
612impl TenantPoolManagerBuilder {
613 pub fn config(mut self, config: PoolConfig) -> Self {
615 self.config = Some(config);
616 self
617 }
618
619 pub fn shared(self, max_connections: usize) -> Self {
621 self.config(
622 PoolConfig::builder()
623 .shared(max_connections)
624 .build(),
625 )
626 }
627
628 pub fn per_tenant(self, max_pools: usize, pool_size: usize) -> Self {
630 self.config(
631 PoolConfig::builder()
632 .per_tenant(max_pools, pool_size)
633 .build(),
634 )
635 }
636
637 pub fn build(self) -> TenantPoolManager {
639 TenantPoolManager::new(self.config.unwrap_or_default())
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 #[test]
648 fn test_pool_config_builder() {
649 let config = PoolConfig::builder()
650 .per_tenant(100, 5)
651 .warmup_size(2)
652 .idle_timeout(Duration::from_secs(600))
653 .build();
654
655 assert!(matches!(config.strategy, PoolStrategy::PerTenant { .. }));
656 assert_eq!(config.warmup_size, 2);
657 assert_eq!(config.idle_timeout, Duration::from_secs(600));
658 }
659
660 #[test]
661 fn test_atomic_stats() {
662 let stats = AtomicPoolStats::new();
663 stats.mark_created();
664
665 stats.record_acquire(Duration::from_millis(5));
666 stats.record_acquire(Duration::from_millis(10));
667 stats.record_release();
668
669 let snapshot = stats.snapshot();
670 assert_eq!(snapshot.connections_acquired, 2);
671 assert_eq!(snapshot.connections_released, 1);
672 assert_eq!(snapshot.active_connections, 1);
673 assert_eq!(snapshot.max_wait_time_ms, 10);
674 }
675
676 #[test]
677 fn test_lru_cache() {
678 let cache: TenantLruCache<String, i32> =
679 TenantLruCache::new(3, Duration::from_secs(60));
680
681 cache.insert("a".to_string(), 1);
682 cache.insert("b".to_string(), 2);
683 cache.insert("c".to_string(), 3);
684
685 assert_eq!(cache.len(), 3);
686 assert_eq!(cache.get(&"a".to_string()), Some(1));
687
688 cache.insert("d".to_string(), 4);
690 assert_eq!(cache.len(), 3);
691
692 assert_eq!(cache.get(&"a".to_string()), Some(1));
694 }
695
696 #[test]
697 fn test_tenant_pool_entry() {
698 let entry = TenantPoolEntry::new(TenantId::new("test"))
699 .with_schema("tenant_test");
700
701 assert_eq!(entry.schema, Some("tenant_test".to_string()));
702 assert_eq!(entry.state, PoolState::Initializing);
703 }
704
705 #[test]
706 fn test_pool_manager_creation() {
707 let manager = TenantPoolManager::builder()
708 .per_tenant(100, 5)
709 .build();
710
711 assert_eq!(manager.active_pools(), 0);
712
713 let entry = manager.get_or_create(&TenantId::new("tenant-1"));
714 assert_eq!(entry.tenant_id.as_str(), "tenant-1");
715 assert_eq!(manager.active_pools(), 1);
716
717 let _entry2 = manager.get_or_create(&TenantId::new("tenant-1"));
719 assert_eq!(manager.active_pools(), 1);
720 }
721}
722