Skip to main content

heliosdb_proxy/multi_tenancy/
mod.rs

1//! Multi-Tenancy Support for HeliosProxy
2//!
3//! This module provides comprehensive multi-tenant isolation for database proxying.
4//!
5//! # Features
6//!
7//! - **Multiple Isolation Strategies**: Database, Schema, Row-level, or Branch isolation
8//! - **Flexible Tenant Identification**: Header, username prefix, JWT, or database name
9//! - **Per-Tenant Connection Pools**: Dedicated or shared pools with configurable limits
10//! - **Query Transformation**: Automatic tenant filtering for row-level isolation
11//! - **Comprehensive Metrics**: Per-tenant query stats, latencies, and costs
12//!
13//! # Example
14//!
15//! ```rust
16//! use heliosdb::proxy::multi_tenancy::{
17//!     TenantManager, TenantConfig, IsolationStrategy,
18//!     IdentificationMethod, RequestContext,
19//! };
20//!
21//! // Create tenant manager
22//! let manager = TenantManager::new();
23//!
24//! // Register a tenant with schema isolation
25//! let config = TenantConfig::builder()
26//!     .id("acme")
27//!     .name("Acme Corp")
28//!     .schema_isolation("shared_db", "acme")
29//!     .max_connections(50)
30//!     .qps_limit(1000)
31//!     .build();
32//!
33//! manager.register_tenant(config);
34//!
35//! // Identify tenant from request
36//! let ctx = RequestContext::new()
37//!     .with_header("X-Tenant-Id", "acme");
38//!
39//! let tenant = manager.identify_tenant(&ctx);
40//! ```
41//!
42//! # Architecture
43//!
44//! ```text
45//! ┌─────────────────────────────────────────────────┐
46//! │              MULTI-TENANT PROXY                  │
47//! │                                                  │
48//! │  ┌──────────────────────────────────────────┐   │
49//! │  │ Tenant Identification                    │   │
50//! │  │ - Header (X-Tenant-Id)                   │   │
51//! │  │ - Username prefix (tenant.user)          │   │
52//! │  │ - JWT claim                              │   │
53//! │  └──────────────────────────────────────────┘   │
54//! │                    │                             │
55//! │                    ▼                             │
56//! │  ┌──────────────────────────────────────────┐   │
57//! │  │ Isolation Strategy                       │   │
58//! │  │ Database | Schema | Row | Branch         │   │
59//! │  └──────────────────────────────────────────┘   │
60//! │                    │                             │
61//! │                    ▼                             │
62//! │  ┌──────────────────────────────────────────┐   │
63//! │  │ Per-Tenant Resources                     │   │
64//! │  │ - Connection pools                       │   │
65//! │  │ - Rate limits                            │   │
66//! │  │ - Metrics                                │   │
67//! │  └──────────────────────────────────────────┘   │
68//! └─────────────────────────────────────────────────┘
69//! ```
70
71pub mod config;
72pub mod identifier;
73pub mod isolation;
74pub mod metrics;
75pub mod pool;
76pub mod transformer;
77
78use std::sync::Arc;
79
80use dashmap::DashMap;
81
82pub use config::{
83    IdentificationMethod, IsolationStrategy, MultiTenancyConfig, TenantAiConfig,
84    TenantConfig, TenantConfigBuilder, TenantId, TenantPermissions, TenantPoolConfig,
85    TenantRateLimits,
86};
87pub use identifier::{
88    create_identifier, CompositeIdentifier, DatabaseNameIdentifier, HeaderTenantIdentifier,
89    JwtClaimIdentifier, RequestContext, SqlContextIdentifier, TenantIdentifier,
90    UsernamePrefixIdentifier,
91};
92pub use isolation::{
93    create_handler, BranchIsolationHandler, DatabaseIsolationHandler, IsolationHandler,
94    IsolationRouter, RowIsolationHandler, RoutingDecision, SchemaIsolationHandler,
95    TenantProvisioner,
96};
97pub use metrics::{
98    AggregateMetricsSnapshot, TenantCostEntry, TenantCostReport, TenantCostTracker,
99    TenantMetrics, TenantMetricsSnapshot, TenantStats,
100};
101pub use pool::{
102    AcquireResult, AggregatePoolStats, ConnectionState, PooledConnection, TenantConnectionLease,
103    TenantConnectionPool, TenantPool, TenantPoolStats,
104};
105pub use transformer::{validate_query, QueryValidation, TenantQueryTransformer, TransformResult};
106
107/// Central tenant manager
108///
109/// Coordinates all multi-tenancy functionality including identification,
110/// isolation, connection pooling, and metrics.
111pub struct TenantManager {
112    /// Global configuration
113    config: MultiTenancyConfig,
114
115    /// Tenant configurations
116    tenants: DashMap<TenantId, TenantConfig>,
117
118    /// Tenant identifier
119    identifier: Arc<dyn TenantIdentifier>,
120
121    /// Isolation router
122    isolation_router: IsolationRouter,
123
124    /// Connection pool manager
125    pool_manager: TenantConnectionPool,
126
127    /// Query transformer
128    query_transformer: TenantQueryTransformer,
129
130    /// Metrics collector
131    metrics: TenantMetrics,
132
133    /// Cost tracker
134    cost_tracker: TenantCostTracker,
135
136    /// Tenant provisioner
137    provisioner: TenantProvisioner,
138}
139
140impl TenantManager {
141    /// Create a new tenant manager with default configuration
142    pub fn new() -> Self {
143        Self::with_config(MultiTenancyConfig::default())
144    }
145
146    /// Create a tenant manager with custom configuration
147    pub fn with_config(config: MultiTenancyConfig) -> Self {
148        let identifier = create_identifier(&config.identification);
149
150        Self {
151            config: config.clone(),
152            tenants: DashMap::new(),
153            identifier: Arc::from(identifier),
154            isolation_router: IsolationRouter::new(),
155            pool_manager: TenantConnectionPool::new(TenantPoolConfig::default()),
156            query_transformer: TenantQueryTransformer::new(),
157            metrics: TenantMetrics::new(),
158            cost_tracker: TenantCostTracker::new(),
159            provisioner: TenantProvisioner::new(),
160        }
161    }
162
163    /// Check if multi-tenancy is enabled
164    pub fn is_enabled(&self) -> bool {
165        self.config.enabled
166    }
167
168    /// Register a tenant
169    pub fn register_tenant(&self, config: TenantConfig) {
170        let tenant_id = config.id.clone();
171
172        // Register isolation handler
173        self.isolation_router
174            .register_from_config(&config);
175
176        // Create connection pool
177        self.pool_manager
178            .create_tenant_pool(&tenant_id, config.pool.clone());
179
180        // Store config
181        self.tenants.insert(tenant_id, config);
182    }
183
184    /// Unregister a tenant
185    pub fn unregister_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
186        self.pool_manager.remove_tenant_pool(tenant);
187        self.tenants.remove(tenant).map(|(_, c)| c)
188    }
189
190    /// Get tenant configuration
191    pub fn get_tenant(&self, tenant: &TenantId) -> Option<TenantConfig> {
192        self.tenants.get(tenant).map(|e| e.clone())
193    }
194
195    /// Check if tenant exists
196    pub fn has_tenant(&self, tenant: &TenantId) -> bool {
197        self.tenants.contains_key(tenant)
198    }
199
200    /// Get all tenant IDs
201    pub fn tenant_ids(&self) -> Vec<TenantId> {
202        self.tenants.iter().map(|e| e.key().clone()).collect()
203    }
204
205    /// Get tenant count
206    pub fn tenant_count(&self) -> usize {
207        self.tenants.len()
208    }
209
210    /// Identify tenant from request context
211    pub fn identify_tenant(&self, request: &RequestContext) -> Option<TenantId> {
212        let tenant_id = self.identifier.identify(request)?;
213
214        // Check if tenant exists or if unknown tenants are allowed
215        if self.has_tenant(&tenant_id) {
216            Some(tenant_id)
217        } else if self.config.allow_unknown_tenants {
218            // Auto-create if enabled
219            if self.config.auto_create_tenants {
220                let config = self.create_default_tenant_config(&tenant_id);
221                self.register_tenant(config);
222            }
223            Some(tenant_id)
224        } else {
225            None
226        }
227    }
228
229    /// Create default configuration for a new tenant
230    fn create_default_tenant_config(&self, tenant: &TenantId) -> TenantConfig {
231        let isolation = self.provisioner.generate_isolation(
232            tenant,
233            self.config.default_config.isolation.strategy_name(),
234            self.config.default_config.isolation.database_name(),
235        );
236
237        TenantConfig::builder()
238            .id(tenant.clone())
239            .name(tenant.0.clone())
240            .isolation(isolation)
241            .rate_limits(self.config.default_config.rate_limits.clone())
242            .pool(self.config.default_config.pool.clone())
243            .build()
244    }
245
246    /// Get routing decision for a tenant
247    pub fn get_routing(&self, tenant: &TenantId) -> Option<RoutingDecision> {
248        let config = self.get_tenant(tenant)?;
249        Some(self.isolation_router.get_routing(tenant, &config))
250    }
251
252    /// Transform a query for tenant isolation
253    pub fn transform_query(&self, query: &str, tenant: &TenantId) -> TransformResult {
254        if let Some(config) = self.get_tenant(tenant) {
255            self.query_transformer.transform(query, tenant, &config)
256        } else {
257            TransformResult::passthrough(query)
258        }
259    }
260
261    /// Validate a query for a tenant
262    pub fn validate_query(&self, query: &str, tenant: &TenantId) -> QueryValidation {
263        if let Some(config) = self.get_tenant(tenant) {
264            validate_query(query, tenant, &config)
265        } else {
266            QueryValidation {
267                valid: false,
268                violations: vec!["Unknown tenant".to_string()],
269            }
270        }
271    }
272
273    /// Get connection pool for a tenant
274    pub fn get_pool(&self, tenant: &TenantId) -> Option<Arc<TenantPool>> {
275        let config = self.get_tenant(tenant)?;
276        Some(self.pool_manager.get_pool(tenant, &config))
277    }
278
279    /// Record a query execution
280    pub fn record_query(
281        &self,
282        tenant: &TenantId,
283        duration: std::time::Duration,
284        rows: u64,
285        bytes_read: u64,
286        bytes_written: u64,
287        success: bool,
288    ) {
289        self.metrics.record_query(tenant, duration, rows, success);
290        self.metrics.record_bytes(tenant, bytes_read, bytes_written);
291        self.cost_tracker
292            .record_query_cost(tenant, rows, bytes_read, bytes_written);
293    }
294
295    /// Get metrics for a tenant
296    pub fn tenant_metrics(&self, tenant: &TenantId) -> Option<TenantMetricsSnapshot> {
297        self.metrics.snapshot(tenant)
298    }
299
300    /// Get aggregate metrics
301    pub fn aggregate_metrics(&self) -> AggregateMetricsSnapshot {
302        self.metrics.aggregate_snapshot()
303    }
304
305    /// Get top tenants by queries
306    pub fn top_tenants_by_queries(&self, limit: usize) -> Vec<TenantMetricsSnapshot> {
307        self.metrics.top_by_queries(limit)
308    }
309
310    /// Get cost for a tenant
311    pub fn tenant_cost(&self, tenant: &TenantId) -> Option<f64> {
312        self.cost_tracker.get_cost(tenant)
313    }
314
315    /// Get cost report
316    pub fn cost_report(&self) -> TenantCostReport {
317        self.cost_tracker.cost_report()
318    }
319
320    /// Get pool statistics for all tenants
321    pub fn pool_stats(&self) -> Vec<TenantPoolStats> {
322        self.pool_manager.all_stats()
323    }
324
325    /// Get aggregate pool statistics
326    pub fn aggregate_pool_stats(&self) -> AggregatePoolStats {
327        self.pool_manager.aggregate_stats()
328    }
329
330    /// Get the tenant provisioner
331    pub fn provisioner(&self) -> &TenantProvisioner {
332        &self.provisioner
333    }
334
335    /// Get the query transformer
336    pub fn query_transformer(&self) -> &TenantQueryTransformer {
337        &self.query_transformer
338    }
339
340    /// Get the metrics collector
341    pub fn metrics(&self) -> &TenantMetrics {
342        &self.metrics
343    }
344
345    /// Check if request is from admin user
346    pub fn is_admin_request(&self, request: &RequestContext) -> bool {
347        if let Some(pattern) = &self.config.admin_user_pattern {
348            if let Some(username) = &request.username {
349                // Simple pattern matching (in production, use regex)
350                return username.starts_with(pattern) || username == pattern;
351            }
352        }
353        false
354    }
355
356    /// Update tenant configuration
357    pub fn update_tenant(&self, tenant: &TenantId, config: TenantConfig) -> bool {
358        if self.tenants.contains_key(tenant) {
359            self.isolation_router.register_from_config(&config);
360            self.pool_manager
361                .create_tenant_pool(tenant, config.pool.clone());
362            self.tenants.insert(tenant.clone(), config);
363            true
364        } else {
365            false
366        }
367    }
368
369    /// Enable a tenant
370    pub fn enable_tenant(&self, tenant: &TenantId) -> bool {
371        if let Some(mut entry) = self.tenants.get_mut(tenant) {
372            entry.enabled = true;
373            true
374        } else {
375            false
376        }
377    }
378
379    /// Disable a tenant
380    pub fn disable_tenant(&self, tenant: &TenantId) -> bool {
381        if let Some(mut entry) = self.tenants.get_mut(tenant) {
382            entry.enabled = false;
383            true
384        } else {
385            false
386        }
387    }
388
389    /// Check if tenant is enabled
390    pub fn is_tenant_enabled(&self, tenant: &TenantId) -> bool {
391        self.tenants
392            .get(tenant)
393            .map(|c| c.enabled)
394            .unwrap_or(false)
395    }
396}
397
398impl Default for TenantManager {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404/// Builder for TenantManager
405pub struct TenantManagerBuilder {
406    config: MultiTenancyConfig,
407    identifier: Option<Arc<dyn TenantIdentifier>>,
408    query_transformer: Option<TenantQueryTransformer>,
409    provisioner: Option<TenantProvisioner>,
410}
411
412impl TenantManagerBuilder {
413    /// Create a new builder
414    pub fn new() -> Self {
415        Self {
416            config: MultiTenancyConfig::enabled(),
417            identifier: None,
418            query_transformer: None,
419            provisioner: None,
420        }
421    }
422
423    /// Set configuration
424    pub fn config(mut self, config: MultiTenancyConfig) -> Self {
425        self.config = config;
426        self
427    }
428
429    /// Set custom identifier
430    pub fn identifier(mut self, identifier: Arc<dyn TenantIdentifier>) -> Self {
431        self.identifier = Some(identifier);
432        self
433    }
434
435    /// Use header identification
436    pub fn header_identification(mut self, header: impl Into<String>) -> Self {
437        self.config.identification = IdentificationMethod::header(header);
438        self
439    }
440
441    /// Use username prefix identification
442    pub fn username_prefix_identification(mut self, separator: char) -> Self {
443        self.config.identification = IdentificationMethod::username_prefix(separator);
444        self
445    }
446
447    /// Set custom query transformer
448    pub fn query_transformer(mut self, transformer: TenantQueryTransformer) -> Self {
449        self.query_transformer = Some(transformer);
450        self
451    }
452
453    /// Set custom provisioner
454    pub fn provisioner(mut self, provisioner: TenantProvisioner) -> Self {
455        self.provisioner = Some(provisioner);
456        self
457    }
458
459    /// Allow unknown tenants
460    pub fn allow_unknown_tenants(mut self) -> Self {
461        self.config.allow_unknown_tenants = true;
462        self
463    }
464
465    /// Auto-create tenants
466    pub fn auto_create_tenants(mut self) -> Self {
467        self.config.auto_create_tenants = true;
468        self
469    }
470
471    /// Set default tenant config
472    pub fn default_tenant_config(mut self, config: TenantConfig) -> Self {
473        self.config.default_config = config;
474        self
475    }
476
477    /// Build the TenantManager
478    pub fn build(self) -> TenantManager {
479        let mut manager = TenantManager::with_config(self.config);
480
481        if let Some(identifier) = self.identifier {
482            manager.identifier = identifier;
483        }
484
485        if let Some(transformer) = self.query_transformer {
486            manager.query_transformer = transformer;
487        }
488
489        if let Some(provisioner) = self.provisioner {
490            manager.provisioner = provisioner;
491        }
492
493        manager
494    }
495}
496
497impl Default for TenantManagerBuilder {
498    fn default() -> Self {
499        Self::new()
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use std::time::Duration;
507
508    #[test]
509    fn test_tenant_manager_creation() {
510        let manager = TenantManager::new();
511        assert_eq!(manager.tenant_count(), 0);
512    }
513
514    #[test]
515    fn test_register_and_get_tenant() {
516        let manager = TenantManager::new();
517
518        let config = TenantConfig::builder()
519            .id("acme")
520            .name("Acme Corp")
521            .schema_isolation("shared", "acme")
522            .build();
523
524        manager.register_tenant(config.clone());
525
526        assert!(manager.has_tenant(&TenantId::new("acme")));
527        assert_eq!(manager.tenant_count(), 1);
528
529        let retrieved = manager.get_tenant(&TenantId::new("acme")).unwrap();
530        assert_eq!(retrieved.name, "Acme Corp");
531    }
532
533    #[test]
534    fn test_identify_tenant() {
535        let manager = TenantManagerBuilder::new()
536            .header_identification("X-Tenant-Id")
537            .build();
538
539        let config = TenantConfig::builder()
540            .id("acme")
541            .name("Acme")
542            .database_isolation("acme_db")
543            .build();
544
545        manager.register_tenant(config);
546
547        let ctx = RequestContext::new().with_header("X-Tenant-Id", "acme");
548        let tenant = manager.identify_tenant(&ctx);
549
550        assert!(tenant.is_some());
551        assert_eq!(tenant.unwrap().as_str(), "acme");
552    }
553
554    #[test]
555    fn test_unknown_tenant_rejected() {
556        let manager = TenantManager::new();
557
558        let ctx = RequestContext::new().with_header("X-Tenant-Id", "unknown");
559        let tenant = manager.identify_tenant(&ctx);
560
561        assert!(tenant.is_none());
562    }
563
564    #[test]
565    fn test_auto_create_tenants() {
566        let manager = TenantManagerBuilder::new()
567            .header_identification("X-Tenant-Id")
568            .allow_unknown_tenants()
569            .auto_create_tenants()
570            .build();
571
572        let ctx = RequestContext::new().with_header("X-Tenant-Id", "new_tenant");
573        let tenant = manager.identify_tenant(&ctx);
574
575        assert!(tenant.is_some());
576        assert!(manager.has_tenant(&TenantId::new("new_tenant")));
577    }
578
579    #[test]
580    fn test_routing_decision() {
581        let manager = TenantManager::new();
582
583        let config = TenantConfig::builder()
584            .id("acme")
585            .name("Acme")
586            .schema_isolation("shared_db", "acme_schema")
587            .build();
588
589        manager.register_tenant(config);
590
591        let routing = manager.get_routing(&TenantId::new("acme")).unwrap();
592        assert_eq!(routing.database, Some("shared_db".to_string()));
593        assert_eq!(routing.search_path, Some("acme_schema".to_string()));
594    }
595
596    #[test]
597    fn test_query_transformation() {
598        let transformer = TenantQueryTransformer::new()
599            .register_table("users", "tenant_id");
600
601        let mut manager = TenantManager::new();
602        manager.query_transformer = transformer;
603
604        let config = TenantConfig::builder()
605            .id("acme")
606            .name("Acme")
607            .row_isolation("shared_db", "tenant_id")
608            .build();
609
610        manager.register_tenant(config);
611
612        let result = manager.transform_query(
613            "SELECT * FROM users",
614            &TenantId::new("acme"),
615        );
616
617        assert!(result.transformed);
618        assert!(result.query.contains("tenant_id = 'acme'"));
619    }
620
621    #[test]
622    fn test_metrics_recording() {
623        let manager = TenantManager::new();
624
625        let config = TenantConfig::builder()
626            .id("acme")
627            .name("Acme")
628            .database_isolation("acme_db")
629            .build();
630
631        manager.register_tenant(config);
632
633        let tenant = TenantId::new("acme");
634        manager.record_query(&tenant, Duration::from_millis(10), 100, 1024, 512, true);
635        manager.record_query(&tenant, Duration::from_millis(20), 200, 2048, 1024, false);
636
637        let snapshot = manager.tenant_metrics(&tenant).unwrap();
638        assert_eq!(snapshot.queries, 2);
639        assert_eq!(snapshot.errors, 1);
640        assert_eq!(snapshot.rows_processed, 300);
641    }
642
643    #[test]
644    fn test_enable_disable_tenant() {
645        let manager = TenantManager::new();
646
647        let config = TenantConfig::builder()
648            .id("acme")
649            .name("Acme")
650            .database_isolation("acme_db")
651            .build();
652
653        manager.register_tenant(config);
654
655        assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
656
657        manager.disable_tenant(&TenantId::new("acme"));
658        assert!(!manager.is_tenant_enabled(&TenantId::new("acme")));
659
660        manager.enable_tenant(&TenantId::new("acme"));
661        assert!(manager.is_tenant_enabled(&TenantId::new("acme")));
662    }
663
664    #[test]
665    fn test_tenant_manager_builder() {
666        let default_config = TenantConfig::builder()
667            .id("default")
668            .name("Default")
669            .schema_isolation("shared", "default")
670            .max_connections(10)
671            .build();
672
673        let manager = TenantManagerBuilder::new()
674            .header_identification("X-Org-Id")
675            .allow_unknown_tenants()
676            .auto_create_tenants()
677            .default_tenant_config(default_config)
678            .build();
679
680        assert!(manager.is_enabled());
681    }
682
683    #[test]
684    fn test_unregister_tenant() {
685        let manager = TenantManager::new();
686
687        let config = TenantConfig::builder()
688            .id("acme")
689            .name("Acme")
690            .database_isolation("acme_db")
691            .build();
692
693        manager.register_tenant(config);
694        assert!(manager.has_tenant(&TenantId::new("acme")));
695
696        let removed = manager.unregister_tenant(&TenantId::new("acme"));
697        assert!(removed.is_some());
698        assert!(!manager.has_tenant(&TenantId::new("acme")));
699    }
700}