1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use dashmap::DashMap;
11
12use super::config::{TenantConfig, TenantId, TenantPoolConfig};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ConnectionState {
17 Idle,
19 Active,
21 Validating,
23 Closed,
25}
26
27#[derive(Debug)]
29pub struct PooledConnection {
30 id: u64,
32
33 tenant_id: TenantId,
35
36 created_at: Instant,
38
39 last_used: Instant,
41
42 state: ConnectionState,
44
45 queries_executed: u64,
47
48 backend_info: String,
50}
51
52impl PooledConnection {
53 pub fn new(id: u64, tenant_id: TenantId, backend_info: impl Into<String>) -> Self {
55 let now = Instant::now();
56 Self {
57 id,
58 tenant_id,
59 created_at: now,
60 last_used: now,
61 state: ConnectionState::Idle,
62 queries_executed: 0,
63 backend_info: backend_info.into(),
64 }
65 }
66
67 pub fn id(&self) -> u64 {
69 self.id
70 }
71
72 pub fn tenant_id(&self) -> &TenantId {
74 &self.tenant_id
75 }
76
77 pub fn age(&self) -> Duration {
79 self.created_at.elapsed()
80 }
81
82 pub fn idle_time(&self) -> Duration {
84 self.last_used.elapsed()
85 }
86
87 pub fn state(&self) -> ConnectionState {
89 self.state
90 }
91
92 pub fn is_available(&self) -> bool {
94 self.state == ConnectionState::Idle
95 }
96
97 pub fn mark_active(&mut self) {
99 self.state = ConnectionState::Active;
100 self.last_used = Instant::now();
101 }
102
103 pub fn mark_idle(&mut self) {
105 self.state = ConnectionState::Idle;
106 self.last_used = Instant::now();
107 }
108
109 pub fn record_query(&mut self) {
111 self.queries_executed += 1;
112 self.last_used = Instant::now();
113 }
114
115 pub fn backend_info(&self) -> &str {
117 &self.backend_info
118 }
119
120 pub fn queries_executed(&self) -> u64 {
122 self.queries_executed
123 }
124}
125
126#[derive(Debug)]
128pub struct TenantPool {
129 tenant_id: TenantId,
131
132 config: TenantPoolConfig,
134
135 active_count: AtomicU32,
137
138 idle_count: AtomicU32,
140
141 total_created: AtomicU64,
143
144 total_closed: AtomicU64,
146
147 waiting_count: AtomicU32,
149
150 created_at: Instant,
152}
153
154impl TenantPool {
155 pub fn new(tenant_id: TenantId, config: TenantPoolConfig) -> Self {
157 Self {
158 tenant_id,
159 config,
160 active_count: AtomicU32::new(0),
161 idle_count: AtomicU32::new(0),
162 total_created: AtomicU64::new(0),
163 total_closed: AtomicU64::new(0),
164 waiting_count: AtomicU32::new(0),
165 created_at: Instant::now(),
166 }
167 }
168
169 pub fn tenant_id(&self) -> &TenantId {
171 &self.tenant_id
172 }
173
174 pub fn config(&self) -> &TenantPoolConfig {
176 &self.config
177 }
178
179 pub fn active_count(&self) -> u32 {
181 self.active_count.load(Ordering::Relaxed)
182 }
183
184 pub fn idle_count(&self) -> u32 {
186 self.idle_count.load(Ordering::Relaxed)
187 }
188
189 pub fn total_count(&self) -> u32 {
191 self.active_count() + self.idle_count()
192 }
193
194 pub fn waiting_count(&self) -> u32 {
196 self.waiting_count.load(Ordering::Relaxed)
197 }
198
199 pub fn is_at_capacity(&self) -> bool {
201 self.total_count() >= self.config.max_connections
202 }
203
204 pub fn can_create_connection(&self) -> bool {
206 self.total_count() < self.config.max_connections
207 }
208
209 pub fn has_available(&self) -> bool {
211 self.idle_count() > 0
212 }
213
214 pub fn record_acquire(&self) {
216 self.idle_count.fetch_sub(1, Ordering::Relaxed);
217 self.active_count.fetch_add(1, Ordering::Relaxed);
218 }
219
220 pub fn record_release(&self) {
222 self.active_count.fetch_sub(1, Ordering::Relaxed);
223 self.idle_count.fetch_add(1, Ordering::Relaxed);
224 }
225
226 pub fn record_created(&self) {
228 self.idle_count.fetch_add(1, Ordering::Relaxed);
229 self.total_created.fetch_add(1, Ordering::Relaxed);
230 }
231
232 pub fn record_closed(&self, was_active: bool) {
234 if was_active {
235 self.active_count.fetch_sub(1, Ordering::Relaxed);
236 } else {
237 self.idle_count.fetch_sub(1, Ordering::Relaxed);
238 }
239 self.total_closed.fetch_add(1, Ordering::Relaxed);
240 }
241
242 pub fn record_waiting(&self) {
244 self.waiting_count.fetch_add(1, Ordering::Relaxed);
245 }
246
247 pub fn record_not_waiting(&self) {
249 self.waiting_count.fetch_sub(1, Ordering::Relaxed);
250 }
251
252 pub fn utilization(&self) -> f32 {
254 let total = self.total_count();
255 if total == 0 {
256 return 0.0;
257 }
258 self.active_count() as f32 / self.config.max_connections as f32
259 }
260
261 pub fn age(&self) -> Duration {
263 self.created_at.elapsed()
264 }
265
266 pub fn stats(&self) -> TenantPoolStats {
268 TenantPoolStats {
269 tenant_id: self.tenant_id.clone(),
270 active: self.active_count(),
271 idle: self.idle_count(),
272 max: self.config.max_connections,
273 waiting: self.waiting_count(),
274 total_created: self.total_created.load(Ordering::Relaxed),
275 total_closed: self.total_closed.load(Ordering::Relaxed),
276 utilization: self.utilization(),
277 age: self.age(),
278 }
279 }
280}
281
282#[derive(Debug, Clone)]
284pub struct TenantPoolStats {
285 pub tenant_id: TenantId,
287
288 pub active: u32,
290
291 pub idle: u32,
293
294 pub max: u32,
296
297 pub waiting: u32,
299
300 pub total_created: u64,
302
303 pub total_closed: u64,
305
306 pub utilization: f32,
308
309 pub age: Duration,
311}
312
313pub struct TenantConnectionPool {
317 pools: DashMap<TenantId, Arc<TenantPool>>,
319
320 shared_pool: Arc<TenantPool>,
322
323 #[allow(dead_code)]
325 default_config: TenantPoolConfig,
326
327 connection_counter: AtomicU64,
329
330 total_acquires: AtomicU64,
332
333 total_timeouts: AtomicU64,
335
336 dedicated_pool_threshold: u32,
338}
339
340impl TenantConnectionPool {
341 pub fn new(default_config: TenantPoolConfig) -> Self {
343 let shared_pool = Arc::new(TenantPool::new(
344 TenantId::new("__shared__"),
345 TenantPoolConfig {
346 max_connections: 50,
347 ..default_config.clone()
348 },
349 ));
350
351 Self {
352 pools: DashMap::new(),
353 shared_pool,
354 default_config,
355 connection_counter: AtomicU64::new(0),
356 total_acquires: AtomicU64::new(0),
357 total_timeouts: AtomicU64::new(0),
358 dedicated_pool_threshold: 5,
359 }
360 }
361
362 pub fn with_dedicated_threshold(mut self, threshold: u32) -> Self {
364 self.dedicated_pool_threshold = threshold;
365 self
366 }
367
368 pub fn get_pool(&self, tenant: &TenantId, config: &TenantConfig) -> Arc<TenantPool> {
370 if config.pool.dedicated_pool
372 || config.pool.max_connections >= self.dedicated_pool_threshold
373 {
374 self.pools
375 .entry(tenant.clone())
376 .or_insert_with(|| Arc::new(TenantPool::new(tenant.clone(), config.pool.clone())))
377 .clone()
378 } else {
379 self.shared_pool.clone()
380 }
381 }
382
383 pub fn get_existing_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
385 self.pools.get(tenant).map(|p| p.clone())
386 }
387
388 pub fn create_tenant_pool(&self, tenant: &TenantId, config: TenantPoolConfig) {
390 self.pools.insert(
391 tenant.clone(),
392 Arc::new(TenantPool::new(tenant.clone(), config)),
393 );
394 }
395
396 pub fn remove_tenant_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
398 self.pools.remove(tenant).map(|(_, pool)| pool)
399 }
400
401 pub fn next_connection_id(&self) -> u64 {
403 self.connection_counter.fetch_add(1, Ordering::Relaxed)
404 }
405
406 pub fn record_acquire(&self) {
408 self.total_acquires.fetch_add(1, Ordering::Relaxed);
409 }
410
411 pub fn record_timeout(&self) {
413 self.total_timeouts.fetch_add(1, Ordering::Relaxed);
414 }
415
416 pub fn all_stats(&self) -> Vec<TenantPoolStats> {
418 let mut stats: Vec<TenantPoolStats> = self
419 .pools
420 .iter()
421 .map(|entry| entry.value().stats())
422 .collect();
423
424 stats.push(self.shared_pool.stats());
426
427 stats
428 }
429
430 pub fn tenant_stats(&self, tenant: &TenantId) -> Option<TenantPoolStats> {
432 self.pools.get(tenant).map(|p| p.stats())
433 }
434
435 pub fn shared_pool_stats(&self) -> TenantPoolStats {
437 self.shared_pool.stats()
438 }
439
440 pub fn tenant_pool_count(&self) -> usize {
442 self.pools.len()
443 }
444
445 pub fn aggregate_stats(&self) -> AggregatePoolStats {
447 let mut total_active = 0u32;
448 let mut total_idle = 0u32;
449 let mut total_max = 0u32;
450 let mut total_waiting = 0u32;
451
452 for pool in self.pools.iter() {
453 total_active += pool.active_count();
454 total_idle += pool.idle_count();
455 total_max += pool.config().max_connections;
456 total_waiting += pool.waiting_count();
457 }
458
459 total_active += self.shared_pool.active_count();
461 total_idle += self.shared_pool.idle_count();
462 total_max += self.shared_pool.config().max_connections;
463 total_waiting += self.shared_pool.waiting_count();
464
465 AggregatePoolStats {
466 tenant_pools: self.pools.len(),
467 total_active,
468 total_idle,
469 total_max,
470 total_waiting,
471 total_acquires: self.total_acquires.load(Ordering::Relaxed),
472 total_timeouts: self.total_timeouts.load(Ordering::Relaxed),
473 average_utilization: if total_max > 0 {
474 total_active as f32 / total_max as f32
475 } else {
476 0.0
477 },
478 }
479 }
480}
481
482#[derive(Debug, Clone)]
484pub struct AggregatePoolStats {
485 pub tenant_pools: usize,
487
488 pub total_active: u32,
490
491 pub total_idle: u32,
493
494 pub total_max: u32,
496
497 pub total_waiting: u32,
499
500 pub total_acquires: u64,
502
503 pub total_timeouts: u64,
505
506 pub average_utilization: f32,
508}
509
510#[derive(Debug)]
512pub enum AcquireResult {
513 Success(PooledConnection),
515
516 Waiting,
518
519 PoolExhausted,
521
522 TenantNotFound,
524
525 Timeout,
527}
528
529pub struct TenantConnectionLease {
531 connection: PooledConnection,
533
534 pool: Arc<TenantPool>,
536
537 leased_at: Instant,
539
540 used: bool,
542}
543
544impl TenantConnectionLease {
545 pub fn new(connection: PooledConnection, pool: Arc<TenantPool>) -> Self {
547 Self {
548 connection,
549 pool,
550 leased_at: Instant::now(),
551 used: false,
552 }
553 }
554
555 pub fn connection(&self) -> &PooledConnection {
557 &self.connection
558 }
559
560 pub fn connection_mut(&mut self) -> &mut PooledConnection {
562 self.used = true;
563 &mut self.connection
564 }
565
566 pub fn tenant_id(&self) -> &TenantId {
568 self.connection.tenant_id()
569 }
570
571 pub fn lease_duration(&self) -> Duration {
573 self.leased_at.elapsed()
574 }
575
576 pub fn mark_used(&mut self) {
578 self.used = true;
579 }
580
581 pub fn was_used(&self) -> bool {
583 self.used
584 }
585
586 pub fn release(mut self) {
588 self.connection.mark_idle();
589 self.pool.record_release();
590 }
591}
592
593impl Drop for TenantConnectionLease {
594 fn drop(&mut self) {
595 if self.connection.state() == ConnectionState::Active {
597 self.pool.record_release();
598 }
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605
606 #[test]
607 fn test_pooled_connection() {
608 let tenant = TenantId::new("test");
609 let mut conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
610
611 assert_eq!(conn.id(), 1);
612 assert_eq!(conn.tenant_id().as_str(), "test");
613 assert!(conn.is_available());
614 assert_eq!(conn.state(), ConnectionState::Idle);
615
616 conn.mark_active();
617 assert!(!conn.is_available());
618 assert_eq!(conn.state(), ConnectionState::Active);
619
620 conn.record_query();
621 assert_eq!(conn.queries_executed(), 1);
622
623 conn.mark_idle();
624 assert!(conn.is_available());
625 }
626
627 #[test]
628 fn test_tenant_pool() {
629 let tenant = TenantId::new("test");
630 let config = TenantPoolConfig {
631 max_connections: 10,
632 ..Default::default()
633 };
634 let pool = TenantPool::new(tenant.clone(), config);
635
636 assert_eq!(pool.tenant_id().as_str(), "test");
637 assert_eq!(pool.active_count(), 0);
638 assert_eq!(pool.idle_count(), 0);
639 assert!(!pool.is_at_capacity());
640 assert!(pool.can_create_connection());
641
642 pool.record_created();
643 assert_eq!(pool.idle_count(), 1);
644
645 pool.record_acquire();
646 assert_eq!(pool.active_count(), 1);
647 assert_eq!(pool.idle_count(), 0);
648
649 pool.record_release();
650 assert_eq!(pool.active_count(), 0);
651 assert_eq!(pool.idle_count(), 1);
652 }
653
654 #[test]
655 fn test_tenant_pool_utilization() {
656 let tenant = TenantId::new("test");
657 let config = TenantPoolConfig {
658 max_connections: 10,
659 ..Default::default()
660 };
661 let pool = TenantPool::new(tenant, config);
662
663 assert_eq!(pool.utilization(), 0.0);
664
665 for _ in 0..5 {
667 pool.record_created();
668 }
669 for _ in 0..3 {
670 pool.record_acquire();
671 }
672
673 assert_eq!(pool.active_count(), 3);
674 assert_eq!(pool.idle_count(), 2);
675 assert!((pool.utilization() - 0.3).abs() < 0.01);
676 }
677
678 #[test]
679 fn test_tenant_connection_pool() {
680 let config = TenantPoolConfig::default();
681 let manager = TenantConnectionPool::new(config.clone());
682
683 let tenant = TenantId::new("tenant_a");
684 let tenant_config = TenantConfig::builder()
685 .id("tenant_a")
686 .name("Tenant A")
687 .database_isolation("tenant_a_db")
688 .max_connections(20)
689 .pool(TenantPoolConfig {
690 max_connections: 20,
691 dedicated_pool: true,
692 ..Default::default()
693 })
694 .build();
695
696 let pool = manager.get_pool(&tenant, &tenant_config);
697 assert_eq!(pool.config().max_connections, 20);
698
699 let stats = manager.aggregate_stats();
700 assert_eq!(stats.tenant_pools, 1);
701 }
702
703 #[test]
704 fn test_shared_pool_usage() {
705 let config = TenantPoolConfig::default();
706 let manager = TenantConnectionPool::new(config.clone());
707
708 let tenant = TenantId::new("small_tenant");
709 let tenant_config = TenantConfig::builder()
710 .id("small_tenant")
711 .name("Small Tenant")
712 .schema_isolation("shared", "small_tenant")
713 .max_connections(2) .build();
715
716 let pool = manager.get_pool(&tenant, &tenant_config);
717
718 assert_eq!(pool.tenant_id().as_str(), "__shared__");
720 }
721
722 #[test]
723 fn test_tenant_connection_lease() {
724 let tenant = TenantId::new("test");
725 let config = TenantPoolConfig::default();
726 let pool = Arc::new(TenantPool::new(tenant.clone(), config));
727
728 pool.record_created();
729 pool.record_acquire();
730
731 let conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
732 let mut lease = TenantConnectionLease::new(conn, pool.clone());
733
734 assert_eq!(lease.tenant_id().as_str(), "test");
735 assert!(!lease.was_used());
736
737 lease.mark_used();
738 assert!(lease.was_used());
739
740 lease.release();
742 assert_eq!(pool.active_count(), 0);
743 assert_eq!(pool.idle_count(), 1);
744 }
745
746 #[test]
747 fn test_pool_stats() {
748 let tenant = TenantId::new("test");
749 let config = TenantPoolConfig {
750 max_connections: 10,
751 ..Default::default()
752 };
753 let pool = TenantPool::new(tenant, config);
754
755 pool.record_created();
756 pool.record_created();
757 pool.record_acquire();
758
759 let stats = pool.stats();
760 assert_eq!(stats.active, 1);
761 assert_eq!(stats.idle, 1);
762 assert_eq!(stats.max, 10);
763 assert_eq!(stats.total_created, 2);
764 }
765}