Skip to main content

heliosdb_proxy/multi_tenancy/
pool.rs

1//! Tenant-Aware Connection Pool
2//!
3//! This module provides connection pool management with per-tenant isolation
4//! and resource allocation.
5
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use dashmap::DashMap;
11
12use super::config::{TenantConfig, TenantId, TenantPoolConfig};
13
14/// Connection state
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ConnectionState {
17    /// Connection is idle and available
18    Idle,
19    /// Connection is in use
20    Active,
21    /// Connection is being validated
22    Validating,
23    /// Connection is closed
24    Closed,
25}
26
27/// A pooled connection handle
28#[derive(Debug)]
29pub struct PooledConnection {
30    /// Connection identifier
31    id: u64,
32
33    /// Tenant that owns this connection
34    tenant_id: TenantId,
35
36    /// When the connection was created
37    created_at: Instant,
38
39    /// When the connection was last used
40    last_used: Instant,
41
42    /// Current state
43    state: ConnectionState,
44
45    /// Total queries executed
46    queries_executed: u64,
47
48    /// Backend connection info (e.g., socket address)
49    backend_info: String,
50}
51
52impl PooledConnection {
53    /// Create a new pooled connection
54    pub fn new(id: u64, tenant_id: TenantId, backend_info: impl Into<String>) -> Self {
55        let now = Instant::now();
56        Self {
57            id,
58            tenant_id,
59            created_at: now,
60            last_used: now,
61            state: ConnectionState::Idle,
62            queries_executed: 0,
63            backend_info: backend_info.into(),
64        }
65    }
66
67    /// Get connection ID
68    pub fn id(&self) -> u64 {
69        self.id
70    }
71
72    /// Get tenant ID
73    pub fn tenant_id(&self) -> &TenantId {
74        &self.tenant_id
75    }
76
77    /// Get connection age
78    pub fn age(&self) -> Duration {
79        self.created_at.elapsed()
80    }
81
82    /// Get idle time
83    pub fn idle_time(&self) -> Duration {
84        self.last_used.elapsed()
85    }
86
87    /// Get connection state
88    pub fn state(&self) -> ConnectionState {
89        self.state
90    }
91
92    /// Check if connection is available
93    pub fn is_available(&self) -> bool {
94        self.state == ConnectionState::Idle
95    }
96
97    /// Mark connection as active
98    pub fn mark_active(&mut self) {
99        self.state = ConnectionState::Active;
100        self.last_used = Instant::now();
101    }
102
103    /// Mark connection as idle
104    pub fn mark_idle(&mut self) {
105        self.state = ConnectionState::Idle;
106        self.last_used = Instant::now();
107    }
108
109    /// Increment query counter
110    pub fn record_query(&mut self) {
111        self.queries_executed += 1;
112        self.last_used = Instant::now();
113    }
114
115    /// Get backend info
116    pub fn backend_info(&self) -> &str {
117        &self.backend_info
118    }
119
120    /// Get queries executed count
121    pub fn queries_executed(&self) -> u64 {
122        self.queries_executed
123    }
124}
125
126/// Per-tenant connection pool
127#[derive(Debug)]
128pub struct TenantPool {
129    /// Tenant ID
130    tenant_id: TenantId,
131
132    /// Pool configuration
133    config: TenantPoolConfig,
134
135    /// Active connections count
136    active_count: AtomicU32,
137
138    /// Idle connections count
139    idle_count: AtomicU32,
140
141    /// Total connections created
142    total_created: AtomicU64,
143
144    /// Total connections closed
145    total_closed: AtomicU64,
146
147    /// Waiting requests count
148    waiting_count: AtomicU32,
149
150    /// Pool creation time
151    created_at: Instant,
152}
153
154impl TenantPool {
155    /// Create a new tenant pool
156    pub fn new(tenant_id: TenantId, config: TenantPoolConfig) -> Self {
157        Self {
158            tenant_id,
159            config,
160            active_count: AtomicU32::new(0),
161            idle_count: AtomicU32::new(0),
162            total_created: AtomicU64::new(0),
163            total_closed: AtomicU64::new(0),
164            waiting_count: AtomicU32::new(0),
165            created_at: Instant::now(),
166        }
167    }
168
169    /// Get tenant ID
170    pub fn tenant_id(&self) -> &TenantId {
171        &self.tenant_id
172    }
173
174    /// Get pool configuration
175    pub fn config(&self) -> &TenantPoolConfig {
176        &self.config
177    }
178
179    /// Get active connection count
180    pub fn active_count(&self) -> u32 {
181        self.active_count.load(Ordering::Relaxed)
182    }
183
184    /// Get idle connection count
185    pub fn idle_count(&self) -> u32 {
186        self.idle_count.load(Ordering::Relaxed)
187    }
188
189    /// Get total connection count
190    pub fn total_count(&self) -> u32 {
191        self.active_count() + self.idle_count()
192    }
193
194    /// Get waiting requests count
195    pub fn waiting_count(&self) -> u32 {
196        self.waiting_count.load(Ordering::Relaxed)
197    }
198
199    /// Check if pool is at capacity
200    pub fn is_at_capacity(&self) -> bool {
201        self.total_count() >= self.config.max_connections
202    }
203
204    /// Check if pool can accept new connection
205    pub fn can_create_connection(&self) -> bool {
206        self.total_count() < self.config.max_connections
207    }
208
209    /// Check if pool has available connections
210    pub fn has_available(&self) -> bool {
211        self.idle_count() > 0
212    }
213
214    /// Record connection acquired
215    pub fn record_acquire(&self) {
216        self.idle_count.fetch_sub(1, Ordering::Relaxed);
217        self.active_count.fetch_add(1, Ordering::Relaxed);
218    }
219
220    /// Record connection released
221    pub fn record_release(&self) {
222        self.active_count.fetch_sub(1, Ordering::Relaxed);
223        self.idle_count.fetch_add(1, Ordering::Relaxed);
224    }
225
226    /// Record connection created
227    pub fn record_created(&self) {
228        self.idle_count.fetch_add(1, Ordering::Relaxed);
229        self.total_created.fetch_add(1, Ordering::Relaxed);
230    }
231
232    /// Record connection closed
233    pub fn record_closed(&self, was_active: bool) {
234        if was_active {
235            self.active_count.fetch_sub(1, Ordering::Relaxed);
236        } else {
237            self.idle_count.fetch_sub(1, Ordering::Relaxed);
238        }
239        self.total_closed.fetch_add(1, Ordering::Relaxed);
240    }
241
242    /// Record waiting request
243    pub fn record_waiting(&self) {
244        self.waiting_count.fetch_add(1, Ordering::Relaxed);
245    }
246
247    /// Record request no longer waiting
248    pub fn record_not_waiting(&self) {
249        self.waiting_count.fetch_sub(1, Ordering::Relaxed);
250    }
251
252    /// Get pool utilization (0.0 to 1.0)
253    pub fn utilization(&self) -> f32 {
254        let total = self.total_count();
255        if total == 0 {
256            return 0.0;
257        }
258        self.active_count() as f32 / self.config.max_connections as f32
259    }
260
261    /// Get pool age
262    pub fn age(&self) -> Duration {
263        self.created_at.elapsed()
264    }
265
266    /// Get statistics snapshot
267    pub fn stats(&self) -> TenantPoolStats {
268        TenantPoolStats {
269            tenant_id: self.tenant_id.clone(),
270            active: self.active_count(),
271            idle: self.idle_count(),
272            max: self.config.max_connections,
273            waiting: self.waiting_count(),
274            total_created: self.total_created.load(Ordering::Relaxed),
275            total_closed: self.total_closed.load(Ordering::Relaxed),
276            utilization: self.utilization(),
277            age: self.age(),
278        }
279    }
280}
281
282/// Tenant pool statistics
283#[derive(Debug, Clone)]
284pub struct TenantPoolStats {
285    /// Tenant ID
286    pub tenant_id: TenantId,
287
288    /// Active connections
289    pub active: u32,
290
291    /// Idle connections
292    pub idle: u32,
293
294    /// Maximum connections
295    pub max: u32,
296
297    /// Waiting requests
298    pub waiting: u32,
299
300    /// Total connections created
301    pub total_created: u64,
302
303    /// Total connections closed
304    pub total_closed: u64,
305
306    /// Pool utilization (0.0 to 1.0)
307    pub utilization: f32,
308
309    /// Pool age
310    pub age: Duration,
311}
312
313/// Tenant connection pool manager
314///
315/// Manages connection pools across all tenants.
316pub struct TenantConnectionPool {
317    /// Per-tenant pools
318    pools: DashMap<TenantId, Arc<TenantPool>>,
319
320    /// Shared pool for small/unknown tenants
321    shared_pool: Arc<TenantPool>,
322
323    /// Default pool configuration
324    #[allow(dead_code)]
325    default_config: TenantPoolConfig,
326
327    /// Connection ID counter
328    connection_counter: AtomicU64,
329
330    /// Total acquire count
331    total_acquires: AtomicU64,
332
333    /// Total acquire timeouts
334    total_timeouts: AtomicU64,
335
336    /// Threshold for dedicated pool (connections)
337    dedicated_pool_threshold: u32,
338}
339
340impl TenantConnectionPool {
341    /// Create a new tenant connection pool manager
342    pub fn new(default_config: TenantPoolConfig) -> Self {
343        let shared_pool = Arc::new(TenantPool::new(
344            TenantId::new("__shared__"),
345            TenantPoolConfig {
346                max_connections: 50,
347                ..default_config.clone()
348            },
349        ));
350
351        Self {
352            pools: DashMap::new(),
353            shared_pool,
354            default_config,
355            connection_counter: AtomicU64::new(0),
356            total_acquires: AtomicU64::new(0),
357            total_timeouts: AtomicU64::new(0),
358            dedicated_pool_threshold: 5,
359        }
360    }
361
362    /// Set dedicated pool threshold
363    pub fn with_dedicated_threshold(mut self, threshold: u32) -> Self {
364        self.dedicated_pool_threshold = threshold;
365        self
366    }
367
368    /// Get or create pool for tenant
369    pub fn get_pool(&self, tenant: &TenantId, config: &TenantConfig) -> Arc<TenantPool> {
370        // Check if tenant should use dedicated pool
371        if config.pool.dedicated_pool
372            || config.pool.max_connections >= self.dedicated_pool_threshold
373        {
374            self.pools
375                .entry(tenant.clone())
376                .or_insert_with(|| Arc::new(TenantPool::new(tenant.clone(), config.pool.clone())))
377                .clone()
378        } else {
379            self.shared_pool.clone()
380        }
381    }
382
383    /// Get existing pool for tenant (if any)
384    pub fn get_existing_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
385        self.pools.get(tenant).map(|p| p.clone())
386    }
387
388    /// Create a tenant-specific pool
389    pub fn create_tenant_pool(&self, tenant: &TenantId, config: TenantPoolConfig) {
390        self.pools.insert(
391            tenant.clone(),
392            Arc::new(TenantPool::new(tenant.clone(), config)),
393        );
394    }
395
396    /// Remove a tenant pool
397    pub fn remove_tenant_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
398        self.pools.remove(tenant).map(|(_, pool)| pool)
399    }
400
401    /// Generate new connection ID
402    pub fn next_connection_id(&self) -> u64 {
403        self.connection_counter.fetch_add(1, Ordering::Relaxed)
404    }
405
406    /// Record an acquire attempt
407    pub fn record_acquire(&self) {
408        self.total_acquires.fetch_add(1, Ordering::Relaxed);
409    }
410
411    /// Record a timeout
412    pub fn record_timeout(&self) {
413        self.total_timeouts.fetch_add(1, Ordering::Relaxed);
414    }
415
416    /// Get all tenant pool stats
417    pub fn all_stats(&self) -> Vec<TenantPoolStats> {
418        let mut stats: Vec<TenantPoolStats> = self
419            .pools
420            .iter()
421            .map(|entry| entry.value().stats())
422            .collect();
423
424        // Add shared pool stats
425        stats.push(self.shared_pool.stats());
426
427        stats
428    }
429
430    /// Get tenant pool stats
431    pub fn tenant_stats(&self, tenant: &TenantId) -> Option<TenantPoolStats> {
432        self.pools.get(tenant).map(|p| p.stats())
433    }
434
435    /// Get shared pool stats
436    pub fn shared_pool_stats(&self) -> TenantPoolStats {
437        self.shared_pool.stats()
438    }
439
440    /// Get total number of tenant pools
441    pub fn tenant_pool_count(&self) -> usize {
442        self.pools.len()
443    }
444
445    /// Get aggregate statistics
446    pub fn aggregate_stats(&self) -> AggregatePoolStats {
447        let mut total_active = 0u32;
448        let mut total_idle = 0u32;
449        let mut total_max = 0u32;
450        let mut total_waiting = 0u32;
451
452        for pool in self.pools.iter() {
453            total_active += pool.active_count();
454            total_idle += pool.idle_count();
455            total_max += pool.config().max_connections;
456            total_waiting += pool.waiting_count();
457        }
458
459        // Add shared pool
460        total_active += self.shared_pool.active_count();
461        total_idle += self.shared_pool.idle_count();
462        total_max += self.shared_pool.config().max_connections;
463        total_waiting += self.shared_pool.waiting_count();
464
465        AggregatePoolStats {
466            tenant_pools: self.pools.len(),
467            total_active,
468            total_idle,
469            total_max,
470            total_waiting,
471            total_acquires: self.total_acquires.load(Ordering::Relaxed),
472            total_timeouts: self.total_timeouts.load(Ordering::Relaxed),
473            average_utilization: if total_max > 0 {
474                total_active as f32 / total_max as f32
475            } else {
476                0.0
477            },
478        }
479    }
480}
481
482/// Aggregate pool statistics
483#[derive(Debug, Clone)]
484pub struct AggregatePoolStats {
485    /// Number of tenant-specific pools
486    pub tenant_pools: usize,
487
488    /// Total active connections across all pools
489    pub total_active: u32,
490
491    /// Total idle connections across all pools
492    pub total_idle: u32,
493
494    /// Total maximum connections across all pools
495    pub total_max: u32,
496
497    /// Total waiting requests
498    pub total_waiting: u32,
499
500    /// Total acquire attempts
501    pub total_acquires: u64,
502
503    /// Total timeout occurrences
504    pub total_timeouts: u64,
505
506    /// Average utilization across all pools
507    pub average_utilization: f32,
508}
509
510/// Acquire result
511#[derive(Debug)]
512pub enum AcquireResult {
513    /// Connection acquired successfully
514    Success(PooledConnection),
515
516    /// Waiting for connection
517    Waiting,
518
519    /// Pool is at capacity
520    PoolExhausted,
521
522    /// Tenant not found
523    TenantNotFound,
524
525    /// Acquire timed out
526    Timeout,
527}
528
529/// Connection lease for tenant
530pub struct TenantConnectionLease {
531    /// The pooled connection
532    connection: PooledConnection,
533
534    /// Pool reference for returning connection
535    pool: Arc<TenantPool>,
536
537    /// Lease start time
538    leased_at: Instant,
539
540    /// Whether connection was used
541    used: bool,
542}
543
544impl TenantConnectionLease {
545    /// Create a new lease
546    pub fn new(connection: PooledConnection, pool: Arc<TenantPool>) -> Self {
547        Self {
548            connection,
549            pool,
550            leased_at: Instant::now(),
551            used: false,
552        }
553    }
554
555    /// Get the connection
556    pub fn connection(&self) -> &PooledConnection {
557        &self.connection
558    }
559
560    /// Get mutable connection
561    pub fn connection_mut(&mut self) -> &mut PooledConnection {
562        self.used = true;
563        &mut self.connection
564    }
565
566    /// Get tenant ID
567    pub fn tenant_id(&self) -> &TenantId {
568        self.connection.tenant_id()
569    }
570
571    /// Get lease duration
572    pub fn lease_duration(&self) -> Duration {
573        self.leased_at.elapsed()
574    }
575
576    /// Mark as used
577    pub fn mark_used(&mut self) {
578        self.used = true;
579    }
580
581    /// Check if lease was used
582    pub fn was_used(&self) -> bool {
583        self.used
584    }
585
586    /// Release the lease (return connection to pool)
587    pub fn release(mut self) {
588        self.connection.mark_idle();
589        self.pool.record_release();
590    }
591}
592
593impl Drop for TenantConnectionLease {
594    fn drop(&mut self) {
595        // Auto-release if not explicitly released
596        if self.connection.state() == ConnectionState::Active {
597            self.pool.record_release();
598        }
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605
606    #[test]
607    fn test_pooled_connection() {
608        let tenant = TenantId::new("test");
609        let mut conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
610
611        assert_eq!(conn.id(), 1);
612        assert_eq!(conn.tenant_id().as_str(), "test");
613        assert!(conn.is_available());
614        assert_eq!(conn.state(), ConnectionState::Idle);
615
616        conn.mark_active();
617        assert!(!conn.is_available());
618        assert_eq!(conn.state(), ConnectionState::Active);
619
620        conn.record_query();
621        assert_eq!(conn.queries_executed(), 1);
622
623        conn.mark_idle();
624        assert!(conn.is_available());
625    }
626
627    #[test]
628    fn test_tenant_pool() {
629        let tenant = TenantId::new("test");
630        let config = TenantPoolConfig {
631            max_connections: 10,
632            ..Default::default()
633        };
634        let pool = TenantPool::new(tenant.clone(), config);
635
636        assert_eq!(pool.tenant_id().as_str(), "test");
637        assert_eq!(pool.active_count(), 0);
638        assert_eq!(pool.idle_count(), 0);
639        assert!(!pool.is_at_capacity());
640        assert!(pool.can_create_connection());
641
642        pool.record_created();
643        assert_eq!(pool.idle_count(), 1);
644
645        pool.record_acquire();
646        assert_eq!(pool.active_count(), 1);
647        assert_eq!(pool.idle_count(), 0);
648
649        pool.record_release();
650        assert_eq!(pool.active_count(), 0);
651        assert_eq!(pool.idle_count(), 1);
652    }
653
654    #[test]
655    fn test_tenant_pool_utilization() {
656        let tenant = TenantId::new("test");
657        let config = TenantPoolConfig {
658            max_connections: 10,
659            ..Default::default()
660        };
661        let pool = TenantPool::new(tenant, config);
662
663        assert_eq!(pool.utilization(), 0.0);
664
665        // Create 5 connections, use 3
666        for _ in 0..5 {
667            pool.record_created();
668        }
669        for _ in 0..3 {
670            pool.record_acquire();
671        }
672
673        assert_eq!(pool.active_count(), 3);
674        assert_eq!(pool.idle_count(), 2);
675        assert!((pool.utilization() - 0.3).abs() < 0.01);
676    }
677
678    #[test]
679    fn test_tenant_connection_pool() {
680        let config = TenantPoolConfig::default();
681        let manager = TenantConnectionPool::new(config.clone());
682
683        let tenant = TenantId::new("tenant_a");
684        let tenant_config = TenantConfig::builder()
685            .id("tenant_a")
686            .name("Tenant A")
687            .database_isolation("tenant_a_db")
688            .max_connections(20)
689            .pool(TenantPoolConfig {
690                max_connections: 20,
691                dedicated_pool: true,
692                ..Default::default()
693            })
694            .build();
695
696        let pool = manager.get_pool(&tenant, &tenant_config);
697        assert_eq!(pool.config().max_connections, 20);
698
699        let stats = manager.aggregate_stats();
700        assert_eq!(stats.tenant_pools, 1);
701    }
702
703    #[test]
704    fn test_shared_pool_usage() {
705        let config = TenantPoolConfig::default();
706        let manager = TenantConnectionPool::new(config.clone());
707
708        let tenant = TenantId::new("small_tenant");
709        let tenant_config = TenantConfig::builder()
710            .id("small_tenant")
711            .name("Small Tenant")
712            .schema_isolation("shared", "small_tenant")
713            .max_connections(2) // Below threshold
714            .build();
715
716        let pool = manager.get_pool(&tenant, &tenant_config);
717
718        // Should use shared pool
719        assert_eq!(pool.tenant_id().as_str(), "__shared__");
720    }
721
722    #[test]
723    fn test_tenant_connection_lease() {
724        let tenant = TenantId::new("test");
725        let config = TenantPoolConfig::default();
726        let pool = Arc::new(TenantPool::new(tenant.clone(), config));
727
728        pool.record_created();
729        pool.record_acquire();
730
731        let conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
732        let mut lease = TenantConnectionLease::new(conn, pool.clone());
733
734        assert_eq!(lease.tenant_id().as_str(), "test");
735        assert!(!lease.was_used());
736
737        lease.mark_used();
738        assert!(lease.was_used());
739
740        // Release explicitly
741        lease.release();
742        assert_eq!(pool.active_count(), 0);
743        assert_eq!(pool.idle_count(), 1);
744    }
745
746    #[test]
747    fn test_pool_stats() {
748        let tenant = TenantId::new("test");
749        let config = TenantPoolConfig {
750            max_connections: 10,
751            ..Default::default()
752        };
753        let pool = TenantPool::new(tenant, config);
754
755        pool.record_created();
756        pool.record_created();
757        pool.record_acquire();
758
759        let stats = pool.stats();
760        assert_eq!(stats.active, 1);
761        assert_eq!(stats.idle, 1);
762        assert_eq!(stats.max, 10);
763        assert_eq!(stats.total_created, 2);
764    }
765}