eventuali_core/tenancy/
projections.rs

1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use crate::event::Event;
6use crate::streaming::Projection;
7use crate::error::{EventualiError, Result};
8use super::tenant::TenantId;
9use super::isolation::{TenantIsolation, TenantOperation};
10use super::quota::{TenantQuota, ResourceType};
11
12/// Tenant-scoped projection that maintains read models isolated per tenant
13pub struct TenantScopedProjection {
14    tenant_id: TenantId,
15    projection_name: String,
16    inner_projection: Arc<dyn Projection + Send + Sync>,
17    isolation: Arc<TenantIsolation>,
18    quota: Arc<TenantQuota>,
19    metrics: Arc<RwLock<TenantProjectionMetrics>>,
20}
21
22impl TenantScopedProjection {
23    pub fn new(
24        tenant_id: TenantId,
25        projection_name: String,
26        inner_projection: Arc<dyn Projection + Send + Sync>,
27        isolation: Arc<TenantIsolation>,
28        quota: Arc<TenantQuota>,
29    ) -> Self {
30        Self {
31            tenant_id,
32            projection_name,
33            inner_projection,
34            isolation,
35            quota,
36            metrics: Arc::new(RwLock::new(TenantProjectionMetrics::new())),
37        }
38    }
39    
40    /// Get tenant-scoped projection name
41    pub fn scoped_name(&self) -> String {
42        format!("{}:{}", self.tenant_id.db_prefix(), self.projection_name)
43    }
44    
45    /// Validate that the event belongs to this tenant's namespace
46    fn validate_event_belongs_to_tenant(&self, event: &Event) -> Result<()> {
47        let expected_prefix = format!("{}:", self.tenant_id.db_prefix());
48        
49        if !event.aggregate_id.starts_with(&expected_prefix) {
50            return Err(EventualiError::Tenant(format!(
51                "Event aggregate_id '{}' does not belong to tenant '{}'",
52                event.aggregate_id,
53                self.tenant_id.as_str()
54            )));
55        }
56        
57        Ok(())
58    }
59    
60    /// Transform event to remove tenant namespace for projection processing
61    fn unscoped_event(&self, mut event: Event) -> Event {
62        let prefix = format!("{}:", self.tenant_id.db_prefix());
63        if event.aggregate_id.starts_with(&prefix) {
64            event.aggregate_id = event.aggregate_id[prefix.len()..].to_string();
65        }
66        event
67    }
68    
69    pub fn get_metrics(&self) -> TenantProjectionMetrics {
70        self.metrics.read().unwrap().clone()
71    }
72}
73
74#[async_trait]
75impl Projection for TenantScopedProjection {
76    async fn handle_event(&self, event: &Event) -> Result<()> {
77        let start_time = std::time::Instant::now();
78        
79        // Validate tenant isolation
80        self.isolation.validate_operation(&self.tenant_id, &TenantOperation::CreateProjection {
81            name: self.projection_name.clone()
82        })?;
83        
84        // Validate event belongs to tenant
85        self.validate_event_belongs_to_tenant(event)?;
86        
87        // Check quotas
88        self.quota.check_quota(ResourceType::Projections, 1)?;
89        
90        // Transform event to remove tenant scoping for inner projection
91        let unscoped_event = self.unscoped_event(event.clone());
92        
93        // Delegate to inner projection
94        let result = self.inner_projection.handle_event(&unscoped_event).await;
95        
96        // Record metrics
97        let duration = start_time.elapsed();
98        let mut metrics = self.metrics.write().unwrap();
99        metrics.record_event_processing(duration, result.is_ok());
100        
101        if result.is_ok() {
102            self.quota.record_usage(ResourceType::Projections, 1);
103        }
104        
105        result
106    }
107    
108    async fn reset(&self) -> Result<()> {
109        let result = self.inner_projection.reset().await;
110        
111        if result.is_ok() {
112            let mut metrics = self.metrics.write().unwrap();
113            metrics.reset_counters();
114        }
115        
116        result
117    }
118    
119    async fn get_last_processed_position(&self) -> Result<Option<u64>> {
120        self.inner_projection.get_last_processed_position().await
121    }
122    
123    async fn set_last_processed_position(&self, position: u64) -> Result<()> {
124        self.inner_projection.set_last_processed_position(position).await
125    }
126}
127
128/// Manager for tenant-scoped projections
129pub struct TenantProjectionManager {
130    tenant_id: TenantId,
131    projections: Arc<RwLock<HashMap<String, Arc<TenantScopedProjection>>>>,
132    isolation: Arc<TenantIsolation>,
133    quota: Arc<TenantQuota>,
134    registry: Arc<RwLock<TenantProjectionRegistry>>,
135}
136
137impl TenantProjectionManager {
138    pub fn new(
139        tenant_id: TenantId,
140        isolation: Arc<TenantIsolation>,
141        quota: Arc<TenantQuota>,
142    ) -> Self {
143        Self {
144            tenant_id,
145            projections: Arc::new(RwLock::new(HashMap::new())),
146            isolation,
147            quota,
148            registry: Arc::new(RwLock::new(TenantProjectionRegistry::new())),
149        }
150    }
151    
152    /// Register a new projection for this tenant
153    pub fn register_projection(
154        &self,
155        name: String,
156        projection: Arc<dyn Projection + Send + Sync>,
157    ) -> Result<Arc<TenantScopedProjection>> {
158        // Check if we can add more projections
159        self.quota.check_quota(ResourceType::Projections, 1)?;
160        
161        let tenant_projection = Arc::new(TenantScopedProjection::new(
162            self.tenant_id.clone(),
163            name.clone(),
164            projection,
165            self.isolation.clone(),
166            self.quota.clone(),
167        ));
168        
169        // Register the projection
170        {
171            let mut projections = self.projections.write().unwrap();
172            if projections.contains_key(&name) {
173                return Err(EventualiError::Tenant(format!(
174                    "Projection '{}' already exists for tenant '{}'",
175                    name,
176                    self.tenant_id.as_str()
177                )));
178            }
179            projections.insert(name.clone(), tenant_projection.clone());
180        }
181        
182        // Update registry
183        {
184            let mut registry = self.registry.write().unwrap();
185            registry.register_projection(name, self.tenant_id.clone())?;
186        }
187        
188        // Record usage
189        self.quota.record_usage(ResourceType::Projections, 1);
190        
191        Ok(tenant_projection)
192    }
193    
194    /// Get a projection by name
195    pub fn get_projection(&self, name: &str) -> Option<Arc<TenantScopedProjection>> {
196        let projections = self.projections.read().unwrap();
197        projections.get(name).cloned()
198    }
199    
200    /// List all projections for this tenant
201    pub fn list_projections(&self) -> Vec<String> {
202        let projections = self.projections.read().unwrap();
203        projections.keys().cloned().collect()
204    }
205    
206    /// Remove a projection
207    pub fn remove_projection(&self, name: &str) -> Result<()> {
208        let mut projections = self.projections.write().unwrap();
209        
210        if projections.remove(name).is_some() {
211            let mut registry = self.registry.write().unwrap();
212            registry.unregister_projection(name);
213            Ok(())
214        } else {
215            Err(EventualiError::Tenant(format!(
216                "Projection '{}' not found for tenant '{}'",
217                name,
218                self.tenant_id.as_str()
219            )))
220        }
221    }
222    
223    /// Get aggregated metrics for all projections
224    pub fn get_aggregated_metrics(&self) -> TenantProjectionMetrics {
225        let projections = self.projections.read().unwrap();
226        let mut aggregated = TenantProjectionMetrics::new();
227        
228        for projection in projections.values() {
229            let metrics = projection.get_metrics();
230            aggregated.aggregate_with(&metrics);
231        }
232        
233        aggregated
234    }
235    
236    /// Process an event through all registered projections
237    pub async fn process_event(&self, event: Event) -> Result<()> {
238        let projections = {
239            let projections_guard = self.projections.read().unwrap();
240            projections_guard.values().cloned().collect::<Vec<_>>()
241        };
242        
243        let mut results = Vec::new();
244        
245        for projection in projections {
246            let result = projection.handle_event(&event).await;
247            results.push(result);
248        }
249        
250        // Check if all projections succeeded
251        for result in results {
252            result?;
253        }
254        
255        Ok(())
256    }
257    
258    pub fn get_registry(&self) -> TenantProjectionRegistry {
259        self.registry.read().unwrap().clone()
260    }
261}
262
263/// Registry for tracking tenant projections
264#[derive(Debug, Clone)]
265pub struct TenantProjectionRegistry {
266    projections: HashMap<String, ProjectionRegistration>,
267    #[allow(dead_code)] // Registry creation timestamp for auditing (stored but not currently accessed)
268    created_at: DateTime<Utc>,
269}
270
271impl Default for TenantProjectionRegistry {
272    fn default() -> Self {
273        Self::new()
274    }
275}
276
277impl TenantProjectionRegistry {
278    pub fn new() -> Self {
279        Self {
280            projections: HashMap::new(),
281            created_at: Utc::now(),
282        }
283    }
284    
285    pub fn register_projection(&mut self, name: String, tenant_id: TenantId) -> Result<()> {
286        let registration = ProjectionRegistration {
287            name: name.clone(),
288            tenant_id,
289            registered_at: Utc::now(),
290            last_processed: None,
291            event_count: 0,
292            status: ProjectionStatus::Active,
293        };
294        
295        self.projections.insert(name, registration);
296        Ok(())
297    }
298    
299    pub fn unregister_projection(&mut self, name: &str) {
300        self.projections.remove(name);
301    }
302    
303    pub fn get_projection_count(&self) -> usize {
304        self.projections.len()
305    }
306    
307    pub fn get_active_projections(&self) -> Vec<String> {
308        self.projections
309            .values()
310            .filter(|reg| matches!(reg.status, ProjectionStatus::Active))
311            .map(|reg| reg.name.clone())
312            .collect()
313    }
314}
315
316#[derive(Debug, Clone)]
317struct ProjectionRegistration {
318    #[allow(dead_code)] // Name stored for projection identification (used in registry operations but not directly accessed)
319    name: String,
320    #[allow(dead_code)] // Tenant ID for isolation tracking (stored but not currently queried in registration info)
321    tenant_id: TenantId,
322    #[allow(dead_code)] // Registration timestamp for audit purposes (stored but not currently accessed)
323    registered_at: DateTime<Utc>,
324    #[allow(dead_code)] // Last processing time for monitoring (stored but not actively used)
325    last_processed: Option<DateTime<Utc>>,
326    #[allow(dead_code)] // Event count tracking for analytics (stored but not currently queried)
327    event_count: u64,
328    status: ProjectionStatus,
329}
330
331#[derive(Debug, Clone)]
332enum ProjectionStatus {
333    Active,
334    #[allow(dead_code)] // Paused status for future projection management features
335    Paused,
336    #[allow(dead_code)] // Error status for future error handling and recovery
337    Error,
338}
339
340/// Performance and usage metrics for tenant projections
341#[derive(Debug, Clone)]
342pub struct TenantProjectionMetrics {
343    pub events_processed: u64,
344    pub successful_events: u64,
345    pub failed_events: u64,
346    pub total_processing_time_ms: f64,
347    pub average_processing_time_ms: f64,
348    pub max_processing_time_ms: f64,
349    pub rebuilds_performed: u64,
350    pub successful_rebuilds: u64,
351    pub last_processed: Option<DateTime<Utc>>,
352    pub last_rebuild: Option<DateTime<Utc>>,
353}
354
355impl Default for TenantProjectionMetrics {
356    fn default() -> Self {
357        Self::new()
358    }
359}
360
361impl TenantProjectionMetrics {
362    pub fn new() -> Self {
363        Self {
364            events_processed: 0,
365            successful_events: 0,
366            failed_events: 0,
367            total_processing_time_ms: 0.0,
368            average_processing_time_ms: 0.0,
369            max_processing_time_ms: 0.0,
370            rebuilds_performed: 0,
371            successful_rebuilds: 0,
372            last_processed: None,
373            last_rebuild: None,
374        }
375    }
376    
377    pub fn record_event_processing(&mut self, duration: std::time::Duration, success: bool) {
378        self.events_processed += 1;
379        
380        if success {
381            self.successful_events += 1;
382        } else {
383            self.failed_events += 1;
384        }
385        
386        let duration_ms = duration.as_millis() as f64;
387        self.total_processing_time_ms += duration_ms;
388        self.average_processing_time_ms = 
389            self.total_processing_time_ms / self.events_processed as f64;
390        
391        if duration_ms > self.max_processing_time_ms {
392            self.max_processing_time_ms = duration_ms;
393        }
394        
395        self.last_processed = Some(Utc::now());
396    }
397    
398    pub fn record_rebuild(&mut self, _duration: std::time::Duration, success: bool) {
399        self.rebuilds_performed += 1;
400        
401        if success {
402            self.successful_rebuilds += 1;
403        }
404        
405        self.last_rebuild = Some(Utc::now());
406    }
407    
408    pub fn reset_counters(&mut self) {
409        self.events_processed = 0;
410        self.successful_events = 0;
411        self.failed_events = 0;
412        self.total_processing_time_ms = 0.0;
413        self.average_processing_time_ms = 0.0;
414        self.max_processing_time_ms = 0.0;
415    }
416    
417    pub fn success_rate(&self) -> f64 {
418        if self.events_processed == 0 {
419            return 100.0;
420        }
421        (self.successful_events as f64 / self.events_processed as f64) * 100.0
422    }
423    
424    pub fn rebuild_success_rate(&self) -> f64 {
425        if self.rebuilds_performed == 0 {
426            return 100.0;
427        }
428        (self.successful_rebuilds as f64 / self.rebuilds_performed as f64) * 100.0
429    }
430    
431    pub fn is_performance_target_met(&self) -> bool {
432        // Target: average processing time < 10ms
433        self.average_processing_time_ms < 10.0
434    }
435    
436    pub fn aggregate_with(&mut self, other: &TenantProjectionMetrics) {
437        self.events_processed += other.events_processed;
438        self.successful_events += other.successful_events;
439        self.failed_events += other.failed_events;
440        self.total_processing_time_ms += other.total_processing_time_ms;
441        self.rebuilds_performed += other.rebuilds_performed;
442        self.successful_rebuilds += other.successful_rebuilds;
443        
444        if self.events_processed > 0 {
445            self.average_processing_time_ms = 
446                self.total_processing_time_ms / self.events_processed as f64;
447        }
448        
449        if other.max_processing_time_ms > self.max_processing_time_ms {
450            self.max_processing_time_ms = other.max_processing_time_ms;
451        }
452        
453        // Update timestamps to most recent
454        if other.last_processed.is_some() && 
455           (self.last_processed.is_none() || other.last_processed > self.last_processed) {
456            self.last_processed = other.last_processed;
457        }
458        
459        if other.last_rebuild.is_some() && 
460           (self.last_rebuild.is_none() || other.last_rebuild > self.last_rebuild) {
461            self.last_rebuild = other.last_rebuild;
462        }
463    }
464}
465
466/// Sample projection implementations for testing
467pub mod sample_projections {
468    use super::*;
469    use serde_json::Value;
470    
471    /// A simple analytics projection that counts events by type
472    pub struct EventAnalyticsProjection {
473        #[allow(dead_code)] // Projection name for identification (stored but not currently accessed)
474        name: String,
475        data: Arc<RwLock<HashMap<String, EventTypeCount>>>,
476    }
477    
478    impl EventAnalyticsProjection {
479        pub fn new(name: String) -> Self {
480            Self {
481                name,
482                data: Arc::new(RwLock::new(HashMap::new())),
483            }
484        }
485        
486        pub fn get_counts(&self) -> HashMap<String, EventTypeCount> {
487            self.data.read().unwrap().clone()
488        }
489    }
490    
491    #[async_trait]
492    impl Projection for EventAnalyticsProjection {
493        async fn handle_event(&self, event: &Event) -> Result<()> {
494            let mut data = self.data.write().unwrap();
495            let count = data.entry(event.event_type.clone()).or_default();
496            
497            count.total_count += 1;
498            count.last_seen = Some(Utc::now());
499            
500            // Try to parse the event data for additional analytics
501            if let crate::event::EventData::Json(json_data) = &event.data {
502                count.sample_data = Some(json_data.clone());
503            }
504            
505            Ok(())
506        }
507        
508        async fn reset(&self) -> Result<()> {
509            // Clear all data for reset
510            let mut data = self.data.write().unwrap();
511            data.clear();
512            Ok(())
513        }
514        
515        async fn get_last_processed_position(&self) -> Result<Option<u64>> {
516            // Simple implementation - doesn't track position
517            Ok(None)
518        }
519        
520        async fn set_last_processed_position(&self, _position: u64) -> Result<()> {
521            // Simple implementation - doesn't track position
522            Ok(())
523        }
524    }
525    
526    #[derive(Debug, Clone)]
527    pub struct EventTypeCount {
528        pub total_count: u64,
529        pub last_seen: Option<DateTime<Utc>>,
530        pub sample_data: Option<Value>,
531    }
532    
533    impl Default for EventTypeCount {
534        fn default() -> Self {
535            Self::new()
536        }
537    }
538
539    impl EventTypeCount {
540        pub fn new() -> Self {
541            Self {
542                total_count: 0,
543                last_seen: None,
544                sample_data: None,
545            }
546        }
547    }
548    
549    /// A user activity projection that tracks user actions
550    pub struct UserActivityProjection {
551        #[allow(dead_code)] // Projection name for identification (stored but not currently accessed)
552        name: String,
553        data: Arc<RwLock<HashMap<String, UserActivity>>>,
554    }
555    
556    impl UserActivityProjection {
557        pub fn new(name: String) -> Self {
558            Self {
559                name,
560                data: Arc::new(RwLock::new(HashMap::new())),
561            }
562        }
563        
564        pub fn get_user_activity(&self, user_id: &str) -> Option<UserActivity> {
565            self.data.read().unwrap().get(user_id).cloned()
566        }
567        
568        pub fn get_all_activities(&self) -> HashMap<String, UserActivity> {
569            self.data.read().unwrap().clone()
570        }
571    }
572    
573    #[async_trait]
574    impl Projection for UserActivityProjection {
575        async fn handle_event(&self, event: &Event) -> Result<()> {
576            // Extract user_id from event metadata or data
577            if let Some(user_id) = event.metadata.headers.get("user_id") {
578                let mut data = self.data.write().unwrap();
579                let activity = data.entry(user_id.clone()).or_insert(UserActivity::new(user_id.clone()));
580                
581                activity.total_events += 1;
582                activity.last_activity = Some(Utc::now());
583                activity.event_types.insert(event.event_type.clone());
584            }
585            
586            Ok(())
587        }
588        
589        async fn reset(&self) -> Result<()> {
590            let mut data = self.data.write().unwrap();
591            data.clear();
592            Ok(())
593        }
594        
595        async fn get_last_processed_position(&self) -> Result<Option<u64>> {
596            // Simple implementation - doesn't track position
597            Ok(None)
598        }
599        
600        async fn set_last_processed_position(&self, _position: u64) -> Result<()> {
601            // Simple implementation - doesn't track position
602            Ok(())
603        }
604    }
605    
606    #[derive(Debug, Clone)]
607    pub struct UserActivity {
608        pub user_id: String,
609        pub total_events: u64,
610        pub last_activity: Option<DateTime<Utc>>,
611        pub event_types: std::collections::HashSet<String>,
612    }
613    
614    impl UserActivity {
615        pub fn new(user_id: String) -> Self {
616            Self {
617                user_id,
618                total_events: 0,
619                last_activity: None,
620                event_types: std::collections::HashSet::new(),
621            }
622        }
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629    use super::sample_projections::*;
630    use crate::tenancy::isolation::{TenantIsolation, IsolationPolicy};
631    use crate::tenancy::quota::TenantQuota;
632    use crate::tenancy::tenant::{TenantConfig, ResourceLimits};
633    use crate::event::{Event, EventData, EventMetadata};
634    use std::collections::HashMap;
635    use chrono::Utc;
636    use uuid::Uuid;
637    
638    #[tokio::test]
639    async fn test_tenant_scoped_projection() {
640        let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
641        
642        // Set up isolation and quota
643        let isolation = Arc::new(TenantIsolation::new());
644        isolation.register_tenant(tenant_id.clone(), IsolationPolicy::strict()).unwrap();
645        
646        let limits = ResourceLimits::default();
647        let quota = Arc::new(TenantQuota::new(tenant_id.clone(), limits));
648        
649        // Create sample projection
650        let analytics_projection = Arc::new(EventAnalyticsProjection::new("analytics".to_string()));
651        
652        // Create tenant-scoped projection
653        let tenant_projection = TenantScopedProjection::new(
654            tenant_id.clone(),
655            "test-analytics".to_string(),
656            analytics_projection.clone(),
657            isolation,
658            quota,
659        );
660        
661        // Create test event with proper tenant scoping
662        let test_event = Event::new(
663            format!("{}:test-aggregate", tenant_id.db_prefix()),
664            "TestAggregate".to_string(),
665            "TestEvent".to_string(),
666            1,
667            1,
668            EventData::Json(serde_json::json!({"test": "data"}))
669        );
670        
671        // Process event
672        tenant_projection.handle_event(&test_event).await.unwrap();
673        
674        // Verify metrics
675        let metrics = tenant_projection.get_metrics();
676        assert_eq!(metrics.events_processed, 1);
677        assert_eq!(metrics.successful_events, 1);
678        assert!(metrics.is_performance_target_met());
679    }
680    
681    #[tokio::test]
682    async fn test_tenant_projection_manager() {
683        let tenant_id = TenantId::new("manager-test".to_string()).unwrap();
684        
685        let isolation = Arc::new(TenantIsolation::new());
686        isolation.register_tenant(tenant_id.clone(), IsolationPolicy::strict()).unwrap();
687        
688        let limits = ResourceLimits::default();
689        let quota = Arc::new(TenantQuota::new(tenant_id.clone(), limits));
690        
691        let manager = TenantProjectionManager::new(tenant_id.clone(), isolation, quota);
692        
693        // Register projections
694        let analytics = Arc::new(EventAnalyticsProjection::new("analytics".to_string()));
695        let user_activity = Arc::new(UserActivityProjection::new("user-activity".to_string()));
696        
697        manager.register_projection("analytics".to_string(), analytics).unwrap();
698        manager.register_projection("user-activity".to_string(), user_activity).unwrap();
699        
700        // Verify projections are registered
701        assert_eq!(manager.list_projections().len(), 2);
702        assert!(manager.get_projection("analytics").is_some());
703        assert!(manager.get_projection("user-activity").is_some());
704        
705        // Test registry
706        let registry = manager.get_registry();
707        assert_eq!(registry.get_projection_count(), 2);
708        assert_eq!(registry.get_active_projections().len(), 2);
709    }
710}