eventuali_core/tenancy/
storage.rs

1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use crate::event::Event;
6use crate::aggregate::{AggregateId, AggregateVersion};
7use crate::store::{EventStore, EventStoreBackend};
8use crate::error::{EventualiError, Result};
9use super::tenant::TenantId;
10use super::isolation::{TenantIsolation, TenantOperation};
11use super::quota::{TenantQuota, ResourceType};
12
13/// Tenant-aware event storage that ensures complete isolation between tenants
14/// while providing high-performance event operations
15pub struct TenantAwareEventStorage {
16    tenant_id: TenantId,
17    backend: Arc<dyn EventStoreBackend + Send + Sync>,
18    isolation: Arc<TenantIsolation>,
19    quota: Arc<TenantQuota>,
20    metrics: Arc<RwLock<TenantStorageMetrics>>,
21}
22
23impl TenantAwareEventStorage {
24    pub fn new(
25        tenant_id: TenantId,
26        backend: Arc<dyn EventStoreBackend + Send + Sync>,
27        isolation: Arc<TenantIsolation>,
28        quota: Arc<TenantQuota>,
29    ) -> Self {
30        Self {
31            tenant_id,
32            backend,
33            isolation,
34            quota,
35            metrics: Arc::new(RwLock::new(TenantStorageMetrics::new())),
36        }
37    }
38    
39    /// Transform event to include tenant namespace
40    fn tenant_scoped_event(&self, mut event: Event) -> Event {
41        // Add tenant namespace to aggregate ID
42        event.aggregate_id = format!("{}:{}", self.tenant_id.db_prefix(), event.aggregate_id);
43        
44        // Add tenant context to metadata headers
45        event.metadata.headers.insert(
46            "tenant_id".to_string(),
47            self.tenant_id.as_str().to_string()
48        );
49        event.metadata.headers.insert(
50            "tenant_namespace".to_string(),
51            self.tenant_id.db_prefix()
52        );
53        
54        event
55    }
56    
57    /// Remove tenant namespace from event for external consumption
58    fn unscoped_event(&self, mut event: Event) -> Event {
59        // Remove tenant prefix from aggregate ID
60        let prefix = format!("{}:", self.tenant_id.db_prefix());
61        if event.aggregate_id.starts_with(&prefix) {
62            event.aggregate_id = event.aggregate_id[prefix.len()..].to_string();
63        }
64        
65        event
66    }
67    
68    /// Get tenant-specific table/collection name
69    #[allow(dead_code)] // Utility method for database table naming (available for backend implementations)
70    fn tenant_table_name(&self, base_name: &str) -> String {
71        format!("{}_{}", self.tenant_id.db_prefix(), base_name)
72    }
73    
74    /// Validate and record event storage operation
75    fn validate_and_record(&self, operation: TenantOperation, event_count: u64) -> Result<()> {
76        // Validate tenant isolation
77        self.isolation.validate_operation(&self.tenant_id, &operation)?;
78        
79        // Check quotas
80        self.quota.check_quota(ResourceType::Events, event_count)?;
81        
82        // Record usage
83        self.quota.record_usage(ResourceType::Events, event_count);
84        
85        // Update metrics
86        let mut metrics = self.metrics.write().unwrap();
87        metrics.record_operation(operation, event_count);
88        
89        Ok(())
90    }
91    
92    pub fn get_metrics(&self) -> TenantStorageMetrics {
93        self.metrics.read().unwrap().clone()
94    }
95}
96
97#[async_trait]
98impl EventStore for TenantAwareEventStorage {
99    async fn save_events(&self, events: Vec<Event>) -> Result<()> {
100        let start_time = std::time::Instant::now();
101        
102        // Validate operation for the first event's aggregate (assuming batch operations on same aggregate)
103        if let Some(first_event) = events.first() {
104            self.validate_and_record(
105                TenantOperation::CreateEvent {
106                    aggregate_id: first_event.aggregate_id.clone()
107                },
108                events.len() as u64
109            )?;
110        }
111        
112        // Transform events to include tenant scoping
113        let scoped_events: Vec<Event> = events
114            .into_iter()
115            .map(|event| self.tenant_scoped_event(event))
116            .collect();
117        
118        // Delegate to backend
119        let result = self.backend.save_events(scoped_events).await;
120        
121        // Record performance metrics
122        let duration = start_time.elapsed();
123        let mut metrics = self.metrics.write().unwrap();
124        metrics.record_save_operation(duration, result.is_ok());
125        
126        result
127    }
128    
129    async fn load_events(
130        &self,
131        aggregate_id: &AggregateId,
132        from_version: Option<AggregateVersion>,
133    ) -> Result<Vec<Event>> {
134        let start_time = std::time::Instant::now();
135        
136        // Validate operation
137        self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents {
138            aggregate_id: aggregate_id.clone()
139        })?;
140        
141        // Transform aggregate ID to include tenant namespace
142        let scoped_aggregate_id = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_id);
143        
144        // Load events from backend
145        let result = self.backend.load_events(&scoped_aggregate_id, from_version).await;
146        
147        // Transform events back and record metrics
148        let final_result = match result {
149            Ok(events) => {
150                let unscoped_events = events
151                    .into_iter()
152                    .map(|event| self.unscoped_event(event))
153                    .collect::<Vec<Event>>();
154                
155                let mut metrics = self.metrics.write().unwrap();
156                metrics.record_load_operation(start_time.elapsed(), true, unscoped_events.len());
157                
158                Ok(unscoped_events)
159            }
160            Err(e) => {
161                let mut metrics = self.metrics.write().unwrap();
162                metrics.record_load_operation(start_time.elapsed(), false, 0);
163                Err(e)
164            }
165        };
166        
167        final_result
168    }
169    
170    async fn load_events_by_type(
171        &self,
172        aggregate_type: &str,
173        from_version: Option<AggregateVersion>,
174    ) -> Result<Vec<Event>> {
175        let start_time = std::time::Instant::now();
176        
177        // Create tenant-scoped aggregate type
178        let scoped_aggregate_type = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_type);
179        
180        // Load events from backend
181        let result = self.backend.load_events_by_type(&scoped_aggregate_type, from_version).await;
182        
183        // Transform events back and record metrics
184        match result {
185            Ok(events) => {
186                let unscoped_events = events
187                    .into_iter()
188                    .map(|event| self.unscoped_event(event))
189                    .collect::<Vec<Event>>();
190                
191                let mut metrics = self.metrics.write().unwrap();
192                metrics.record_load_operation(start_time.elapsed(), true, unscoped_events.len());
193                
194                Ok(unscoped_events)
195            }
196            Err(e) => {
197                let mut metrics = self.metrics.write().unwrap();
198                metrics.record_load_operation(start_time.elapsed(), false, 0);
199                Err(e)
200            }
201        }
202    }
203    
204    async fn get_aggregate_version(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateVersion>> {
205        // Validate operation
206        self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents {
207            aggregate_id: aggregate_id.clone()
208        })?;
209        
210        // Transform aggregate ID to include tenant namespace
211        let scoped_aggregate_id = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_id);
212        
213        self.backend.get_aggregate_version(&scoped_aggregate_id).await
214    }
215    
216    fn set_event_streamer(&mut self, _streamer: Arc<dyn crate::streaming::EventStreamer + Send + Sync>) {
217        // For tenant-aware storage, streaming would need to be tenant-scoped as well
218        // This would be implemented in a production system
219    }
220}
221
222/// Performance and usage metrics for tenant event storage
223#[derive(Debug, Clone)]
224pub struct TenantStorageMetrics {
225    pub tenant_id: TenantId,
226    pub total_save_operations: u64,
227    pub total_load_operations: u64,
228    pub total_events_saved: u64,
229    pub total_events_loaded: u64,
230    pub successful_saves: u64,
231    pub successful_loads: u64,
232    pub average_save_time_ms: f64,
233    pub average_load_time_ms: f64,
234    pub max_save_time_ms: f64,
235    pub max_load_time_ms: f64,
236    pub last_operation: Option<DateTime<Utc>>,
237    pub operations_by_type: HashMap<String, u64>,
238}
239
240impl Default for TenantStorageMetrics {
241    fn default() -> Self {
242        Self::new()
243    }
244}
245
246impl TenantStorageMetrics {
247    pub fn new() -> Self {
248        Self {
249            tenant_id: TenantId::generate(), // Will be set properly when used
250            total_save_operations: 0,
251            total_load_operations: 0,
252            total_events_saved: 0,
253            total_events_loaded: 0,
254            successful_saves: 0,
255            successful_loads: 0,
256            average_save_time_ms: 0.0,
257            average_load_time_ms: 0.0,
258            max_save_time_ms: 0.0,
259            max_load_time_ms: 0.0,
260            last_operation: None,
261            operations_by_type: HashMap::new(),
262        }
263    }
264    
265    pub fn record_operation(&mut self, operation: TenantOperation, _event_count: u64) {
266        self.last_operation = Some(Utc::now());
267        
268        let operation_type = match operation {
269            TenantOperation::CreateEvent { .. } => "create_event",
270            TenantOperation::ReadEvents { .. } => "read_events",
271            TenantOperation::CreateProjection { .. } => "create_projection",
272            TenantOperation::StreamEvents { .. } => "stream_events",
273        };
274        
275        *self.operations_by_type.entry(operation_type.to_string()).or_insert(0) += 1;
276    }
277    
278    pub fn record_save_operation(&mut self, duration: std::time::Duration, success: bool) {
279        self.total_save_operations += 1;
280        if success {
281            self.successful_saves += 1;
282        }
283        
284        let duration_ms = duration.as_millis() as f64;
285        self.average_save_time_ms = 
286            (self.average_save_time_ms * (self.total_save_operations - 1) as f64 + duration_ms)
287            / self.total_save_operations as f64;
288        
289        if duration_ms > self.max_save_time_ms {
290            self.max_save_time_ms = duration_ms;
291        }
292        
293        self.last_operation = Some(Utc::now());
294    }
295    
296    pub fn record_load_operation(&mut self, duration: std::time::Duration, success: bool, event_count: usize) {
297        self.total_load_operations += 1;
298        if success {
299            self.successful_loads += 1;
300            self.total_events_loaded += event_count as u64;
301        }
302        
303        let duration_ms = duration.as_millis() as f64;
304        self.average_load_time_ms = 
305            (self.average_load_time_ms * (self.total_load_operations - 1) as f64 + duration_ms)
306            / self.total_load_operations as f64;
307        
308        if duration_ms > self.max_load_time_ms {
309            self.max_load_time_ms = duration_ms;
310        }
311        
312        self.last_operation = Some(Utc::now());
313    }
314    
315    /// Calculate success rates
316    pub fn save_success_rate(&self) -> f64 {
317        if self.total_save_operations == 0 {
318            return 100.0;
319        }
320        (self.successful_saves as f64 / self.total_save_operations as f64) * 100.0
321    }
322    
323    pub fn load_success_rate(&self) -> f64 {
324        if self.total_load_operations == 0 {
325            return 100.0;
326        }
327        (self.successful_loads as f64 / self.total_load_operations as f64) * 100.0
328    }
329    
330    /// Check if performance targets are met
331    pub fn is_performance_target_met(&self) -> bool {
332        self.average_save_time_ms < 50.0 && self.average_load_time_ms < 20.0
333    }
334}
335
336/// Batch operations for efficient tenant event storage
337pub struct TenantEventBatch {
338    #[allow(dead_code)] // Tenant ID for batch isolation (stored for validation but not actively used in current implementation)
339    tenant_id: TenantId,
340    events: Vec<Event>,
341    max_batch_size: usize,
342}
343
344impl TenantEventBatch {
345    pub fn new(tenant_id: TenantId, max_batch_size: Option<usize>) -> Self {
346        Self {
347            tenant_id,
348            events: Vec::new(),
349            max_batch_size: max_batch_size.unwrap_or(1000),
350        }
351    }
352    
353    pub fn add_event(&mut self, event: Event) -> Result<()> {
354        if self.events.len() >= self.max_batch_size {
355            return Err(EventualiError::Tenant(format!(
356                "Batch size limit exceeded: {} events", 
357                self.max_batch_size
358            )));
359        }
360        
361        self.events.push(event);
362        Ok(())
363    }
364    
365    pub fn size(&self) -> usize {
366        self.events.len()
367    }
368    
369    pub fn is_full(&self) -> bool {
370        self.events.len() >= self.max_batch_size
371    }
372    
373    pub fn clear(&mut self) {
374        self.events.clear();
375    }
376    
377    pub async fn flush(&mut self, storage: &TenantAwareEventStorage) -> Result<()> {
378        if self.events.is_empty() {
379            return Ok(());
380        }
381        
382        let events_to_save = std::mem::take(&mut self.events);
383        storage.save_events(events_to_save).await?;
384        
385        Ok(())
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::store::sqlite::SQLiteBackend;
393    use crate::tenancy::isolation::{TenantIsolation, IsolationPolicy};
394    use crate::tenancy::quota::{TenantQuota};
395    use crate::tenancy::tenant::{TenantConfig, ResourceLimits};
396    
397    #[tokio::test]
398    async fn test_tenant_aware_storage_isolation() {
399        // Create test tenant
400        let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
401        
402        // Create in-memory SQLite backend
403        let mut backend = SQLiteBackend::new("sqlite://:memory:".to_string()).unwrap();
404        backend.initialize().await.unwrap();
405        
406        // Set up isolation and quota
407        let isolation = Arc::new(TenantIsolation::new());
408        isolation.register_tenant(tenant_id.clone(), IsolationPolicy::strict()).unwrap();
409        
410        let limits = ResourceLimits::default();
411        let quota = Arc::new(TenantQuota::new(tenant_id.clone(), limits));
412        
413        // Create tenant-aware storage
414        let storage = TenantAwareEventStorage::new(
415            tenant_id.clone(),
416            Arc::new(backend),
417            isolation,
418            quota,
419        );
420        
421        // Test event saving and loading
422        let test_event = Event::new(
423            "test-aggregate".to_string(),
424            "TestAggregate".to_string(),
425            "TestEvent".to_string(),
426            1,
427            1,
428            EventData::Json(serde_json::json!({"test": "data"}))
429        );
430        
431        // Save events
432        storage.save_events(vec![test_event.clone()]).await.unwrap();
433        
434        // Load events
435        let loaded_events = storage.load_events(&"test-aggregate".to_string(), None).await.unwrap();
436        
437        assert_eq!(loaded_events.len(), 1);
438        assert_eq!(loaded_events[0].aggregate_id, "test-aggregate");
439        assert_eq!(loaded_events[0].event_type, "TestEvent");
440        
441        // Verify metrics
442        let metrics = storage.get_metrics();
443        assert_eq!(metrics.total_save_operations, 1);
444        assert_eq!(metrics.total_load_operations, 1);
445        assert!(metrics.is_performance_target_met());
446    }
447    
448    #[test]
449    fn test_tenant_event_batch() {
450        let tenant_id = TenantId::new("batch-test".to_string()).unwrap();
451        let mut batch = TenantEventBatch::new(tenant_id, Some(3));
452        
453        // Add events to batch
454        for i in 0..2 {
455            let event = Event::new(
456                format!("aggregate-{}", i),
457                "TestAggregate".to_string(),
458                "TestEvent".to_string(),
459                1,
460                i as i64 + 1,
461                EventData::Json(serde_json::json!({"index": i}))
462            );
463            
464            batch.add_event(event).unwrap();
465        }
466        
467        assert_eq!(batch.size(), 2);
468        assert!(!batch.is_full());
469        
470        // Add one more to reach limit
471        let event = Event::new(
472            "aggregate-3".to_string(),
473            "TestAggregate".to_string(),
474            "TestEvent".to_string(),
475            1,
476            3,
477            EventData::Json(serde_json::json!({"index": 3}))
478        );
479        
480        batch.add_event(event).unwrap();
481        assert!(batch.is_full());
482        
483        // Try to add one more (should fail)
484        let overflow_event = Event::new(
485            "aggregate-4".to_string(),
486            "TestAggregate".to_string(),
487            "TestEvent".to_string(),
488            1,
489            4,
490            EventData::Json(serde_json::json!({"index": 4}))
491        );
492        
493        assert!(batch.add_event(overflow_event).is_err());
494    }
495}