1use super::{health::ServiceHealth, logging::LogEntry, metrics::Metric, profiling::ProfileData};
7use crate::{RragError, RragResult};
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetentionConfig {
17 pub enabled: bool,
18 pub retention_days: u32,
19 pub archive_enabled: bool,
20 pub archive_compression: bool,
21 pub cleanup_interval_hours: u32,
22 pub policies: Vec<RetentionPolicyConfig>,
23 pub historical_analysis_enabled: bool,
24 pub trend_analysis_days: u32,
25}
26
27impl Default for RetentionConfig {
28 fn default() -> Self {
29 Self {
30 enabled: true,
31 retention_days: 90,
32 archive_enabled: true,
33 archive_compression: true,
34 cleanup_interval_hours: 24,
35 policies: vec![
36 RetentionPolicyConfig::default_metrics(),
37 RetentionPolicyConfig::default_logs(),
38 RetentionPolicyConfig::default_health(),
39 RetentionPolicyConfig::default_profiles(),
40 ],
41 historical_analysis_enabled: true,
42 trend_analysis_days: 30,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RetentionPolicyConfig {
50 pub name: String,
51 pub data_type: DataType,
52 pub retention_days: u32,
53 pub archive_after_days: Option<u32>,
54 pub compression_enabled: bool,
55 pub priority: RetentionPriority,
56 pub conditions: Vec<RetentionCondition>,
57}
58
59impl RetentionPolicyConfig {
60 pub fn default_metrics() -> Self {
61 Self {
62 name: "metrics_policy".to_string(),
63 data_type: DataType::Metrics,
64 retention_days: 90,
65 archive_after_days: Some(30),
66 compression_enabled: true,
67 priority: RetentionPriority::Medium,
68 conditions: vec![],
69 }
70 }
71
72 pub fn default_logs() -> Self {
73 Self {
74 name: "logs_policy".to_string(),
75 data_type: DataType::Logs,
76 retention_days: 30,
77 archive_after_days: Some(7),
78 compression_enabled: true,
79 priority: RetentionPriority::High,
80 conditions: vec![
81 RetentionCondition::SeverityLevel("ERROR".to_string(), 60),
82 RetentionCondition::SeverityLevel("WARN".to_string(), 30),
83 ],
84 }
85 }
86
87 pub fn default_health() -> Self {
88 Self {
89 name: "health_policy".to_string(),
90 data_type: DataType::HealthChecks,
91 retention_days: 180,
92 archive_after_days: Some(60),
93 compression_enabled: true,
94 priority: RetentionPriority::High,
95 conditions: vec![],
96 }
97 }
98
99 pub fn default_profiles() -> Self {
100 Self {
101 name: "profiles_policy".to_string(),
102 data_type: DataType::Profiles,
103 retention_days: 60,
104 archive_after_days: Some(14),
105 compression_enabled: true,
106 priority: RetentionPriority::Medium,
107 conditions: vec![],
108 }
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113pub enum DataType {
114 Metrics,
115 Logs,
116 HealthChecks,
117 Profiles,
118 Alerts,
119 UserActivity,
120 SystemEvents,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
124pub enum RetentionPriority {
125 Low,
126 Medium,
127 High,
128 Critical,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub enum RetentionCondition {
134 SeverityLevel(String, u32), ComponentName(String, u32), UserDefined(String, String, u32), DataSize(u64, u32), }
139
140pub struct RetentionPolicy {
142 config: RetentionPolicyConfig,
143 last_cleanup: Option<DateTime<Utc>>,
144 items_processed: u64,
145 items_archived: u64,
146 items_deleted: u64,
147}
148
149impl RetentionPolicy {
150 pub fn new(config: RetentionPolicyConfig) -> Self {
151 Self {
152 config,
153 last_cleanup: None,
154 items_processed: 0,
155 items_archived: 0,
156 items_deleted: 0,
157 }
158 }
159
160 pub fn should_retain(&self, timestamp: DateTime<Utc>) -> RetentionAction {
161 let age = Utc::now() - timestamp;
162 let age_days = age.num_days() as u32;
163
164 if age_days > self.config.retention_days {
166 return RetentionAction::Delete;
167 }
168
169 if let Some(archive_days) = self.config.archive_after_days {
171 if age_days > archive_days {
172 return RetentionAction::Archive;
173 }
174 }
175
176 RetentionAction::Keep
177 }
178
179 pub fn apply_conditions(&self, data: &dyn RetentionData) -> Option<u32> {
180 for condition in &self.config.conditions {
181 match condition {
182 RetentionCondition::SeverityLevel(level, days) => {
183 if let Some(severity) = data.severity_level() {
184 if severity == *level {
185 return Some(*days);
186 }
187 }
188 }
189 RetentionCondition::ComponentName(component, days) => {
190 if let Some(comp) = data.component_name() {
191 if comp == *component {
192 return Some(*days);
193 }
194 }
195 }
196 RetentionCondition::UserDefined(field, value, days) => {
197 if let Some(field_value) = data.custom_field(field) {
198 if field_value == *value {
199 return Some(*days);
200 }
201 }
202 }
203 RetentionCondition::DataSize(max_size, days) => {
204 if data.data_size() > *max_size {
205 return Some(*days);
206 }
207 }
208 }
209 }
210 None
211 }
212
213 pub fn stats(&self) -> RetentionPolicyStats {
214 RetentionPolicyStats {
215 name: self.config.name.clone(),
216 data_type: self.config.data_type.clone(),
217 retention_days: self.config.retention_days,
218 last_cleanup: self.last_cleanup,
219 items_processed: self.items_processed,
220 items_archived: self.items_archived,
221 items_deleted: self.items_deleted,
222 }
223 }
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct RetentionPolicyStats {
228 pub name: String,
229 pub data_type: DataType,
230 pub retention_days: u32,
231 pub last_cleanup: Option<DateTime<Utc>>,
232 pub items_processed: u64,
233 pub items_archived: u64,
234 pub items_deleted: u64,
235}
236
237#[derive(Debug, Clone, PartialEq)]
238pub enum RetentionAction {
239 Keep,
240 Archive,
241 Delete,
242}
243
244pub trait RetentionData {
246 fn timestamp(&self) -> DateTime<Utc>;
247 fn data_size(&self) -> u64;
248 fn severity_level(&self) -> Option<String> {
249 None
250 }
251 fn component_name(&self) -> Option<String> {
252 None
253 }
254 fn custom_field(&self, _field: &str) -> Option<String> {
255 None
256 }
257 fn data_type(&self) -> DataType;
258}
259
260impl RetentionData for Metric {
261 fn timestamp(&self) -> DateTime<Utc> {
262 self.timestamp
263 }
264
265 fn data_size(&self) -> u64 {
266 std::mem::size_of::<Metric>() as u64
268 + self.name.len() as u64
269 + self
270 .labels
271 .iter()
272 .map(|(k, v)| k.len() + v.len())
273 .sum::<usize>() as u64
274 }
275
276 fn component_name(&self) -> Option<String> {
277 self.labels.get("component").cloned()
278 }
279
280 fn custom_field(&self, field: &str) -> Option<String> {
281 self.labels.get(field).cloned()
282 }
283
284 fn data_type(&self) -> DataType {
285 DataType::Metrics
286 }
287}
288
289impl RetentionData for LogEntry {
290 fn timestamp(&self) -> DateTime<Utc> {
291 self.timestamp
292 }
293
294 fn data_size(&self) -> u64 {
295 std::mem::size_of::<LogEntry>() as u64
296 + self.message.len() as u64
297 + self.component.len() as u64
298 + self
299 .fields
300 .iter()
301 .map(|(k, v)| k.len() + v.to_string().len())
302 .sum::<usize>() as u64
303 }
304
305 fn severity_level(&self) -> Option<String> {
306 Some(self.level.to_string())
307 }
308
309 fn component_name(&self) -> Option<String> {
310 Some(self.component.clone())
311 }
312
313 fn custom_field(&self, field: &str) -> Option<String> {
314 self.fields
315 .get(field)
316 .and_then(|v| v.as_str().map(|s| s.to_string()))
317 }
318
319 fn data_type(&self) -> DataType {
320 DataType::Logs
321 }
322}
323
324impl RetentionData for ServiceHealth {
325 fn timestamp(&self) -> DateTime<Utc> {
326 self.last_check
327 }
328
329 fn data_size(&self) -> u64 {
330 std::mem::size_of::<ServiceHealth>() as u64
331 + self.component_name.len() as u64
332 + self.error_message.as_ref().map(|s| s.len()).unwrap_or(0) as u64
333 }
334
335 fn component_name(&self) -> Option<String> {
336 Some(self.component_name.clone())
337 }
338
339 fn severity_level(&self) -> Option<String> {
340 Some(self.status.to_string())
341 }
342
343 fn data_type(&self) -> DataType {
344 DataType::HealthChecks
345 }
346}
347
348impl RetentionData for ProfileData {
349 fn timestamp(&self) -> DateTime<Utc> {
350 self.timestamp
351 }
352
353 fn data_size(&self) -> u64 {
354 std::mem::size_of::<ProfileData>() as u64
355 + self.operation.len() as u64
356 + self.component.len() as u64
357 + self
358 .custom_metrics
359 .iter()
360 .map(|(k, _)| k.len())
361 .sum::<usize>() as u64
362 }
363
364 fn component_name(&self) -> Option<String> {
365 Some(self.component.clone())
366 }
367
368 fn custom_field(&self, field: &str) -> Option<String> {
369 self.tags.get(field).cloned()
370 }
371
372 fn data_type(&self) -> DataType {
373 DataType::Profiles
374 }
375}
376
377pub struct ArchiveManager {
379 archive_path: String,
380 compression_enabled: bool,
381 max_archive_size_mb: u64,
382}
383
384impl ArchiveManager {
385 pub fn new(archive_path: impl Into<String>, compression_enabled: bool) -> Self {
386 Self {
387 archive_path: archive_path.into(),
388 compression_enabled,
389 max_archive_size_mb: 1024, }
391 }
392
393 pub async fn archive_data(&self, data: &[u8], filename: &str) -> RragResult<ArchiveResult> {
394 let full_path = format!("{}/{}", self.archive_path, filename);
395
396 if let Some(parent) = std::path::Path::new(&full_path).parent() {
398 tokio::fs::create_dir_all(parent)
399 .await
400 .map_err(|e| RragError::storage("create_archive_directory", e))?;
401 }
402
403 let final_data = if self.compression_enabled {
404 let compressed_data = data.to_vec(); compressed_data
408 } else {
409 data.to_vec()
410 };
411
412 tokio::fs::write(&full_path, &final_data)
414 .await
415 .map_err(|e| RragError::storage("write_archive", e))?;
416
417 Ok(ArchiveResult {
418 archived_at: Utc::now(),
419 file_path: full_path,
420 original_size: data.len() as u64,
421 archived_size: final_data.len() as u64,
422 compression_ratio: if data.len() > 0 {
423 final_data.len() as f64 / data.len() as f64
424 } else {
425 1.0
426 },
427 })
428 }
429
430 pub async fn restore_data(&self, filename: &str) -> RragResult<Vec<u8>> {
431 let full_path = format!("{}/{}", self.archive_path, filename);
432
433 let archived_data = tokio::fs::read(&full_path)
434 .await
435 .map_err(|e| RragError::storage("read_archive", e))?;
436
437 Ok(archived_data)
440 }
441
442 pub async fn delete_archive(&self, filename: &str) -> RragResult<()> {
443 let full_path = format!("{}/{}", self.archive_path, filename);
444 tokio::fs::remove_file(&full_path)
445 .await
446 .map_err(|e| RragError::storage("delete_archive", e))
447 }
448
449 pub async fn list_archives(&self) -> RragResult<Vec<ArchiveInfo>> {
450 let mut archives = Vec::new();
451 let mut dir = tokio::fs::read_dir(&self.archive_path)
452 .await
453 .map_err(|e| RragError::storage("read_archive_directory", e))?;
454
455 while let Some(entry) = dir
456 .next_entry()
457 .await
458 .map_err(|e| RragError::storage("read_directory_entry", e))?
459 {
460 if let Ok(metadata) = entry.metadata().await {
461 if metadata.is_file() {
462 archives.push(ArchiveInfo {
463 filename: entry.file_name().to_string_lossy().to_string(),
464 size_bytes: metadata.len(),
465 created_at: metadata
466 .created()
467 .ok()
468 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
469 .map(|d| Utc::now() - Duration::seconds(d.as_secs() as i64))
470 .unwrap_or(Utc::now()),
471 });
472 }
473 }
474 }
475
476 Ok(archives)
477 }
478}
479
480#[derive(Debug, Clone, Serialize, Deserialize)]
481pub struct ArchiveResult {
482 pub archived_at: DateTime<Utc>,
483 pub file_path: String,
484 pub original_size: u64,
485 pub archived_size: u64,
486 pub compression_ratio: f64,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
490pub struct ArchiveInfo {
491 pub filename: String,
492 pub size_bytes: u64,
493 pub created_at: DateTime<Utc>,
494}
495
496pub struct HistoricalAnalyzer {
498 analysis_window_days: u32,
499}
500
501impl HistoricalAnalyzer {
502 pub fn new(analysis_window_days: u32) -> Self {
503 Self {
504 analysis_window_days,
505 }
506 }
507
508 pub async fn analyze_trends<T: RetentionData>(&self, data: &[T]) -> TrendAnalysis {
509 let cutoff_time = Utc::now() - Duration::days(self.analysis_window_days as i64);
510 let recent_data: Vec<_> = data
511 .iter()
512 .filter(|item| item.timestamp() >= cutoff_time)
513 .collect();
514
515 if recent_data.is_empty() {
516 return TrendAnalysis::empty();
517 }
518
519 let volume_trend = self.analyze_volume_trend(&recent_data).await;
521
522 let component_trends = self.analyze_component_trends(&recent_data).await;
524
525 let severity_trends = self.analyze_severity_trends(&recent_data).await;
527
528 TrendAnalysis {
529 analysis_period_days: self.analysis_window_days,
530 generated_at: Utc::now(),
531 total_data_points: recent_data.len(),
532 volume_trend: volume_trend.clone(),
533 component_trends: component_trends.clone(),
534 severity_trends,
535 recommendations: self
536 .generate_recommendations(&volume_trend, &component_trends)
537 .await,
538 }
539 }
540
541 async fn analyze_volume_trend<T: RetentionData>(&self, data: &[&T]) -> VolumeTrend {
542 let days = self.analysis_window_days.min(30) as usize; let mut daily_counts = vec![0; days];
544 let mut daily_sizes = vec![0u64; days];
545
546 for item in data {
547 let days_ago = (Utc::now() - item.timestamp()).num_days().max(0) as usize;
548 if days_ago < days {
549 let index = days - 1 - days_ago; daily_counts[index] += 1;
551 daily_sizes[index] += item.data_size();
552 }
553 }
554
555 let avg_daily_count = if days > 0 {
556 daily_counts.iter().sum::<usize>() as f64 / days as f64
557 } else {
558 0.0
559 };
560
561 let avg_daily_size = if days > 0 {
562 daily_sizes.iter().sum::<u64>() as f64 / days as f64
563 } else {
564 0.0
565 };
566
567 let recent_half = &daily_counts[days / 2..];
569 let older_half = &daily_counts[..days / 2];
570
571 let recent_avg = if recent_half.len() > 0 {
572 recent_half.iter().sum::<usize>() as f64 / recent_half.len() as f64
573 } else {
574 0.0
575 };
576
577 let older_avg = if older_half.len() > 0 {
578 older_half.iter().sum::<usize>() as f64 / older_half.len() as f64
579 } else {
580 0.0
581 };
582
583 let trend_direction = if recent_avg > older_avg * 1.1 {
584 TrendDirection::Increasing
585 } else if recent_avg < older_avg * 0.9 {
586 TrendDirection::Decreasing
587 } else {
588 TrendDirection::Stable
589 };
590
591 VolumeTrend {
592 daily_counts,
593 daily_sizes,
594 average_daily_count: avg_daily_count,
595 average_daily_size_bytes: avg_daily_size,
596 trend_direction,
597 growth_rate_percent: if older_avg > 0.0 {
598 ((recent_avg - older_avg) / older_avg) * 100.0
599 } else {
600 0.0
601 },
602 }
603 }
604
605 async fn analyze_component_trends<T: RetentionData>(
606 &self,
607 data: &[&T],
608 ) -> HashMap<String, ComponentTrend> {
609 let mut component_data: HashMap<String, Vec<&T>> = HashMap::new();
610
611 for item in data {
612 if let Some(component) = item.component_name() {
613 component_data.entry(component).or_default().push(*item);
614 }
615 }
616
617 let mut trends = HashMap::new();
618 for (component, items) in component_data {
619 let volume_trend = self.analyze_volume_trend(&items).await;
620 trends.insert(
621 component.clone(),
622 ComponentTrend {
623 component_name: component,
624 data_count: items.len(),
625 volume_trend,
626 },
627 );
628 }
629
630 trends
631 }
632
633 async fn analyze_severity_trends<T: RetentionData>(
634 &self,
635 data: &[&T],
636 ) -> HashMap<String, SeverityTrend> {
637 let mut severity_data: HashMap<String, Vec<&T>> = HashMap::new();
638
639 for item in data {
640 if let Some(severity) = item.severity_level() {
641 severity_data.entry(severity).or_default().push(*item);
642 }
643 }
644
645 let mut trends = HashMap::new();
646 for (severity, items) in severity_data {
647 let volume_trend = self.analyze_volume_trend(&items).await;
648 trends.insert(
649 severity.clone(),
650 SeverityTrend {
651 severity_level: severity,
652 occurrence_count: items.len(),
653 volume_trend,
654 },
655 );
656 }
657
658 trends
659 }
660
661 async fn generate_recommendations(
662 &self,
663 volume_trend: &VolumeTrend,
664 component_trends: &HashMap<String, ComponentTrend>,
665 ) -> Vec<RetentionRecommendation> {
666 let mut recommendations = Vec::new();
667
668 if volume_trend.growth_rate_percent > 50.0 {
670 recommendations.push(RetentionRecommendation {
671 category: RecommendationCategory::Storage,
672 priority: RecommendationPriority::High,
673 message: "Data volume is growing rapidly. Consider reducing retention periods or implementing more aggressive archiving.".to_string(),
674 estimated_savings_percent: 30.0,
675 });
676 }
677
678 if volume_trend.average_daily_size_bytes > 1_000_000_000.0 {
679 recommendations.push(RetentionRecommendation {
681 category: RecommendationCategory::Compression,
682 priority: RecommendationPriority::Medium,
683 message:
684 "Large daily data volume detected. Enable compression to reduce storage costs."
685 .to_string(),
686 estimated_savings_percent: 60.0,
687 });
688 }
689
690 for (component, trend) in component_trends {
692 if trend.volume_trend.growth_rate_percent > 100.0 {
693 recommendations.push(RetentionRecommendation {
694 category: RecommendationCategory::ComponentSpecific,
695 priority: RecommendationPriority::High,
696 message: format!("Component '{}' is producing data at an increasing rate. Review logging levels or implement component-specific retention policies.", component),
697 estimated_savings_percent: 25.0,
698 });
699 }
700 }
701
702 recommendations
703 }
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize)]
707pub struct TrendAnalysis {
708 pub analysis_period_days: u32,
709 pub generated_at: DateTime<Utc>,
710 pub total_data_points: usize,
711 pub volume_trend: VolumeTrend,
712 pub component_trends: HashMap<String, ComponentTrend>,
713 pub severity_trends: HashMap<String, SeverityTrend>,
714 pub recommendations: Vec<RetentionRecommendation>,
715}
716
717impl TrendAnalysis {
718 fn empty() -> Self {
719 Self {
720 analysis_period_days: 0,
721 generated_at: Utc::now(),
722 total_data_points: 0,
723 volume_trend: VolumeTrend {
724 daily_counts: Vec::new(),
725 daily_sizes: Vec::new(),
726 average_daily_count: 0.0,
727 average_daily_size_bytes: 0.0,
728 trend_direction: TrendDirection::Stable,
729 growth_rate_percent: 0.0,
730 },
731 component_trends: HashMap::new(),
732 severity_trends: HashMap::new(),
733 recommendations: Vec::new(),
734 }
735 }
736}
737
738#[derive(Debug, Clone, Serialize, Deserialize)]
739pub struct VolumeTrend {
740 pub daily_counts: Vec<usize>,
741 pub daily_sizes: Vec<u64>,
742 pub average_daily_count: f64,
743 pub average_daily_size_bytes: f64,
744 pub trend_direction: TrendDirection,
745 pub growth_rate_percent: f64,
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize)]
749pub struct ComponentTrend {
750 pub component_name: String,
751 pub data_count: usize,
752 pub volume_trend: VolumeTrend,
753}
754
755#[derive(Debug, Clone, Serialize, Deserialize)]
756pub struct SeverityTrend {
757 pub severity_level: String,
758 pub occurrence_count: usize,
759 pub volume_trend: VolumeTrend,
760}
761
762#[derive(Debug, Clone, Serialize, Deserialize)]
763pub enum TrendDirection {
764 Increasing,
765 Decreasing,
766 Stable,
767}
768
769#[derive(Debug, Clone, Serialize, Deserialize)]
770pub struct RetentionRecommendation {
771 pub category: RecommendationCategory,
772 pub priority: RecommendationPriority,
773 pub message: String,
774 pub estimated_savings_percent: f64,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub enum RecommendationCategory {
779 Storage,
780 Compression,
781 RetentionPeriod,
782 ComponentSpecific,
783 ArchivingStrategy,
784}
785
786#[derive(Debug, Clone, Serialize, Deserialize)]
787pub enum RecommendationPriority {
788 Low,
789 Medium,
790 High,
791 Critical,
792}
793
794pub struct DataRetention {
796 config: RetentionConfig,
797 policies: Arc<RwLock<HashMap<String, RetentionPolicy>>>,
798 archive_manager: Arc<ArchiveManager>,
799 historical_analyzer: Arc<HistoricalAnalyzer>,
800 cleanup_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
801 is_running: Arc<RwLock<bool>>,
802}
803
804impl DataRetention {
805 pub async fn new(config: RetentionConfig) -> RragResult<Self> {
806 let policies = Arc::new(RwLock::new(HashMap::new()));
807
808 {
810 let mut policy_map = policies.write().await;
811 for policy_config in &config.policies {
812 let policy = RetentionPolicy::new(policy_config.clone());
813 policy_map.insert(policy_config.name.clone(), policy);
814 }
815 }
816
817 let archive_manager = Arc::new(ArchiveManager::new(
818 "./archives",
819 config.archive_compression,
820 ));
821
822 let historical_analyzer = Arc::new(HistoricalAnalyzer::new(config.trend_analysis_days));
823
824 Ok(Self {
825 config,
826 policies,
827 archive_manager,
828 historical_analyzer,
829 cleanup_handle: Arc::new(RwLock::new(None)),
830 is_running: Arc::new(RwLock::new(false)),
831 })
832 }
833
834 pub async fn start(&self) -> RragResult<()> {
835 let mut running = self.is_running.write().await;
836 if *running {
837 return Err(RragError::config(
838 "data_retention",
839 "stopped",
840 "already running",
841 ));
842 }
843
844 if self.config.enabled {
845 let handle = self.start_cleanup_loop().await?;
846 let mut handle_guard = self.cleanup_handle.write().await;
847 *handle_guard = Some(handle);
848 }
849
850 *running = true;
851 tracing::info!("Data retention system started");
852 Ok(())
853 }
854
855 pub async fn stop(&self) -> RragResult<()> {
856 let mut running = self.is_running.write().await;
857 if !*running {
858 return Ok(());
859 }
860
861 {
862 let mut handle_guard = self.cleanup_handle.write().await;
863 if let Some(handle) = handle_guard.take() {
864 handle.abort();
865 }
866 }
867
868 *running = false;
869 tracing::info!("Data retention system stopped");
870 Ok(())
871 }
872
873 pub async fn is_healthy(&self) -> bool {
874 *self.is_running.read().await
875 }
876
877 async fn start_cleanup_loop(&self) -> RragResult<tokio::task::JoinHandle<()>> {
878 let config = self.config.clone();
879 let policies = self.policies.clone();
880 let is_running = self.is_running.clone();
881
882 let handle = tokio::spawn(async move {
883 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
884 config.cleanup_interval_hours as u64 * 3600,
885 ));
886
887 while *is_running.read().await {
888 interval.tick().await;
889
890 tracing::info!("Running data retention cleanup");
891
892 let mut policy_map = policies.write().await;
899 for (name, policy) in policy_map.iter_mut() {
900 tracing::debug!("Processing retention policy: {}", name);
901 policy.items_processed += 10;
903 policy.last_cleanup = Some(Utc::now());
904 }
905 }
906 });
907
908 Ok(handle)
909 }
910
911 pub async fn apply_retention_policy<T: RetentionData + Clone>(
912 &self,
913 data: Vec<T>,
914 data_type: DataType,
915 ) -> RragResult<RetentionResult> {
916 let policies = self.policies.read().await;
917
918 let policy = policies
920 .values()
921 .find(|p| p.config.data_type == data_type)
922 .ok_or_else(|| {
923 RragError::config("retention_policy", "exists", &format!("{:?}", data_type))
924 })?;
925
926 let mut result = RetentionResult {
927 processed_count: 0,
928 kept_count: 0,
929 archived_count: 0,
930 deleted_count: 0,
931 errors: Vec::new(),
932 };
933
934 for item in data {
935 result.processed_count += 1;
936
937 let retention_days = policy
939 .apply_conditions(&item)
940 .unwrap_or(policy.config.retention_days);
941
942 let age = Utc::now() - item.timestamp();
943 let age_days = age.num_days() as u32;
944
945 if age_days > retention_days {
946 result.deleted_count += 1;
948 } else if let Some(archive_days) = policy.config.archive_after_days {
949 if age_days > archive_days {
950 match self.archive_item(&item).await {
952 Ok(_) => result.archived_count += 1,
953 Err(e) => result.errors.push(e.to_string()),
954 }
955 } else {
956 result.kept_count += 1;
957 }
958 } else {
959 result.kept_count += 1;
960 }
961 }
962
963 Ok(result)
964 }
965
966 async fn archive_item<T: RetentionData>(&self, item: &T) -> RragResult<ArchiveResult> {
967 let serialized_data = serde_json::to_vec(&serde_json::json!({
969 "timestamp": item.timestamp(),
970 "data_type": item.data_type(),
971 "size": item.data_size(),
972 "component": item.component_name(),
973 "severity": item.severity_level(),
974 }))
975 .map_err(|e| RragError::agent("serialization", e.to_string()))?;
976
977 let filename = format!(
978 "{}_{}.json",
979 format!("{:?}", item.data_type()).to_lowercase(),
980 item.timestamp().format("%Y%m%d_%H%M%S")
981 );
982
983 self.archive_manager
984 .archive_data(&serialized_data, &filename)
985 .await
986 }
987
988 pub async fn analyze_historical_data<T: RetentionData>(&self, data: &[T]) -> TrendAnalysis {
989 self.historical_analyzer.analyze_trends(data).await
990 }
991
992 pub async fn get_retention_stats(&self) -> Vec<RetentionPolicyStats> {
993 let policies = self.policies.read().await;
994 policies.values().map(|p| p.stats()).collect()
995 }
996
997 pub async fn add_policy(&self, policy_config: RetentionPolicyConfig) -> RragResult<()> {
998 let mut policies = self.policies.write().await;
999 let policy = RetentionPolicy::new(policy_config.clone());
1000 policies.insert(policy_config.name.clone(), policy);
1001 Ok(())
1002 }
1003
1004 pub async fn remove_policy(&self, policy_name: &str) -> RragResult<()> {
1005 let mut policies = self.policies.write().await;
1006 policies.remove(policy_name);
1007 Ok(())
1008 }
1009
1010 pub async fn update_policy(&self, policy_config: RetentionPolicyConfig) -> RragResult<()> {
1011 let mut policies = self.policies.write().await;
1012 let policy = RetentionPolicy::new(policy_config.clone());
1013 policies.insert(policy_config.name.clone(), policy);
1014 Ok(())
1015 }
1016
1017 pub async fn get_archive_info(&self) -> RragResult<Vec<ArchiveInfo>> {
1018 self.archive_manager.list_archives().await
1019 }
1020
1021 pub async fn restore_from_archive(&self, filename: &str) -> RragResult<Vec<u8>> {
1022 self.archive_manager.restore_data(filename).await
1023 }
1024
1025 pub async fn delete_archive(&self, filename: &str) -> RragResult<()> {
1026 self.archive_manager.delete_archive(filename).await
1027 }
1028}
1029
1030#[derive(Debug, Clone, Serialize, Deserialize)]
1031pub struct RetentionResult {
1032 pub processed_count: usize,
1033 pub kept_count: usize,
1034 pub archived_count: usize,
1035 pub deleted_count: usize,
1036 pub errors: Vec<String>,
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use super::*;
1042 use crate::observability::{logging::LogLevel, metrics::MetricType};
1043
1044 #[test]
1045 fn test_retention_policy_creation() {
1046 let config = RetentionPolicyConfig::default_logs();
1047 let policy = RetentionPolicy::new(config);
1048
1049 assert_eq!(policy.config.data_type, DataType::Logs);
1050 assert_eq!(policy.config.retention_days, 30);
1051 assert!(policy.config.archive_after_days.is_some());
1052 assert_eq!(policy.config.archive_after_days.unwrap(), 7);
1053 }
1054
1055 #[test]
1056 fn test_retention_action() {
1057 let config = RetentionPolicyConfig {
1058 name: "test_policy".to_string(),
1059 data_type: DataType::Metrics,
1060 retention_days: 30,
1061 archive_after_days: Some(7),
1062 compression_enabled: true,
1063 priority: RetentionPriority::Medium,
1064 conditions: vec![],
1065 };
1066
1067 let policy = RetentionPolicy::new(config);
1068
1069 let recent_time = Utc::now() - Duration::days(5);
1071 assert_eq!(policy.should_retain(recent_time), RetentionAction::Keep);
1072
1073 let archive_time = Utc::now() - Duration::days(10);
1075 assert_eq!(policy.should_retain(archive_time), RetentionAction::Archive);
1076
1077 let delete_time = Utc::now() - Duration::days(40);
1079 assert_eq!(policy.should_retain(delete_time), RetentionAction::Delete);
1080 }
1081
1082 #[tokio::test]
1083 async fn test_archive_manager() {
1084 let temp_dir = tempfile::tempdir().unwrap();
1085 let archive_manager =
1086 ArchiveManager::new(temp_dir.path().to_string_lossy().to_string(), true);
1087
1088 let test_data = b"test archive data";
1089 let filename = "test_archive.json";
1090
1091 let result = archive_manager
1092 .archive_data(test_data, filename)
1093 .await
1094 .unwrap();
1095 assert_eq!(result.original_size, test_data.len() as u64);
1096 assert!(!result.file_path.is_empty());
1097
1098 let restored_data = archive_manager.restore_data(filename).await.unwrap();
1099 assert_eq!(restored_data, test_data);
1100
1101 let archives = archive_manager.list_archives().await.unwrap();
1102 assert_eq!(archives.len(), 1);
1103 assert_eq!(archives[0].filename, filename);
1104
1105 archive_manager.delete_archive(filename).await.unwrap();
1106 let archives_after_delete = archive_manager.list_archives().await.unwrap();
1107 assert_eq!(archives_after_delete.len(), 0);
1108 }
1109
1110 #[tokio::test]
1111 async fn test_historical_analyzer() {
1112 let analyzer = HistoricalAnalyzer::new(7); let mut test_metrics = Vec::new();
1116 for i in 0..10 {
1117 let timestamp = Utc::now() - Duration::days(i);
1118 let metric = Metric::counter("test_counter", (i * 10) as u64)
1119 .with_label("component", "test_component");
1120 let mut metric = metric;
1121 metric.timestamp = timestamp;
1122 test_metrics.push(metric);
1123 }
1124
1125 let analysis = analyzer.analyze_trends(&test_metrics).await;
1126 assert_eq!(analysis.total_data_points, 8); assert!(!analysis.volume_trend.daily_counts.is_empty());
1128 assert!(!analysis.component_trends.is_empty());
1129 assert!(analysis.component_trends.contains_key("test_component"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_data_retention_system() {
1134 let config = RetentionConfig::default();
1135 let mut retention = DataRetention::new(config).await.unwrap();
1136
1137 assert!(!retention.is_healthy().await);
1138
1139 retention.start().await.unwrap();
1140 assert!(retention.is_healthy().await);
1141
1142 let custom_policy = RetentionPolicyConfig {
1144 name: "custom_test_policy".to_string(),
1145 data_type: DataType::Metrics,
1146 retention_days: 60,
1147 archive_after_days: Some(14),
1148 compression_enabled: true,
1149 priority: RetentionPriority::High,
1150 conditions: vec![],
1151 };
1152
1153 retention.add_policy(custom_policy).await.unwrap();
1154
1155 let stats = retention.get_retention_stats().await;
1156 assert!(stats.iter().any(|s| s.name == "custom_test_policy"));
1157
1158 retention.stop().await.unwrap();
1159 assert!(!retention.is_healthy().await);
1160 }
1161
1162 #[tokio::test]
1163 async fn test_retention_data_trait() {
1164 let metric = Metric::counter("test_metric", 100).with_label("component", "test_component");
1166
1167 assert_eq!(metric.data_type(), DataType::Metrics);
1168 assert!(metric.data_size() > 0);
1169 assert_eq!(metric.component_name(), Some("test_component".to_string()));
1170
1171 let log_entry =
1173 super::super::logging::LogEntry::new(LogLevel::Error, "Test error", "test_component");
1174 assert_eq!(log_entry.data_type(), DataType::Logs);
1175 assert_eq!(log_entry.severity_level(), Some("ERROR".to_string()));
1176 assert_eq!(
1177 log_entry.component_name(),
1178 Some("test_component".to_string())
1179 );
1180 }
1181
1182 #[test]
1183 fn test_retention_conditions() {
1184 let config = RetentionPolicyConfig {
1185 name: "conditional_policy".to_string(),
1186 data_type: DataType::Logs,
1187 retention_days: 30,
1188 archive_after_days: None,
1189 compression_enabled: false,
1190 priority: RetentionPriority::Medium,
1191 conditions: vec![
1192 RetentionCondition::SeverityLevel("ERROR".to_string(), 90),
1193 RetentionCondition::ComponentName("critical_component".to_string(), 180),
1194 ],
1195 };
1196
1197 let policy = RetentionPolicy::new(config);
1198
1199 let error_log = super::super::logging::LogEntry::new(
1200 LogLevel::Error,
1201 "Error message",
1202 "normal_component",
1203 );
1204 assert_eq!(policy.apply_conditions(&error_log), Some(90));
1205
1206 let critical_log = super::super::logging::LogEntry::new(
1207 LogLevel::Info,
1208 "Info message",
1209 "critical_component",
1210 );
1211 assert_eq!(policy.apply_conditions(&critical_log), Some(180));
1212
1213 let normal_log = super::super::logging::LogEntry::new(
1214 LogLevel::Info,
1215 "Info message",
1216 "normal_component",
1217 );
1218 assert_eq!(policy.apply_conditions(&normal_log), None);
1219 }
1220}