1use std::sync::{Arc, RwLock};
2use std::collections::HashMap;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5
6use crate::event::Event;
7use crate::aggregate::{AggregateId, AggregateVersion};
8use crate::store::EventStore;
9use crate::error::{EventualiError, Result};
10use super::tenant::{TenantId, TenantError};
11
12pub struct TenantIsolation {
14 isolation_policies: Arc<RwLock<HashMap<TenantId, IsolationPolicy>>>,
15 performance_monitor: Arc<RwLock<IsolationMetrics>>,
16}
17
18impl Default for TenantIsolation {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl TenantIsolation {
25 pub fn new() -> Self {
26 Self {
27 isolation_policies: Arc::new(RwLock::new(HashMap::new())),
28 performance_monitor: Arc::new(RwLock::new(IsolationMetrics::new())),
29 }
30 }
31
32 pub fn register_tenant(&self, tenant_id: TenantId, policy: IsolationPolicy) -> Result<()> {
34 let mut policies = self.isolation_policies.write().unwrap();
35 policies.insert(tenant_id, policy);
36 Ok(())
37 }
38
39 pub fn validate_operation(&self, tenant_id: &TenantId, operation: &TenantOperation) -> Result<()> {
41 let start_time = std::time::Instant::now();
42
43 let policies = self.isolation_policies.read().unwrap();
44 let policy = policies.get(tenant_id)
45 .ok_or_else(|| EventualiError::from(TenantError::TenantNotFound(tenant_id.clone())))?;
46
47 let result = policy.validate_operation(operation);
48
49 let duration = start_time.elapsed();
51 if duration.as_millis() > 10 {
52 eprintln!("Warning: Tenant isolation check took {}ms", duration.as_millis());
54 }
55
56 let mut metrics = self.performance_monitor.write().unwrap();
57 metrics.record_validation(duration, result.is_ok());
58
59 result
60 }
61
62 pub fn get_metrics(&self) -> IsolationMetrics {
64 self.performance_monitor.read().unwrap().clone()
65 }
66}
67
68#[derive(Debug, Clone)]
70pub enum TenantOperation {
71 CreateEvent { aggregate_id: AggregateId },
72 ReadEvents { aggregate_id: AggregateId },
73 CreateProjection { name: String },
74 StreamEvents { from_timestamp: Option<DateTime<Utc>> },
75}
76
77#[derive(Debug, Clone)]
79pub struct IsolationPolicy {
80 pub enforce_namespace: bool,
81 pub validate_access_patterns: bool,
82 pub audit_all_operations: bool,
83 pub max_cross_tenant_references: Option<u32>,
84}
85
86impl IsolationPolicy {
87 pub fn strict() -> Self {
88 Self {
89 enforce_namespace: true,
90 validate_access_patterns: true,
91 audit_all_operations: true,
92 max_cross_tenant_references: Some(0),
93 }
94 }
95
96 pub fn relaxed() -> Self {
97 Self {
98 enforce_namespace: true,
99 validate_access_patterns: false,
100 audit_all_operations: false,
101 max_cross_tenant_references: Some(10),
102 }
103 }
104
105 fn validate_operation(&self, operation: &TenantOperation) -> Result<()> {
106 match operation {
108 TenantOperation::CreateEvent { aggregate_id } => {
109 if self.enforce_namespace && !self.validate_aggregate_namespace(aggregate_id) {
110 return Err(EventualiError::from(TenantError::IsolationViolation(
111 "Aggregate ID does not match tenant namespace".to_string()
112 )));
113 }
114 },
115 TenantOperation::ReadEvents { aggregate_id } => {
116 if self.enforce_namespace && !self.validate_aggregate_namespace(aggregate_id) {
117 return Err(EventualiError::from(TenantError::IsolationViolation(
118 "Aggregate ID does not match tenant namespace".to_string()
119 )));
120 }
121 },
122 TenantOperation::CreateProjection { .. } => {
123 },
125 TenantOperation::StreamEvents { .. } => {
126 },
128 }
129 Ok(())
130 }
131
132 fn validate_aggregate_namespace(&self, _aggregate_id: &AggregateId) -> bool {
133 true
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct IsolationMetrics {
141 pub total_validations: u64,
142 pub successful_validations: u64,
143 pub average_validation_time_ms: f64,
144 pub max_validation_time_ms: f64,
145 pub violations_detected: u64,
146 pub last_updated: DateTime<Utc>,
147}
148
149impl Default for IsolationMetrics {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154
155impl IsolationMetrics {
156 pub fn new() -> Self {
157 Self {
158 total_validations: 0,
159 successful_validations: 0,
160 average_validation_time_ms: 0.0,
161 max_validation_time_ms: 0.0,
162 violations_detected: 0,
163 last_updated: Utc::now(),
164 }
165 }
166
167 pub fn record_validation(&mut self, duration: std::time::Duration, success: bool) {
168 self.total_validations += 1;
169 if success {
170 self.successful_validations += 1;
171 } else {
172 self.violations_detected += 1;
173 }
174
175 let duration_ms = duration.as_millis() as f64;
176 self.average_validation_time_ms =
177 (self.average_validation_time_ms * (self.total_validations - 1) as f64 + duration_ms)
178 / self.total_validations as f64;
179
180 if duration_ms > self.max_validation_time_ms {
181 self.max_validation_time_ms = duration_ms;
182 }
183
184 self.last_updated = Utc::now();
185 }
186
187 pub fn is_performance_target_met(&self) -> bool {
189 self.average_validation_time_ms < 10.0 && self.max_validation_time_ms < 50.0
190 }
191
192 pub fn isolation_success_rate(&self) -> f64 {
194 if self.total_validations == 0 {
195 return 100.0;
196 }
197 (self.successful_validations as f64 / self.total_validations as f64) * 100.0
198 }
199}
200
201pub struct IsolatedEventStore {
203 tenant_id: TenantId,
204 inner_store: Arc<dyn EventStore + Send + Sync>,
205 isolation: Arc<TenantIsolation>,
206}
207
208impl IsolatedEventStore {
209 pub fn new(
210 tenant_id: TenantId,
211 inner_store: Arc<dyn EventStore + Send + Sync>,
212 isolation: Arc<TenantIsolation>
213 ) -> Self {
214 Self {
215 tenant_id,
216 inner_store,
217 isolation,
218 }
219 }
220
221 pub fn tenant_id(&self) -> &TenantId {
223 &self.tenant_id
224 }
225
226 fn tenant_scoped_aggregate_id(&self, aggregate_id: &AggregateId) -> AggregateId {
228 format!("{}:{}", self.tenant_id.db_prefix(), aggregate_id)
229 }
230}
231
232#[async_trait]
233impl EventStore for IsolatedEventStore {
234 async fn save_events(&self, events: Vec<Event>) -> Result<()> {
235 let mut scoped_events = Vec::new();
237
238 for mut event in events {
239 self.isolation.validate_operation(&self.tenant_id, &TenantOperation::CreateEvent {
241 aggregate_id: event.aggregate_id.clone()
242 })?;
243
244 event.aggregate_id = self.tenant_scoped_aggregate_id(&event.aggregate_id);
246 scoped_events.push(event);
247 }
248
249 self.inner_store.save_events(scoped_events).await
251 }
252
253 async fn load_events(&self, aggregate_id: &AggregateId, from_version: Option<AggregateVersion>) -> Result<Vec<Event>> {
254 self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents {
256 aggregate_id: aggregate_id.clone()
257 })?;
258
259 let scoped_aggregate_id = self.tenant_scoped_aggregate_id(aggregate_id);
261
262 let mut events = self.inner_store.load_events(&scoped_aggregate_id, from_version).await?;
264
265 for event in &mut events {
267 event.aggregate_id = aggregate_id.clone();
268 }
269
270 Ok(events)
271 }
272
273 async fn load_events_by_type(&self, aggregate_type: &str, from_version: Option<AggregateVersion>) -> Result<Vec<Event>> {
274 let scoped_aggregate_type = format!("{}:{}", self.tenant_id.db_prefix(), aggregate_type);
276
277 let mut events = self.inner_store.load_events_by_type(&scoped_aggregate_type, from_version).await?;
279
280 for event in &mut events {
282 if let Some(unscoped) = event.aggregate_id.strip_prefix(&format!("{}:", self.tenant_id.db_prefix())) {
284 event.aggregate_id = unscoped.to_string();
285 }
286 }
287
288 Ok(events)
289 }
290
291 async fn get_aggregate_version(&self, aggregate_id: &AggregateId) -> Result<Option<AggregateVersion>> {
292 self.isolation.validate_operation(&self.tenant_id, &TenantOperation::ReadEvents {
294 aggregate_id: aggregate_id.clone()
295 })?;
296
297 let scoped_aggregate_id = self.tenant_scoped_aggregate_id(aggregate_id);
299
300 self.inner_store.get_aggregate_version(&scoped_aggregate_id).await
302 }
303
304 fn set_event_streamer(&mut self, _streamer: Arc<dyn crate::streaming::EventStreamer + Send + Sync>) {
305 }
309}
310
311pub struct TenantScope {
313 pub tenant_id: TenantId,
314 pub context: TenantContext,
315}
316
317impl TenantScope {
318 pub fn new(tenant_id: TenantId) -> Self {
319 Self {
320 tenant_id,
321 context: TenantContext::new(),
322 }
323 }
324
325 pub fn execute<T, F>(&self, f: F) -> Result<T>
327 where
328 F: FnOnce(&TenantScope) -> Result<T>,
329 {
330 f(self)
331 }
332}
333
334#[derive(Debug, Clone)]
336pub struct TenantContext {
337 pub operation_id: String,
338 pub started_at: DateTime<Utc>,
339 pub metadata: HashMap<String, String>,
340}
341
342impl Default for TenantContext {
343 fn default() -> Self {
344 Self::new()
345 }
346}
347
348impl TenantContext {
349 pub fn new() -> Self {
350 Self {
351 operation_id: uuid::Uuid::new_v4().to_string(),
352 started_at: Utc::now(),
353 metadata: HashMap::new(),
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_tenant_isolation_creation() {
364 let isolation = TenantIsolation::new();
365 let tenant_id = TenantId::new("test-tenant".to_string()).unwrap();
366
367 let policy = IsolationPolicy::strict();
368 assert!(isolation.register_tenant(tenant_id, policy).is_ok());
369 }
370
371 #[test]
372 fn test_isolation_metrics_performance_target() {
373 let mut metrics = IsolationMetrics::new();
374
375 metrics.record_validation(std::time::Duration::from_millis(5), true);
377 metrics.record_validation(std::time::Duration::from_millis(8), true);
378
379 assert!(metrics.is_performance_target_met());
380 assert!(metrics.isolation_success_rate() > 99.0);
381 }
382}