1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use dashmap::DashMap;
11
12use super::config::{TenantConfig, TenantId, TenantPoolConfig};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ConnectionState {
17 Idle,
19 Active,
21 Validating,
23 Closed,
25}
26
27#[derive(Debug)]
29pub struct PooledConnection {
30 id: u64,
32
33 tenant_id: TenantId,
35
36 created_at: Instant,
38
39 last_used: Instant,
41
42 state: ConnectionState,
44
45 queries_executed: u64,
47
48 backend_info: String,
50}
51
52impl PooledConnection {
53 pub fn new(id: u64, tenant_id: TenantId, backend_info: impl Into<String>) -> Self {
55 let now = Instant::now();
56 Self {
57 id,
58 tenant_id,
59 created_at: now,
60 last_used: now,
61 state: ConnectionState::Idle,
62 queries_executed: 0,
63 backend_info: backend_info.into(),
64 }
65 }
66
67 pub fn id(&self) -> u64 {
69 self.id
70 }
71
72 pub fn tenant_id(&self) -> &TenantId {
74 &self.tenant_id
75 }
76
77 pub fn age(&self) -> Duration {
79 self.created_at.elapsed()
80 }
81
82 pub fn idle_time(&self) -> Duration {
84 self.last_used.elapsed()
85 }
86
87 pub fn state(&self) -> ConnectionState {
89 self.state
90 }
91
92 pub fn is_available(&self) -> bool {
94 self.state == ConnectionState::Idle
95 }
96
97 pub fn mark_active(&mut self) {
99 self.state = ConnectionState::Active;
100 self.last_used = Instant::now();
101 }
102
103 pub fn mark_idle(&mut self) {
105 self.state = ConnectionState::Idle;
106 self.last_used = Instant::now();
107 }
108
109 pub fn record_query(&mut self) {
111 self.queries_executed += 1;
112 self.last_used = Instant::now();
113 }
114
115 pub fn backend_info(&self) -> &str {
117 &self.backend_info
118 }
119
120 pub fn queries_executed(&self) -> u64 {
122 self.queries_executed
123 }
124}
125
126#[derive(Debug)]
128pub struct TenantPool {
129 tenant_id: TenantId,
131
132 config: TenantPoolConfig,
134
135 active_count: AtomicU32,
137
138 idle_count: AtomicU32,
140
141 total_created: AtomicU64,
143
144 total_closed: AtomicU64,
146
147 waiting_count: AtomicU32,
149
150 created_at: Instant,
152}
153
154impl TenantPool {
155 pub fn new(tenant_id: TenantId, config: TenantPoolConfig) -> Self {
157 Self {
158 tenant_id,
159 config,
160 active_count: AtomicU32::new(0),
161 idle_count: AtomicU32::new(0),
162 total_created: AtomicU64::new(0),
163 total_closed: AtomicU64::new(0),
164 waiting_count: AtomicU32::new(0),
165 created_at: Instant::now(),
166 }
167 }
168
169 pub fn tenant_id(&self) -> &TenantId {
171 &self.tenant_id
172 }
173
174 pub fn config(&self) -> &TenantPoolConfig {
176 &self.config
177 }
178
179 pub fn active_count(&self) -> u32 {
181 self.active_count.load(Ordering::Relaxed)
182 }
183
184 pub fn idle_count(&self) -> u32 {
186 self.idle_count.load(Ordering::Relaxed)
187 }
188
189 pub fn total_count(&self) -> u32 {
191 self.active_count() + self.idle_count()
192 }
193
194 pub fn waiting_count(&self) -> u32 {
196 self.waiting_count.load(Ordering::Relaxed)
197 }
198
199 pub fn is_at_capacity(&self) -> bool {
201 self.total_count() >= self.config.max_connections
202 }
203
204 pub fn can_create_connection(&self) -> bool {
206 self.total_count() < self.config.max_connections
207 }
208
209 pub fn has_available(&self) -> bool {
211 self.idle_count() > 0
212 }
213
214 pub fn record_acquire(&self) {
216 self.idle_count.fetch_sub(1, Ordering::Relaxed);
217 self.active_count.fetch_add(1, Ordering::Relaxed);
218 }
219
220 pub fn record_release(&self) {
222 self.active_count.fetch_sub(1, Ordering::Relaxed);
223 self.idle_count.fetch_add(1, Ordering::Relaxed);
224 }
225
226 pub fn record_created(&self) {
228 self.idle_count.fetch_add(1, Ordering::Relaxed);
229 self.total_created.fetch_add(1, Ordering::Relaxed);
230 }
231
232 pub fn record_closed(&self, was_active: bool) {
234 if was_active {
235 self.active_count.fetch_sub(1, Ordering::Relaxed);
236 } else {
237 self.idle_count.fetch_sub(1, Ordering::Relaxed);
238 }
239 self.total_closed.fetch_add(1, Ordering::Relaxed);
240 }
241
242 pub fn record_waiting(&self) {
244 self.waiting_count.fetch_add(1, Ordering::Relaxed);
245 }
246
247 pub fn record_not_waiting(&self) {
249 self.waiting_count.fetch_sub(1, Ordering::Relaxed);
250 }
251
252 pub fn utilization(&self) -> f32 {
254 let total = self.total_count();
255 if total == 0 {
256 return 0.0;
257 }
258 self.active_count() as f32 / self.config.max_connections as f32
259 }
260
261 pub fn age(&self) -> Duration {
263 self.created_at.elapsed()
264 }
265
266 pub fn stats(&self) -> TenantPoolStats {
268 TenantPoolStats {
269 tenant_id: self.tenant_id.clone(),
270 active: self.active_count(),
271 idle: self.idle_count(),
272 max: self.config.max_connections,
273 waiting: self.waiting_count(),
274 total_created: self.total_created.load(Ordering::Relaxed),
275 total_closed: self.total_closed.load(Ordering::Relaxed),
276 utilization: self.utilization(),
277 age: self.age(),
278 }
279 }
280}
281
282#[derive(Debug, Clone)]
284pub struct TenantPoolStats {
285 pub tenant_id: TenantId,
287
288 pub active: u32,
290
291 pub idle: u32,
293
294 pub max: u32,
296
297 pub waiting: u32,
299
300 pub total_created: u64,
302
303 pub total_closed: u64,
305
306 pub utilization: f32,
308
309 pub age: Duration,
311}
312
313pub struct TenantConnectionPool {
317 pools: DashMap<TenantId, Arc<TenantPool>>,
319
320 shared_pool: Arc<TenantPool>,
322
323 default_config: TenantPoolConfig,
325
326 connection_counter: AtomicU64,
328
329 total_acquires: AtomicU64,
331
332 total_timeouts: AtomicU64,
334
335 dedicated_pool_threshold: u32,
337}
338
339impl TenantConnectionPool {
340 pub fn new(default_config: TenantPoolConfig) -> Self {
342 let shared_pool = Arc::new(TenantPool::new(
343 TenantId::new("__shared__"),
344 TenantPoolConfig {
345 max_connections: 50,
346 ..default_config.clone()
347 },
348 ));
349
350 Self {
351 pools: DashMap::new(),
352 shared_pool,
353 default_config,
354 connection_counter: AtomicU64::new(0),
355 total_acquires: AtomicU64::new(0),
356 total_timeouts: AtomicU64::new(0),
357 dedicated_pool_threshold: 5,
358 }
359 }
360
361 pub fn with_dedicated_threshold(mut self, threshold: u32) -> Self {
363 self.dedicated_pool_threshold = threshold;
364 self
365 }
366
367 pub fn get_pool(&self, tenant: &TenantId, config: &TenantConfig) -> Arc<TenantPool> {
369 if config.pool.dedicated_pool
371 || config.pool.max_connections >= self.dedicated_pool_threshold
372 {
373 self.pools
374 .entry(tenant.clone())
375 .or_insert_with(|| Arc::new(TenantPool::new(tenant.clone(), config.pool.clone())))
376 .clone()
377 } else {
378 self.shared_pool.clone()
379 }
380 }
381
382 pub fn get_existing_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
384 self.pools.get(tenant).map(|p| p.clone())
385 }
386
387 pub fn create_tenant_pool(&self, tenant: &TenantId, config: TenantPoolConfig) {
389 self.pools
390 .insert(tenant.clone(), Arc::new(TenantPool::new(tenant.clone(), config)));
391 }
392
393 pub fn remove_tenant_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
395 self.pools.remove(tenant).map(|(_, pool)| pool)
396 }
397
398 pub fn next_connection_id(&self) -> u64 {
400 self.connection_counter.fetch_add(1, Ordering::Relaxed)
401 }
402
403 pub fn record_acquire(&self) {
405 self.total_acquires.fetch_add(1, Ordering::Relaxed);
406 }
407
408 pub fn record_timeout(&self) {
410 self.total_timeouts.fetch_add(1, Ordering::Relaxed);
411 }
412
413 pub fn all_stats(&self) -> Vec<TenantPoolStats> {
415 let mut stats: Vec<TenantPoolStats> = self
416 .pools
417 .iter()
418 .map(|entry| entry.value().stats())
419 .collect();
420
421 stats.push(self.shared_pool.stats());
423
424 stats
425 }
426
427 pub fn tenant_stats(&self, tenant: &TenantId) -> Option<TenantPoolStats> {
429 self.pools.get(tenant).map(|p| p.stats())
430 }
431
432 pub fn shared_pool_stats(&self) -> TenantPoolStats {
434 self.shared_pool.stats()
435 }
436
437 pub fn tenant_pool_count(&self) -> usize {
439 self.pools.len()
440 }
441
442 pub fn aggregate_stats(&self) -> AggregatePoolStats {
444 let mut total_active = 0u32;
445 let mut total_idle = 0u32;
446 let mut total_max = 0u32;
447 let mut total_waiting = 0u32;
448
449 for pool in self.pools.iter() {
450 total_active += pool.active_count();
451 total_idle += pool.idle_count();
452 total_max += pool.config().max_connections;
453 total_waiting += pool.waiting_count();
454 }
455
456 total_active += self.shared_pool.active_count();
458 total_idle += self.shared_pool.idle_count();
459 total_max += self.shared_pool.config().max_connections;
460 total_waiting += self.shared_pool.waiting_count();
461
462 AggregatePoolStats {
463 tenant_pools: self.pools.len(),
464 total_active,
465 total_idle,
466 total_max,
467 total_waiting,
468 total_acquires: self.total_acquires.load(Ordering::Relaxed),
469 total_timeouts: self.total_timeouts.load(Ordering::Relaxed),
470 average_utilization: if total_max > 0 {
471 total_active as f32 / total_max as f32
472 } else {
473 0.0
474 },
475 }
476 }
477}
478
479#[derive(Debug, Clone)]
481pub struct AggregatePoolStats {
482 pub tenant_pools: usize,
484
485 pub total_active: u32,
487
488 pub total_idle: u32,
490
491 pub total_max: u32,
493
494 pub total_waiting: u32,
496
497 pub total_acquires: u64,
499
500 pub total_timeouts: u64,
502
503 pub average_utilization: f32,
505}
506
507#[derive(Debug)]
509pub enum AcquireResult {
510 Success(PooledConnection),
512
513 Waiting,
515
516 PoolExhausted,
518
519 TenantNotFound,
521
522 Timeout,
524}
525
526pub struct TenantConnectionLease {
528 connection: PooledConnection,
530
531 pool: Arc<TenantPool>,
533
534 leased_at: Instant,
536
537 used: bool,
539}
540
541impl TenantConnectionLease {
542 pub fn new(connection: PooledConnection, pool: Arc<TenantPool>) -> Self {
544 Self {
545 connection,
546 pool,
547 leased_at: Instant::now(),
548 used: false,
549 }
550 }
551
552 pub fn connection(&self) -> &PooledConnection {
554 &self.connection
555 }
556
557 pub fn connection_mut(&mut self) -> &mut PooledConnection {
559 self.used = true;
560 &mut self.connection
561 }
562
563 pub fn tenant_id(&self) -> &TenantId {
565 self.connection.tenant_id()
566 }
567
568 pub fn lease_duration(&self) -> Duration {
570 self.leased_at.elapsed()
571 }
572
573 pub fn mark_used(&mut self) {
575 self.used = true;
576 }
577
578 pub fn was_used(&self) -> bool {
580 self.used
581 }
582
583 pub fn release(mut self) {
585 self.connection.mark_idle();
586 self.pool.record_release();
587 }
588}
589
590impl Drop for TenantConnectionLease {
591 fn drop(&mut self) {
592 if self.connection.state() == ConnectionState::Active {
594 self.pool.record_release();
595 }
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602
603 #[test]
604 fn test_pooled_connection() {
605 let tenant = TenantId::new("test");
606 let mut conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
607
608 assert_eq!(conn.id(), 1);
609 assert_eq!(conn.tenant_id().as_str(), "test");
610 assert!(conn.is_available());
611 assert_eq!(conn.state(), ConnectionState::Idle);
612
613 conn.mark_active();
614 assert!(!conn.is_available());
615 assert_eq!(conn.state(), ConnectionState::Active);
616
617 conn.record_query();
618 assert_eq!(conn.queries_executed(), 1);
619
620 conn.mark_idle();
621 assert!(conn.is_available());
622 }
623
624 #[test]
625 fn test_tenant_pool() {
626 let tenant = TenantId::new("test");
627 let config = TenantPoolConfig {
628 max_connections: 10,
629 ..Default::default()
630 };
631 let pool = TenantPool::new(tenant.clone(), config);
632
633 assert_eq!(pool.tenant_id().as_str(), "test");
634 assert_eq!(pool.active_count(), 0);
635 assert_eq!(pool.idle_count(), 0);
636 assert!(!pool.is_at_capacity());
637 assert!(pool.can_create_connection());
638
639 pool.record_created();
640 assert_eq!(pool.idle_count(), 1);
641
642 pool.record_acquire();
643 assert_eq!(pool.active_count(), 1);
644 assert_eq!(pool.idle_count(), 0);
645
646 pool.record_release();
647 assert_eq!(pool.active_count(), 0);
648 assert_eq!(pool.idle_count(), 1);
649 }
650
651 #[test]
652 fn test_tenant_pool_utilization() {
653 let tenant = TenantId::new("test");
654 let config = TenantPoolConfig {
655 max_connections: 10,
656 ..Default::default()
657 };
658 let pool = TenantPool::new(tenant, config);
659
660 assert_eq!(pool.utilization(), 0.0);
661
662 for _ in 0..5 {
664 pool.record_created();
665 }
666 for _ in 0..3 {
667 pool.record_acquire();
668 }
669
670 assert_eq!(pool.active_count(), 3);
671 assert_eq!(pool.idle_count(), 2);
672 assert!((pool.utilization() - 0.3).abs() < 0.01);
673 }
674
675 #[test]
676 fn test_tenant_connection_pool() {
677 let config = TenantPoolConfig::default();
678 let manager = TenantConnectionPool::new(config.clone());
679
680 let tenant = TenantId::new("tenant_a");
681 let tenant_config = TenantConfig::builder()
682 .id("tenant_a")
683 .name("Tenant A")
684 .database_isolation("tenant_a_db")
685 .max_connections(20)
686 .pool(TenantPoolConfig {
687 max_connections: 20,
688 dedicated_pool: true,
689 ..Default::default()
690 })
691 .build();
692
693 let pool = manager.get_pool(&tenant, &tenant_config);
694 assert_eq!(pool.config().max_connections, 20);
695
696 let stats = manager.aggregate_stats();
697 assert_eq!(stats.tenant_pools, 1);
698 }
699
700 #[test]
701 fn test_shared_pool_usage() {
702 let config = TenantPoolConfig::default();
703 let manager = TenantConnectionPool::new(config.clone());
704
705 let tenant = TenantId::new("small_tenant");
706 let tenant_config = TenantConfig::builder()
707 .id("small_tenant")
708 .name("Small Tenant")
709 .schema_isolation("shared", "small_tenant")
710 .max_connections(2) .build();
712
713 let pool = manager.get_pool(&tenant, &tenant_config);
714
715 assert_eq!(pool.tenant_id().as_str(), "__shared__");
717 }
718
719 #[test]
720 fn test_tenant_connection_lease() {
721 let tenant = TenantId::new("test");
722 let config = TenantPoolConfig::default();
723 let pool = Arc::new(TenantPool::new(tenant.clone(), config));
724
725 pool.record_created();
726 pool.record_acquire();
727
728 let conn = PooledConnection::new(1, tenant.clone(), "127.0.0.1:5432");
729 let mut lease = TenantConnectionLease::new(conn, pool.clone());
730
731 assert_eq!(lease.tenant_id().as_str(), "test");
732 assert!(!lease.was_used());
733
734 lease.mark_used();
735 assert!(lease.was_used());
736
737 lease.release();
739 assert_eq!(pool.active_count(), 0);
740 assert_eq!(pool.idle_count(), 1);
741 }
742
743 #[test]
744 fn test_pool_stats() {
745 let tenant = TenantId::new("test");
746 let config = TenantPoolConfig {
747 max_connections: 10,
748 ..Default::default()
749 };
750 let pool = TenantPool::new(tenant, config);
751
752 pool.record_created();
753 pool.record_created();
754 pool.record_acquire();
755
756 let stats = pool.stats();
757 assert_eq!(stats.active, 1);
758 assert_eq!(stats.idle, 1);
759 assert_eq!(stats.max, 10);
760 assert_eq!(stats.total_created, 2);
761 }
762}