1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use crate::event::Event;
6use crate::aggregate::{AggregateId, AggregateVersion};
7use crate::store::{EventStore, EventStoreBackend};
8use crate::error::{EventualiError, Result};
9use super::tenant::TenantId;
10use super::isolation::{TenantIsolation, TenantOperation};
11use super::quota::{TenantQuota, ResourceType};
12
13pub struct TenantAwareEventStorage {
16 tenant_id: TenantId,
17 backend: Arc<dyn EventStoreBackend + Send + Sync>,
18 isolation: Arc<TenantIsolation>,
19 quota: Arc<TenantQuota>,
20 metrics: Arc<RwLock<TenantStorageMetrics>>,
21}
22
23impl TenantAwareEventStorage {
24 pub fn new(
25 tenant_id: TenantId,
26 backend: Arc<dyn EventStoreBackend + Send + Sync>,
27 isolation: Arc<TenantIsolation>,
28 quota: Arc<TenantQuota>,
29 ) -> Self {
30 Self {
31 tenant_id,
32 backend,
33 isolation,
34 quota,
35 metrics: Arc::new(RwLock::new(TenantStorageMetrics::new())),
36 }
37 }
38
39 fn tenant_scoped_event(&self, mut event: Event) -> Event {
41 event.aggregate_id = format!("{}:{}", self.tenant_id.db_prefix(), event.aggregate_id);
43
44 event.metadata.headers.insert(
46 "tenant_id".to_string(),
47 self.tenant_id.as_str().to_string()
48 );
49 event.metadata.headers.insert(
50 "tenant_namespace".to_string(),
51 self.tenant_id.db_prefix()
52 );
53
54 event
55 }
56
57 fn unscoped_event(&self, mut event: Event) -> Event {
59 let prefix = format!("{}:", self.tenant_id.db_prefix());
61 if event.aggregate_id.starts_with(&prefix) {
62 event.aggregate_id = event.aggregate_id[prefix.len()..].to_string();
63 }
64
65 event
66 }
67
68 #[allow(dead_code)] fn tenant_table_name(&self, base_name: &str) -> String {
71 format!("{}_{}", self.tenant_id.db_prefix(), base_name)
72 }
73
74 fn validate_and_record(&self, operation: TenantOperation, event_count: u64) -> Result<()> {
76 self.isolation.validate_operation(&self.tenant_id, &operation)?;
78
79 self.quota.check_quota(ResourceType::Events, event_count)?;
81
82 self.quota.record_usage(ResourceType::Events, event_count);
84
85 let mut metrics = self.metrics.write().unwrap();
87 metrics.record_operation(operation, event_count);
88
89 Ok(())
90 }
91
92 pub fn get_metrics(&self) -> TenantStorageMetrics {
93 self.metrics.read().unwrap().clone()
94 }
95}
96
97#[async_trait]
98impl EventStore for TenantAwareEventStorage {
99 async fn save_events(&self, events: Vec<Event>) -> Result<()> {
100 let start_time = std::time::Instant::now();
101
102 if let Some(first_event) = events.first() {
104 self.validate_and_record(
105 TenantOperation::CreateEvent {
106 aggregate_id: first_event.aggregate_id.clone()
107 },
108 events.len() as u64
109 )?;
110 }
111
112 let scoped_events: Vec<Event> = events
114 .into_iter()
115 .map(|event| self.tenant_scoped_event(event))
116 .collect();
117
118 let result = self.backend.save_events(scoped_events).await;
120
121 let duration = start_time.elapsed();
123 let mut metrics = self.metrics.write().unwrap();
124 metrics.record_save_operation(duration, result.is_ok());
125
126 result
127 }
128
129 async fn load_events(
130 &self,
131 aggregate_id: &AggregateId,
132 from_version: Option<AggregateVersion>,
133 ) -> Result<Vec<Event>> {
134 let start_time = std::time::Instant::now();
135
136 self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents {
138 aggregate_id: aggregate_id.clone()
139 })?;
140
141 let scoped_aggregate_id = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_id);
143
144 let result = self.backend.load_events(&scoped_aggregate_id, from_version).await;
146
147 let final_result = match result {
149 Ok(events) => {
150 let unscoped_events = events
151 .into_iter()
152 .map(|event| self.unscoped_event(event))
153 .collect::<Vec<Event>>();
154
155 let mut metrics = self.metrics.write().unwrap();
156 metrics.record_load_operation(start_time.elapsed(), true, unscoped_events.len());
157
158 Ok(unscoped_events)
159 }
160 Err(e) => {
161 let mut metrics = self.metrics.write().unwrap();
162 metrics.record_load_operation(start_time.elapsed(), false, 0);
163 Err(e)
164 }
165 };
166
167 final_result
168 }
169
170 async fn load_events_by_type(
171 &self,
172 aggregate_type: &str,
173 from_version: Option<AggregateVersion>,
174 ) -> Result<Vec<Event>> {
175 let start_time = std::time::Instant::now();
176
177 let scoped_aggregate_type = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_type);
179
180 let result = self.backend.load_events_by_type(&scoped_aggregate_type, from_version).await;
182
183 match result {
185 Ok(events) => {
186 let unscoped_events = events
187 .into_iter()
188 .map(|event| self.unscoped_event(event))
189 .collect::<Vec<Event>>();
190
191 let mut metrics = self.metrics.write().unwrap();
192 metrics.record_load_operation(start_time.elapsed(), true, unscoped_events.len());
193
194 Ok(unscoped_events)
195 }
196 Err(e) => {
197 let mut metrics = self.metrics.write().unwrap();
198 metrics.record_load_operation(start_time.elapsed(), false, 0);
199 Err(e)
200 }
201 }
202 }
203
204 async fn get_aggregate_version(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateVersion>> {
205 self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents {
207 aggregate_id: aggregate_id.clone()
208 })?;
209
210 let scoped_aggregate_id = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_id);
212
213 self.backend.get_aggregate_version(&scoped_aggregate_id).await
214 }
215
216 fn set_event_streamer(&mut self, _streamer: Arc<dyn crate::streaming::EventStreamer + Send + Sync>) {
217 }
220}
221
222#[derive(Debug, Clone)]
224pub struct TenantStorageMetrics {
225 pub tenant_id: TenantId,
226 pub total_save_operations: u64,
227 pub total_load_operations: u64,
228 pub total_events_saved: u64,
229 pub total_events_loaded: u64,
230 pub successful_saves: u64,
231 pub successful_loads: u64,
232 pub average_save_time_ms: f64,
233 pub average_load_time_ms: f64,
234 pub max_save_time_ms: f64,
235 pub max_load_time_ms: f64,
236 pub last_operation: Option<DateTime<Utc>>,
237 pub operations_by_type: HashMap<String, u64>,
238}
239
240impl Default for TenantStorageMetrics {
241 fn default() -> Self {
242 Self::new()
243 }
244}
245
246impl TenantStorageMetrics {
247 pub fn new() -> Self {
248 Self {
249 tenant_id: TenantId::generate(), total_save_operations: 0,
251 total_load_operations: 0,
252 total_events_saved: 0,
253 total_events_loaded: 0,
254 successful_saves: 0,
255 successful_loads: 0,
256 average_save_time_ms: 0.0,
257 average_load_time_ms: 0.0,
258 max_save_time_ms: 0.0,
259 max_load_time_ms: 0.0,
260 last_operation: None,
261 operations_by_type: HashMap::new(),
262 }
263 }
264
265 pub fn record_operation(&mut self, operation: TenantOperation, _event_count: u64) {
266 self.last_operation = Some(Utc::now());
267
268 let operation_type = match operation {
269 TenantOperation::CreateEvent { .. } => "create_event",
270 TenantOperation::ReadEvents { .. } => "read_events",
271 TenantOperation::CreateProjection { .. } => "create_projection",
272 TenantOperation::StreamEvents { .. } => "stream_events",
273 };
274
275 *self.operations_by_type.entry(operation_type.to_string()).or_insert(0) += 1;
276 }
277
278 pub fn record_save_operation(&mut self, duration: std::time::Duration, success: bool) {
279 self.total_save_operations += 1;
280 if success {
281 self.successful_saves += 1;
282 }
283
284 let duration_ms = duration.as_millis() as f64;
285 self.average_save_time_ms =
286 (self.average_save_time_ms * (self.total_save_operations - 1) as f64 + duration_ms)
287 / self.total_save_operations as f64;
288
289 if duration_ms > self.max_save_time_ms {
290 self.max_save_time_ms = duration_ms;
291 }
292
293 self.last_operation = Some(Utc::now());
294 }
295
296 pub fn record_load_operation(&mut self, duration: std::time::Duration, success: bool, event_count: usize) {
297 self.total_load_operations += 1;
298 if success {
299 self.successful_loads += 1;
300 self.total_events_loaded += event_count as u64;
301 }
302
303 let duration_ms = duration.as_millis() as f64;
304 self.average_load_time_ms =
305 (self.average_load_time_ms * (self.total_load_operations - 1) as f64 + duration_ms)
306 / self.total_load_operations as f64;
307
308 if duration_ms > self.max_load_time_ms {
309 self.max_load_time_ms = duration_ms;
310 }
311
312 self.last_operation = Some(Utc::now());
313 }
314
315 pub fn save_success_rate(&self) -> f64 {
317 if self.total_save_operations == 0 {
318 return 100.0;
319 }
320 (self.successful_saves as f64 / self.total_save_operations as f64) * 100.0
321 }
322
323 pub fn load_success_rate(&self) -> f64 {
324 if self.total_load_operations == 0 {
325 return 100.0;
326 }
327 (self.successful_loads as f64 / self.total_load_operations as f64) * 100.0
328 }
329
330 pub fn is_performance_target_met(&self) -> bool {
332 self.average_save_time_ms < 50.0 && self.average_load_time_ms < 20.0
333 }
334}
335
336pub struct TenantEventBatch {
338 #[allow(dead_code)] tenant_id: TenantId,
340 events: Vec<Event>,
341 max_batch_size: usize,
342}
343
344impl TenantEventBatch {
345 pub fn new(tenant_id: TenantId, max_batch_size: Option<usize>) -> Self {
346 Self {
347 tenant_id,
348 events: Vec::new(),
349 max_batch_size: max_batch_size.unwrap_or(1000),
350 }
351 }
352
353 pub fn add_event(&mut self, event: Event) -> Result<()> {
354 if self.events.len() >= self.max_batch_size {
355 return Err(EventualiError::Tenant(format!(
356 "Batch size limit exceeded: {} events",
357 self.max_batch_size
358 )));
359 }
360
361 self.events.push(event);
362 Ok(())
363 }
364
365 pub fn size(&self) -> usize {
366 self.events.len()
367 }
368
369 pub fn is_full(&self) -> bool {
370 self.events.len() >= self.max_batch_size
371 }
372
373 pub fn clear(&mut self) {
374 self.events.clear();
375 }
376
377 pub async fn flush(&mut self, storage: &TenantAwareEventStorage) -> Result<()> {
378 if self.events.is_empty() {
379 return Ok(());
380 }
381
382 let events_to_save = std::mem::take(&mut self.events);
383 storage.save_events(events_to_save).await?;
384
385 Ok(())
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use crate::store::sqlite::SQLiteBackend;
393 use crate::tenancy::isolation::{TenantIsolation, IsolationPolicy};
394 use crate::tenancy::quota::{TenantQuota};
395 use crate::tenancy::tenant::{TenantConfig, ResourceLimits};
396
397 #[tokio::test]
398 async fn test_tenant_aware_storage_isolation() {
399 let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
401
402 let mut backend = SQLiteBackend::new("sqlite://:memory:".to_string()).unwrap();
404 backend.initialize().await.unwrap();
405
406 let isolation = Arc::new(TenantIsolation::new());
408 isolation.register_tenant(tenant_id.clone(), IsolationPolicy::strict()).unwrap();
409
410 let limits = ResourceLimits::default();
411 let quota = Arc::new(TenantQuota::new(tenant_id.clone(), limits));
412
413 let storage = TenantAwareEventStorage::new(
415 tenant_id.clone(),
416 Arc::new(backend),
417 isolation,
418 quota,
419 );
420
421 let test_event = Event::new(
423 "test-aggregate".to_string(),
424 "TestAggregate".to_string(),
425 "TestEvent".to_string(),
426 1,
427 1,
428 EventData::Json(serde_json::json!({"test": "data"}))
429 );
430
431 storage.save_events(vec![test_event.clone()]).await.unwrap();
433
434 let loaded_events = storage.load_events(&"test-aggregate".to_string(), None).await.unwrap();
436
437 assert_eq!(loaded_events.len(), 1);
438 assert_eq!(loaded_events[0].aggregate_id, "test-aggregate");
439 assert_eq!(loaded_events[0].event_type, "TestEvent");
440
441 let metrics = storage.get_metrics();
443 assert_eq!(metrics.total_save_operations, 1);
444 assert_eq!(metrics.total_load_operations, 1);
445 assert!(metrics.is_performance_target_met());
446 }
447
448 #[test]
449 fn test_tenant_event_batch() {
450 let tenant_id = TenantId::new("batch-test".to_string()).unwrap();
451 let mut batch = TenantEventBatch::new(tenant_id, Some(3));
452
453 for i in 0..2 {
455 let event = Event::new(
456 format!("aggregate-{}", i),
457 "TestAggregate".to_string(),
458 "TestEvent".to_string(),
459 1,
460 i as i64 + 1,
461 EventData::Json(serde_json::json!({"index": i}))
462 );
463
464 batch.add_event(event).unwrap();
465 }
466
467 assert_eq!(batch.size(), 2);
468 assert!(!batch.is_full());
469
470 let event = Event::new(
472 "aggregate-3".to_string(),
473 "TestAggregate".to_string(),
474 "TestEvent".to_string(),
475 1,
476 3,
477 EventData::Json(serde_json::json!({"index": 3}))
478 );
479
480 batch.add_event(event).unwrap();
481 assert!(batch.is_full());
482
483 let overflow_event = Event::new(
485 "aggregate-4".to_string(),
486 "TestAggregate".to_string(),
487 "TestEvent".to_string(),
488 1,
489 4,
490 EventData::Json(serde_json::json!({"index": 4}))
491 );
492
493 assert!(batch.add_event(overflow_event).is_err());
494 }
495}