1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use crate::event::Event;
6use crate::streaming::Projection;
7use crate::error::{EventualiError, Result};
8use super::tenant::TenantId;
9use super::isolation::{TenantIsolation, TenantOperation};
10use super::quota::{TenantQuota, ResourceType};
11
12pub struct TenantScopedProjection {
14 tenant_id: TenantId,
15 projection_name: String,
16 inner_projection: Arc<dyn Projection + Send + Sync>,
17 isolation: Arc<TenantIsolation>,
18 quota: Arc<TenantQuota>,
19 metrics: Arc<RwLock<TenantProjectionMetrics>>,
20}
21
22impl TenantScopedProjection {
23 pub fn new(
24 tenant_id: TenantId,
25 projection_name: String,
26 inner_projection: Arc<dyn Projection + Send + Sync>,
27 isolation: Arc<TenantIsolation>,
28 quota: Arc<TenantQuota>,
29 ) -> Self {
30 Self {
31 tenant_id,
32 projection_name,
33 inner_projection,
34 isolation,
35 quota,
36 metrics: Arc::new(RwLock::new(TenantProjectionMetrics::new())),
37 }
38 }
39
40 pub fn scoped_name(&self) -> String {
42 format!("{}:{}", self.tenant_id.db_prefix(), self.projection_name)
43 }
44
45 fn validate_event_belongs_to_tenant(&self, event: &Event) -> Result<()> {
47 let expected_prefix = format!("{}:", self.tenant_id.db_prefix());
48
49 if !event.aggregate_id.starts_with(&expected_prefix) {
50 return Err(EventualiError::Tenant(format!(
51 "Event aggregate_id '{}' does not belong to tenant '{}'",
52 event.aggregate_id,
53 self.tenant_id.as_str()
54 )));
55 }
56
57 Ok(())
58 }
59
60 fn unscoped_event(&self, mut event: Event) -> Event {
62 let prefix = format!("{}:", self.tenant_id.db_prefix());
63 if event.aggregate_id.starts_with(&prefix) {
64 event.aggregate_id = event.aggregate_id[prefix.len()..].to_string();
65 }
66 event
67 }
68
69 pub fn get_metrics(&self) -> TenantProjectionMetrics {
70 self.metrics.read().unwrap().clone()
71 }
72}
73
74#[async_trait]
75impl Projection for TenantScopedProjection {
76 async fn handle_event(&self, event: &Event) -> Result<()> {
77 let start_time = std::time::Instant::now();
78
79 self.isolation.validate_operation(&self.tenant_id, &TenantOperation::CreateProjection {
81 name: self.projection_name.clone()
82 })?;
83
84 self.validate_event_belongs_to_tenant(event)?;
86
87 self.quota.check_quota(ResourceType::Projections, 1)?;
89
90 let unscoped_event = self.unscoped_event(event.clone());
92
93 let result = self.inner_projection.handle_event(&unscoped_event).await;
95
96 let duration = start_time.elapsed();
98 let mut metrics = self.metrics.write().unwrap();
99 metrics.record_event_processing(duration, result.is_ok());
100
101 if result.is_ok() {
102 self.quota.record_usage(ResourceType::Projections, 1);
103 }
104
105 result
106 }
107
108 async fn reset(&self) -> Result<()> {
109 let result = self.inner_projection.reset().await;
110
111 if result.is_ok() {
112 let mut metrics = self.metrics.write().unwrap();
113 metrics.reset_counters();
114 }
115
116 result
117 }
118
119 async fn get_last_processed_position(&self) -> Result<Option<u64>> {
120 self.inner_projection.get_last_processed_position().await
121 }
122
123 async fn set_last_processed_position(&self, position: u64) -> Result<()> {
124 self.inner_projection.set_last_processed_position(position).await
125 }
126}
127
128pub struct TenantProjectionManager {
130 tenant_id: TenantId,
131 projections: Arc<RwLock<HashMap<String, Arc<TenantScopedProjection>>>>,
132 isolation: Arc<TenantIsolation>,
133 quota: Arc<TenantQuota>,
134 registry: Arc<RwLock<TenantProjectionRegistry>>,
135}
136
137impl TenantProjectionManager {
138 pub fn new(
139 tenant_id: TenantId,
140 isolation: Arc<TenantIsolation>,
141 quota: Arc<TenantQuota>,
142 ) -> Self {
143 Self {
144 tenant_id,
145 projections: Arc::new(RwLock::new(HashMap::new())),
146 isolation,
147 quota,
148 registry: Arc::new(RwLock::new(TenantProjectionRegistry::new())),
149 }
150 }
151
152 pub fn register_projection(
154 &self,
155 name: String,
156 projection: Arc<dyn Projection + Send + Sync>,
157 ) -> Result<Arc<TenantScopedProjection>> {
158 self.quota.check_quota(ResourceType::Projections, 1)?;
160
161 let tenant_projection = Arc::new(TenantScopedProjection::new(
162 self.tenant_id.clone(),
163 name.clone(),
164 projection,
165 self.isolation.clone(),
166 self.quota.clone(),
167 ));
168
169 {
171 let mut projections = self.projections.write().unwrap();
172 if projections.contains_key(&name) {
173 return Err(EventualiError::Tenant(format!(
174 "Projection '{}' already exists for tenant '{}'",
175 name,
176 self.tenant_id.as_str()
177 )));
178 }
179 projections.insert(name.clone(), tenant_projection.clone());
180 }
181
182 {
184 let mut registry = self.registry.write().unwrap();
185 registry.register_projection(name, self.tenant_id.clone())?;
186 }
187
188 self.quota.record_usage(ResourceType::Projections, 1);
190
191 Ok(tenant_projection)
192 }
193
194 pub fn get_projection(&self, name: &str) -> Option<Arc<TenantScopedProjection>> {
196 let projections = self.projections.read().unwrap();
197 projections.get(name).cloned()
198 }
199
200 pub fn list_projections(&self) -> Vec<String> {
202 let projections = self.projections.read().unwrap();
203 projections.keys().cloned().collect()
204 }
205
206 pub fn remove_projection(&self, name: &str) -> Result<()> {
208 let mut projections = self.projections.write().unwrap();
209
210 if projections.remove(name).is_some() {
211 let mut registry = self.registry.write().unwrap();
212 registry.unregister_projection(name);
213 Ok(())
214 } else {
215 Err(EventualiError::Tenant(format!(
216 "Projection '{}' not found for tenant '{}'",
217 name,
218 self.tenant_id.as_str()
219 )))
220 }
221 }
222
223 pub fn get_aggregated_metrics(&self) -> TenantProjectionMetrics {
225 let projections = self.projections.read().unwrap();
226 let mut aggregated = TenantProjectionMetrics::new();
227
228 for projection in projections.values() {
229 let metrics = projection.get_metrics();
230 aggregated.aggregate_with(&metrics);
231 }
232
233 aggregated
234 }
235
236 pub async fn process_event(&self, event: Event) -> Result<()> {
238 let projections = {
239 let projections_guard = self.projections.read().unwrap();
240 projections_guard.values().cloned().collect::<Vec<_>>()
241 };
242
243 let mut results = Vec::new();
244
245 for projection in projections {
246 let result = projection.handle_event(&event).await;
247 results.push(result);
248 }
249
250 for result in results {
252 result?;
253 }
254
255 Ok(())
256 }
257
258 pub fn get_registry(&self) -> TenantProjectionRegistry {
259 self.registry.read().unwrap().clone()
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct TenantProjectionRegistry {
266 projections: HashMap<String, ProjectionRegistration>,
267 #[allow(dead_code)] created_at: DateTime<Utc>,
269}
270
271impl Default for TenantProjectionRegistry {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277impl TenantProjectionRegistry {
278 pub fn new() -> Self {
279 Self {
280 projections: HashMap::new(),
281 created_at: Utc::now(),
282 }
283 }
284
285 pub fn register_projection(&mut self, name: String, tenant_id: TenantId) -> Result<()> {
286 let registration = ProjectionRegistration {
287 name: name.clone(),
288 tenant_id,
289 registered_at: Utc::now(),
290 last_processed: None,
291 event_count: 0,
292 status: ProjectionStatus::Active,
293 };
294
295 self.projections.insert(name, registration);
296 Ok(())
297 }
298
299 pub fn unregister_projection(&mut self, name: &str) {
300 self.projections.remove(name);
301 }
302
303 pub fn get_projection_count(&self) -> usize {
304 self.projections.len()
305 }
306
307 pub fn get_active_projections(&self) -> Vec<String> {
308 self.projections
309 .values()
310 .filter(|reg| matches!(reg.status, ProjectionStatus::Active))
311 .map(|reg| reg.name.clone())
312 .collect()
313 }
314}
315
316#[derive(Debug, Clone)]
317struct ProjectionRegistration {
318 #[allow(dead_code)] name: String,
320 #[allow(dead_code)] tenant_id: TenantId,
322 #[allow(dead_code)] registered_at: DateTime<Utc>,
324 #[allow(dead_code)] last_processed: Option<DateTime<Utc>>,
326 #[allow(dead_code)] event_count: u64,
328 status: ProjectionStatus,
329}
330
331#[derive(Debug, Clone)]
332enum ProjectionStatus {
333 Active,
334 #[allow(dead_code)] Paused,
336 #[allow(dead_code)] Error,
338}
339
340#[derive(Debug, Clone)]
342pub struct TenantProjectionMetrics {
343 pub events_processed: u64,
344 pub successful_events: u64,
345 pub failed_events: u64,
346 pub total_processing_time_ms: f64,
347 pub average_processing_time_ms: f64,
348 pub max_processing_time_ms: f64,
349 pub rebuilds_performed: u64,
350 pub successful_rebuilds: u64,
351 pub last_processed: Option<DateTime<Utc>>,
352 pub last_rebuild: Option<DateTime<Utc>>,
353}
354
355impl Default for TenantProjectionMetrics {
356 fn default() -> Self {
357 Self::new()
358 }
359}
360
361impl TenantProjectionMetrics {
362 pub fn new() -> Self {
363 Self {
364 events_processed: 0,
365 successful_events: 0,
366 failed_events: 0,
367 total_processing_time_ms: 0.0,
368 average_processing_time_ms: 0.0,
369 max_processing_time_ms: 0.0,
370 rebuilds_performed: 0,
371 successful_rebuilds: 0,
372 last_processed: None,
373 last_rebuild: None,
374 }
375 }
376
377 pub fn record_event_processing(&mut self, duration: std::time::Duration, success: bool) {
378 self.events_processed += 1;
379
380 if success {
381 self.successful_events += 1;
382 } else {
383 self.failed_events += 1;
384 }
385
386 let duration_ms = duration.as_millis() as f64;
387 self.total_processing_time_ms += duration_ms;
388 self.average_processing_time_ms =
389 self.total_processing_time_ms / self.events_processed as f64;
390
391 if duration_ms > self.max_processing_time_ms {
392 self.max_processing_time_ms = duration_ms;
393 }
394
395 self.last_processed = Some(Utc::now());
396 }
397
398 pub fn record_rebuild(&mut self, _duration: std::time::Duration, success: bool) {
399 self.rebuilds_performed += 1;
400
401 if success {
402 self.successful_rebuilds += 1;
403 }
404
405 self.last_rebuild = Some(Utc::now());
406 }
407
408 pub fn reset_counters(&mut self) {
409 self.events_processed = 0;
410 self.successful_events = 0;
411 self.failed_events = 0;
412 self.total_processing_time_ms = 0.0;
413 self.average_processing_time_ms = 0.0;
414 self.max_processing_time_ms = 0.0;
415 }
416
417 pub fn success_rate(&self) -> f64 {
418 if self.events_processed == 0 {
419 return 100.0;
420 }
421 (self.successful_events as f64 / self.events_processed as f64) * 100.0
422 }
423
424 pub fn rebuild_success_rate(&self) -> f64 {
425 if self.rebuilds_performed == 0 {
426 return 100.0;
427 }
428 (self.successful_rebuilds as f64 / self.rebuilds_performed as f64) * 100.0
429 }
430
431 pub fn is_performance_target_met(&self) -> bool {
432 self.average_processing_time_ms < 10.0
434 }
435
436 pub fn aggregate_with(&mut self, other: &TenantProjectionMetrics) {
437 self.events_processed += other.events_processed;
438 self.successful_events += other.successful_events;
439 self.failed_events += other.failed_events;
440 self.total_processing_time_ms += other.total_processing_time_ms;
441 self.rebuilds_performed += other.rebuilds_performed;
442 self.successful_rebuilds += other.successful_rebuilds;
443
444 if self.events_processed > 0 {
445 self.average_processing_time_ms =
446 self.total_processing_time_ms / self.events_processed as f64;
447 }
448
449 if other.max_processing_time_ms > self.max_processing_time_ms {
450 self.max_processing_time_ms = other.max_processing_time_ms;
451 }
452
453 if other.last_processed.is_some() &&
455 (self.last_processed.is_none() || other.last_processed > self.last_processed) {
456 self.last_processed = other.last_processed;
457 }
458
459 if other.last_rebuild.is_some() &&
460 (self.last_rebuild.is_none() || other.last_rebuild > self.last_rebuild) {
461 self.last_rebuild = other.last_rebuild;
462 }
463 }
464}
465
466pub mod sample_projections {
468 use super::*;
469 use serde_json::Value;
470
471 pub struct EventAnalyticsProjection {
473 #[allow(dead_code)] name: String,
475 data: Arc<RwLock<HashMap<String, EventTypeCount>>>,
476 }
477
478 impl EventAnalyticsProjection {
479 pub fn new(name: String) -> Self {
480 Self {
481 name,
482 data: Arc::new(RwLock::new(HashMap::new())),
483 }
484 }
485
486 pub fn get_counts(&self) -> HashMap<String, EventTypeCount> {
487 self.data.read().unwrap().clone()
488 }
489 }
490
491 #[async_trait]
492 impl Projection for EventAnalyticsProjection {
493 async fn handle_event(&self, event: &Event) -> Result<()> {
494 let mut data = self.data.write().unwrap();
495 let count = data.entry(event.event_type.clone()).or_default();
496
497 count.total_count += 1;
498 count.last_seen = Some(Utc::now());
499
500 if let crate::event::EventData::Json(json_data) = &event.data {
502 count.sample_data = Some(json_data.clone());
503 }
504
505 Ok(())
506 }
507
508 async fn reset(&self) -> Result<()> {
509 let mut data = self.data.write().unwrap();
511 data.clear();
512 Ok(())
513 }
514
515 async fn get_last_processed_position(&self) -> Result<Option<u64>> {
516 Ok(None)
518 }
519
520 async fn set_last_processed_position(&self, _position: u64) -> Result<()> {
521 Ok(())
523 }
524 }
525
526 #[derive(Debug, Clone)]
527 pub struct EventTypeCount {
528 pub total_count: u64,
529 pub last_seen: Option<DateTime<Utc>>,
530 pub sample_data: Option<Value>,
531 }
532
533 impl Default for EventTypeCount {
534 fn default() -> Self {
535 Self::new()
536 }
537 }
538
539 impl EventTypeCount {
540 pub fn new() -> Self {
541 Self {
542 total_count: 0,
543 last_seen: None,
544 sample_data: None,
545 }
546 }
547 }
548
549 pub struct UserActivityProjection {
551 #[allow(dead_code)] name: String,
553 data: Arc<RwLock<HashMap<String, UserActivity>>>,
554 }
555
556 impl UserActivityProjection {
557 pub fn new(name: String) -> Self {
558 Self {
559 name,
560 data: Arc::new(RwLock::new(HashMap::new())),
561 }
562 }
563
564 pub fn get_user_activity(&self, user_id: &str) -> Option<UserActivity> {
565 self.data.read().unwrap().get(user_id).cloned()
566 }
567
568 pub fn get_all_activities(&self) -> HashMap<String, UserActivity> {
569 self.data.read().unwrap().clone()
570 }
571 }
572
573 #[async_trait]
574 impl Projection for UserActivityProjection {
575 async fn handle_event(&self, event: &Event) -> Result<()> {
576 if let Some(user_id) = event.metadata.headers.get("user_id") {
578 let mut data = self.data.write().unwrap();
579 let activity = data.entry(user_id.clone()).or_insert(UserActivity::new(user_id.clone()));
580
581 activity.total_events += 1;
582 activity.last_activity = Some(Utc::now());
583 activity.event_types.insert(event.event_type.clone());
584 }
585
586 Ok(())
587 }
588
589 async fn reset(&self) -> Result<()> {
590 let mut data = self.data.write().unwrap();
591 data.clear();
592 Ok(())
593 }
594
595 async fn get_last_processed_position(&self) -> Result<Option<u64>> {
596 Ok(None)
598 }
599
600 async fn set_last_processed_position(&self, _position: u64) -> Result<()> {
601 Ok(())
603 }
604 }
605
606 #[derive(Debug, Clone)]
607 pub struct UserActivity {
608 pub user_id: String,
609 pub total_events: u64,
610 pub last_activity: Option<DateTime<Utc>>,
611 pub event_types: std::collections::HashSet<String>,
612 }
613
614 impl UserActivity {
615 pub fn new(user_id: String) -> Self {
616 Self {
617 user_id,
618 total_events: 0,
619 last_activity: None,
620 event_types: std::collections::HashSet::new(),
621 }
622 }
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629 use super::sample_projections::*;
630 use crate::tenancy::isolation::{TenantIsolation, IsolationPolicy};
631 use crate::tenancy::quota::TenantQuota;
632 use crate::tenancy::tenant::{TenantConfig, ResourceLimits};
633 use crate::event::{Event, EventData, EventMetadata};
634 use std::collections::HashMap;
635 use chrono::Utc;
636 use uuid::Uuid;
637
638 #[tokio::test]
639 async fn test_tenant_scoped_projection() {
640 let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
641
642 let isolation = Arc::new(TenantIsolation::new());
644 isolation.register_tenant(tenant_id.clone(), IsolationPolicy::strict()).unwrap();
645
646 let limits = ResourceLimits::default();
647 let quota = Arc::new(TenantQuota::new(tenant_id.clone(), limits));
648
649 let analytics_projection = Arc::new(EventAnalyticsProjection::new("analytics".to_string()));
651
652 let tenant_projection = TenantScopedProjection::new(
654 tenant_id.clone(),
655 "test-analytics".to_string(),
656 analytics_projection.clone(),
657 isolation,
658 quota,
659 );
660
661 let test_event = Event::new(
663 format!("{}:test-aggregate", tenant_id.db_prefix()),
664 "TestAggregate".to_string(),
665 "TestEvent".to_string(),
666 1,
667 1,
668 EventData::Json(serde_json::json!({"test": "data"}))
669 );
670
671 tenant_projection.handle_event(&test_event).await.unwrap();
673
674 let metrics = tenant_projection.get_metrics();
676 assert_eq!(metrics.events_processed, 1);
677 assert_eq!(metrics.successful_events, 1);
678 assert!(metrics.is_performance_target_met());
679 }
680
681 #[tokio::test]
682 async fn test_tenant_projection_manager() {
683 let tenant_id = TenantId::new("manager-test".to_string()).unwrap();
684
685 let isolation = Arc::new(TenantIsolation::new());
686 isolation.register_tenant(tenant_id.clone(), IsolationPolicy::strict()).unwrap();
687
688 let limits = ResourceLimits::default();
689 let quota = Arc::new(TenantQuota::new(tenant_id.clone(), limits));
690
691 let manager = TenantProjectionManager::new(tenant_id.clone(), isolation, quota);
692
693 let analytics = Arc::new(EventAnalyticsProjection::new("analytics".to_string()));
695 let user_activity = Arc::new(UserActivityProjection::new("user-activity".to_string()));
696
697 manager.register_projection("analytics".to_string(), analytics).unwrap();
698 manager.register_projection("user-activity".to_string(), user_activity).unwrap();
699
700 assert_eq!(manager.list_projections().len(), 2);
702 assert!(manager.get_projection("analytics").is_some());
703 assert!(manager.get_projection("user-activity").is_some());
704
705 let registry = manager.get_registry();
707 assert_eq!(registry.get_projection_count(), 2);
708 assert_eq!(registry.get_active_projections().len(), 2);
709 }
710}