1use crate::{Event, EventualiError, Result};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use chrono::{DateTime, Duration, Utc};
5
6pub struct RetentionPolicyManager {
8 policies: HashMap<String, RetentionPolicy>,
9 default_policy: String,
10}
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RetentionPolicy {
15 pub name: String,
16 pub description: String,
17 pub retention_period: RetentionPeriod,
18 pub deletion_method: DeletionMethod,
19 pub grace_period: Duration,
20 pub legal_hold_exempt: bool,
21 pub data_categories: Vec<DataCategory>,
22 pub created_at: DateTime<Utc>,
23 pub updated_at: DateTime<Utc>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub enum RetentionPeriod {
29 Days(i64),
30 Months(i32),
31 Years(i32),
32 Indefinite,
33 UntilEvent(String), CustomRule(String), }
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub enum DeletionMethod {
40 SoftDelete, HardDelete, Anonymize, Archive, Encrypt, }
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
49pub enum DataCategory {
50 PersonalData,
51 SensitivePersonalData,
52 FinancialData,
53 HealthData,
54 CommunicationData,
55 BehavioralData,
56 TechnicalData,
57 MarketingData,
58 OperationalData,
59 LegalData,
60 AuditData,
61 BackupData,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct RetentionEnforcementResult {
67 pub policy_name: String,
68 pub events_processed: usize,
69 pub events_deleted: usize,
70 pub events_anonymized: usize,
71 pub events_archived: usize,
72 pub events_encrypted: usize,
73 pub enforcement_timestamp: DateTime<Utc>,
74 pub next_enforcement: DateTime<Utc>,
75 pub errors: Vec<String>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct LegalHold {
81 pub id: String,
82 pub reason: String,
83 pub authority: String,
84 pub case_number: Option<String>,
85 pub data_categories: Vec<DataCategory>,
86 pub aggregate_patterns: Vec<String>, pub start_date: DateTime<Utc>,
88 pub end_date: Option<DateTime<Utc>>,
89 pub created_by: String,
90 pub status: LegalHoldStatus,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
95pub enum LegalHoldStatus {
96 Active,
97 Released,
98 Expired,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct EventDataClassification {
104 pub event_id: String,
105 pub aggregate_id: String,
106 pub data_categories: Vec<DataCategory>,
107 pub retention_policy: String,
108 pub classified_at: DateTime<Utc>,
109 pub expires_at: Option<DateTime<Utc>>,
110 pub legal_holds: Vec<String>,
111}
112
113impl RetentionPolicyManager {
114 pub fn new() -> Self {
116 let mut manager = Self {
117 policies: HashMap::new(),
118 default_policy: "default".to_string(),
119 };
120
121 let default_policy = RetentionPolicy::gdpr_default();
123 manager.policies.insert("default".to_string(), default_policy);
124
125 manager
126 }
127
128 pub fn add_policy(&mut self, policy: RetentionPolicy) -> Result<()> {
130 if policy.name.is_empty() {
131 return Err(EventualiError::Configuration(
132 "Policy name cannot be empty".to_string()
133 ));
134 }
135
136 self.policies.insert(policy.name.clone(), policy);
137 Ok(())
138 }
139
140 pub fn get_policy(&self, name: &str) -> Result<&RetentionPolicy> {
142 self.policies.get(name).ok_or_else(|| {
143 EventualiError::Configuration(format!("Retention policy not found: {name}"))
144 })
145 }
146
147 pub fn set_default_policy(&mut self, name: &str) -> Result<()> {
149 if !self.policies.contains_key(name) {
150 return Err(EventualiError::Configuration(
151 format!("Retention policy not found: {name}")
152 ));
153 }
154 self.default_policy = name.to_string();
155 Ok(())
156 }
157
158 pub fn classify_event(&self, event: &Event) -> Result<EventDataClassification> {
160 let data_categories = self.analyze_event_data(event)?;
161 let policy_name = self.select_retention_policy(&data_categories)?;
162 let policy = self.get_policy(&policy_name)?;
163
164 let expires_at = self.calculate_expiration_date(policy)?;
165
166 Ok(EventDataClassification {
167 event_id: event.id.to_string(),
168 aggregate_id: event.aggregate_id.clone(),
169 data_categories,
170 retention_policy: policy_name,
171 classified_at: Utc::now(),
172 expires_at,
173 legal_holds: Vec::new(),
174 })
175 }
176
177 pub async fn enforce_retention(
179 &self,
180 events: Vec<Event>,
181 classifications: HashMap<String, EventDataClassification>,
182 legal_holds: &[LegalHold],
183 ) -> Result<RetentionEnforcementResult> {
184 let mut result = RetentionEnforcementResult {
185 policy_name: "batch_enforcement".to_string(),
186 events_processed: 0,
187 events_deleted: 0,
188 events_anonymized: 0,
189 events_archived: 0,
190 events_encrypted: 0,
191 enforcement_timestamp: Utc::now(),
192 next_enforcement: Utc::now() + Duration::days(1),
193 errors: Vec::new(),
194 };
195
196 for event in events {
197 result.events_processed += 1;
198
199 if self.is_under_legal_hold(&event, legal_holds) {
201 continue; }
203
204 let classification = match classifications.get(&event.id.to_string()) {
206 Some(c) => c,
207 None => {
208 result.errors.push(format!(
209 "No classification found for event: {}", event.id
210 ));
211 continue;
212 }
213 };
214
215 if !self.is_retention_expired(classification)? {
217 continue; }
219
220 let policy = match self.get_policy(&classification.retention_policy) {
222 Ok(p) => p,
223 Err(e) => {
224 result.errors.push(format!(
225 "Failed to get policy for event {}: {}", event.id, e
226 ));
227 continue;
228 }
229 };
230
231 match self.apply_deletion_method(&event, &policy.deletion_method).await {
233 Ok(method) => match method {
234 DeletionMethod::SoftDelete | DeletionMethod::HardDelete => {
235 result.events_deleted += 1;
236 },
237 DeletionMethod::Anonymize => {
238 result.events_anonymized += 1;
239 },
240 DeletionMethod::Archive => {
241 result.events_archived += 1;
242 },
243 DeletionMethod::Encrypt => {
244 result.events_encrypted += 1;
245 },
246 },
247 Err(e) => {
248 result.errors.push(format!(
249 "Failed to apply retention to event {}: {}", event.id, e
250 ));
251 }
252 }
253 }
254
255 Ok(result)
256 }
257
258 fn is_under_legal_hold(&self, event: &Event, legal_holds: &[LegalHold]) -> bool {
260 for hold in legal_holds {
261 if hold.status != LegalHoldStatus::Active {
262 continue;
263 }
264
265 for pattern in &hold.aggregate_patterns {
267 if event.aggregate_id.contains(pattern) {
268 return true;
269 }
270 }
271
272 if let Ok(categories) = self.analyze_event_data(event) {
274 for category in &categories {
275 if hold.data_categories.contains(category) {
276 return true;
277 }
278 }
279 }
280 }
281 false
282 }
283
284 fn is_retention_expired(&self, classification: &EventDataClassification) -> Result<bool> {
286 match classification.expires_at {
287 Some(expires_at) => Ok(Utc::now() > expires_at),
288 None => Ok(false), }
290 }
291
292 async fn apply_deletion_method(
294 &self,
295 _event: &Event,
296 method: &DeletionMethod,
297 ) -> Result<DeletionMethod> {
298 match method {
305 DeletionMethod::SoftDelete => {
306 },
309 DeletionMethod::HardDelete => {
310 },
313 DeletionMethod::Anonymize => {
314 },
317 DeletionMethod::Archive => {
318 },
322 DeletionMethod::Encrypt => {
323 },
326 }
327
328 Ok(method.clone())
329 }
330
331 fn analyze_event_data(&self, event: &Event) -> Result<Vec<DataCategory>> {
333 let mut categories = Vec::new();
334
335 if let crate::EventData::Json(data) = &event.data {
337 let data_str = data.to_string().to_lowercase();
338
339 if data_str.contains("email") || data_str.contains("phone") ||
341 data_str.contains("address") || data_str.contains("name") {
342 categories.push(DataCategory::PersonalData);
343 }
344
345 if data_str.contains("ssn") || data_str.contains("passport") ||
347 data_str.contains("driver_license") || data_str.contains("medical") {
348 categories.push(DataCategory::SensitivePersonalData);
349 }
350
351 if data_str.contains("credit_card") || data_str.contains("bank_account") ||
353 data_str.contains("payment") || data_str.contains("transaction") {
354 categories.push(DataCategory::FinancialData);
355 }
356
357 if data_str.contains("medical") || data_str.contains("health") ||
359 data_str.contains("diagnosis") || data_str.contains("treatment") {
360 categories.push(DataCategory::HealthData);
361 }
362
363 if data_str.contains("message") || data_str.contains("communication") ||
365 data_str.contains("chat") || data_str.contains("email") {
366 categories.push(DataCategory::CommunicationData);
367 }
368
369 if data_str.contains("click") || data_str.contains("view") ||
371 data_str.contains("behavior") || data_str.contains("interaction") {
372 categories.push(DataCategory::BehavioralData);
373 }
374
375 if data_str.contains("campaign") || data_str.contains("marketing") ||
377 data_str.contains("advertisement") || data_str.contains("promotion") {
378 categories.push(DataCategory::MarketingData);
379 }
380 }
381
382 if categories.is_empty() {
384 categories.push(DataCategory::OperationalData);
385 }
386
387 Ok(categories)
388 }
389
390 fn select_retention_policy(&self, categories: &[DataCategory]) -> Result<String> {
392 let priority = [
394 DataCategory::HealthData,
395 DataCategory::FinancialData,
396 DataCategory::SensitivePersonalData,
397 DataCategory::PersonalData,
398 DataCategory::LegalData,
399 DataCategory::CommunicationData,
400 DataCategory::BehavioralData,
401 DataCategory::MarketingData,
402 DataCategory::TechnicalData,
403 DataCategory::OperationalData,
404 ];
405
406 for category in &priority {
408 if categories.contains(category) {
409 return Ok(match category {
411 DataCategory::HealthData => "health_data_7_years".to_string(),
412 DataCategory::FinancialData => "financial_data_10_years".to_string(),
413 DataCategory::SensitivePersonalData => "sensitive_pii_3_years".to_string(),
414 DataCategory::PersonalData => "personal_data_2_years".to_string(),
415 DataCategory::LegalData => "legal_data_indefinite".to_string(),
416 _ => self.default_policy.clone(),
417 });
418 }
419 }
420
421 Ok(self.default_policy.clone())
422 }
423
424 fn calculate_expiration_date(&self, policy: &RetentionPolicy) -> Result<Option<DateTime<Utc>>> {
426 let base_date = Utc::now();
427
428 match &policy.retention_period {
429 RetentionPeriod::Days(days) => {
430 Ok(Some(base_date + Duration::days(*days)))
431 },
432 RetentionPeriod::Months(months) => {
433 Ok(Some(base_date + Duration::days(*months as i64 * 30)))
434 },
435 RetentionPeriod::Years(years) => {
436 Ok(Some(base_date + Duration::days(*years as i64 * 365)))
437 },
438 RetentionPeriod::Indefinite => Ok(None),
439 RetentionPeriod::UntilEvent(_event_type) => {
440 Ok(None)
442 },
443 RetentionPeriod::CustomRule(_rule) => {
444 Ok(Some(base_date + Duration::days(365))) },
447 }
448 }
449
450 pub fn list_policies(&self) -> Vec<String> {
452 self.policies.keys().cloned().collect()
453 }
454
455 pub fn get_retention_stats(&self) -> HashMap<String, usize> {
457 let mut stats = HashMap::new();
458 stats.insert("total_policies".to_string(), self.policies.len());
459
460 for policy in self.policies.values() {
462 let period_type = match &policy.retention_period {
463 RetentionPeriod::Days(_) => "days_based",
464 RetentionPeriod::Months(_) => "months_based",
465 RetentionPeriod::Years(_) => "years_based",
466 RetentionPeriod::Indefinite => "indefinite",
467 RetentionPeriod::UntilEvent(_) => "event_based",
468 RetentionPeriod::CustomRule(_) => "custom_rule",
469 };
470
471 *stats.entry(period_type.to_string()).or_insert(0) += 1;
472 }
473
474 stats
475 }
476}
477
478impl RetentionPolicy {
479 pub fn gdpr_default() -> Self {
481 Self {
482 name: "gdpr_default".to_string(),
483 description: "GDPR-compliant default retention policy".to_string(),
484 retention_period: RetentionPeriod::Years(2),
485 deletion_method: DeletionMethod::Anonymize,
486 grace_period: Duration::days(30),
487 legal_hold_exempt: false,
488 data_categories: vec![
489 DataCategory::PersonalData,
490 DataCategory::BehavioralData,
491 DataCategory::TechnicalData,
492 ],
493 created_at: Utc::now(),
494 updated_at: Utc::now(),
495 }
496 }
497
498 pub fn financial_data_policy() -> Self {
500 Self {
501 name: "financial_data_10_years".to_string(),
502 description: "Financial data retention for regulatory compliance".to_string(),
503 retention_period: RetentionPeriod::Years(10),
504 deletion_method: DeletionMethod::Archive,
505 grace_period: Duration::days(90),
506 legal_hold_exempt: false,
507 data_categories: vec![
508 DataCategory::FinancialData,
509 DataCategory::AuditData,
510 ],
511 created_at: Utc::now(),
512 updated_at: Utc::now(),
513 }
514 }
515
516 pub fn health_data_policy() -> Self {
518 Self {
519 name: "health_data_7_years".to_string(),
520 description: "Health data retention for HIPAA compliance".to_string(),
521 retention_period: RetentionPeriod::Years(7),
522 deletion_method: DeletionMethod::Encrypt,
523 grace_period: Duration::days(60),
524 legal_hold_exempt: false,
525 data_categories: vec![
526 DataCategory::HealthData,
527 DataCategory::SensitivePersonalData,
528 ],
529 created_at: Utc::now(),
530 updated_at: Utc::now(),
531 }
532 }
533
534 pub fn marketing_data_policy() -> Self {
536 Self {
537 name: "marketing_data_1_year".to_string(),
538 description: "Marketing data retention policy".to_string(),
539 retention_period: RetentionPeriod::Years(1),
540 deletion_method: DeletionMethod::Anonymize,
541 grace_period: Duration::days(14),
542 legal_hold_exempt: true,
543 data_categories: vec![
544 DataCategory::MarketingData,
545 DataCategory::BehavioralData,
546 ],
547 created_at: Utc::now(),
548 updated_at: Utc::now(),
549 }
550 }
551
552 pub fn applies_to_category(&self, category: &DataCategory) -> bool {
554 self.data_categories.contains(category)
555 }
556
557 pub fn touch(&mut self) {
559 self.updated_at = Utc::now();
560 }
561}
562
563impl Default for RetentionPolicyManager {
564 fn default() -> Self {
565 Self::new()
566 }
567}
568
569impl LegalHold {
570 pub fn new(
572 id: String,
573 reason: String,
574 authority: String,
575 data_categories: Vec<DataCategory>,
576 aggregate_patterns: Vec<String>,
577 created_by: String,
578 ) -> Self {
579 Self {
580 id,
581 reason,
582 authority,
583 case_number: None,
584 data_categories,
585 aggregate_patterns,
586 start_date: Utc::now(),
587 end_date: None,
588 created_by,
589 status: LegalHoldStatus::Active,
590 }
591 }
592
593 pub fn release(&mut self) {
595 self.status = LegalHoldStatus::Released;
596 self.end_date = Some(Utc::now());
597 }
598
599 pub fn is_active(&self) -> bool {
601 matches!(self.status, LegalHoldStatus::Active) &&
602 (self.end_date.is_none() || self.end_date.unwrap() > Utc::now())
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609 use crate::{EventData, EventMetadata};
610 use uuid::Uuid;
611
612 fn create_test_event_with_data(data: serde_json::Value) -> Event {
613 Event {
614 id: Uuid::new_v4(),
615 aggregate_id: "test-aggregate".to_string(),
616 aggregate_type: "TestAggregate".to_string(),
617 event_type: "TestEvent".to_string(),
618 event_version: 1,
619 aggregate_version: 1,
620 data: EventData::Json(data),
621 metadata: EventMetadata::default(),
622 timestamp: Utc::now(),
623 }
624 }
625
626 #[test]
627 fn test_retention_policy_manager_creation() {
628 let manager = RetentionPolicyManager::new();
629 assert_eq!(manager.policies.len(), 1); assert!(manager.policies.contains_key("default"));
631 }
632
633 #[test]
634 fn test_policy_addition() {
635 let mut manager = RetentionPolicyManager::new();
636 let policy = RetentionPolicy::financial_data_policy();
637
638 assert!(manager.add_policy(policy.clone()).is_ok());
639 assert!(manager.get_policy(&policy.name).is_ok());
640 }
641
642 #[test]
643 fn test_event_data_classification() {
644 let manager = RetentionPolicyManager::new();
645
646 let personal_data = serde_json::json!({
648 "user_email": "test@example.com",
649 "user_name": "John Doe"
650 });
651 let event = create_test_event_with_data(personal_data);
652
653 let classification = manager.classify_event(&event).unwrap();
654 assert!(classification.data_categories.contains(&DataCategory::PersonalData));
655
656 let financial_data = serde_json::json!({
658 "credit_card": "1234-5678-9012-3456",
659 "transaction_amount": 100.00
660 });
661 let event = create_test_event_with_data(financial_data);
662
663 let classification = manager.classify_event(&event).unwrap();
664 assert!(classification.data_categories.contains(&DataCategory::FinancialData));
665 }
666
667 #[test]
668 fn test_retention_period_calculation() {
669 let policy = RetentionPolicy {
670 name: "test".to_string(),
671 description: "Test policy".to_string(),
672 retention_period: RetentionPeriod::Days(30),
673 deletion_method: DeletionMethod::SoftDelete,
674 grace_period: Duration::days(7),
675 legal_hold_exempt: false,
676 data_categories: vec![DataCategory::OperationalData],
677 created_at: Utc::now(),
678 updated_at: Utc::now(),
679 };
680
681 let manager = RetentionPolicyManager::new();
682 let expires_at = manager.calculate_expiration_date(&policy).unwrap();
683
684 assert!(expires_at.is_some());
685 let expiry = expires_at.unwrap();
686 let expected_expiry = Utc::now() + Duration::days(30);
687
688 let diff = (expiry - expected_expiry).num_seconds().abs();
690 assert!(diff < 60); }
692
693 #[test]
694 fn test_legal_hold() {
695 let mut hold = LegalHold::new(
696 "hold-001".to_string(),
697 "Investigation".to_string(),
698 "Legal Department".to_string(),
699 vec![DataCategory::PersonalData],
700 vec!["user-123".to_string()],
701 "legal@example.com".to_string(),
702 );
703
704 assert!(hold.is_active());
705
706 hold.release();
707 assert!(!hold.is_active());
708 assert_eq!(hold.status, LegalHoldStatus::Released);
709 }
710
711 #[test]
712 fn test_gdpr_default_policy() {
713 let policy = RetentionPolicy::gdpr_default();
714
715 assert_eq!(policy.name, "gdpr_default");
716 assert!(matches!(policy.retention_period, RetentionPeriod::Years(2)));
717 assert!(matches!(policy.deletion_method, DeletionMethod::Anonymize));
718 assert!(!policy.legal_hold_exempt);
719 }
720
721 #[test]
722 fn test_policy_category_matching() {
723 let policy = RetentionPolicy::financial_data_policy();
724
725 assert!(policy.applies_to_category(&DataCategory::FinancialData));
726 assert!(!policy.applies_to_category(&DataCategory::MarketingData));
727 }
728
729 #[tokio::test]
730 async fn test_retention_enforcement() {
731 let mut manager = RetentionPolicyManager::new();
732
733 let mut test_policy = RetentionPolicy::gdpr_default();
735 test_policy.name = "test_immediate".to_string();
736 test_policy.retention_period = RetentionPeriod::Days(-1); manager.add_policy(test_policy).unwrap();
738
739 let event = create_test_event_with_data(serde_json::json!({"test": "data"}));
740 let mut classifications = HashMap::new();
741
742 let classification = EventDataClassification {
743 event_id: event.id.to_string(),
744 aggregate_id: event.aggregate_id.clone(),
745 data_categories: vec![DataCategory::OperationalData],
746 retention_policy: "test_immediate".to_string(),
747 classified_at: Utc::now(),
748 expires_at: Some(Utc::now() - Duration::days(1)), legal_holds: Vec::new(),
750 };
751
752 classifications.insert(event.id.to_string(), classification);
753
754 let result = manager.enforce_retention(
755 vec![event],
756 classifications,
757 &[]
758 ).await.unwrap();
759
760 assert_eq!(result.events_processed, 1);
761 }
764}