Skip to main content

heliosdb_proxy/multi_tenancy/
pool.rs

1//! Tenant-Aware Connection Pool
2//!
3//! This module provides connection pool management with per-tenant isolation
4//! and resource allocation.
5
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use dashmap::DashMap;
11
12use super::config::{TenantConfig, TenantId, TenantPoolConfig};
13
14/// Connection state
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ConnectionState {
17    /// Connection is idle and available
18    Idle,
19    /// Connection is in use
20    Active,
21    /// Connection is being validated
22    Validating,
23    /// Connection is closed
24    Closed,
25}
26
27/// A pooled connection handle
28#[derive(Debug)]
29pub struct PooledConnection {
30    /// Connection identifier
31    id: u64,
32
33    /// Tenant that owns this connection
34    tenant_id: TenantId,
35
36    /// When the connection was created
37    created_at: Instant,
38
39    /// When the connection was last used
40    last_used: Instant,
41
42    /// Current state
43    state: ConnectionState,
44
45    /// Total queries executed
46    queries_executed: u64,
47
48    /// Backend connection info (e.g., socket address)
49    backend_info: String,
50}
51
52impl PooledConnection {
53    /// Create a new pooled connection
54    pub fn new(id: u64, tenant_id: TenantId, backend_info: impl Into<String>) -> Self {
55        let now = Instant::now();
56        Self {
57            id,
58            tenant_id,
59            created_at: now,
60            last_used: now,
61            state: ConnectionState::Idle,
62            queries_executed: 0,
63            backend_info: backend_info.into(),
64        }
65    }
66
67    /// Get connection ID
68    pub fn id(&self) -> u64 {
69        self.id
70    }
71
72    /// Get tenant ID
73    pub fn tenant_id(&self) -> &TenantId {
74        &self.tenant_id
75    }
76
77    /// Get connection age
78    pub fn age(&self) -> Duration {
79        self.created_at.elapsed()
80    }
81
82    /// Get idle time
83    pub fn idle_time(&self) -> Duration {
84        self.last_used.elapsed()
85    }
86
87    /// Get connection state
88    pub fn state(&self) -> ConnectionState {
89        self.state
90    }
91
92    /// Check if connection is available
93    pub fn is_available(&self) -> bool {
94        self.state == ConnectionState::Idle
95    }
96
97    /// Mark connection as active
98    pub fn mark_active(&mut self) {
99        self.state = ConnectionState::Active;
100        self.last_used = Instant::now();
101    }
102
103    /// Mark connection as idle
104    pub fn mark_idle(&mut self) {
105        self.state = ConnectionState::Idle;
106        self.last_used = Instant::now();
107    }
108
109    /// Increment query counter
110    pub fn record_query(&mut self) {
111        self.queries_executed += 1;
112        self.last_used = Instant::now();
113    }
114
115    /// Get backend info
116    pub fn backend_info(&self) -> &str {
117        &self.backend_info
118    }
119
120    /// Get queries executed count
121    pub fn queries_executed(&self) -> u64 {
122        self.queries_executed
123    }
124}
125
126/// Per-tenant connection pool
127#[derive(Debug)]
128pub struct TenantPool {
129    /// Tenant ID
130    tenant_id: TenantId,
131
132    /// Pool configuration
133    config: TenantPoolConfig,
134
135    /// Active connections count
136    active_count: AtomicU32,
137
138    /// Idle connections count
139    idle_count: AtomicU32,
140
141    /// Total connections created
142    total_created: AtomicU64,
143
144    /// Total connections closed
145    total_closed: AtomicU64,
146
147    /// Waiting requests count
148    waiting_count: AtomicU32,
149
150    /// Pool creation time
151    created_at: Instant,
152}
153
154impl TenantPool {
155    /// Create a new tenant pool
156    pub fn new(tenant_id: TenantId, config: TenantPoolConfig) -> Self {
157        Self {
158            tenant_id,
159            config,
160            active_count: AtomicU32::new(0),
161            idle_count: AtomicU32::new(0),
162            total_created: AtomicU64::new(0),
163            total_closed: AtomicU64::new(0),
164            waiting_count: AtomicU32::new(0),
165            created_at: Instant::now(),
166        }
167    }
168
169    /// Get tenant ID
170    pub fn tenant_id(&self) -> &TenantId {
171        &self.tenant_id
172    }
173
174    /// Get pool configuration
175    pub fn config(&self) -> &TenantPoolConfig {
176        &self.config
177    }
178
179    /// Get active connection count
180    pub fn active_count(&self) -> u32 {
181        self.active_count.load(Ordering::Relaxed)
182    }
183
184    /// Get idle connection count
185    pub fn idle_count(&self) -> u32 {
186        self.idle_count.load(Ordering::Relaxed)
187    }
188
189    /// Get total connection count
190    pub fn total_count(&self) -> u32 {
191        self.active_count() + self.idle_count()
192    }
193
194    /// Get waiting requests count
195    pub fn waiting_count(&self) -> u32 {
196        self.waiting_count.load(Ordering::Relaxed)
197    }
198
199    /// Check if pool is at capacity
200    pub fn is_at_capacity(&self) -> bool {
201        self.total_count() >= self.config.max_connections
202    }
203
204    /// Check if pool can accept new connection
205    pub fn can_create_connection(&self) -> bool {
206        self.total_count() < self.config.max_connections
207    }
208
209    /// Check if pool has available connections
210    pub fn has_available(&self) -> bool {
211        self.idle_count() > 0
212    }
213
214    /// Record connection acquired
215    pub fn record_acquire(&self) {
216        self.idle_count.fetch_sub(1, Ordering::Relaxed);
217        self.active_count.fetch_add(1, Ordering::Relaxed);
218    }
219
220    /// Record connection released
221    pub fn record_release(&self) {
222        self.active_count.fetch_sub(1, Ordering::Relaxed);
223        self.idle_count.fetch_add(1, Ordering::Relaxed);
224    }
225
226    /// Record connection created
227    pub fn record_created(&self) {
228        self.idle_count.fetch_add(1, Ordering::Relaxed);
229        self.total_created.fetch_add(1, Ordering::Relaxed);
230    }
231
232    /// Record connection closed
233    pub fn record_closed(&self, was_active: bool) {
234        if was_active {
235            self.active_count.fetch_sub(1, Ordering::Relaxed);
236        } else {
237            self.idle_count.fetch_sub(1, Ordering::Relaxed);
238        }
239        self.total_closed.fetch_add(1, Ordering::Relaxed);
240    }
241
242    /// Record waiting request
243    pub fn record_waiting(&self) {
244        self.waiting_count.fetch_add(1, Ordering::Relaxed);
245    }
246
247    /// Record request no longer waiting
248    pub fn record_not_waiting(&self) {
249        self.waiting_count.fetch_sub(1, Ordering::Relaxed);
250    }
251
252    /// Get pool utilization (0.0 to 1.0)
253    pub fn utilization(&self) -> f32 {
254        let total = self.total_count();
255        if total == 0 {
256            return 0.0;
257        }
258        self.active_count() as f32 / self.config.max_connections as f32
259    }
260
261    /// Get pool age
262    pub fn age(&self) -> Duration {
263        self.created_at.elapsed()
264    }
265
266    /// Get statistics snapshot
267    pub fn stats(&self) -> TenantPoolStats {
268        TenantPoolStats {
269            tenant_id: self.tenant_id.clone(),
270            active: self.active_count(),
271            idle: self.idle_count(),
272            max: self.config.max_connections,
273            waiting: self.waiting_count(),
274            total_created: self.total_created.load(Ordering::Relaxed),
275            total_closed: self.total_closed.load(Ordering::Relaxed),
276            utilization: self.utilization(),
277            age: self.age(),
278        }
279    }
280}
281
282/// Tenant pool statistics
283#[derive(Debug, Clone)]
284pub struct TenantPoolStats {
285    /// Tenant ID
286    pub tenant_id: TenantId,
287
288    /// Active connections
289    pub active: u32,
290
291    /// Idle connections
292    pub idle: u32,
293
294    /// Maximum connections
295    pub max: u32,
296
297    /// Waiting requests
298    pub waiting: u32,
299
300    /// Total connections created
301    pub total_created: u64,
302
303    /// Total connections closed
304    pub total_closed: u64,
305
306    /// Pool utilization (0.0 to 1.0)
307    pub utilization: f32,
308
309    /// Pool age
310    pub age: Duration,
311}
312
313/// Tenant connection pool manager
314///
315/// Manages connection pools across all tenants.
316pub struct TenantConnectionPool {
317    /// Per-tenant pools
318    pools: DashMap<TenantId, Arc<TenantPool>>,
319
320    /// Shared pool for small/unknown tenants
321    shared_pool: Arc<TenantPool>,
322
323    /// Default pool configuration
324    default_config: TenantPoolConfig,
325
326    /// Connection ID counter
327    connection_counter: AtomicU64,
328
329    /// Total acquire count
330    total_acquires: AtomicU64,
331
332    /// Total acquire timeouts
333    total_timeouts: AtomicU64,
334
335    /// Threshold for dedicated pool (connections)
336    dedicated_pool_threshold: u32,
337}
338
339impl TenantConnectionPool {
340    /// Create a new tenant connection pool manager
341    pub fn new(default_config: TenantPoolConfig) -> Self {
342        let shared_pool = Arc::new(TenantPool::new(
343            TenantId::new("__shared__"),
344            TenantPoolConfig {
345                max_connections: 50,
346                ..default_config.clone()
347            },
348        ));
349
350        Self {
351            pools: DashMap::new(),
352            shared_pool,
353            default_config,
354            connection_counter: AtomicU64::new(0),
355            total_acquires: AtomicU64::new(0),
356            total_timeouts: AtomicU64::new(0),
357            dedicated_pool_threshold: 5,
358        }
359    }
360
361    /// Set dedicated pool threshold
362    pub fn with_dedicated_threshold(mut self, threshold: u32) -> Self {
363        self.dedicated_pool_threshold = threshold;
364        self
365    }
366
367    /// Get or create pool for tenant
368    pub fn get_pool(&self, tenant: &TenantId, config: &TenantConfig) -> Arc<TenantPool> {
369        // Check if tenant should use dedicated pool
370        if config.pool.dedicated_pool
371            || config.pool.max_connections >= self.dedicated_pool_threshold
372        {
373            self.pools
374                .entry(tenant.clone())
375                .or_insert_with(|| Arc::new(TenantPool::new(tenant.clone(), config.pool.clone())))
376                .clone()
377        } else {
378            self.shared_pool.clone()
379        }
380    }
381
382    /// Get existing pool for tenant (if any)
383    pub fn get_existing_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
384        self.pools.get(tenant).map(|p| p.clone())
385    }
386
387    /// Create a tenant-specific pool
388    pub fn create_tenant_pool(&self, tenant: &TenantId, config: TenantPoolConfig) {
389        self.pools
390            .insert(tenant.clone(), Arc::new(TenantPool::new(tenant.clone(), config)));
391    }
392
393    /// Remove a tenant pool
394    pub fn remove_tenant_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
395        self.pools.remove(tenant).map(|(_, pool)| pool)
396    }
397
398    /// Generate new connection ID
399    pub fn next_connection_id(&self) -> u64 {
400        self.connection_counter.fetch_add(1, Ordering::Relaxed)
401    }
402
403    /// Record an acquire attempt
404    pub fn record_acquire(&self) {
405        self.total_acquires.fetch_add(1, Ordering::Relaxed);
406    }
407
408    /// Record a timeout
409    pub fn record_timeout(&self) {
410        self.total_timeouts.fetch_add(1, Ordering::Relaxed);
411    }
412
413    /// Get all tenant pool stats
414    pub fn all_stats(&self) -> Vec<TenantPoolStats> {
415        let mut stats: Vec<TenantPoolStats> = self
416            .pools
417            .iter()
418            .map(|entry| entry.value().stats())
419            .collect();
420
421        // Add shared pool stats
422        stats.push(self.shared_pool.stats());
423
424        stats
425    }
426
427    /// Get tenant pool stats
428    pub fn tenant_stats(&self, tenant: &TenantId) -> Option<TenantPoolStats> {
429        self.pools.get(tenant).map(|p| p.stats())
430    }
431
432    /// Get shared pool stats
433    pub fn shared_pool_stats(&self) -> TenantPoolStats {
434        self.shared_pool.stats()
435    }
436
437    /// Get total number of tenant pools
438    pub fn tenant_pool_count(&self) -> usize {
439        self.pools.len()
440    }
441
442    /// Get aggregate statistics
443    pub fn aggregate_stats(&self) -> AggregatePoolStats {
444        let mut total_active = 0u32;
445        let mut total_idle = 0u32;
446        let mut total_max = 0u32;
447        let mut total_waiting = 0u32;
448
449        for pool in self.pools.iter() {
450            total_active += pool.active_count();
451            total_idle += pool.idle_count();
452            total_max += pool.config().max_connections;
453            total_waiting += pool.waiting_count();
454        }
455
456        // Add shared pool
457        total_active += self.shared_pool.active_count();
458        total_idle += self.shared_pool.idle_count();
459        total_max += self.shared_pool.config().max_connections;
460        total_waiting += self.shared_pool.waiting_count();
461
462        AggregatePoolStats {
463            tenant_pools: self.pools.len(),
464            total_active,
465            total_idle,
466            total_max,
467            total_waiting,
468            total_acquires: self.total_acquires.load(Ordering::Relaxed),
469            total_timeouts: self.total_timeouts.load(Ordering::Relaxed),
470            average_utilization: if total_max > 0 {
471                total_active as f32 / total_max as f32
472            } else {
473                0.0
474            },
475        }
476    }
477}
478
479/// Aggregate pool statistics
480#[derive(Debug, Clone)]
481pub struct AggregatePoolStats {
482    /// Number of tenant-specific pools
483    pub tenant_pools: usize,
484
485    /// Total active connections across all pools
486    pub total_active: u32,
487
488    /// Total idle connections across all pools
489    pub total_idle: u32,
490
491    /// Total maximum connections across all pools
492    pub total_max: u32,
493
494    /// Total waiting requests
495    pub total_waiting: u32,
496
497    /// Total acquire attempts
498    pub total_acquires: u64,
499
500    /// Total timeout occurrences
501    pub total_timeouts: u64,
502
503    /// Average utilization across all pools
504    pub average_utilization: f32,
505}
506
507/// Acquire result
508#[derive(Debug)]
509pub enum AcquireResult {
510    /// Connection acquired successfully
511    Success(PooledConnection),
512
513    /// Waiting for connection
514    Waiting,
515
516    /// Pool is at capacity
517    PoolExhausted,
518
519    /// Tenant not found
520    TenantNotFound,
521
522    /// Acquire timed out
523    Timeout,
524}
525
526/// Connection lease for tenant
527pub struct TenantConnectionLease {
528    /// The pooled connection
529    connection: PooledConnection,
530
531    /// Pool reference for returning connection
532    pool: Arc<TenantPool>,
533
534    /// Lease start time
535    leased_at: Instant,
536
537    /// Whether connection was used
538    used: bool,
539}
540
541impl TenantConnectionLease {
542    /// Create a new lease
543    pub fn new(connection: PooledConnection, pool: Arc<TenantPool>) -> Self {
544        Self {
545            connection,
546            pool,
547            leased_at: Instant::now(),
548            used: false,
549        }
550    }
551
552    /// Get the connection
553    pub fn connection(&self) -> &PooledConnection {
554        &self.connection
555    }
556
557    /// Get mutable connection
558    pub fn connection_mut(&mut self) -> &mut PooledConnection {
559        self.used = true;
560        &mut self.connection
561    }
562
563    /// Get tenant ID
564    pub fn tenant_id(&self) -> &TenantId {
565        self.connection.tenant_id()
566    }
567
568    /// Get lease duration
569    pub fn lease_duration(&self) -> Duration {
570        self.leased_at.elapsed()
571    }
572
573    /// Mark as used
574    pub fn mark_used(&mut self) {
575        self.used = true;
576    }
577
578    /// Check if lease was used
579    pub fn was_used(&self) -> bool {
580        self.used
581    }
582
583    /// Release the lease (return connection to pool)
584    pub fn release(mut self) {
585        self.connection.mark_idle();
586        self.pool.record_release();
587    }
588}
589
590impl Drop for TenantConnectionLease {
591    fn drop(&mut self) {
592        // Auto-release if not explicitly released
593        if self.connection.state() == ConnectionState::Active {
594            self.pool.record_release();
595        }
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602
603    #[test]
604    fn test_pooled_connection() {
605        let tenant = TenantId::new("test");
606        let mut conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
607
608        assert_eq!(conn.id(), 1);
609        assert_eq!(conn.tenant_id().as_str(), "test");
610        assert!(conn.is_available());
611        assert_eq!(conn.state(), ConnectionState::Idle);
612
613        conn.mark_active();
614        assert!(!conn.is_available());
615        assert_eq!(conn.state(), ConnectionState::Active);
616
617        conn.record_query();
618        assert_eq!(conn.queries_executed(), 1);
619
620        conn.mark_idle();
621        assert!(conn.is_available());
622    }
623
624    #[test]
625    fn test_tenant_pool() {
626        let tenant = TenantId::new("test");
627        let config = TenantPoolConfig {
628            max_connections: 10,
629            ..Default::default()
630        };
631        let pool = TenantPool::new(tenant.clone(), config);
632
633        assert_eq!(pool.tenant_id().as_str(), "test");
634        assert_eq!(pool.active_count(), 0);
635        assert_eq!(pool.idle_count(), 0);
636        assert!(!pool.is_at_capacity());
637        assert!(pool.can_create_connection());
638
639        pool.record_created();
640        assert_eq!(pool.idle_count(), 1);
641
642        pool.record_acquire();
643        assert_eq!(pool.active_count(), 1);
644        assert_eq!(pool.idle_count(), 0);
645
646        pool.record_release();
647        assert_eq!(pool.active_count(), 0);
648        assert_eq!(pool.idle_count(), 1);
649    }
650
651    #[test]
652    fn test_tenant_pool_utilization() {
653        let tenant = TenantId::new("test");
654        let config = TenantPoolConfig {
655            max_connections: 10,
656            ..Default::default()
657        };
658        let pool = TenantPool::new(tenant, config);
659
660        assert_eq!(pool.utilization(), 0.0);
661
662        // Create 5 connections, use 3
663        for _ in 0..5 {
664            pool.record_created();
665        }
666        for _ in 0..3 {
667            pool.record_acquire();
668        }
669
670        assert_eq!(pool.active_count(), 3);
671        assert_eq!(pool.idle_count(), 2);
672        assert!((pool.utilization() - 0.3).abs() < 0.01);
673    }
674
675    #[test]
676    fn test_tenant_connection_pool() {
677        let config = TenantPoolConfig::default();
678        let manager = TenantConnectionPool::new(config.clone());
679
680        let tenant = TenantId::new("tenant_a");
681        let tenant_config = TenantConfig::builder()
682            .id("tenant_a")
683            .name("Tenant A")
684            .database_isolation("tenant_a_db")
685            .max_connections(20)
686            .pool(TenantPoolConfig {
687                max_connections: 20,
688                dedicated_pool: true,
689                ..Default::default()
690            })
691            .build();
692
693        let pool = manager.get_pool(&tenant, &tenant_config);
694        assert_eq!(pool.config().max_connections, 20);
695
696        let stats = manager.aggregate_stats();
697        assert_eq!(stats.tenant_pools, 1);
698    }
699
700    #[test]
701    fn test_shared_pool_usage() {
702        let config = TenantPoolConfig::default();
703        let manager = TenantConnectionPool::new(config.clone());
704
705        let tenant = TenantId::new("small_tenant");
706        let tenant_config = TenantConfig::builder()
707            .id("small_tenant")
708            .name("Small Tenant")
709            .schema_isolation("shared", "small_tenant")
710            .max_connections(2) // Below threshold
711            .build();
712
713        let pool = manager.get_pool(&tenant, &tenant_config);
714
715        // Should use shared pool
716        assert_eq!(pool.tenant_id().as_str(), "__shared__");
717    }
718
719    #[test]
720    fn test_tenant_connection_lease() {
721        let tenant = TenantId::new("test");
722        let config = TenantPoolConfig::default();
723        let pool = Arc::new(TenantPool::new(tenant.clone(), config));
724
725        pool.record_created();
726        pool.record_acquire();
727
728        let conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
729        let mut lease = TenantConnectionLease::new(conn, pool.clone());
730
731        assert_eq!(lease.tenant_id().as_str(), "test");
732        assert!(!lease.was_used());
733
734        lease.mark_used();
735        assert!(lease.was_used());
736
737        // Release explicitly
738        lease.release();
739        assert_eq!(pool.active_count(), 0);
740        assert_eq!(pool.idle_count(), 1);
741    }
742
743    #[test]
744    fn test_pool_stats() {
745        let tenant = TenantId::new("test");
746        let config = TenantPoolConfig {
747            max_connections: 10,
748            ..Default::default()
749        };
750        let pool = TenantPool::new(tenant, config);
751
752        pool.record_created();
753        pool.record_created();
754        pool.record_acquire();
755
756        let stats = pool.stats();
757        assert_eq!(stats.active, 1);
758        assert_eq!(stats.idle, 1);
759        assert_eq!(stats.max, 10);
760        assert_eq!(stats.total_created, 2);
761    }
762}