Skip to main content

heliosdb_proxy/multi_tenancy/
mod.rs

1//! Multi-Tenancy Support for HeliosProxy
2//!
3//! This module provides comprehensive multi-tenant isolation for database proxying.
4//!
5//! # Features
6//!
7//! - **Multiple Isolation Strategies**: Database, Schema, Row-level, or Branch isolation
8//! - **Flexible Tenant Identification**: Header, username prefix, JWT, or database name
9//! - **Per-Tenant Connection Pools**: Dedicated or shared pools with configurable limits
10//! - **Query Transformation**: Automatic tenant filtering for row-level isolation
11//! - **Comprehensive Metrics**: Per-tenant query stats, latencies, and costs
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! use heliosdb::proxy::multi_tenancy::{
17//!     TenantManager, TenantConfig, IsolationStrategy,
18//!     IdentificationMethod, RequestContext,
19//! };
20//!
21//! // Create tenant manager
22//! let manager = TenantManager::new();
23//!
24//! // Register a tenant with schema isolation
25//! let config = TenantConfig::builder()
26//!     .id("acme")
27//!     .name("Acme Corp")
28//!     .schema_isolation("shared_db", "acme")
29//!     .max_connections(50)
30//!     .qps_limit(1000)
31//!     .build();
32//!
33//! manager.register_tenant(config);
34//!
35//! // Identify tenant from request
36//! let ctx = RequestContext::new()
37//!     .with_header("X-Tenant-Id", "acme");
38//!
39//! let tenant = manager.identify_tenant(&ctx);
40//! ```
41//!
42//! # Architecture
43//!
44//! ```text
45//! ┌─────────────────────────────────────────────────┐
46//! │              MULTI-TENANT PROXY                  │
47//! │                                                  │
48//! │  ┌──────────────────────────────────────────┐   │
49//! │  │ Tenant Identification                    │   │
50//! │  │ - Header (X-Tenant-Id)                   │   │
51//! │  │ - Username prefix (tenant.user)          │   │
52//! │  │ - JWT claim                              │   │
53//! │  └──────────────────────────────────────────┘   │
54//! │                    │                             │
55//! │                    ▼                             │
56//! │  ┌──────────────────────────────────────────┐   │
57//! │  │ Isolation Strategy                       │   │
58//! │  │ Database | Schema | Row | Branch         │   │
59//! │  └──────────────────────────────────────────┘   │
60//! │                    │                             │
61//! │                    ▼                             │
62//! │  ┌──────────────────────────────────────────┐   │
63//! │  │ Per-Tenant Resources                     │   │
64//! │  │ - Connection pools                       │   │
65//! │  │ - Rate limits                            │   │
66//! │  │ - Metrics                                │   │
67//! │  └──────────────────────────────────────────┘   │
68//! └─────────────────────────────────────────────────┘
69//! ```
70
71pub mod config;
72pub mod identifier;
73pub mod isolation;
74pub mod metrics;
75pub mod pool;
76pub mod transformer;
77
78use std::sync::Arc;
79
80use dashmap::DashMap;
81
82pub use config::{
83    IdentificationMethod, IsolationStrategy, MultiTenancyConfig, TenantAiConfig, TenantConfig,
84    TenantConfigBuilder, TenantId, TenantPermissions, TenantPoolConfig, TenantRateLimits,
85};
86pub use identifier::{
87    create_identifier, CompositeIdentifier, DatabaseNameIdentifier, HeaderTenantIdentifier,
88    JwtClaimIdentifier, RequestContext, SqlContextIdentifier, TenantIdentifier,
89    UsernamePrefixIdentifier,
90};
91pub use isolation::{
92    create_handler, BranchIsolationHandler, DatabaseIsolationHandler, IsolationHandler,
93    IsolationRouter, RoutingDecision, RowIsolationHandler, SchemaIsolationHandler,
94    TenantProvisioner,
95};
96pub use metrics::{
97    AggregateMetricsSnapshot, TenantCostEntry, TenantCostReport, TenantCostTracker, TenantMetrics,
98    TenantMetricsSnapshot, TenantStats,
99};
100pub use pool::{
101    AcquireResult, AggregatePoolStats, ConnectionState, PooledConnection, TenantConnectionLease,
102    TenantConnectionPool, TenantPool, TenantPoolStats,
103};
104pub use transformer::{validate_query, QueryValidation, TenantQueryTransformer, TransformResult};
105
106/// Central tenant manager
107///
108/// Coordinates all multi-tenancy functionality including identification,
109/// isolation, connection pooling, and metrics.
110pub struct TenantManager {
111    /// Global configuration
112    config: MultiTenancyConfig,
113
114    /// Tenant configurations
115    tenants: DashMap<TenantId, TenantConfig>,
116
117    /// Tenant identifier
118    identifier: Arc<dyn TenantIdentifier>,
119
120    /// Isolation router
121    isolation_router: IsolationRouter,
122
123    /// Connection pool manager
124    pool_manager: TenantConnectionPool,
125
126    /// Query transformer
127    query_transformer: TenantQueryTransformer,
128
129    /// Metrics collector
130    metrics: TenantMetrics,
131
132    /// Cost tracker
133    cost_tracker: TenantCostTracker,
134
135    /// Tenant provisioner
136    provisioner: TenantProvisioner,
137}
138
139impl TenantManager {
140    /// Create a new tenant manager with default configuration
141    pub fn new() -> Self {
142        Self::with_config(MultiTenancyConfig::default())
143    }
144
145    /// Create a tenant manager with custom configuration
146    pub fn with_config(config: MultiTenancyConfig) -> Self {
147        let identifier = create_identifier(&config.identification);
148
149        Self {
150            config: config.clone(),
151            tenants: DashMap::new(),
152            identifier: Arc::from(identifier),
153            isolation_router: IsolationRouter::new(),
154            pool_manager: TenantConnectionPool::new(TenantPoolConfig::default()),
155            query_transformer: TenantQueryTransformer::new(),
156            metrics: TenantMetrics::new(),
157            cost_tracker: TenantCostTracker::new(),
158            provisioner: TenantProvisioner::new(),
159        }
160    }
161
162    /// Check if multi-tenancy is enabled
163    pub fn is_enabled(&self) -> bool {
164        self.config.enabled
165    }
166
167    /// Register a tenant
168    pub fn register_tenant(&self, config: TenantConfig) {
169        let tenant_id = config.id.clone();
170
171        // Register isolation handler
172        self.isolation_router.register_from_config(&config);
173
174        // Create connection pool
175        self.pool_manager
176            .create_tenant_pool(&tenant_id, config.pool.clone());
177
178        // Store config
179        self.tenants.insert(tenant_id, config);
180    }
181
182    /// Unregister a tenant
183    pub fn unregister_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
184        self.pool_manager.remove_tenant_pool(tenant);
185        self.tenants.remove(tenant).map(|(_, c)| c)
186    }
187
188    /// Get tenant configuration
189    pub fn get_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
190        self.tenants.get(tenant).map(|e| e.clone())
191    }
192
193    /// Check if tenant exists
194    pub fn has_tenant(&self, tenant: &TenantId) -> bool {
195        self.tenants.contains_key(tenant)
196    }
197
198    /// Get all tenant IDs
199    pub fn tenant_ids(&self) -> Vec<TenantId> {
200        self.tenants.iter().map(|e| e.key().clone()).collect()
201    }
202
203    /// Get tenant count
204    pub fn tenant_count(&self) -> usize {
205        self.tenants.len()
206    }
207
208    /// Identify tenant from request context
209    pub fn identify_tenant(&self, request: &RequestContext) -> Option<TenantId> {
210        let tenant_id = self.identifier.identify(request)?;
211
212        // Check if tenant exists or if unknown tenants are allowed
213        if self.has_tenant(&tenant_id) {
214            Some(tenant_id)
215        } else if self.config.allow_unknown_tenants {
216            // Auto-create if enabled
217            if self.config.auto_create_tenants {
218                let config = self.create_default_tenant_config(&tenant_id);
219                self.register_tenant(config);
220            }
221            Some(tenant_id)
222        } else {
223            None
224        }
225    }
226
227    /// Create default configuration for a new tenant
228    fn create_default_tenant_config(&self, tenant: &TenantId) -> TenantConfig {
229        let isolation = self.provisioner.generate_isolation(
230            tenant,
231            self.config.default_config.isolation.strategy_name(),
232            self.config.default_config.isolation.database_name(),
233        );
234
235        TenantConfig::builder()
236            .id(tenant.clone())
237            .name(tenant.0.clone())
238            .isolation(isolation)
239            .rate_limits(self.config.default_config.rate_limits.clone())
240            .pool(self.config.default_config.pool.clone())
241            .build()
242    }
243
244    /// Get routing decision for a tenant
245    pub fn get_routing(&self, tenant: &TenantId) -> Option<RoutingDecision> {
246        let config = self.get_tenant(tenant)?;
247        Some(self.isolation_router.get_routing(tenant, &config))
248    }
249
250    /// Transform a query for tenant isolation
251    pub fn transform_query(&self, query: &str, tenant: &TenantId) -> TransformResult {
252        if let Some(config) = self.get_tenant(tenant) {
253            self.query_transformer.transform(query, tenant, &config)
254        } else {
255            TransformResult::passthrough(query)
256        }
257    }
258
259    /// Validate a query for a tenant
260    pub fn validate_query(&self, query: &str, tenant: &TenantId) -> QueryValidation {
261        if let Some(config) = self.get_tenant(tenant) {
262            validate_query(query, tenant, &config)
263        } else {
264            QueryValidation {
265                valid: false,
266                violations: vec!["Unknown tenant".to_string()],
267            }
268        }
269    }
270
271    /// Get connection pool for a tenant
272    pub fn get_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
273        let config = self.get_tenant(tenant)?;
274        Some(self.pool_manager.get_pool(tenant, &config))
275    }
276
277    /// Record a query execution
278    pub fn record_query(
279        &self,
280        tenant: &TenantId,
281        duration: std::time::Duration,
282        rows: u64,
283        bytes_read: u64,
284        bytes_written: u64,
285        success: bool,
286    ) {
287        self.metrics.record_query(tenant, duration, rows, success);
288        self.metrics.record_bytes(tenant, bytes_read, bytes_written);
289        self.cost_tracker
290            .record_query_cost(tenant, rows, bytes_read, bytes_written);
291    }
292
293    /// Get metrics for a tenant
294    pub fn tenant_metrics(&self, tenant: &TenantId) -> Option<TenantMetricsSnapshot> {
295        self.metrics.snapshot(tenant)
296    }
297
298    /// Get aggregate metrics
299    pub fn aggregate_metrics(&self) -> AggregateMetricsSnapshot {
300        self.metrics.aggregate_snapshot()
301    }
302
303    /// Get top tenants by queries
304    pub fn top_tenants_by_queries(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
305        self.metrics.top_by_queries(limit)
306    }
307
308    /// Get cost for a tenant
309    pub fn tenant_cost(&self, tenant: &TenantId) -> Option<f64> {
310        self.cost_tracker.get_cost(tenant)
311    }
312
313    /// Get cost report
314    pub fn cost_report(&self) -> TenantCostReport {
315        self.cost_tracker.cost_report()
316    }
317
318    /// Get pool statistics for all tenants
319    pub fn pool_stats(&self) -> Vec<TenantPoolStats> {
320        self.pool_manager.all_stats()
321    }
322
323    /// Get aggregate pool statistics
324    pub fn aggregate_pool_stats(&self) -> AggregatePoolStats {
325        self.pool_manager.aggregate_stats()
326    }
327
328    /// Get the tenant provisioner
329    pub fn provisioner(&self) -> &TenantProvisioner {
330        &self.provisioner
331    }
332
333    /// Get the query transformer
334    pub fn query_transformer(&self) -> &TenantQueryTransformer {
335        &self.query_transformer
336    }
337
338    /// Get the metrics collector
339    pub fn metrics(&self) -> &TenantMetrics {
340        &self.metrics
341    }
342
343    /// Check if request is from admin user
344    pub fn is_admin_request(&self, request: &RequestContext) -> bool {
345        if let Some(pattern) = &self.config.admin_user_pattern {
346            if let Some(username) = &request.username {
347                // Simple pattern matching (in production, use regex)
348                return username.starts_with(pattern) || username == pattern;
349            }
350        }
351        false
352    }
353
354    /// Update tenant configuration
355    pub fn update_tenant(&self, tenant: &TenantId, config: TenantConfig) -> bool {
356        if self.tenants.contains_key(tenant) {
357            self.isolation_router.register_from_config(&config);
358            self.pool_manager
359                .create_tenant_pool(tenant, config.pool.clone());
360            self.tenants.insert(tenant.clone(), config);
361            true
362        } else {
363            false
364        }
365    }
366
367    /// Enable a tenant
368    pub fn enable_tenant(&self, tenant: &TenantId) -> bool {
369        if let Some(mut entry) = self.tenants.get_mut(tenant) {
370            entry.enabled = true;
371            true
372        } else {
373            false
374        }
375    }
376
377    /// Disable a tenant
378    pub fn disable_tenant(&self, tenant: &TenantId) -> bool {
379        if let Some(mut entry) = self.tenants.get_mut(tenant) {
380            entry.enabled = false;
381            true
382        } else {
383            false
384        }
385    }
386
387    /// Check if tenant is enabled
388    pub fn is_tenant_enabled(&self, tenant: &TenantId) -> bool {
389        self.tenants.get(tenant).map(|c| c.enabled).unwrap_or(false)
390    }
391}
392
393impl Default for TenantManager {
394    fn default() -> Self {
395        Self::new()
396    }
397}
398
399/// Builder for TenantManager
400pub struct TenantManagerBuilder {
401    config: MultiTenancyConfig,
402    identifier: Option<Arc<dyn TenantIdentifier>>,
403    query_transformer: Option<TenantQueryTransformer>,
404    provisioner: Option<TenantProvisioner>,
405}
406
407impl TenantManagerBuilder {
408    /// Create a new builder
409    pub fn new() -> Self {
410        Self {
411            config: MultiTenancyConfig::enabled(),
412            identifier: None,
413            query_transformer: None,
414            provisioner: None,
415        }
416    }
417
418    /// Set configuration
419    pub fn config(mut self, config: MultiTenancyConfig) -> Self {
420        self.config = config;
421        self
422    }
423
424    /// Set custom identifier
425    pub fn identifier(mut self, identifier: Arc<dyn TenantIdentifier>) -> Self {
426        self.identifier = Some(identifier);
427        self
428    }
429
430    /// Use header identification
431    pub fn header_identification(mut self, header: impl Into<String>) -> Self {
432        self.config.identification = IdentificationMethod::header(header);
433        self
434    }
435
436    /// Use username prefix identification
437    pub fn username_prefix_identification(mut self, separator: char) -> Self {
438        self.config.identification = IdentificationMethod::username_prefix(separator);
439        self
440    }
441
442    /// Set custom query transformer
443    pub fn query_transformer(mut self, transformer: TenantQueryTransformer) -> Self {
444        self.query_transformer = Some(transformer);
445        self
446    }
447
448    /// Set custom provisioner
449    pub fn provisioner(mut self, provisioner: TenantProvisioner) -> Self {
450        self.provisioner = Some(provisioner);
451        self
452    }
453
454    /// Allow unknown tenants
455    pub fn allow_unknown_tenants(mut self) -> Self {
456        self.config.allow_unknown_tenants = true;
457        self
458    }
459
460    /// Auto-create tenants
461    pub fn auto_create_tenants(mut self) -> Self {
462        self.config.auto_create_tenants = true;
463        self
464    }
465
466    /// Set default tenant config
467    pub fn default_tenant_config(mut self, config: TenantConfig) -> Self {
468        self.config.default_config = config;
469        self
470    }
471
472    /// Build the TenantManager
473    pub fn build(self) -> TenantManager {
474        let mut manager = TenantManager::with_config(self.config);
475
476        if let Some(identifier) = self.identifier {
477            manager.identifier = identifier;
478        }
479
480        if let Some(transformer) = self.query_transformer {
481            manager.query_transformer = transformer;
482        }
483
484        if let Some(provisioner) = self.provisioner {
485            manager.provisioner = provisioner;
486        }
487
488        manager
489    }
490}
491
492impl Default for TenantManagerBuilder {
493    fn default() -> Self {
494        Self::new()
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use std::time::Duration;
502
503    #[test]
504    fn test_tenant_manager_creation() {
505        let manager = TenantManager::new();
506        assert_eq!(manager.tenant_count(), 0);
507    }
508
509    #[test]
510    fn test_register_and_get_tenant() {
511        let manager = TenantManager::new();
512
513        let config = TenantConfig::builder()
514            .id("acme")
515            .name("Acme Corp")
516            .schema_isolation("shared", "acme")
517            .build();
518
519        manager.register_tenant(config.clone());
520
521        assert!(manager.has_tenant(&TenantId::new("acme")));
522        assert_eq!(manager.tenant_count(), 1);
523
524        let retrieved = manager.get_tenant(&TenantId::new("acme")).unwrap();
525        assert_eq!(retrieved.name, "Acme Corp");
526    }
527
528    #[test]
529    fn test_identify_tenant() {
530        let manager = TenantManagerBuilder::new()
531            .header_identification("X-Tenant-Id")
532            .build();
533
534        let config = TenantConfig::builder()
535            .id("acme")
536            .name("Acme")
537            .database_isolation("acme_db")
538            .build();
539
540        manager.register_tenant(config);
541
542        let ctx = RequestContext::new().with_header("X-Tenant-Id", "acme");
543        let tenant = manager.identify_tenant(&ctx);
544
545        assert!(tenant.is_some());
546        assert_eq!(tenant.unwrap().as_str(), "acme");
547    }
548
549    #[test]
550    fn test_unknown_tenant_rejected() {
551        let manager = TenantManager::new();
552
553        let ctx = RequestContext::new().with_header("X-Tenant-Id", "unknown");
554        let tenant = manager.identify_tenant(&ctx);
555
556        assert!(tenant.is_none());
557    }
558
559    #[test]
560    fn test_auto_create_tenants() {
561        let manager = TenantManagerBuilder::new()
562            .header_identification("X-Tenant-Id")
563            .allow_unknown_tenants()
564            .auto_create_tenants()
565            .build();
566
567        let ctx = RequestContext::new().with_header("X-Tenant-Id", "new_tenant");
568        let tenant = manager.identify_tenant(&ctx);
569
570        assert!(tenant.is_some());
571        assert!(manager.has_tenant(&TenantId::new("new_tenant")));
572    }
573
574    #[test]
575    fn test_routing_decision() {
576        let manager = TenantManager::new();
577
578        let config = TenantConfig::builder()
579            .id("acme")
580            .name("Acme")
581            .schema_isolation("shared_db", "acme_schema")
582            .build();
583
584        manager.register_tenant(config);
585
586        let routing = manager.get_routing(&TenantId::new("acme")).unwrap();
587        assert_eq!(routing.database, Some("shared_db".to_string()));
588        assert_eq!(routing.search_path, Some("acme_schema".to_string()));
589    }
590
591    #[test]
592    fn test_query_transformation() {
593        let transformer = TenantQueryTransformer::new().register_table("users", "tenant_id");
594
595        let mut manager = TenantManager::new();
596        manager.query_transformer = transformer;
597
598        let config = TenantConfig::builder()
599            .id("acme")
600            .name("Acme")
601            .row_isolation("shared_db", "tenant_id")
602            .build();
603
604        manager.register_tenant(config);
605
606        let result = manager.transform_query("SELECT * FROM users", &TenantId::new("acme"));
607
608        assert!(result.transformed);
609        assert!(result.query.contains("tenant_id = 'acme'"));
610    }
611
612    #[test]
613    fn test_metrics_recording() {
614        let manager = TenantManager::new();
615
616        let config = TenantConfig::builder()
617            .id("acme")
618            .name("Acme")
619            .database_isolation("acme_db")
620            .build();
621
622        manager.register_tenant(config);
623
624        let tenant = TenantId::new("acme");
625        manager.record_query(&tenant, Duration::from_millis(10), 100, 1024, 512, true);
626        manager.record_query(&tenant, Duration::from_millis(20), 200, 2048, 1024, false);
627
628        let snapshot = manager.tenant_metrics(&tenant).unwrap();
629        assert_eq!(snapshot.queries, 2);
630        assert_eq!(snapshot.errors, 1);
631        assert_eq!(snapshot.rows_processed, 300);
632    }
633
634    #[test]
635    fn test_enable_disable_tenant() {
636        let manager = TenantManager::new();
637
638        let config = TenantConfig::builder()
639            .id("acme")
640            .name("Acme")
641            .database_isolation("acme_db")
642            .build();
643
644        manager.register_tenant(config);
645
646        assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
647
648        manager.disable_tenant(&TenantId::new("acme"));
649        assert!(!manager.is_tenant_enabled(&TenantId::new("acme")));
650
651        manager.enable_tenant(&TenantId::new("acme"));
652        assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
653    }
654
655    #[test]
656    fn test_tenant_manager_builder() {
657        let default_config = TenantConfig::builder()
658            .id("default")
659            .name("Default")
660            .schema_isolation("shared", "default")
661            .max_connections(10)
662            .build();
663
664        let manager = TenantManagerBuilder::new()
665            .header_identification("X-Org-Id")
666            .allow_unknown_tenants()
667            .auto_create_tenants()
668            .default_tenant_config(default_config)
669            .build();
670
671        assert!(manager.is_enabled());
672    }
673
674    #[test]
675    fn test_unregister_tenant() {
676        let manager = TenantManager::new();
677
678        let config = TenantConfig::builder()
679            .id("acme")
680            .name("Acme")
681            .database_isolation("acme_db")
682            .build();
683
684        manager.register_tenant(config);
685        assert!(manager.has_tenant(&TenantId::new("acme")));
686
687        let removed = manager.unregister_tenant(&TenantId::new("acme"));
688        assert!(removed.is_some());
689        assert!(!manager.has_tenant(&TenantId::new("acme")));
690    }
691}