prax_query/tenant/
pool.rs

1//! High-performance tenant-aware connection pool management.
2//!
3//! This module provides efficient connection pooling strategies for multi-tenant
4//! applications with support for:
5//!
6//! - **Per-tenant connection pools** with lazy initialization
7//! - **Shared pools with tenant context** for row-level isolation
8//! - **LRU eviction** for tenant pools to bound memory usage
9//! - **Pool warmup** for latency-sensitive applications
10//! - **Health checking** and automatic pool recovery
11//!
12//! # Performance Characteristics
13//!
14//! | Strategy | Memory | Latency | Isolation |
15//! |----------|--------|---------|-----------|
16//! | Shared Pool | Low | Lowest | Row-level |
17//! | Per-tenant Pool | Medium | Low | Schema/DB |
18//! | Database-per-tenant | High | Medium | Complete |
19//!
20//! # Example
21//!
22//! ```rust,ignore
23//! use prax_query::tenant::pool::{TenantPoolManager, PoolStrategy};
24//!
25//! // Create a tenant pool manager
26//! let manager = TenantPoolManager::builder()
27//!     .strategy(PoolStrategy::PerTenant {
28//!         max_pools: 100,
29//!         pool_size: 5,
30//!     })
31//!     .warmup_size(2)
32//!     .idle_timeout(Duration::from_secs(300))
33//!     .build();
34//!
35//! // Get a connection for a tenant
36//! let conn = manager.get("tenant-123").await?;
37//! ```
38
39use parking_lot::{Mutex, RwLock};
40use std::collections::HashMap;
41use std::hash::Hash;
42use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
43use std::sync::Arc;
44use std::time::{Duration, Instant};
45
46use super::context::TenantId;
47
48/// Strategy for managing tenant connections.
49#[derive(Debug, Clone)]
50pub enum PoolStrategy {
51    /// Single shared pool with tenant context injection (row-level isolation).
52    /// Most memory efficient, lowest latency for first request.
53    Shared {
54        /// Maximum connections in the shared pool.
55        max_connections: usize,
56    },
57
58    /// Per-tenant connection pools (schema-based isolation).
59    /// Medium memory usage, consistent latency per tenant.
60    PerTenant {
61        /// Maximum number of tenant pools to keep alive.
62        max_pools: usize,
63        /// Connections per tenant pool.
64        pool_size: usize,
65    },
66
67    /// Per-tenant databases with dedicated pools (complete isolation).
68    /// Highest memory usage, best isolation.
69    PerDatabase {
70        /// Maximum number of tenant databases.
71        max_databases: usize,
72        /// Connections per database pool.
73        pool_size: usize,
74    },
75}
76
77impl Default for PoolStrategy {
78    fn default() -> Self {
79        Self::Shared {
80            max_connections: 20,
81        }
82    }
83}
84
85/// Configuration for tenant pool manager.
86#[derive(Debug, Clone)]
87pub struct PoolConfig {
88    /// Pool management strategy.
89    pub strategy: PoolStrategy,
90    /// Number of connections to pre-warm per tenant.
91    pub warmup_size: usize,
92    /// Time before idle pools are evicted.
93    pub idle_timeout: Duration,
94    /// Time before connections are recycled.
95    pub max_lifetime: Duration,
96    /// Enable connection health checks.
97    pub health_check: bool,
98    /// Health check interval.
99    pub health_check_interval: Duration,
100}
101
102impl Default for PoolConfig {
103    fn default() -> Self {
104        Self {
105            strategy: PoolStrategy::default(),
106            warmup_size: 1,
107            idle_timeout: Duration::from_secs(300),
108            max_lifetime: Duration::from_secs(1800),
109            health_check: true,
110            health_check_interval: Duration::from_secs(30),
111        }
112    }
113}
114
115impl PoolConfig {
116    /// Create a new config builder.
117    pub fn builder() -> PoolConfigBuilder {
118        PoolConfigBuilder::default()
119    }
120}
121
122/// Builder for pool configuration.
123#[derive(Default)]
124pub struct PoolConfigBuilder {
125    strategy: Option<PoolStrategy>,
126    warmup_size: Option<usize>,
127    idle_timeout: Option<Duration>,
128    max_lifetime: Option<Duration>,
129    health_check: Option<bool>,
130    health_check_interval: Option<Duration>,
131}
132
133impl PoolConfigBuilder {
134    /// Set the pool strategy.
135    pub fn strategy(mut self, strategy: PoolStrategy) -> Self {
136        self.strategy = Some(strategy);
137        self
138    }
139
140    /// Use shared pool strategy.
141    pub fn shared(mut self, max_connections: usize) -> Self {
142        self.strategy = Some(PoolStrategy::Shared { max_connections });
143        self
144    }
145
146    /// Use per-tenant pool strategy.
147    pub fn per_tenant(mut self, max_pools: usize, pool_size: usize) -> Self {
148        self.strategy = Some(PoolStrategy::PerTenant {
149            max_pools,
150            pool_size,
151        });
152        self
153    }
154
155    /// Use per-database pool strategy.
156    pub fn per_database(mut self, max_databases: usize, pool_size: usize) -> Self {
157        self.strategy = Some(PoolStrategy::PerDatabase {
158            max_databases,
159            pool_size,
160        });
161        self
162    }
163
164    /// Set warmup size.
165    pub fn warmup_size(mut self, size: usize) -> Self {
166        self.warmup_size = Some(size);
167        self
168    }
169
170    /// Set idle timeout.
171    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
172        self.idle_timeout = Some(timeout);
173        self
174    }
175
176    /// Set max lifetime.
177    pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
178        self.max_lifetime = Some(lifetime);
179        self
180    }
181
182    /// Enable/disable health checks.
183    pub fn health_check(mut self, enabled: bool) -> Self {
184        self.health_check = Some(enabled);
185        self
186    }
187
188    /// Set health check interval.
189    pub fn health_check_interval(mut self, interval: Duration) -> Self {
190        self.health_check_interval = Some(interval);
191        self
192    }
193
194    /// Build the config.
195    pub fn build(self) -> PoolConfig {
196        PoolConfig {
197            strategy: self.strategy.unwrap_or_default(),
198            warmup_size: self.warmup_size.unwrap_or(1),
199            idle_timeout: self.idle_timeout.unwrap_or(Duration::from_secs(300)),
200            max_lifetime: self.max_lifetime.unwrap_or(Duration::from_secs(1800)),
201            health_check: self.health_check.unwrap_or(true),
202            health_check_interval: self
203                .health_check_interval
204                .unwrap_or(Duration::from_secs(30)),
205        }
206    }
207}
208
209/// Statistics for a tenant pool.
210#[derive(Debug, Clone, Default)]
211pub struct PoolStats {
212    /// Total connections acquired.
213    pub connections_acquired: u64,
214    /// Total connections released.
215    pub connections_released: u64,
216    /// Currently active connections.
217    pub active_connections: usize,
218    /// Idle connections available.
219    pub idle_connections: usize,
220    /// Total wait time for connections (ms).
221    pub total_wait_time_ms: u64,
222    /// Maximum wait time observed (ms).
223    pub max_wait_time_ms: u64,
224    /// Connection timeouts.
225    pub timeouts: u64,
226    /// Failed health checks.
227    pub health_check_failures: u64,
228    /// Pool creation time.
229    pub created_at: Option<Instant>,
230    /// Last activity time.
231    pub last_activity: Option<Instant>,
232}
233
234/// Thread-safe pool statistics.
235pub struct AtomicPoolStats {
236    connections_acquired: AtomicU64,
237    connections_released: AtomicU64,
238    active_connections: AtomicUsize,
239    idle_connections: AtomicUsize,
240    total_wait_time_ms: AtomicU64,
241    max_wait_time_ms: AtomicU64,
242    timeouts: AtomicU64,
243    health_check_failures: AtomicU64,
244    created_at: Mutex<Option<Instant>>,
245    last_activity: Mutex<Option<Instant>>,
246}
247
248impl Default for AtomicPoolStats {
249    fn default() -> Self {
250        Self::new()
251    }
252}
253
254impl AtomicPoolStats {
255    /// Create new atomic stats.
256    pub fn new() -> Self {
257        Self {
258            connections_acquired: AtomicU64::new(0),
259            connections_released: AtomicU64::new(0),
260            active_connections: AtomicUsize::new(0),
261            idle_connections: AtomicUsize::new(0),
262            total_wait_time_ms: AtomicU64::new(0),
263            max_wait_time_ms: AtomicU64::new(0),
264            timeouts: AtomicU64::new(0),
265            health_check_failures: AtomicU64::new(0),
266            created_at: Mutex::new(None),
267            last_activity: Mutex::new(None),
268        }
269    }
270
271    /// Record a connection acquisition.
272    pub fn record_acquire(&self, wait_time: Duration) {
273        self.connections_acquired.fetch_add(1, Ordering::Relaxed);
274        self.active_connections.fetch_add(1, Ordering::Relaxed);
275
276        let wait_ms = wait_time.as_millis() as u64;
277        self.total_wait_time_ms.fetch_add(wait_ms, Ordering::Relaxed);
278
279        // Update max wait time (lock-free)
280        let mut current = self.max_wait_time_ms.load(Ordering::Relaxed);
281        while wait_ms > current {
282            match self.max_wait_time_ms.compare_exchange_weak(
283                current,
284                wait_ms,
285                Ordering::Relaxed,
286                Ordering::Relaxed,
287            ) {
288                Ok(_) => break,
289                Err(c) => current = c,
290            }
291        }
292
293        *self.last_activity.lock() = Some(Instant::now());
294    }
295
296    /// Record a connection release.
297    pub fn record_release(&self) {
298        self.connections_released.fetch_add(1, Ordering::Relaxed);
299        self.active_connections.fetch_sub(1, Ordering::Relaxed);
300        *self.last_activity.lock() = Some(Instant::now());
301    }
302
303    /// Record a timeout.
304    pub fn record_timeout(&self) {
305        self.timeouts.fetch_add(1, Ordering::Relaxed);
306    }
307
308    /// Record a health check failure.
309    pub fn record_health_failure(&self) {
310        self.health_check_failures.fetch_add(1, Ordering::Relaxed);
311    }
312
313    /// Set idle connection count.
314    pub fn set_idle(&self, count: usize) {
315        self.idle_connections.store(count, Ordering::Relaxed);
316    }
317
318    /// Mark as created.
319    pub fn mark_created(&self) {
320        *self.created_at.lock() = Some(Instant::now());
321    }
322
323    /// Get a snapshot of the stats.
324    pub fn snapshot(&self) -> PoolStats {
325        PoolStats {
326            connections_acquired: self.connections_acquired.load(Ordering::Relaxed),
327            connections_released: self.connections_released.load(Ordering::Relaxed),
328            active_connections: self.active_connections.load(Ordering::Relaxed),
329            idle_connections: self.idle_connections.load(Ordering::Relaxed),
330            total_wait_time_ms: self.total_wait_time_ms.load(Ordering::Relaxed),
331            max_wait_time_ms: self.max_wait_time_ms.load(Ordering::Relaxed),
332            timeouts: self.timeouts.load(Ordering::Relaxed),
333            health_check_failures: self.health_check_failures.load(Ordering::Relaxed),
334            created_at: *self.created_at.lock(),
335            last_activity: *self.last_activity.lock(),
336        }
337    }
338}
339
340/// LRU entry for tenant pools.
341struct LruEntry<T> {
342    value: T,
343    last_access: Instant,
344    access_count: u64,
345}
346
347/// LRU cache for tenant pools with capacity limits.
348pub struct TenantLruCache<K, V>
349where
350    K: Eq + Hash + Clone,
351{
352    entries: RwLock<HashMap<K, LruEntry<V>>>,
353    max_size: usize,
354    idle_timeout: Duration,
355}
356
357impl<K, V> TenantLruCache<K, V>
358where
359    K: Eq + Hash + Clone,
360{
361    /// Create a new LRU cache.
362    pub fn new(max_size: usize, idle_timeout: Duration) -> Self {
363        Self {
364            entries: RwLock::new(HashMap::with_capacity(max_size)),
365            max_size,
366            idle_timeout,
367        }
368    }
369
370    /// Get a value from the cache, updating access time.
371    pub fn get(&self, key: &K) -> Option<V>
372    where
373        V: Clone,
374    {
375        let mut entries = self.entries.write();
376        if let Some(entry) = entries.get_mut(key) {
377            entry.last_access = Instant::now();
378            entry.access_count += 1;
379            Some(entry.value.clone())
380        } else {
381            None
382        }
383    }
384
385    /// Insert a value into the cache, evicting if necessary.
386    pub fn insert(&self, key: K, value: V) {
387        let mut entries = self.entries.write();
388
389        // Check if we need to evict
390        if entries.len() >= self.max_size && !entries.contains_key(&key) {
391            self.evict_one(&mut entries);
392        }
393
394        entries.insert(
395            key,
396            LruEntry {
397                value,
398                last_access: Instant::now(),
399                access_count: 1,
400            },
401        );
402    }
403
404    /// Remove a value from the cache.
405    pub fn remove(&self, key: &K) -> Option<V> {
406        self.entries.write().remove(key).map(|e| e.value)
407    }
408
409    /// Evict expired entries.
410    pub fn evict_expired(&self) -> usize {
411        let mut entries = self.entries.write();
412        let now = Instant::now();
413        let before = entries.len();
414
415        entries.retain(|_, entry| now.duration_since(entry.last_access) < self.idle_timeout);
416
417        before - entries.len()
418    }
419
420    /// Get cache size.
421    pub fn len(&self) -> usize {
422        self.entries.read().len()
423    }
424
425    /// Check if cache is empty.
426    pub fn is_empty(&self) -> bool {
427        self.len() == 0
428    }
429
430    /// Evict the least recently used entry.
431    fn evict_one(&self, entries: &mut HashMap<K, LruEntry<V>>) {
432        let now = Instant::now();
433
434        // First try to evict expired entries
435        let expired_key = entries
436            .iter()
437            .filter(|(_, e)| now.duration_since(e.last_access) >= self.idle_timeout)
438            .map(|(k, _)| k.clone())
439            .next();
440
441        if let Some(key) = expired_key {
442            entries.remove(&key);
443            return;
444        }
445
446        // Otherwise evict LRU entry
447        let lru_key = entries
448            .iter()
449            .min_by_key(|(_, e)| e.last_access)
450            .map(|(k, _)| k.clone());
451
452        if let Some(key) = lru_key {
453            entries.remove(&key);
454        }
455    }
456}
457
458/// Tenant-specific pool entry.
459pub struct TenantPoolEntry {
460    /// Tenant identifier.
461    pub tenant_id: TenantId,
462    /// Pool statistics.
463    pub stats: Arc<AtomicPoolStats>,
464    /// Pool state.
465    pub state: PoolState,
466    /// Schema name (for schema-based isolation).
467    pub schema: Option<String>,
468    /// Database name (for database-based isolation).
469    pub database: Option<String>,
470}
471
472/// State of a tenant pool.
473#[derive(Debug, Clone, Copy, PartialEq, Eq)]
474pub enum PoolState {
475    /// Pool is initializing.
476    Initializing,
477    /// Pool is ready for connections.
478    Ready,
479    /// Pool is warming up.
480    WarmingUp,
481    /// Pool is draining connections.
482    Draining,
483    /// Pool is closed.
484    Closed,
485}
486
487impl TenantPoolEntry {
488    /// Create a new pool entry.
489    pub fn new(tenant_id: TenantId) -> Self {
490        let stats = Arc::new(AtomicPoolStats::new());
491        stats.mark_created();
492
493        Self {
494            tenant_id,
495            stats,
496            state: PoolState::Initializing,
497            schema: None,
498            database: None,
499        }
500    }
501
502    /// Set the schema.
503    pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
504        self.schema = Some(schema.into());
505        self
506    }
507
508    /// Set the database.
509    pub fn with_database(mut self, database: impl Into<String>) -> Self {
510        self.database = Some(database.into());
511        self
512    }
513
514    /// Mark as ready.
515    pub fn mark_ready(&mut self) {
516        self.state = PoolState::Ready;
517    }
518
519    /// Check if ready.
520    pub fn is_ready(&self) -> bool {
521        self.state == PoolState::Ready
522    }
523
524    /// Get stats snapshot.
525    pub fn stats(&self) -> PoolStats {
526        self.stats.snapshot()
527    }
528
529    /// Check if pool should be evicted (idle too long).
530    pub fn should_evict(&self, idle_timeout: Duration) -> bool {
531        if let Some(last) = self.stats.snapshot().last_activity {
532            Instant::now().duration_since(last) > idle_timeout
533        } else {
534            false
535        }
536    }
537}
538
539/// Manager for tenant connection pools.
540///
541/// This is a placeholder struct that would be implemented with actual database
542/// driver integration (tokio-postgres, sqlx, etc.).
543pub struct TenantPoolManager {
544    config: PoolConfig,
545    pools: TenantLruCache<String, Arc<TenantPoolEntry>>,
546    global_stats: Arc<AtomicPoolStats>,
547}
548
549impl TenantPoolManager {
550    /// Create a new pool manager with the given config.
551    pub fn new(config: PoolConfig) -> Self {
552        let max_pools = match &config.strategy {
553            PoolStrategy::Shared { .. } => 1,
554            PoolStrategy::PerTenant { max_pools, .. } => *max_pools,
555            PoolStrategy::PerDatabase { max_databases, .. } => *max_databases,
556        };
557
558        Self {
559            pools: TenantLruCache::new(max_pools, config.idle_timeout),
560            config,
561            global_stats: Arc::new(AtomicPoolStats::new()),
562        }
563    }
564
565    /// Create a builder.
566    pub fn builder() -> TenantPoolManagerBuilder {
567        TenantPoolManagerBuilder::default()
568    }
569
570    /// Get or create a pool entry for a tenant.
571    pub fn get_or_create(&self, tenant_id: &TenantId) -> Arc<TenantPoolEntry> {
572        let key = tenant_id.as_str().to_string();
573
574        // Try to get existing
575        if let Some(entry) = self.pools.get(&key) {
576            return entry;
577        }
578
579        // Create new entry
580        let entry = Arc::new(TenantPoolEntry::new(tenant_id.clone()));
581        self.pools.insert(key, entry.clone());
582        entry
583    }
584
585    /// Get global statistics.
586    pub fn global_stats(&self) -> PoolStats {
587        self.global_stats.snapshot()
588    }
589
590    /// Get number of active tenant pools.
591    pub fn active_pools(&self) -> usize {
592        self.pools.len()
593    }
594
595    /// Evict expired tenant pools.
596    pub fn evict_expired(&self) -> usize {
597        self.pools.evict_expired()
598    }
599
600    /// Get the pool configuration.
601    pub fn config(&self) -> &PoolConfig {
602        &self.config
603    }
604}
605
606/// Builder for tenant pool manager.
607#[derive(Default)]
608pub struct TenantPoolManagerBuilder {
609    config: Option<PoolConfig>,
610}
611
612impl TenantPoolManagerBuilder {
613    /// Set the pool config.
614    pub fn config(mut self, config: PoolConfig) -> Self {
615        self.config = Some(config);
616        self
617    }
618
619    /// Use shared pool strategy.
620    pub fn shared(self, max_connections: usize) -> Self {
621        self.config(
622            PoolConfig::builder()
623                .shared(max_connections)
624                .build(),
625        )
626    }
627
628    /// Use per-tenant strategy.
629    pub fn per_tenant(self, max_pools: usize, pool_size: usize) -> Self {
630        self.config(
631            PoolConfig::builder()
632                .per_tenant(max_pools, pool_size)
633                .build(),
634        )
635    }
636
637    /// Build the manager.
638    pub fn build(self) -> TenantPoolManager {
639        TenantPoolManager::new(self.config.unwrap_or_default())
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646
647    #[test]
648    fn test_pool_config_builder() {
649        let config = PoolConfig::builder()
650            .per_tenant(100, 5)
651            .warmup_size(2)
652            .idle_timeout(Duration::from_secs(600))
653            .build();
654
655        assert!(matches!(config.strategy, PoolStrategy::PerTenant { .. }));
656        assert_eq!(config.warmup_size, 2);
657        assert_eq!(config.idle_timeout, Duration::from_secs(600));
658    }
659
660    #[test]
661    fn test_atomic_stats() {
662        let stats = AtomicPoolStats::new();
663        stats.mark_created();
664
665        stats.record_acquire(Duration::from_millis(5));
666        stats.record_acquire(Duration::from_millis(10));
667        stats.record_release();
668
669        let snapshot = stats.snapshot();
670        assert_eq!(snapshot.connections_acquired, 2);
671        assert_eq!(snapshot.connections_released, 1);
672        assert_eq!(snapshot.active_connections, 1);
673        assert_eq!(snapshot.max_wait_time_ms, 10);
674    }
675
676    #[test]
677    fn test_lru_cache() {
678        let cache: TenantLruCache<String, i32> =
679            TenantLruCache::new(3, Duration::from_secs(60));
680
681        cache.insert("a".to_string(), 1);
682        cache.insert("b".to_string(), 2);
683        cache.insert("c".to_string(), 3);
684
685        assert_eq!(cache.len(), 3);
686        assert_eq!(cache.get(&"a".to_string()), Some(1));
687
688        // Inserting d should evict the LRU (b or c)
689        cache.insert("d".to_string(), 4);
690        assert_eq!(cache.len(), 3);
691
692        // a should still exist (accessed recently)
693        assert_eq!(cache.get(&"a".to_string()), Some(1));
694    }
695
696    #[test]
697    fn test_tenant_pool_entry() {
698        let entry = TenantPoolEntry::new(TenantId::new("test"))
699            .with_schema("tenant_test");
700
701        assert_eq!(entry.schema, Some("tenant_test".to_string()));
702        assert_eq!(entry.state, PoolState::Initializing);
703    }
704
705    #[test]
706    fn test_pool_manager_creation() {
707        let manager = TenantPoolManager::builder()
708            .per_tenant(100, 5)
709            .build();
710
711        assert_eq!(manager.active_pools(), 0);
712
713        let entry = manager.get_or_create(&TenantId::new("tenant-1"));
714        assert_eq!(entry.tenant_id.as_str(), "tenant-1");
715        assert_eq!(manager.active_pools(), 1);
716
717        // Getting same tenant should return same entry
718        let _entry2 = manager.get_or_create(&TenantId::new("tenant-1"));
719        assert_eq!(manager.active_pools(), 1);
720    }
721}
722