heliosdb_proxy/multi_tenancy/
mod.rs1pub mod config;
72pub mod identifier;
73pub mod isolation;
74pub mod metrics;
75pub mod pool;
76pub mod transformer;
77
78use std::sync::Arc;
79
80use dashmap::DashMap;
81
82pub use config::{
83 IdentificationMethod, IsolationStrategy, MultiTenancyConfig, TenantAiConfig,
84 TenantConfig, TenantConfigBuilder, TenantId, TenantPermissions, TenantPoolConfig,
85 TenantRateLimits,
86};
87pub use identifier::{
88 create_identifier, CompositeIdentifier, DatabaseNameIdentifier, HeaderTenantIdentifier,
89 JwtClaimIdentifier, RequestContext, SqlContextIdentifier, TenantIdentifier,
90 UsernamePrefixIdentifier,
91};
92pub use isolation::{
93 create_handler, BranchIsolationHandler, DatabaseIsolationHandler, IsolationHandler,
94 IsolationRouter, RowIsolationHandler, RoutingDecision, SchemaIsolationHandler,
95 TenantProvisioner,
96};
97pub use metrics::{
98 AggregateMetricsSnapshot, TenantCostEntry, TenantCostReport, TenantCostTracker,
99 TenantMetrics, TenantMetricsSnapshot, TenantStats,
100};
101pub use pool::{
102 AcquireResult, AggregatePoolStats, ConnectionState, PooledConnection, TenantConnectionLease,
103 TenantConnectionPool, TenantPool, TenantPoolStats,
104};
105pub use transformer::{validate_query, QueryValidation, TenantQueryTransformer, TransformResult};
106
107pub struct TenantManager {
112 config: MultiTenancyConfig,
114
115 tenants: DashMap<TenantId, TenantConfig>,
117
118 identifier: Arc<dyn TenantIdentifier>,
120
121 isolation_router: IsolationRouter,
123
124 pool_manager: TenantConnectionPool,
126
127 query_transformer: TenantQueryTransformer,
129
130 metrics: TenantMetrics,
132
133 cost_tracker: TenantCostTracker,
135
136 provisioner: TenantProvisioner,
138}
139
140impl TenantManager {
141 pub fn new() -> Self {
143 Self::with_config(MultiTenancyConfig::default())
144 }
145
146 pub fn with_config(config: MultiTenancyConfig) -> Self {
148 let identifier = create_identifier(&config.identification);
149
150 Self {
151 config: config.clone(),
152 tenants: DashMap::new(),
153 identifier: Arc::from(identifier),
154 isolation_router: IsolationRouter::new(),
155 pool_manager: TenantConnectionPool::new(TenantPoolConfig::default()),
156 query_transformer: TenantQueryTransformer::new(),
157 metrics: TenantMetrics::new(),
158 cost_tracker: TenantCostTracker::new(),
159 provisioner: TenantProvisioner::new(),
160 }
161 }
162
163 pub fn is_enabled(&self) -> bool {
165 self.config.enabled
166 }
167
168 pub fn register_tenant(&self, config: TenantConfig) {
170 let tenant_id = config.id.clone();
171
172 self.isolation_router
174 .register_from_config(&config);
175
176 self.pool_manager
178 .create_tenant_pool(&tenant_id, config.pool.clone());
179
180 self.tenants.insert(tenant_id, config);
182 }
183
184 pub fn unregister_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
186 self.pool_manager.remove_tenant_pool(tenant);
187 self.tenants.remove(tenant).map(|(_, c)| c)
188 }
189
190 pub fn get_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
192 self.tenants.get(tenant).map(|e| e.clone())
193 }
194
195 pub fn has_tenant(&self, tenant: &TenantId) -> bool {
197 self.tenants.contains_key(tenant)
198 }
199
200 pub fn tenant_ids(&self) -> Vec<TenantId> {
202 self.tenants.iter().map(|e| e.key().clone()).collect()
203 }
204
205 pub fn tenant_count(&self) -> usize {
207 self.tenants.len()
208 }
209
210 pub fn identify_tenant(&self, request: &RequestContext) -> Option<TenantId> {
212 let tenant_id = self.identifier.identify(request)?;
213
214 if self.has_tenant(&tenant_id) {
216 Some(tenant_id)
217 } else if self.config.allow_unknown_tenants {
218 if self.config.auto_create_tenants {
220 let config = self.create_default_tenant_config(&tenant_id);
221 self.register_tenant(config);
222 }
223 Some(tenant_id)
224 } else {
225 None
226 }
227 }
228
229 fn create_default_tenant_config(&self, tenant: &TenantId) -> TenantConfig {
231 let isolation = self.provisioner.generate_isolation(
232 tenant,
233 self.config.default_config.isolation.strategy_name(),
234 self.config.default_config.isolation.database_name(),
235 );
236
237 TenantConfig::builder()
238 .id(tenant.clone())
239 .name(tenant.0.clone())
240 .isolation(isolation)
241 .rate_limits(self.config.default_config.rate_limits.clone())
242 .pool(self.config.default_config.pool.clone())
243 .build()
244 }
245
246 pub fn get_routing(&self, tenant: &TenantId) -> Option<RoutingDecision> {
248 let config = self.get_tenant(tenant)?;
249 Some(self.isolation_router.get_routing(tenant, &config))
250 }
251
252 pub fn transform_query(&self, query: &str, tenant: &TenantId) -> TransformResult {
254 if let Some(config) = self.get_tenant(tenant) {
255 self.query_transformer.transform(query, tenant, &config)
256 } else {
257 TransformResult::passthrough(query)
258 }
259 }
260
261 pub fn validate_query(&self, query: &str, tenant: &TenantId) -> QueryValidation {
263 if let Some(config) = self.get_tenant(tenant) {
264 validate_query(query, tenant, &config)
265 } else {
266 QueryValidation {
267 valid: false,
268 violations: vec!["Unknown tenant".to_string()],
269 }
270 }
271 }
272
273 pub fn get_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
275 let config = self.get_tenant(tenant)?;
276 Some(self.pool_manager.get_pool(tenant, &config))
277 }
278
279 pub fn record_query(
281 &self,
282 tenant: &TenantId,
283 duration: std::time::Duration,
284 rows: u64,
285 bytes_read: u64,
286 bytes_written: u64,
287 success: bool,
288 ) {
289 self.metrics.record_query(tenant, duration, rows, success);
290 self.metrics.record_bytes(tenant, bytes_read, bytes_written);
291 self.cost_tracker
292 .record_query_cost(tenant, rows, bytes_read, bytes_written);
293 }
294
295 pub fn tenant_metrics(&self, tenant: &TenantId) -> Option<TenantMetricsSnapshot> {
297 self.metrics.snapshot(tenant)
298 }
299
300 pub fn aggregate_metrics(&self) -> AggregateMetricsSnapshot {
302 self.metrics.aggregate_snapshot()
303 }
304
305 pub fn top_tenants_by_queries(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
307 self.metrics.top_by_queries(limit)
308 }
309
310 pub fn tenant_cost(&self, tenant: &TenantId) -> Option<f64> {
312 self.cost_tracker.get_cost(tenant)
313 }
314
315 pub fn cost_report(&self) -> TenantCostReport {
317 self.cost_tracker.cost_report()
318 }
319
320 pub fn pool_stats(&self) -> Vec<TenantPoolStats> {
322 self.pool_manager.all_stats()
323 }
324
325 pub fn aggregate_pool_stats(&self) -> AggregatePoolStats {
327 self.pool_manager.aggregate_stats()
328 }
329
330 pub fn provisioner(&self) -> &TenantProvisioner {
332 &self.provisioner
333 }
334
335 pub fn query_transformer(&self) -> &TenantQueryTransformer {
337 &self.query_transformer
338 }
339
340 pub fn metrics(&self) -> &TenantMetrics {
342 &self.metrics
343 }
344
345 pub fn is_admin_request(&self, request: &RequestContext) -> bool {
347 if let Some(pattern) = &self.config.admin_user_pattern {
348 if let Some(username) = &request.username {
349 return username.starts_with(pattern) || username == pattern;
351 }
352 }
353 false
354 }
355
356 pub fn update_tenant(&self, tenant: &TenantId, config: TenantConfig) -> bool {
358 if self.tenants.contains_key(tenant) {
359 self.isolation_router.register_from_config(&config);
360 self.pool_manager
361 .create_tenant_pool(tenant, config.pool.clone());
362 self.tenants.insert(tenant.clone(), config);
363 true
364 } else {
365 false
366 }
367 }
368
369 pub fn enable_tenant(&self, tenant: &TenantId) -> bool {
371 if let Some(mut entry) = self.tenants.get_mut(tenant) {
372 entry.enabled = true;
373 true
374 } else {
375 false
376 }
377 }
378
379 pub fn disable_tenant(&self, tenant: &TenantId) -> bool {
381 if let Some(mut entry) = self.tenants.get_mut(tenant) {
382 entry.enabled = false;
383 true
384 } else {
385 false
386 }
387 }
388
389 pub fn is_tenant_enabled(&self, tenant: &TenantId) -> bool {
391 self.tenants
392 .get(tenant)
393 .map(|c| c.enabled)
394 .unwrap_or(false)
395 }
396}
397
398impl Default for TenantManager {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404pub struct TenantManagerBuilder {
406 config: MultiTenancyConfig,
407 identifier: Option<Arc<dyn TenantIdentifier>>,
408 query_transformer: Option<TenantQueryTransformer>,
409 provisioner: Option<TenantProvisioner>,
410}
411
412impl TenantManagerBuilder {
413 pub fn new() -> Self {
415 Self {
416 config: MultiTenancyConfig::enabled(),
417 identifier: None,
418 query_transformer: None,
419 provisioner: None,
420 }
421 }
422
423 pub fn config(mut self, config: MultiTenancyConfig) -> Self {
425 self.config = config;
426 self
427 }
428
429 pub fn identifier(mut self, identifier: Arc<dyn TenantIdentifier>) -> Self {
431 self.identifier = Some(identifier);
432 self
433 }
434
435 pub fn header_identification(mut self, header: impl Into<String>) -> Self {
437 self.config.identification = IdentificationMethod::header(header);
438 self
439 }
440
441 pub fn username_prefix_identification(mut self, separator: char) -> Self {
443 self.config.identification = IdentificationMethod::username_prefix(separator);
444 self
445 }
446
447 pub fn query_transformer(mut self, transformer: TenantQueryTransformer) -> Self {
449 self.query_transformer = Some(transformer);
450 self
451 }
452
453 pub fn provisioner(mut self, provisioner: TenantProvisioner) -> Self {
455 self.provisioner = Some(provisioner);
456 self
457 }
458
459 pub fn allow_unknown_tenants(mut self) -> Self {
461 self.config.allow_unknown_tenants = true;
462 self
463 }
464
465 pub fn auto_create_tenants(mut self) -> Self {
467 self.config.auto_create_tenants = true;
468 self
469 }
470
471 pub fn default_tenant_config(mut self, config: TenantConfig) -> Self {
473 self.config.default_config = config;
474 self
475 }
476
477 pub fn build(self) -> TenantManager {
479 let mut manager = TenantManager::with_config(self.config);
480
481 if let Some(identifier) = self.identifier {
482 manager.identifier = identifier;
483 }
484
485 if let Some(transformer) = self.query_transformer {
486 manager.query_transformer = transformer;
487 }
488
489 if let Some(provisioner) = self.provisioner {
490 manager.provisioner = provisioner;
491 }
492
493 manager
494 }
495}
496
497impl Default for TenantManagerBuilder {
498 fn default() -> Self {
499 Self::new()
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use std::time::Duration;
507
508 #[test]
509 fn test_tenant_manager_creation() {
510 let manager = TenantManager::new();
511 assert_eq!(manager.tenant_count(), 0);
512 }
513
514 #[test]
515 fn test_register_and_get_tenant() {
516 let manager = TenantManager::new();
517
518 let config = TenantConfig::builder()
519 .id("acme")
520 .name("Acme Corp")
521 .schema_isolation("shared", "acme")
522 .build();
523
524 manager.register_tenant(config.clone());
525
526 assert!(manager.has_tenant(&TenantId::new("acme")));
527 assert_eq!(manager.tenant_count(), 1);
528
529 let retrieved = manager.get_tenant(&TenantId::new("acme")).unwrap();
530 assert_eq!(retrieved.name, "Acme Corp");
531 }
532
533 #[test]
534 fn test_identify_tenant() {
535 let manager = TenantManagerBuilder::new()
536 .header_identification("X-Tenant-Id")
537 .build();
538
539 let config = TenantConfig::builder()
540 .id("acme")
541 .name("Acme")
542 .database_isolation("acme_db")
543 .build();
544
545 manager.register_tenant(config);
546
547 let ctx = RequestContext::new().with_header("X-Tenant-Id", "acme");
548 let tenant = manager.identify_tenant(&ctx);
549
550 assert!(tenant.is_some());
551 assert_eq!(tenant.unwrap().as_str(), "acme");
552 }
553
554 #[test]
555 fn test_unknown_tenant_rejected() {
556 let manager = TenantManager::new();
557
558 let ctx = RequestContext::new().with_header("X-Tenant-Id", "unknown");
559 let tenant = manager.identify_tenant(&ctx);
560
561 assert!(tenant.is_none());
562 }
563
564 #[test]
565 fn test_auto_create_tenants() {
566 let manager = TenantManagerBuilder::new()
567 .header_identification("X-Tenant-Id")
568 .allow_unknown_tenants()
569 .auto_create_tenants()
570 .build();
571
572 let ctx = RequestContext::new().with_header("X-Tenant-Id", "new_tenant");
573 let tenant = manager.identify_tenant(&ctx);
574
575 assert!(tenant.is_some());
576 assert!(manager.has_tenant(&TenantId::new("new_tenant")));
577 }
578
579 #[test]
580 fn test_routing_decision() {
581 let manager = TenantManager::new();
582
583 let config = TenantConfig::builder()
584 .id("acme")
585 .name("Acme")
586 .schema_isolation("shared_db", "acme_schema")
587 .build();
588
589 manager.register_tenant(config);
590
591 let routing = manager.get_routing(&TenantId::new("acme")).unwrap();
592 assert_eq!(routing.database, Some("shared_db".to_string()));
593 assert_eq!(routing.search_path, Some("acme_schema".to_string()));
594 }
595
596 #[test]
597 fn test_query_transformation() {
598 let transformer = TenantQueryTransformer::new()
599 .register_table("users", "tenant_id");
600
601 let mut manager = TenantManager::new();
602 manager.query_transformer = transformer;
603
604 let config = TenantConfig::builder()
605 .id("acme")
606 .name("Acme")
607 .row_isolation("shared_db", "tenant_id")
608 .build();
609
610 manager.register_tenant(config);
611
612 let result = manager.transform_query(
613 "SELECT * FROM users",
614 &TenantId::new("acme"),
615 );
616
617 assert!(result.transformed);
618 assert!(result.query.contains("tenant_id = 'acme'"));
619 }
620
621 #[test]
622 fn test_metrics_recording() {
623 let manager = TenantManager::new();
624
625 let config = TenantConfig::builder()
626 .id("acme")
627 .name("Acme")
628 .database_isolation("acme_db")
629 .build();
630
631 manager.register_tenant(config);
632
633 let tenant = TenantId::new("acme");
634 manager.record_query(&tenant, Duration::from_millis(10), 100, 1024, 512, true);
635 manager.record_query(&tenant, Duration::from_millis(20), 200, 2048, 1024, false);
636
637 let snapshot = manager.tenant_metrics(&tenant).unwrap();
638 assert_eq!(snapshot.queries, 2);
639 assert_eq!(snapshot.errors, 1);
640 assert_eq!(snapshot.rows_processed, 300);
641 }
642
643 #[test]
644 fn test_enable_disable_tenant() {
645 let manager = TenantManager::new();
646
647 let config = TenantConfig::builder()
648 .id("acme")
649 .name("Acme")
650 .database_isolation("acme_db")
651 .build();
652
653 manager.register_tenant(config);
654
655 assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
656
657 manager.disable_tenant(&TenantId::new("acme"));
658 assert!(!manager.is_tenant_enabled(&TenantId::new("acme")));
659
660 manager.enable_tenant(&TenantId::new("acme"));
661 assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
662 }
663
664 #[test]
665 fn test_tenant_manager_builder() {
666 let default_config = TenantConfig::builder()
667 .id("default")
668 .name("Default")
669 .schema_isolation("shared", "default")
670 .max_connections(10)
671 .build();
672
673 let manager = TenantManagerBuilder::new()
674 .header_identification("X-Org-Id")
675 .allow_unknown_tenants()
676 .auto_create_tenants()
677 .default_tenant_config(default_config)
678 .build();
679
680 assert!(manager.is_enabled());
681 }
682
683 #[test]
684 fn test_unregister_tenant() {
685 let manager = TenantManager::new();
686
687 let config = TenantConfig::builder()
688 .id("acme")
689 .name("Acme")
690 .database_isolation("acme_db")
691 .build();
692
693 manager.register_tenant(config);
694 assert!(manager.has_tenant(&TenantId::new("acme")));
695
696 let removed = manager.unregister_tenant(&TenantId::new("acme"));
697 assert!(removed.is_some());
698 assert!(!manager.has_tenant(&TenantId::new("acme")));
699 }
700}