Skip to main content

prax_query/tenant/
pool.rs

1//! High-performance tenant-aware connection pool management.
2//!
3//! This module provides efficient connection pooling strategies for multi-tenant
4//! applications with support for:
5//!
6//! - **Per-tenant connection pools** with lazy initialization
7//! - **Shared pools with tenant context** for row-level isolation
8//! - **LRU eviction** for tenant pools to bound memory usage
9//! - **Pool warmup** for latency-sensitive applications
10//! - **Health checking** and automatic pool recovery
11//!
12//! # Performance Characteristics
13//!
14//! | Strategy | Memory | Latency | Isolation |
15//! |----------|--------|---------|-----------|
16//! | Shared Pool | Low | Lowest | Row-level |
17//! | Per-tenant Pool | Medium | Low | Schema/DB |
18//! | Database-per-tenant | High | Medium | Complete |
19//!
20//! # Example
21//!
22//! ```rust,ignore
23//! use prax_query::tenant::pool::{TenantPoolManager, PoolStrategy};
24//!
25//! // Create a tenant pool manager
26//! let manager = TenantPoolManager::builder()
27//!     .strategy(PoolStrategy::PerTenant {
28//!         max_pools: 100,
29//!         pool_size: 5,
30//!     })
31//!     .warmup_size(2)
32//!     .idle_timeout(Duration::from_secs(300))
33//!     .build();
34//!
35//! // Get a connection for a tenant
36//! let conn = manager.get("tenant-123").await?;
37//! ```
38
39use parking_lot::{Mutex, RwLock};
40use std::collections::HashMap;
41use std::hash::Hash;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
44use std::time::{Duration, Instant};
45
46use super::context::TenantId;
47
48/// Strategy for managing tenant connections.
49#[derive(Debug, Clone)]
50pub enum PoolStrategy {
51    /// Single shared pool with tenant context injection (row-level isolation).
52    /// Most memory efficient, lowest latency for first request.
53    Shared {
54        /// Maximum connections in the shared pool.
55        max_connections: usize,
56    },
57
58    /// Per-tenant connection pools (schema-based isolation).
59    /// Medium memory usage, consistent latency per tenant.
60    PerTenant {
61        /// Maximum number of tenant pools to keep alive.
62        max_pools: usize,
63        /// Connections per tenant pool.
64        pool_size: usize,
65    },
66
67    /// Per-tenant databases with dedicated pools (complete isolation).
68    /// Highest memory usage, best isolation.
69    PerDatabase {
70        /// Maximum number of tenant databases.
71        max_databases: usize,
72        /// Connections per database pool.
73        pool_size: usize,
74    },
75}
76
77impl Default for PoolStrategy {
78    fn default() -> Self {
79        Self::Shared {
80            max_connections: 20,
81        }
82    }
83}
84
85/// Configuration for tenant pool manager.
86#[derive(Debug, Clone)]
87pub struct PoolConfig {
88    /// Pool management strategy.
89    pub strategy: PoolStrategy,
90    /// Number of connections to pre-warm per tenant.
91    pub warmup_size: usize,
92    /// Time before idle pools are evicted.
93    pub idle_timeout: Duration,
94    /// Time before connections are recycled.
95    pub max_lifetime: Duration,
96    /// Enable connection health checks.
97    pub health_check: bool,
98    /// Health check interval.
99    pub health_check_interval: Duration,
100}
101
102impl Default for PoolConfig {
103    fn default() -> Self {
104        Self {
105            strategy: PoolStrategy::default(),
106            warmup_size: 1,
107            idle_timeout: Duration::from_secs(300),
108            max_lifetime: Duration::from_secs(1800),
109            health_check: true,
110            health_check_interval: Duration::from_secs(30),
111        }
112    }
113}
114
115impl PoolConfig {
116    /// Create a new config builder.
117    pub fn builder() -> PoolConfigBuilder {
118        PoolConfigBuilder::default()
119    }
120}
121
122/// Builder for pool configuration.
123#[derive(Default)]
124pub struct PoolConfigBuilder {
125    strategy: Option<PoolStrategy>,
126    warmup_size: Option<usize>,
127    idle_timeout: Option<Duration>,
128    max_lifetime: Option<Duration>,
129    health_check: Option<bool>,
130    health_check_interval: Option<Duration>,
131}
132
133impl PoolConfigBuilder {
134    /// Set the pool strategy.
135    pub fn strategy(mut self, strategy: PoolStrategy) -> Self {
136        self.strategy = Some(strategy);
137        self
138    }
139
140    /// Use shared pool strategy.
141    pub fn shared(mut self, max_connections: usize) -> Self {
142        self.strategy = Some(PoolStrategy::Shared { max_connections });
143        self
144    }
145
146    /// Use per-tenant pool strategy.
147    pub fn per_tenant(mut self, max_pools: usize, pool_size: usize) -> Self {
148        self.strategy = Some(PoolStrategy::PerTenant {
149            max_pools,
150            pool_size,
151        });
152        self
153    }
154
155    /// Use per-database pool strategy.
156    pub fn per_database(mut self, max_databases: usize, pool_size: usize) -> Self {
157        self.strategy = Some(PoolStrategy::PerDatabase {
158            max_databases,
159            pool_size,
160        });
161        self
162    }
163
164    /// Set warmup size.
165    pub fn warmup_size(mut self, size: usize) -> Self {
166        self.warmup_size = Some(size);
167        self
168    }
169
170    /// Set idle timeout.
171    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
172        self.idle_timeout = Some(timeout);
173        self
174    }
175
176    /// Set max lifetime.
177    pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
178        self.max_lifetime = Some(lifetime);
179        self
180    }
181
182    /// Enable/disable health checks.
183    pub fn health_check(mut self, enabled: bool) -> Self {
184        self.health_check = Some(enabled);
185        self
186    }
187
188    /// Set health check interval.
189    pub fn health_check_interval(mut self, interval: Duration) -> Self {
190        self.health_check_interval = Some(interval);
191        self
192    }
193
194    /// Build the config.
195    pub fn build(self) -> PoolConfig {
196        PoolConfig {
197            strategy: self.strategy.unwrap_or_default(),
198            warmup_size: self.warmup_size.unwrap_or(1),
199            idle_timeout: self.idle_timeout.unwrap_or(Duration::from_secs(300)),
200            max_lifetime: self.max_lifetime.unwrap_or(Duration::from_secs(1800)),
201            health_check: self.health_check.unwrap_or(true),
202            health_check_interval: self
203                .health_check_interval
204                .unwrap_or(Duration::from_secs(30)),
205        }
206    }
207}
208
209/// Statistics for a tenant pool.
210#[derive(Debug, Clone, Default)]
211pub struct PoolStats {
212    /// Total connections acquired.
213    pub connections_acquired: u64,
214    /// Total connections released.
215    pub connections_released: u64,
216    /// Currently active connections.
217    pub active_connections: usize,
218    /// Idle connections available.
219    pub idle_connections: usize,
220    /// Total wait time for connections (ms).
221    pub total_wait_time_ms: u64,
222    /// Maximum wait time observed (ms).
223    pub max_wait_time_ms: u64,
224    /// Connection timeouts.
225    pub timeouts: u64,
226    /// Failed health checks.
227    pub health_check_failures: u64,
228    /// Pool creation time.
229    pub created_at: Option<Instant>,
230    /// Last activity time.
231    pub last_activity: Option<Instant>,
232}
233
234/// Thread-safe pool statistics.
235pub struct AtomicPoolStats {
236    connections_acquired: AtomicU64,
237    connections_released: AtomicU64,
238    active_connections: AtomicUsize,
239    idle_connections: AtomicUsize,
240    total_wait_time_ms: AtomicU64,
241    max_wait_time_ms: AtomicU64,
242    timeouts: AtomicU64,
243    health_check_failures: AtomicU64,
244    created_at: Mutex<Option<Instant>>,
245    last_activity: Mutex<Option<Instant>>,
246}
247
248impl Default for AtomicPoolStats {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254impl AtomicPoolStats {
255    /// Create new atomic stats.
256    pub fn new() -> Self {
257        Self {
258            connections_acquired: AtomicU64::new(0),
259            connections_released: AtomicU64::new(0),
260            active_connections: AtomicUsize::new(0),
261            idle_connections: AtomicUsize::new(0),
262            total_wait_time_ms: AtomicU64::new(0),
263            max_wait_time_ms: AtomicU64::new(0),
264            timeouts: AtomicU64::new(0),
265            health_check_failures: AtomicU64::new(0),
266            created_at: Mutex::new(None),
267            last_activity: Mutex::new(None),
268        }
269    }
270
271    /// Record a connection acquisition.
272    pub fn record_acquire(&self, wait_time: Duration) {
273        self.connections_acquired.fetch_add(1, Ordering::Relaxed);
274        self.active_connections.fetch_add(1, Ordering::Relaxed);
275
276        let wait_ms = wait_time.as_millis() as u64;
277        self.total_wait_time_ms
278            .fetch_add(wait_ms, Ordering::Relaxed);
279
280        // Update max wait time (lock-free)
281        let mut current = self.max_wait_time_ms.load(Ordering::Relaxed);
282        while wait_ms > current {
283            match self.max_wait_time_ms.compare_exchange_weak(
284                current,
285                wait_ms,
286                Ordering::Relaxed,
287                Ordering::Relaxed,
288            ) {
289                Ok(_) => break,
290                Err(c) => current = c,
291            }
292        }
293
294        *self.last_activity.lock() = Some(Instant::now());
295    }
296
297    /// Record a connection release.
298    pub fn record_release(&self) {
299        self.connections_released.fetch_add(1, Ordering::Relaxed);
300        self.active_connections.fetch_sub(1, Ordering::Relaxed);
301        *self.last_activity.lock() = Some(Instant::now());
302    }
303
304    /// Record a timeout.
305    pub fn record_timeout(&self) {
306        self.timeouts.fetch_add(1, Ordering::Relaxed);
307    }
308
309    /// Record a health check failure.
310    pub fn record_health_failure(&self) {
311        self.health_check_failures.fetch_add(1, Ordering::Relaxed);
312    }
313
314    /// Set idle connection count.
315    pub fn set_idle(&self, count: usize) {
316        self.idle_connections.store(count, Ordering::Relaxed);
317    }
318
319    /// Mark as created.
320    pub fn mark_created(&self) {
321        *self.created_at.lock() = Some(Instant::now());
322    }
323
324    /// Get a snapshot of the stats.
325    pub fn snapshot(&self) -> PoolStats {
326        PoolStats {
327            connections_acquired: self.connections_acquired.load(Ordering::Relaxed),
328            connections_released: self.connections_released.load(Ordering::Relaxed),
329            active_connections: self.active_connections.load(Ordering::Relaxed),
330            idle_connections: self.idle_connections.load(Ordering::Relaxed),
331            total_wait_time_ms: self.total_wait_time_ms.load(Ordering::Relaxed),
332            max_wait_time_ms: self.max_wait_time_ms.load(Ordering::Relaxed),
333            timeouts: self.timeouts.load(Ordering::Relaxed),
334            health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
335            created_at: *self.created_at.lock(),
336            last_activity: *self.last_activity.lock(),
337        }
338    }
339}
340
341/// LRU entry for tenant pools.
342struct LruEntry<T> {
343    value: T,
344    last_access: Instant,
345    access_count: u64,
346}
347
348/// LRU cache for tenant pools with capacity limits.
349pub struct TenantLruCache<K, V>
350where
351    K: Eq + Hash + Clone,
352{
353    entries: RwLock<HashMap<K, LruEntry<V>>>,
354    max_size: usize,
355    idle_timeout: Duration,
356}
357
358impl<K, V> TenantLruCache<K, V>
359where
360    K: Eq + Hash + Clone,
361{
362    /// Create a new LRU cache.
363    pub fn new(max_size: usize, idle_timeout: Duration) -> Self {
364        Self {
365            entries: RwLock::new(HashMap::with_capacity(max_size)),
366            max_size,
367            idle_timeout,
368        }
369    }
370
371    /// Get a value from the cache, updating access time.
372    pub fn get(&self, key: &K) -> Option<V>
373    where
374        V: Clone,
375    {
376        let mut entries = self.entries.write();
377        if let Some(entry) = entries.get_mut(key) {
378            entry.last_access = Instant::now();
379            entry.access_count += 1;
380            Some(entry.value.clone())
381        } else {
382            None
383        }
384    }
385
386    /// Insert a value into the cache, evicting if necessary.
387    pub fn insert(&self, key: K, value: V) {
388        let mut entries = self.entries.write();
389
390        // Check if we need to evict
391        if entries.len() >= self.max_size && !entries.contains_key(&key) {
392            self.evict_one(&mut entries);
393        }
394
395        entries.insert(
396            key,
397            LruEntry {
398                value,
399                last_access: Instant::now(),
400                access_count: 1,
401            },
402        );
403    }
404
405    /// Remove a value from the cache.
406    pub fn remove(&self, key: &K) -> Option<V> {
407        self.entries.write().remove(key).map(|e| e.value)
408    }
409
410    /// Evict expired entries.
411    pub fn evict_expired(&self) -> usize {
412        let mut entries = self.entries.write();
413        let now = Instant::now();
414        let before = entries.len();
415
416        entries.retain(|_, entry| now.duration_since(entry.last_access) < self.idle_timeout);
417
418        before - entries.len()
419    }
420
421    /// Get cache size.
422    pub fn len(&self) -> usize {
423        self.entries.read().len()
424    }
425
426    /// Check if cache is empty.
427    pub fn is_empty(&self) -> bool {
428        self.len() == 0
429    }
430
431    /// Evict the least recently used entry.
432    fn evict_one(&self, entries: &mut HashMap<K, LruEntry<V>>) {
433        let now = Instant::now();
434
435        // First try to evict expired entries
436        let expired_key = entries
437            .iter()
438            .filter(|(_, e)| now.duration_since(e.last_access) >= self.idle_timeout)
439            .map(|(k, _)| k.clone())
440            .next();
441
442        if let Some(key) = expired_key {
443            entries.remove(&key);
444            return;
445        }
446
447        // Otherwise evict LRU entry
448        let lru_key = entries
449            .iter()
450            .min_by_key(|(_, e)| e.last_access)
451            .map(|(k, _)| k.clone());
452
453        if let Some(key) = lru_key {
454            entries.remove(&key);
455        }
456    }
457}
458
459/// Tenant-specific pool entry.
460pub struct TenantPoolEntry {
461    /// Tenant identifier.
462    pub tenant_id: TenantId,
463    /// Pool statistics.
464    pub stats: Arc<AtomicPoolStats>,
465    /// Pool state.
466    pub state: PoolState,
467    /// Schema name (for schema-based isolation).
468    pub schema: Option<String>,
469    /// Database name (for database-based isolation).
470    pub database: Option<String>,
471}
472
473/// State of a tenant pool.
474#[derive(Debug, Clone, Copy, PartialEq, Eq)]
475pub enum PoolState {
476    /// Pool is initializing.
477    Initializing,
478    /// Pool is ready for connections.
479    Ready,
480    /// Pool is warming up.
481    WarmingUp,
482    /// Pool is draining connections.
483    Draining,
484    /// Pool is closed.
485    Closed,
486}
487
488impl TenantPoolEntry {
489    /// Create a new pool entry.
490    pub fn new(tenant_id: TenantId) -> Self {
491        let stats = Arc::new(AtomicPoolStats::new());
492        stats.mark_created();
493
494        Self {
495            tenant_id,
496            stats,
497            state: PoolState::Initializing,
498            schema: None,
499            database: None,
500        }
501    }
502
503    /// Set the schema.
504    pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
505        self.schema = Some(schema.into());
506        self
507    }
508
509    /// Set the database.
510    pub fn with_database(mut self, database: impl Into<String>) -> Self {
511        self.database = Some(database.into());
512        self
513    }
514
515    /// Mark as ready.
516    pub fn mark_ready(&mut self) {
517        self.state = PoolState::Ready;
518    }
519
520    /// Check if ready.
521    pub fn is_ready(&self) -> bool {
522        self.state == PoolState::Ready
523    }
524
525    /// Get stats snapshot.
526    pub fn stats(&self) -> PoolStats {
527        self.stats.snapshot()
528    }
529
530    /// Check if pool should be evicted (idle too long).
531    pub fn should_evict(&self, idle_timeout: Duration) -> bool {
532        if let Some(last) = self.stats.snapshot().last_activity {
533            Instant::now().duration_since(last) > idle_timeout
534        } else {
535            false
536        }
537    }
538}
539
540/// Manager for tenant connection pools.
541///
542/// This is a placeholder struct that would be implemented with actual database
543/// driver integration (tokio-postgres, sqlx, etc.).
544pub struct TenantPoolManager {
545    config: PoolConfig,
546    pools: TenantLruCache<String, Arc<TenantPoolEntry>>,
547    global_stats: Arc<AtomicPoolStats>,
548}
549
550impl TenantPoolManager {
551    /// Create a new pool manager with the given config.
552    pub fn new(config: PoolConfig) -> Self {
553        let max_pools = match &config.strategy {
554            PoolStrategy::Shared { .. } => 1,
555            PoolStrategy::PerTenant { max_pools, .. } => *max_pools,
556            PoolStrategy::PerDatabase { max_databases, .. } => *max_databases,
557        };
558
559        Self {
560            pools: TenantLruCache::new(max_pools, config.idle_timeout),
561            config,
562            global_stats: Arc::new(AtomicPoolStats::new()),
563        }
564    }
565
566    /// Create a builder.
567    pub fn builder() -> TenantPoolManagerBuilder {
568        TenantPoolManagerBuilder::default()
569    }
570
571    /// Get or create a pool entry for a tenant.
572    pub fn get_or_create(&self, tenant_id: &TenantId) -> Arc<TenantPoolEntry> {
573        let key = tenant_id.as_str().to_string();
574
575        // Try to get existing
576        if let Some(entry) = self.pools.get(&key) {
577            return entry;
578        }
579
580        // Create new entry
581        let entry = Arc::new(TenantPoolEntry::new(tenant_id.clone()));
582        self.pools.insert(key, entry.clone());
583        entry
584    }
585
586    /// Get global statistics.
587    pub fn global_stats(&self) -> PoolStats {
588        self.global_stats.snapshot()
589    }
590
591    /// Get number of active tenant pools.
592    pub fn active_pools(&self) -> usize {
593        self.pools.len()
594    }
595
596    /// Evict expired tenant pools.
597    pub fn evict_expired(&self) -> usize {
598        self.pools.evict_expired()
599    }
600
601    /// Get the pool configuration.
602    pub fn config(&self) -> &PoolConfig {
603        &self.config
604    }
605}
606
607/// Builder for tenant pool manager.
608#[derive(Default)]
609pub struct TenantPoolManagerBuilder {
610    config: Option<PoolConfig>,
611}
612
613impl TenantPoolManagerBuilder {
614    /// Set the pool config.
615    pub fn config(mut self, config: PoolConfig) -> Self {
616        self.config = Some(config);
617        self
618    }
619
620    /// Use shared pool strategy.
621    pub fn shared(self, max_connections: usize) -> Self {
622        self.config(PoolConfig::builder().shared(max_connections).build())
623    }
624
625    /// Use per-tenant strategy.
626    pub fn per_tenant(self, max_pools: usize, pool_size: usize) -> Self {
627        self.config(
628            PoolConfig::builder()
629                .per_tenant(max_pools, pool_size)
630                .build(),
631        )
632    }
633
634    /// Build the manager.
635    pub fn build(self) -> TenantPoolManager {
636        TenantPoolManager::new(self.config.unwrap_or_default())
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643
644    #[test]
645    fn test_pool_config_builder() {
646        let config = PoolConfig::builder()
647            .per_tenant(100, 5)
648            .warmup_size(2)
649            .idle_timeout(Duration::from_secs(600))
650            .build();
651
652        assert!(matches!(config.strategy, PoolStrategy::PerTenant { .. }));
653        assert_eq!(config.warmup_size, 2);
654        assert_eq!(config.idle_timeout, Duration::from_secs(600));
655    }
656
657    #[test]
658    fn test_atomic_stats() {
659        let stats = AtomicPoolStats::new();
660        stats.mark_created();
661
662        stats.record_acquire(Duration::from_millis(5));
663        stats.record_acquire(Duration::from_millis(10));
664        stats.record_release();
665
666        let snapshot = stats.snapshot();
667        assert_eq!(snapshot.connections_acquired, 2);
668        assert_eq!(snapshot.connections_released, 1);
669        assert_eq!(snapshot.active_connections, 1);
670        assert_eq!(snapshot.max_wait_time_ms, 10);
671    }
672
673    #[test]
674    fn test_lru_cache() {
675        let cache: TenantLruCache<String, i32> = TenantLruCache::new(3, Duration::from_secs(60));
676
677        cache.insert("a".to_string(), 1);
678        cache.insert("b".to_string(), 2);
679        cache.insert("c".to_string(), 3);
680
681        assert_eq!(cache.len(), 3);
682        assert_eq!(cache.get(&"a".to_string()), Some(1));
683
684        // Inserting d should evict the LRU (b or c)
685        cache.insert("d".to_string(), 4);
686        assert_eq!(cache.len(), 3);
687
688        // a should still exist (accessed recently)
689        assert_eq!(cache.get(&"a".to_string()), Some(1));
690    }
691
692    #[test]
693    fn test_tenant_pool_entry() {
694        let entry = TenantPoolEntry::new(TenantId::new("test")).with_schema("tenant_test");
695
696        assert_eq!(entry.schema, Some("tenant_test".to_string()));
697        assert_eq!(entry.state, PoolState::Initializing);
698    }
699
700    #[test]
701    fn test_pool_manager_creation() {
702        let manager = TenantPoolManager::builder().per_tenant(100, 5).build();
703
704        assert_eq!(manager.active_pools(), 0);
705
706        let entry = manager.get_or_create(&TenantId::new("tenant-1"));
707        assert_eq!(entry.tenant_id.as_str(), "tenant-1");
708        assert_eq!(manager.active_pools(), 1);
709
710        // Getting same tenant should return same entry
711        let _entry2 = manager.get_or_create(&TenantId::new("tenant-1"));
712        assert_eq!(manager.active_pools(), 1);
713    }
714}