heliosdb_proxy/multi_tenancy/
mod.rs1pub mod config;
72pub mod identifier;
73pub mod isolation;
74pub mod metrics;
75pub mod pool;
76pub mod transformer;
77
78use std::sync::Arc;
79
80use dashmap::DashMap;
81
82pub use config::{
83 IdentificationMethod, IsolationStrategy, MultiTenancyConfig, TenantAiConfig, TenantConfig,
84 TenantConfigBuilder, TenantId, TenantPermissions, TenantPoolConfig, TenantRateLimits,
85};
86pub use identifier::{
87 create_identifier, CompositeIdentifier, DatabaseNameIdentifier, HeaderTenantIdentifier,
88 JwtClaimIdentifier, RequestContext, SqlContextIdentifier, TenantIdentifier,
89 UsernamePrefixIdentifier,
90};
91pub use isolation::{
92 create_handler, BranchIsolationHandler, DatabaseIsolationHandler, IsolationHandler,
93 IsolationRouter, RoutingDecision, RowIsolationHandler, SchemaIsolationHandler,
94 TenantProvisioner,
95};
96pub use metrics::{
97 AggregateMetricsSnapshot, TenantCostEntry, TenantCostReport, TenantCostTracker, TenantMetrics,
98 TenantMetricsSnapshot, TenantStats,
99};
100pub use pool::{
101 AcquireResult, AggregatePoolStats, ConnectionState, PooledConnection, TenantConnectionLease,
102 TenantConnectionPool, TenantPool, TenantPoolStats,
103};
104pub use transformer::{validate_query, QueryValidation, TenantQueryTransformer, TransformResult};
105
106pub struct TenantManager {
111 config: MultiTenancyConfig,
113
114 tenants: DashMap<TenantId, TenantConfig>,
116
117 identifier: Arc<dyn TenantIdentifier>,
119
120 isolation_router: IsolationRouter,
122
123 pool_manager: TenantConnectionPool,
125
126 query_transformer: TenantQueryTransformer,
128
129 metrics: TenantMetrics,
131
132 cost_tracker: TenantCostTracker,
134
135 provisioner: TenantProvisioner,
137}
138
139impl TenantManager {
140 pub fn new() -> Self {
142 Self::with_config(MultiTenancyConfig::default())
143 }
144
145 pub fn with_config(config: MultiTenancyConfig) -> Self {
147 let identifier = create_identifier(&config.identification);
148
149 Self {
150 config: config.clone(),
151 tenants: DashMap::new(),
152 identifier: Arc::from(identifier),
153 isolation_router: IsolationRouter::new(),
154 pool_manager: TenantConnectionPool::new(TenantPoolConfig::default()),
155 query_transformer: TenantQueryTransformer::new(),
156 metrics: TenantMetrics::new(),
157 cost_tracker: TenantCostTracker::new(),
158 provisioner: TenantProvisioner::new(),
159 }
160 }
161
162 pub fn is_enabled(&self) -> bool {
164 self.config.enabled
165 }
166
167 pub fn register_tenant(&self, config: TenantConfig) {
169 let tenant_id = config.id.clone();
170
171 self.isolation_router.register_from_config(&config);
173
174 self.pool_manager
176 .create_tenant_pool(&tenant_id, config.pool.clone());
177
178 self.tenants.insert(tenant_id, config);
180 }
181
182 pub fn unregister_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
184 self.pool_manager.remove_tenant_pool(tenant);
185 self.tenants.remove(tenant).map(|(_, c)| c)
186 }
187
188 pub fn get_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
190 self.tenants.get(tenant).map(|e| e.clone())
191 }
192
193 pub fn has_tenant(&self, tenant: &TenantId) -> bool {
195 self.tenants.contains_key(tenant)
196 }
197
198 pub fn tenant_ids(&self) -> Vec<TenantId> {
200 self.tenants.iter().map(|e| e.key().clone()).collect()
201 }
202
203 pub fn tenant_count(&self) -> usize {
205 self.tenants.len()
206 }
207
208 pub fn identify_tenant(&self, request: &RequestContext) -> Option<TenantId> {
210 let tenant_id = self.identifier.identify(request)?;
211
212 if self.has_tenant(&tenant_id) {
214 Some(tenant_id)
215 } else if self.config.allow_unknown_tenants {
216 if self.config.auto_create_tenants {
218 let config = self.create_default_tenant_config(&tenant_id);
219 self.register_tenant(config);
220 }
221 Some(tenant_id)
222 } else {
223 None
224 }
225 }
226
227 fn create_default_tenant_config(&self, tenant: &TenantId) -> TenantConfig {
229 let isolation = self.provisioner.generate_isolation(
230 tenant,
231 self.config.default_config.isolation.strategy_name(),
232 self.config.default_config.isolation.database_name(),
233 );
234
235 TenantConfig::builder()
236 .id(tenant.clone())
237 .name(tenant.0.clone())
238 .isolation(isolation)
239 .rate_limits(self.config.default_config.rate_limits.clone())
240 .pool(self.config.default_config.pool.clone())
241 .build()
242 }
243
244 pub fn get_routing(&self, tenant: &TenantId) -> Option<RoutingDecision> {
246 let config = self.get_tenant(tenant)?;
247 Some(self.isolation_router.get_routing(tenant, &config))
248 }
249
250 pub fn transform_query(&self, query: &str, tenant: &TenantId) -> TransformResult {
252 if let Some(config) = self.get_tenant(tenant) {
253 self.query_transformer.transform(query, tenant, &config)
254 } else {
255 TransformResult::passthrough(query)
256 }
257 }
258
259 pub fn validate_query(&self, query: &str, tenant: &TenantId) -> QueryValidation {
261 if let Some(config) = self.get_tenant(tenant) {
262 validate_query(query, tenant, &config)
263 } else {
264 QueryValidation {
265 valid: false,
266 violations: vec!["Unknown tenant".to_string()],
267 }
268 }
269 }
270
271 pub fn get_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
273 let config = self.get_tenant(tenant)?;
274 Some(self.pool_manager.get_pool(tenant, &config))
275 }
276
277 pub fn record_query(
279 &self,
280 tenant: &TenantId,
281 duration: std::time::Duration,
282 rows: u64,
283 bytes_read: u64,
284 bytes_written: u64,
285 success: bool,
286 ) {
287 self.metrics.record_query(tenant, duration, rows, success);
288 self.metrics.record_bytes(tenant, bytes_read, bytes_written);
289 self.cost_tracker
290 .record_query_cost(tenant, rows, bytes_read, bytes_written);
291 }
292
293 pub fn tenant_metrics(&self, tenant: &TenantId) -> Option<TenantMetricsSnapshot> {
295 self.metrics.snapshot(tenant)
296 }
297
298 pub fn aggregate_metrics(&self) -> AggregateMetricsSnapshot {
300 self.metrics.aggregate_snapshot()
301 }
302
303 pub fn top_tenants_by_queries(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
305 self.metrics.top_by_queries(limit)
306 }
307
308 pub fn tenant_cost(&self, tenant: &TenantId) -> Option<f64> {
310 self.cost_tracker.get_cost(tenant)
311 }
312
313 pub fn cost_report(&self) -> TenantCostReport {
315 self.cost_tracker.cost_report()
316 }
317
318 pub fn pool_stats(&self) -> Vec<TenantPoolStats> {
320 self.pool_manager.all_stats()
321 }
322
323 pub fn aggregate_pool_stats(&self) -> AggregatePoolStats {
325 self.pool_manager.aggregate_stats()
326 }
327
328 pub fn provisioner(&self) -> &TenantProvisioner {
330 &self.provisioner
331 }
332
333 pub fn query_transformer(&self) -> &TenantQueryTransformer {
335 &self.query_transformer
336 }
337
338 pub fn metrics(&self) -> &TenantMetrics {
340 &self.metrics
341 }
342
343 pub fn is_admin_request(&self, request: &RequestContext) -> bool {
345 if let Some(pattern) = &self.config.admin_user_pattern {
346 if let Some(username) = &request.username {
347 return username.starts_with(pattern) || username == pattern;
349 }
350 }
351 false
352 }
353
354 pub fn update_tenant(&self, tenant: &TenantId, config: TenantConfig) -> bool {
356 if self.tenants.contains_key(tenant) {
357 self.isolation_router.register_from_config(&config);
358 self.pool_manager
359 .create_tenant_pool(tenant, config.pool.clone());
360 self.tenants.insert(tenant.clone(), config);
361 true
362 } else {
363 false
364 }
365 }
366
367 pub fn enable_tenant(&self, tenant: &TenantId) -> bool {
369 if let Some(mut entry) = self.tenants.get_mut(tenant) {
370 entry.enabled = true;
371 true
372 } else {
373 false
374 }
375 }
376
377 pub fn disable_tenant(&self, tenant: &TenantId) -> bool {
379 if let Some(mut entry) = self.tenants.get_mut(tenant) {
380 entry.enabled = false;
381 true
382 } else {
383 false
384 }
385 }
386
387 pub fn is_tenant_enabled(&self, tenant: &TenantId) -> bool {
389 self.tenants.get(tenant).map(|c| c.enabled).unwrap_or(false)
390 }
391}
392
393impl Default for TenantManager {
394 fn default() -> Self {
395 Self::new()
396 }
397}
398
399pub struct TenantManagerBuilder {
401 config: MultiTenancyConfig,
402 identifier: Option<Arc<dyn TenantIdentifier>>,
403 query_transformer: Option<TenantQueryTransformer>,
404 provisioner: Option<TenantProvisioner>,
405}
406
407impl TenantManagerBuilder {
408 pub fn new() -> Self {
410 Self {
411 config: MultiTenancyConfig::enabled(),
412 identifier: None,
413 query_transformer: None,
414 provisioner: None,
415 }
416 }
417
418 pub fn config(mut self, config: MultiTenancyConfig) -> Self {
420 self.config = config;
421 self
422 }
423
424 pub fn identifier(mut self, identifier: Arc<dyn TenantIdentifier>) -> Self {
426 self.identifier = Some(identifier);
427 self
428 }
429
430 pub fn header_identification(mut self, header: impl Into<String>) -> Self {
432 self.config.identification = IdentificationMethod::header(header);
433 self
434 }
435
436 pub fn username_prefix_identification(mut self, separator: char) -> Self {
438 self.config.identification = IdentificationMethod::username_prefix(separator);
439 self
440 }
441
442 pub fn query_transformer(mut self, transformer: TenantQueryTransformer) -> Self {
444 self.query_transformer = Some(transformer);
445 self
446 }
447
448 pub fn provisioner(mut self, provisioner: TenantProvisioner) -> Self {
450 self.provisioner = Some(provisioner);
451 self
452 }
453
454 pub fn allow_unknown_tenants(mut self) -> Self {
456 self.config.allow_unknown_tenants = true;
457 self
458 }
459
460 pub fn auto_create_tenants(mut self) -> Self {
462 self.config.auto_create_tenants = true;
463 self
464 }
465
466 pub fn default_tenant_config(mut self, config: TenantConfig) -> Self {
468 self.config.default_config = config;
469 self
470 }
471
472 pub fn build(self) -> TenantManager {
474 let mut manager = TenantManager::with_config(self.config);
475
476 if let Some(identifier) = self.identifier {
477 manager.identifier = identifier;
478 }
479
480 if let Some(transformer) = self.query_transformer {
481 manager.query_transformer = transformer;
482 }
483
484 if let Some(provisioner) = self.provisioner {
485 manager.provisioner = provisioner;
486 }
487
488 manager
489 }
490}
491
492impl Default for TenantManagerBuilder {
493 fn default() -> Self {
494 Self::new()
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use std::time::Duration;
502
503 #[test]
504 fn test_tenant_manager_creation() {
505 let manager = TenantManager::new();
506 assert_eq!(manager.tenant_count(), 0);
507 }
508
509 #[test]
510 fn test_register_and_get_tenant() {
511 let manager = TenantManager::new();
512
513 let config = TenantConfig::builder()
514 .id("acme")
515 .name("Acme Corp")
516 .schema_isolation("shared", "acme")
517 .build();
518
519 manager.register_tenant(config.clone());
520
521 assert!(manager.has_tenant(&TenantId::new("acme")));
522 assert_eq!(manager.tenant_count(), 1);
523
524 let retrieved = manager.get_tenant(&TenantId::new("acme")).unwrap();
525 assert_eq!(retrieved.name, "Acme Corp");
526 }
527
528 #[test]
529 fn test_identify_tenant() {
530 let manager = TenantManagerBuilder::new()
531 .header_identification("X-Tenant-Id")
532 .build();
533
534 let config = TenantConfig::builder()
535 .id("acme")
536 .name("Acme")
537 .database_isolation("acme_db")
538 .build();
539
540 manager.register_tenant(config);
541
542 let ctx = RequestContext::new().with_header("X-Tenant-Id", "acme");
543 let tenant = manager.identify_tenant(&ctx);
544
545 assert!(tenant.is_some());
546 assert_eq!(tenant.unwrap().as_str(), "acme");
547 }
548
549 #[test]
550 fn test_unknown_tenant_rejected() {
551 let manager = TenantManager::new();
552
553 let ctx = RequestContext::new().with_header("X-Tenant-Id", "unknown");
554 let tenant = manager.identify_tenant(&ctx);
555
556 assert!(tenant.is_none());
557 }
558
559 #[test]
560 fn test_auto_create_tenants() {
561 let manager = TenantManagerBuilder::new()
562 .header_identification("X-Tenant-Id")
563 .allow_unknown_tenants()
564 .auto_create_tenants()
565 .build();
566
567 let ctx = RequestContext::new().with_header("X-Tenant-Id", "new_tenant");
568 let tenant = manager.identify_tenant(&ctx);
569
570 assert!(tenant.is_some());
571 assert!(manager.has_tenant(&TenantId::new("new_tenant")));
572 }
573
574 #[test]
575 fn test_routing_decision() {
576 let manager = TenantManager::new();
577
578 let config = TenantConfig::builder()
579 .id("acme")
580 .name("Acme")
581 .schema_isolation("shared_db", "acme_schema")
582 .build();
583
584 manager.register_tenant(config);
585
586 let routing = manager.get_routing(&TenantId::new("acme")).unwrap();
587 assert_eq!(routing.database, Some("shared_db".to_string()));
588 assert_eq!(routing.search_path, Some("acme_schema".to_string()));
589 }
590
591 #[test]
592 fn test_query_transformation() {
593 let transformer = TenantQueryTransformer::new().register_table("users", "tenant_id");
594
595 let mut manager = TenantManager::new();
596 manager.query_transformer = transformer;
597
598 let config = TenantConfig::builder()
599 .id("acme")
600 .name("Acme")
601 .row_isolation("shared_db", "tenant_id")
602 .build();
603
604 manager.register_tenant(config);
605
606 let result = manager.transform_query("SELECT * FROM users", &TenantId::new("acme"));
607
608 assert!(result.transformed);
609 assert!(result.query.contains("tenant_id = 'acme'"));
610 }
611
612 #[test]
613 fn test_metrics_recording() {
614 let manager = TenantManager::new();
615
616 let config = TenantConfig::builder()
617 .id("acme")
618 .name("Acme")
619 .database_isolation("acme_db")
620 .build();
621
622 manager.register_tenant(config);
623
624 let tenant = TenantId::new("acme");
625 manager.record_query(&tenant, Duration::from_millis(10), 100, 1024, 512, true);
626 manager.record_query(&tenant, Duration::from_millis(20), 200, 2048, 1024, false);
627
628 let snapshot = manager.tenant_metrics(&tenant).unwrap();
629 assert_eq!(snapshot.queries, 2);
630 assert_eq!(snapshot.errors, 1);
631 assert_eq!(snapshot.rows_processed, 300);
632 }
633
634 #[test]
635 fn test_enable_disable_tenant() {
636 let manager = TenantManager::new();
637
638 let config = TenantConfig::builder()
639 .id("acme")
640 .name("Acme")
641 .database_isolation("acme_db")
642 .build();
643
644 manager.register_tenant(config);
645
646 assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
647
648 manager.disable_tenant(&TenantId::new("acme"));
649 assert!(!manager.is_tenant_enabled(&TenantId::new("acme")));
650
651 manager.enable_tenant(&TenantId::new("acme"));
652 assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
653 }
654
655 #[test]
656 fn test_tenant_manager_builder() {
657 let default_config = TenantConfig::builder()
658 .id("default")
659 .name("Default")
660 .schema_isolation("shared", "default")
661 .max_connections(10)
662 .build();
663
664 let manager = TenantManagerBuilder::new()
665 .header_identification("X-Org-Id")
666 .allow_unknown_tenants()
667 .auto_create_tenants()
668 .default_tenant_config(default_config)
669 .build();
670
671 assert!(manager.is_enabled());
672 }
673
674 #[test]
675 fn test_unregister_tenant() {
676 let manager = TenantManager::new();
677
678 let config = TenantConfig::builder()
679 .id("acme")
680 .name("Acme")
681 .database_isolation("acme_db")
682 .build();
683
684 manager.register_tenant(config);
685 assert!(manager.has_tenant(&TenantId::new("acme")));
686
687 let removed = manager.unregister_tenant(&TenantId::new("acme"));
688 assert!(removed.is_some());
689 assert!(!manager.has_tenant(&TenantId::new("acme")));
690 }
691}