eventuali_core/tenancy/
isolation.rs

1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5
6use crate::event::Event;
7use crate::aggregate::{AggregateId, AggregateVersion};
8use crate::store::EventStore;
9use crate::error::{EventualiError, Result};
10use super::tenant::{TenantId, TenantError};
11
12/// Tenant isolation enforcement mechanism
13pub struct TenantIsolation {
14    isolation_policies: Arc<RwLock<HashMap<TenantId, IsolationPolicy>>>,
15    performance_monitor: Arc<RwLock<IsolationMetrics>>,
16}
17
18impl Default for TenantIsolation {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl TenantIsolation {
25    pub fn new() -> Self {
26        Self {
27            isolation_policies: Arc::new(RwLock::new(HashMap::new())),
28            performance_monitor: Arc::new(RwLock::new(IsolationMetrics::new())),
29        }
30    }
31    
32    /// Register a new tenant with isolation policy
33    pub fn register_tenant(&self, tenant_id: TenantId, policy: IsolationPolicy) -> Result<()> {
34        let mut policies = self.isolation_policies.write().unwrap();
35        policies.insert(tenant_id, policy);
36        Ok(())
37    }
38    
39    /// Validate that an operation is allowed for the tenant
40    pub fn validate_operation(&self, tenant_id: &TenantId, operation: &TenantOperation) -> Result<()> {
41        let start_time = std::time::Instant::now();
42        
43        let policies = self.isolation_policies.read().unwrap();
44        let policy = policies.get(tenant_id)
45            .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))?;
46        
47        let result = policy.validate_operation(operation);
48        
49        // Track performance
50        let duration = start_time.elapsed();
51        if duration.as_millis() > 10 {
52            // Log if we exceed 10ms target
53            eprintln!("Warning: Tenant isolation check took {}ms", duration.as_millis());
54        }
55        
56        let mut metrics = self.performance_monitor.write().unwrap();
57        metrics.record_validation(duration, result.is_ok());
58        
59        result
60    }
61    
62    /// Get isolation metrics for monitoring
63    pub fn get_metrics(&self) -> IsolationMetrics {
64        self.performance_monitor.read().unwrap().clone()
65    }
66}
67
68/// Tenant operation types for validation
69#[derive(Debug, Clone)]
70pub enum TenantOperation {
71    CreateEvent { aggregate_id: AggregateId },
72    ReadEvents { aggregate_id: AggregateId },
73    CreateProjection { name: String },
74    StreamEvents { from_timestamp: Option<DateTime<Utc>> },
75}
76
77/// Isolation policy for a tenant
78#[derive(Debug, Clone)]
79pub struct IsolationPolicy {
80    pub enforce_namespace: bool,
81    pub validate_access_patterns: bool,
82    pub audit_all_operations: bool,
83    pub max_cross_tenant_references: Option<u32>,
84}
85
86impl IsolationPolicy {
87    pub fn strict() -> Self {
88        Self {
89            enforce_namespace: true,
90            validate_access_patterns: true,
91            audit_all_operations: true,
92            max_cross_tenant_references: Some(0),
93        }
94    }
95    
96    pub fn relaxed() -> Self {
97        Self {
98            enforce_namespace: true,
99            validate_access_patterns: false,
100            audit_all_operations: false,
101            max_cross_tenant_references: Some(10),
102        }
103    }
104    
105    fn validate_operation(&self, operation: &TenantOperation) -> Result<()> {
106        // Implement validation logic
107        match operation {
108            TenantOperation::CreateEvent { aggregate_id } => {
109                if self.enforce_namespace && !self.validate_aggregate_namespace(aggregate_id) {
110                    return Err(EventualiError::from(TenantError::IsolationViolation(
111                        "Aggregate ID does not match tenant namespace".to_string()
112                    )));
113                }
114            },
115            TenantOperation::ReadEvents { aggregate_id } => {
116                if self.enforce_namespace && !self.validate_aggregate_namespace(aggregate_id) {
117                    return Err(EventualiError::from(TenantError::IsolationViolation(
118                        "Aggregate ID does not match tenant namespace".to_string()
119                    )));
120                }
121            },
122            TenantOperation::CreateProjection { .. } => {
123                // Additional validation for projections
124            },
125            TenantOperation::StreamEvents { .. } => {
126                // Validate streaming permissions
127            },
128        }
129        Ok(())
130    }
131    
132    fn validate_aggregate_namespace(&self, _aggregate_id: &AggregateId) -> bool {
133        // Simple validation - in real implementation this would check namespace prefixes
134        true
135    }
136}
137
138/// Performance metrics for tenant isolation
139#[derive(Debug, Clone)]
140pub struct IsolationMetrics {
141    pub total_validations: u64,
142    pub successful_validations: u64,
143    pub average_validation_time_ms: f64,
144    pub max_validation_time_ms: f64,
145    pub violations_detected: u64,
146    pub last_updated: DateTime<Utc>,
147}
148
149impl Default for IsolationMetrics {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154
155impl IsolationMetrics {
156    pub fn new() -> Self {
157        Self {
158            total_validations: 0,
159            successful_validations: 0,
160            average_validation_time_ms: 0.0,
161            max_validation_time_ms: 0.0,
162            violations_detected: 0,
163            last_updated: Utc::now(),
164        }
165    }
166    
167    pub fn record_validation(&mut self, duration: std::time::Duration, success: bool) {
168        self.total_validations += 1;
169        if success {
170            self.successful_validations += 1;
171        } else {
172            self.violations_detected += 1;
173        }
174        
175        let duration_ms = duration.as_millis() as f64;
176        self.average_validation_time_ms = 
177            (self.average_validation_time_ms * (self.total_validations - 1) as f64 + duration_ms) 
178            / self.total_validations as f64;
179        
180        if duration_ms > self.max_validation_time_ms {
181            self.max_validation_time_ms = duration_ms;
182        }
183        
184        self.last_updated = Utc::now();
185    }
186    
187    /// Check if we're meeting the <10ms performance target
188    pub fn is_performance_target_met(&self) -> bool {
189        self.average_validation_time_ms < 10.0 && self.max_validation_time_ms < 50.0
190    }
191    
192    /// Calculate isolation success rate (target: 99.9%)
193    pub fn isolation_success_rate(&self) -> f64 {
194        if self.total_validations == 0 {
195            return 100.0;
196        }
197        (self.successful_validations as f64 / self.total_validations as f64) * 100.0
198    }
199}
200
201/// Tenant-scoped event store wrapper
202pub struct IsolatedEventStore {
203    tenant_id: TenantId,
204    inner_store: Arc<dyn EventStore + Send + Sync>,
205    isolation: Arc<TenantIsolation>,
206}
207
208impl IsolatedEventStore {
209    pub fn new(
210        tenant_id: TenantId, 
211        inner_store: Arc<dyn EventStore + Send + Sync>,
212        isolation: Arc<TenantIsolation>
213    ) -> Self {
214        Self {
215            tenant_id,
216            inner_store,
217            isolation,
218        }
219    }
220    
221    /// Get the tenant ID this store is scoped to
222    pub fn tenant_id(&self) -> &TenantId {
223        &self.tenant_id
224    }
225    
226    /// Transform aggregate ID to include tenant namespace
227    fn tenant_scoped_aggregate_id(&self, aggregate_id: &AggregateId) -> AggregateId {
228        format!("{}:{}", self.tenant_id.db_prefix(), aggregate_id)
229    }
230}
231
232#[async_trait]
233impl EventStore for IsolatedEventStore {
234    async fn save_events(&self, events: Vec<Event>) -> Result<()> {
235        // For each event, validate operation and transform aggregate IDs
236        let mut scoped_events = Vec::new();
237        
238        for mut event in events {
239            // Validate operation
240            self.isolation.validate_operation(&self.tenant_id, &TenantOperation::CreateEvent { 
241                aggregate_id: event.aggregate_id.clone() 
242            })?;
243            
244            // Transform aggregate ID to include tenant namespace
245            event.aggregate_id = self.tenant_scoped_aggregate_id(&event.aggregate_id);
246            scoped_events.push(event);
247        }
248        
249        // Delegate to inner store
250        self.inner_store.save_events(scoped_events).await
251    }
252    
253    async fn load_events(&self, aggregate_id: &AggregateId, from_version: Option<AggregateVersion>) -> Result<Vec<Event>> {
254        // Validate operation
255        self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents { 
256            aggregate_id: aggregate_id.clone() 
257        })?;
258        
259        // Transform aggregate ID to include tenant namespace
260        let scoped_aggregate_id = self.tenant_scoped_aggregate_id(aggregate_id);
261        
262        // Delegate to inner store
263        let mut events = self.inner_store.load_events(&scoped_aggregate_id, from_version).await?;
264        
265        // Transform aggregate IDs back to unscoped versions for the caller
266        for event in &mut events {
267            event.aggregate_id = aggregate_id.clone();
268        }
269        
270        Ok(events)
271    }
272    
273    async fn load_events_by_type(&self, aggregate_type: &str, from_version: Option<AggregateVersion>) -> Result<Vec<Event>> {
274        // Create a tenant-scoped aggregate type
275        let scoped_aggregate_type = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_type);
276        
277        // Delegate to inner store
278        let mut events = self.inner_store.load_events_by_type(&scoped_aggregate_type, from_version).await?;
279        
280        // Transform aggregate IDs back to unscoped versions for the caller
281        for event in &mut events {
282            // Remove tenant prefix from aggregate ID
283            if let Some(unscoped) = event.aggregate_id.strip_prefix(&format!("{}:", self.tenant_id.db_prefix())) {
284                event.aggregate_id = unscoped.to_string();
285            }
286        }
287        
288        Ok(events)
289    }
290    
291    async fn get_aggregate_version(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateVersion>> {
292        // Validate operation (as read)
293        self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents { 
294            aggregate_id: aggregate_id.clone() 
295        })?;
296        
297        // Transform aggregate ID to include tenant namespace
298        let scoped_aggregate_id = self.tenant_scoped_aggregate_id(aggregate_id);
299        
300        // Delegate to inner store
301        self.inner_store.get_aggregate_version(&scoped_aggregate_id).await
302    }
303    
304    fn set_event_streamer(&mut self, _streamer: Arc<dyn crate::streaming::EventStreamer + Send + Sync>) {
305        // This would need to be handled differently as we have a reference to the inner store
306        // For now, we'll need to assume the inner store is mutable or use interior mutability
307        // This is a design limitation that would need to be addressed in a real implementation
308    }
309}
310
311/// Tenant scope utility for ensuring operations stay within tenant boundaries
312pub struct TenantScope {
313    pub tenant_id: TenantId,
314    pub context: TenantContext,
315}
316
317impl TenantScope {
318    pub fn new(tenant_id: TenantId) -> Self {
319        Self {
320            tenant_id,
321            context: TenantContext::new(),
322        }
323    }
324    
325    /// Execute a closure within this tenant scope
326    pub fn execute<T, F>(&self, f: F) -> Result<T>
327    where
328        F: FnOnce(&TenantScope) -> Result<T>,
329    {
330        f(self)
331    }
332}
333
334/// Context information for tenant operations
335#[derive(Debug, Clone)]
336pub struct TenantContext {
337    pub operation_id: String,
338    pub started_at: DateTime<Utc>,
339    pub metadata: HashMap<String, String>,
340}
341
342impl Default for TenantContext {
343    fn default() -> Self {
344        Self::new()
345    }
346}
347
348impl TenantContext {
349    pub fn new() -> Self {
350        Self {
351            operation_id: uuid::Uuid::new_v4().to_string(),
352            started_at: Utc::now(),
353            metadata: HashMap::new(),
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    
362    #[test]
363    fn test_tenant_isolation_creation() {
364        let isolation = TenantIsolation::new();
365        let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
366        
367        let policy = IsolationPolicy::strict();
368        assert!(isolation.register_tenant(tenant_id, policy).is_ok());
369    }
370    
371    #[test]
372    fn test_isolation_metrics_performance_target() {
373        let mut metrics = IsolationMetrics::new();
374        
375        // Record some fast validations
376        metrics.record_validation(std::time::Duration::from_millis(5), true);
377        metrics.record_validation(std::time::Duration::from_millis(8), true);
378        
379        assert!(metrics.is_performance_target_met());
380        assert!(metrics.isolation_success_rate() > 99.0);
381    }
382}