rexis_rag/observability/
retention.rs

1//! # Data Retention and Historical Analysis
2//!
3//! Automated data lifecycle management with configurable retention policies,
4//! historical analysis capabilities, and efficient archiving for RRAG observability data.
5
6use super::{health::ServiceHealth, logging::LogEntry, metrics::Metric, profiling::ProfileData};
7use crate::{RragError, RragResult};
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14/// Data retention configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RetentionConfig {
17    pub enabled: bool,
18    pub retention_days: u32,
19    pub archive_enabled: bool,
20    pub archive_compression: bool,
21    pub cleanup_interval_hours: u32,
22    pub policies: Vec<RetentionPolicyConfig>,
23    pub historical_analysis_enabled: bool,
24    pub trend_analysis_days: u32,
25}
26
27impl Default for RetentionConfig {
28    fn default() -> Self {
29        Self {
30            enabled: true,
31            retention_days: 90,
32            archive_enabled: true,
33            archive_compression: true,
34            cleanup_interval_hours: 24,
35            policies: vec![
36                RetentionPolicyConfig::default_metrics(),
37                RetentionPolicyConfig::default_logs(),
38                RetentionPolicyConfig::default_health(),
39                RetentionPolicyConfig::default_profiles(),
40            ],
41            historical_analysis_enabled: true,
42            trend_analysis_days: 30,
43        }
44    }
45}
46
47/// Individual retention policy configuration
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RetentionPolicyConfig {
50    pub name: String,
51    pub data_type: DataType,
52    pub retention_days: u32,
53    pub archive_after_days: Option<u32>,
54    pub compression_enabled: bool,
55    pub priority: RetentionPriority,
56    pub conditions: Vec<RetentionCondition>,
57}
58
59impl RetentionPolicyConfig {
60    pub fn default_metrics() -> Self {
61        Self {
62            name: "metrics_policy".to_string(),
63            data_type: DataType::Metrics,
64            retention_days: 90,
65            archive_after_days: Some(30),
66            compression_enabled: true,
67            priority: RetentionPriority::Medium,
68            conditions: vec![],
69        }
70    }
71
72    pub fn default_logs() -> Self {
73        Self {
74            name: "logs_policy".to_string(),
75            data_type: DataType::Logs,
76            retention_days: 30,
77            archive_after_days: Some(7),
78            compression_enabled: true,
79            priority: RetentionPriority::High,
80            conditions: vec![
81                RetentionCondition::SeverityLevel("ERROR".to_string(), 60),
82                RetentionCondition::SeverityLevel("WARN".to_string(), 30),
83            ],
84        }
85    }
86
87    pub fn default_health() -> Self {
88        Self {
89            name: "health_policy".to_string(),
90            data_type: DataType::HealthChecks,
91            retention_days: 180,
92            archive_after_days: Some(60),
93            compression_enabled: true,
94            priority: RetentionPriority::High,
95            conditions: vec![],
96        }
97    }
98
99    pub fn default_profiles() -> Self {
100        Self {
101            name: "profiles_policy".to_string(),
102            data_type: DataType::Profiles,
103            retention_days: 60,
104            archive_after_days: Some(14),
105            compression_enabled: true,
106            priority: RetentionPriority::Medium,
107            conditions: vec![],
108        }
109    }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113pub enum DataType {
114    Metrics,
115    Logs,
116    HealthChecks,
117    Profiles,
118    Alerts,
119    UserActivity,
120    SystemEvents,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
124pub enum RetentionPriority {
125    Low,
126    Medium,
127    High,
128    Critical,
129}
130
131/// Retention condition for selective data retention
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub enum RetentionCondition {
134    SeverityLevel(String, u32),       // level, retention_days
135    ComponentName(String, u32),       // component, retention_days
136    UserDefined(String, String, u32), // field, value, retention_days
137    DataSize(u64, u32),               // max_size_bytes, retention_days
138}
139
140/// Retention policy for specific data types
141pub struct RetentionPolicy {
142    config: RetentionPolicyConfig,
143    last_cleanup: Option<DateTime<Utc>>,
144    items_processed: u64,
145    items_archived: u64,
146    items_deleted: u64,
147}
148
149impl RetentionPolicy {
150    pub fn new(config: RetentionPolicyConfig) -> Self {
151        Self {
152            config,
153            last_cleanup: None,
154            items_processed: 0,
155            items_archived: 0,
156            items_deleted: 0,
157        }
158    }
159
160    pub fn should_retain(&self, timestamp: DateTime<Utc>) -> RetentionAction {
161        let age = Utc::now() - timestamp;
162        let age_days = age.num_days() as u32;
163
164        // Check if data should be deleted
165        if age_days > self.config.retention_days {
166            return RetentionAction::Delete;
167        }
168
169        // Check if data should be archived
170        if let Some(archive_days) = self.config.archive_after_days {
171            if age_days > archive_days {
172                return RetentionAction::Archive;
173            }
174        }
175
176        RetentionAction::Keep
177    }
178
179    pub fn apply_conditions(&self, data: &dyn RetentionData) -> Option<u32> {
180        for condition in &self.config.conditions {
181            match condition {
182                RetentionCondition::SeverityLevel(level, days) => {
183                    if let Some(severity) = data.severity_level() {
184                        if severity == *level {
185                            return Some(*days);
186                        }
187                    }
188                }
189                RetentionCondition::ComponentName(component, days) => {
190                    if let Some(comp) = data.component_name() {
191                        if comp == *component {
192                            return Some(*days);
193                        }
194                    }
195                }
196                RetentionCondition::UserDefined(field, value, days) => {
197                    if let Some(field_value) = data.custom_field(field) {
198                        if field_value == *value {
199                            return Some(*days);
200                        }
201                    }
202                }
203                RetentionCondition::DataSize(max_size, days) => {
204                    if data.data_size() > *max_size {
205                        return Some(*days);
206                    }
207                }
208            }
209        }
210        None
211    }
212
213    pub fn stats(&self) -> RetentionPolicyStats {
214        RetentionPolicyStats {
215            name: self.config.name.clone(),
216            data_type: self.config.data_type.clone(),
217            retention_days: self.config.retention_days,
218            last_cleanup: self.last_cleanup,
219            items_processed: self.items_processed,
220            items_archived: self.items_archived,
221            items_deleted: self.items_deleted,
222        }
223    }
224}
225
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct RetentionPolicyStats {
228    pub name: String,
229    pub data_type: DataType,
230    pub retention_days: u32,
231    pub last_cleanup: Option<DateTime<Utc>>,
232    pub items_processed: u64,
233    pub items_archived: u64,
234    pub items_deleted: u64,
235}
236
237#[derive(Debug, Clone, PartialEq)]
238pub enum RetentionAction {
239    Keep,
240    Archive,
241    Delete,
242}
243
244/// Trait for data that can be subject to retention policies
245pub trait RetentionData {
246    fn timestamp(&self) -> DateTime<Utc>;
247    fn data_size(&self) -> u64;
248    fn severity_level(&self) -> Option<String> {
249        None
250    }
251    fn component_name(&self) -> Option<String> {
252        None
253    }
254    fn custom_field(&self, _field: &str) -> Option<String> {
255        None
256    }
257    fn data_type(&self) -> DataType;
258}
259
260impl RetentionData for Metric {
261    fn timestamp(&self) -> DateTime<Utc> {
262        self.timestamp
263    }
264
265    fn data_size(&self) -> u64 {
266        // Rough estimate of metric size
267        std::mem::size_of::<Metric>() as u64
268            + self.name.len() as u64
269            + self
270                .labels
271                .iter()
272                .map(|(k, v)| k.len() + v.len())
273                .sum::<usize>() as u64
274    }
275
276    fn component_name(&self) -> Option<String> {
277        self.labels.get("component").cloned()
278    }
279
280    fn custom_field(&self, field: &str) -> Option<String> {
281        self.labels.get(field).cloned()
282    }
283
284    fn data_type(&self) -> DataType {
285        DataType::Metrics
286    }
287}
288
289impl RetentionData for LogEntry {
290    fn timestamp(&self) -> DateTime<Utc> {
291        self.timestamp
292    }
293
294    fn data_size(&self) -> u64 {
295        std::mem::size_of::<LogEntry>() as u64
296            + self.message.len() as u64
297            + self.component.len() as u64
298            + self
299                .fields
300                .iter()
301                .map(|(k, v)| k.len() + v.to_string().len())
302                .sum::<usize>() as u64
303    }
304
305    fn severity_level(&self) -> Option<String> {
306        Some(self.level.to_string())
307    }
308
309    fn component_name(&self) -> Option<String> {
310        Some(self.component.clone())
311    }
312
313    fn custom_field(&self, field: &str) -> Option<String> {
314        self.fields
315            .get(field)
316            .and_then(|v| v.as_str().map(|s| s.to_string()))
317    }
318
319    fn data_type(&self) -> DataType {
320        DataType::Logs
321    }
322}
323
324impl RetentionData for ServiceHealth {
325    fn timestamp(&self) -> DateTime<Utc> {
326        self.last_check
327    }
328
329    fn data_size(&self) -> u64 {
330        std::mem::size_of::<ServiceHealth>() as u64
331            + self.component_name.len() as u64
332            + self.error_message.as_ref().map(|s| s.len()).unwrap_or(0) as u64
333    }
334
335    fn component_name(&self) -> Option<String> {
336        Some(self.component_name.clone())
337    }
338
339    fn severity_level(&self) -> Option<String> {
340        Some(self.status.to_string())
341    }
342
343    fn data_type(&self) -> DataType {
344        DataType::HealthChecks
345    }
346}
347
348impl RetentionData for ProfileData {
349    fn timestamp(&self) -> DateTime<Utc> {
350        self.timestamp
351    }
352
353    fn data_size(&self) -> u64 {
354        std::mem::size_of::<ProfileData>() as u64
355            + self.operation.len() as u64
356            + self.component.len() as u64
357            + self
358                .custom_metrics
359                .iter()
360                .map(|(k, _)| k.len())
361                .sum::<usize>() as u64
362    }
363
364    fn component_name(&self) -> Option<String> {
365        Some(self.component.clone())
366    }
367
368    fn custom_field(&self, field: &str) -> Option<String> {
369        self.tags.get(field).cloned()
370    }
371
372    fn data_type(&self) -> DataType {
373        DataType::Profiles
374    }
375}
376
377/// Archive manager for compressed storage
378pub struct ArchiveManager {
379    archive_path: String,
380    compression_enabled: bool,
381    max_archive_size_mb: u64,
382}
383
384impl ArchiveManager {
385    pub fn new(archive_path: impl Into<String>, compression_enabled: bool) -> Self {
386        Self {
387            archive_path: archive_path.into(),
388            compression_enabled,
389            max_archive_size_mb: 1024, // 1 GB default
390        }
391    }
392
393    pub async fn archive_data(&self, data: &[u8], filename: &str) -> RragResult<ArchiveResult> {
394        let full_path = format!("{}/{}", self.archive_path, filename);
395
396        // Create directory if it doesn't exist
397        if let Some(parent) = std::path::Path::new(&full_path).parent() {
398            tokio::fs::create_dir_all(parent)
399                .await
400                .map_err(|e| RragError::storage("create_archive_directory", e))?;
401        }
402
403        let final_data = if self.compression_enabled {
404            // In a real implementation, this would use a compression library like flate2
405            // For now, we'll simulate compression
406            let compressed_data = data.to_vec(); // Mock: no actual compression
407            compressed_data
408        } else {
409            data.to_vec()
410        };
411
412        // Write archived data
413        tokio::fs::write(&full_path, &final_data)
414            .await
415            .map_err(|e| RragError::storage("write_archive", e))?;
416
417        Ok(ArchiveResult {
418            archived_at: Utc::now(),
419            file_path: full_path,
420            original_size: data.len() as u64,
421            archived_size: final_data.len() as u64,
422            compression_ratio: if data.len() > 0 {
423                final_data.len() as f64 / data.len() as f64
424            } else {
425                1.0
426            },
427        })
428    }
429
430    pub async fn restore_data(&self, filename: &str) -> RragResult<Vec<u8>> {
431        let full_path = format!("{}/{}", self.archive_path, filename);
432
433        let archived_data = tokio::fs::read(&full_path)
434            .await
435            .map_err(|e| RragError::storage("read_archive", e))?;
436
437        // If compression was enabled, decompress here
438        // For now, return as-is since we're not actually compressing
439        Ok(archived_data)
440    }
441
442    pub async fn delete_archive(&self, filename: &str) -> RragResult<()> {
443        let full_path = format!("{}/{}", self.archive_path, filename);
444        tokio::fs::remove_file(&full_path)
445            .await
446            .map_err(|e| RragError::storage("delete_archive", e))
447    }
448
449    pub async fn list_archives(&self) -> RragResult<Vec<ArchiveInfo>> {
450        let mut archives = Vec::new();
451        let mut dir = tokio::fs::read_dir(&self.archive_path)
452            .await
453            .map_err(|e| RragError::storage("read_archive_directory", e))?;
454
455        while let Some(entry) = dir
456            .next_entry()
457            .await
458            .map_err(|e| RragError::storage("read_directory_entry", e))?
459        {
460            if let Ok(metadata) = entry.metadata().await {
461                if metadata.is_file() {
462                    archives.push(ArchiveInfo {
463                        filename: entry.file_name().to_string_lossy().to_string(),
464                        size_bytes: metadata.len(),
465                        created_at: metadata
466                            .created()
467                            .ok()
468                            .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
469                            .map(|d| Utc::now() - Duration::seconds(d.as_secs() as i64))
470                            .unwrap_or(Utc::now()),
471                    });
472                }
473            }
474        }
475
476        Ok(archives)
477    }
478}
479
480#[derive(Debug, Clone, Serialize, Deserialize)]
481pub struct ArchiveResult {
482    pub archived_at: DateTime<Utc>,
483    pub file_path: String,
484    pub original_size: u64,
485    pub archived_size: u64,
486    pub compression_ratio: f64,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
490pub struct ArchiveInfo {
491    pub filename: String,
492    pub size_bytes: u64,
493    pub created_at: DateTime<Utc>,
494}
495
496/// Historical analyzer for trend analysis
497pub struct HistoricalAnalyzer {
498    analysis_window_days: u32,
499}
500
501impl HistoricalAnalyzer {
502    pub fn new(analysis_window_days: u32) -> Self {
503        Self {
504            analysis_window_days,
505        }
506    }
507
508    pub async fn analyze_trends<T: RetentionData>(&self, data: &[T]) -> TrendAnalysis {
509        let cutoff_time = Utc::now() - Duration::days(self.analysis_window_days as i64);
510        let recent_data: Vec<_> = data
511            .iter()
512            .filter(|item| item.timestamp() >= cutoff_time)
513            .collect();
514
515        if recent_data.is_empty() {
516            return TrendAnalysis::empty();
517        }
518
519        // Analyze data volume trends
520        let volume_trend = self.analyze_volume_trend(&recent_data).await;
521
522        // Analyze component trends
523        let component_trends = self.analyze_component_trends(&recent_data).await;
524
525        // Analyze severity trends (for logs/alerts)
526        let severity_trends = self.analyze_severity_trends(&recent_data).await;
527
528        TrendAnalysis {
529            analysis_period_days: self.analysis_window_days,
530            generated_at: Utc::now(),
531            total_data_points: recent_data.len(),
532            volume_trend: volume_trend.clone(),
533            component_trends: component_trends.clone(),
534            severity_trends,
535            recommendations: self
536                .generate_recommendations(&volume_trend, &component_trends)
537                .await,
538        }
539    }
540
541    async fn analyze_volume_trend<T: RetentionData>(&self, data: &[&T]) -> VolumeTrend {
542        let days = self.analysis_window_days.min(30) as usize; // Max 30 days for daily buckets
543        let mut daily_counts = vec![0; days];
544        let mut daily_sizes = vec![0u64; days];
545
546        for item in data {
547            let days_ago = (Utc::now() - item.timestamp()).num_days().max(0) as usize;
548            if days_ago < days {
549                let index = days - 1 - days_ago; // Reverse order (recent first)
550                daily_counts[index] += 1;
551                daily_sizes[index] += item.data_size();
552            }
553        }
554
555        let avg_daily_count = if days > 0 {
556            daily_counts.iter().sum::<usize>() as f64 / days as f64
557        } else {
558            0.0
559        };
560
561        let avg_daily_size = if days > 0 {
562            daily_sizes.iter().sum::<u64>() as f64 / days as f64
563        } else {
564            0.0
565        };
566
567        // Calculate trend direction
568        let recent_half = &daily_counts[days / 2..];
569        let older_half = &daily_counts[..days / 2];
570
571        let recent_avg = if recent_half.len() > 0 {
572            recent_half.iter().sum::<usize>() as f64 / recent_half.len() as f64
573        } else {
574            0.0
575        };
576
577        let older_avg = if older_half.len() > 0 {
578            older_half.iter().sum::<usize>() as f64 / older_half.len() as f64
579        } else {
580            0.0
581        };
582
583        let trend_direction = if recent_avg > older_avg * 1.1 {
584            TrendDirection::Increasing
585        } else if recent_avg < older_avg * 0.9 {
586            TrendDirection::Decreasing
587        } else {
588            TrendDirection::Stable
589        };
590
591        VolumeTrend {
592            daily_counts,
593            daily_sizes,
594            average_daily_count: avg_daily_count,
595            average_daily_size_bytes: avg_daily_size,
596            trend_direction,
597            growth_rate_percent: if older_avg > 0.0 {
598                ((recent_avg - older_avg) / older_avg) * 100.0
599            } else {
600                0.0
601            },
602        }
603    }
604
605    async fn analyze_component_trends<T: RetentionData>(
606        &self,
607        data: &[&T],
608    ) -> HashMap<String, ComponentTrend> {
609        let mut component_data: HashMap<String, Vec<&T>> = HashMap::new();
610
611        for item in data {
612            if let Some(component) = item.component_name() {
613                component_data.entry(component).or_default().push(*item);
614            }
615        }
616
617        let mut trends = HashMap::new();
618        for (component, items) in component_data {
619            let volume_trend = self.analyze_volume_trend(&items).await;
620            trends.insert(
621                component.clone(),
622                ComponentTrend {
623                    component_name: component,
624                    data_count: items.len(),
625                    volume_trend,
626                },
627            );
628        }
629
630        trends
631    }
632
633    async fn analyze_severity_trends<T: RetentionData>(
634        &self,
635        data: &[&T],
636    ) -> HashMap<String, SeverityTrend> {
637        let mut severity_data: HashMap<String, Vec<&T>> = HashMap::new();
638
639        for item in data {
640            if let Some(severity) = item.severity_level() {
641                severity_data.entry(severity).or_default().push(*item);
642            }
643        }
644
645        let mut trends = HashMap::new();
646        for (severity, items) in severity_data {
647            let volume_trend = self.analyze_volume_trend(&items).await;
648            trends.insert(
649                severity.clone(),
650                SeverityTrend {
651                    severity_level: severity,
652                    occurrence_count: items.len(),
653                    volume_trend,
654                },
655            );
656        }
657
658        trends
659    }
660
661    async fn generate_recommendations(
662        &self,
663        volume_trend: &VolumeTrend,
664        component_trends: &HashMap<String, ComponentTrend>,
665    ) -> Vec<RetentionRecommendation> {
666        let mut recommendations = Vec::new();
667
668        // Volume-based recommendations
669        if volume_trend.growth_rate_percent > 50.0 {
670            recommendations.push(RetentionRecommendation {
671                category: RecommendationCategory::Storage,
672                priority: RecommendationPriority::High,
673                message: "Data volume is growing rapidly. Consider reducing retention periods or implementing more aggressive archiving.".to_string(),
674                estimated_savings_percent: 30.0,
675            });
676        }
677
678        if volume_trend.average_daily_size_bytes > 1_000_000_000.0 {
679            // > 1GB daily
680            recommendations.push(RetentionRecommendation {
681                category: RecommendationCategory::Compression,
682                priority: RecommendationPriority::Medium,
683                message:
684                    "Large daily data volume detected. Enable compression to reduce storage costs."
685                        .to_string(),
686                estimated_savings_percent: 60.0,
687            });
688        }
689
690        // Component-based recommendations
691        for (component, trend) in component_trends {
692            if trend.volume_trend.growth_rate_percent > 100.0 {
693                recommendations.push(RetentionRecommendation {
694                    category: RecommendationCategory::ComponentSpecific,
695                    priority: RecommendationPriority::High,
696                    message: format!("Component '{}' is producing data at an increasing rate. Review logging levels or implement component-specific retention policies.", component),
697                    estimated_savings_percent: 25.0,
698                });
699            }
700        }
701
702        recommendations
703    }
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize)]
707pub struct TrendAnalysis {
708    pub analysis_period_days: u32,
709    pub generated_at: DateTime<Utc>,
710    pub total_data_points: usize,
711    pub volume_trend: VolumeTrend,
712    pub component_trends: HashMap<String, ComponentTrend>,
713    pub severity_trends: HashMap<String, SeverityTrend>,
714    pub recommendations: Vec<RetentionRecommendation>,
715}
716
717impl TrendAnalysis {
718    fn empty() -> Self {
719        Self {
720            analysis_period_days: 0,
721            generated_at: Utc::now(),
722            total_data_points: 0,
723            volume_trend: VolumeTrend {
724                daily_counts: Vec::new(),
725                daily_sizes: Vec::new(),
726                average_daily_count: 0.0,
727                average_daily_size_bytes: 0.0,
728                trend_direction: TrendDirection::Stable,
729                growth_rate_percent: 0.0,
730            },
731            component_trends: HashMap::new(),
732            severity_trends: HashMap::new(),
733            recommendations: Vec::new(),
734        }
735    }
736}
737
738#[derive(Debug, Clone, Serialize, Deserialize)]
739pub struct VolumeTrend {
740    pub daily_counts: Vec<usize>,
741    pub daily_sizes: Vec<u64>,
742    pub average_daily_count: f64,
743    pub average_daily_size_bytes: f64,
744    pub trend_direction: TrendDirection,
745    pub growth_rate_percent: f64,
746}
747
748#[derive(Debug, Clone, Serialize, Deserialize)]
749pub struct ComponentTrend {
750    pub component_name: String,
751    pub data_count: usize,
752    pub volume_trend: VolumeTrend,
753}
754
755#[derive(Debug, Clone, Serialize, Deserialize)]
756pub struct SeverityTrend {
757    pub severity_level: String,
758    pub occurrence_count: usize,
759    pub volume_trend: VolumeTrend,
760}
761
762#[derive(Debug, Clone, Serialize, Deserialize)]
763pub enum TrendDirection {
764    Increasing,
765    Decreasing,
766    Stable,
767}
768
769#[derive(Debug, Clone, Serialize, Deserialize)]
770pub struct RetentionRecommendation {
771    pub category: RecommendationCategory,
772    pub priority: RecommendationPriority,
773    pub message: String,
774    pub estimated_savings_percent: f64,
775}
776
777#[derive(Debug, Clone, Serialize, Deserialize)]
778pub enum RecommendationCategory {
779    Storage,
780    Compression,
781    RetentionPeriod,
782    ComponentSpecific,
783    ArchivingStrategy,
784}
785
786#[derive(Debug, Clone, Serialize, Deserialize)]
787pub enum RecommendationPriority {
788    Low,
789    Medium,
790    High,
791    Critical,
792}
793
794/// Main data retention system
795pub struct DataRetention {
796    config: RetentionConfig,
797    policies: Arc<RwLock<HashMap<String, RetentionPolicy>>>,
798    archive_manager: Arc<ArchiveManager>,
799    historical_analyzer: Arc<HistoricalAnalyzer>,
800    cleanup_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
801    is_running: Arc<RwLock<bool>>,
802}
803
804impl DataRetention {
805    pub async fn new(config: RetentionConfig) -> RragResult<Self> {
806        let policies = Arc::new(RwLock::new(HashMap::new()));
807
808        // Initialize policies from config
809        {
810            let mut policy_map = policies.write().await;
811            for policy_config in &config.policies {
812                let policy = RetentionPolicy::new(policy_config.clone());
813                policy_map.insert(policy_config.name.clone(), policy);
814            }
815        }
816
817        let archive_manager = Arc::new(ArchiveManager::new(
818            "./archives",
819            config.archive_compression,
820        ));
821
822        let historical_analyzer = Arc::new(HistoricalAnalyzer::new(config.trend_analysis_days));
823
824        Ok(Self {
825            config,
826            policies,
827            archive_manager,
828            historical_analyzer,
829            cleanup_handle: Arc::new(RwLock::new(None)),
830            is_running: Arc::new(RwLock::new(false)),
831        })
832    }
833
834    pub async fn start(&self) -> RragResult<()> {
835        let mut running = self.is_running.write().await;
836        if *running {
837            return Err(RragError::config(
838                "data_retention",
839                "stopped",
840                "already running",
841            ));
842        }
843
844        if self.config.enabled {
845            let handle = self.start_cleanup_loop().await?;
846            let mut handle_guard = self.cleanup_handle.write().await;
847            *handle_guard = Some(handle);
848        }
849
850        *running = true;
851        tracing::info!("Data retention system started");
852        Ok(())
853    }
854
855    pub async fn stop(&self) -> RragResult<()> {
856        let mut running = self.is_running.write().await;
857        if !*running {
858            return Ok(());
859        }
860
861        {
862            let mut handle_guard = self.cleanup_handle.write().await;
863            if let Some(handle) = handle_guard.take() {
864                handle.abort();
865            }
866        }
867
868        *running = false;
869        tracing::info!("Data retention system stopped");
870        Ok(())
871    }
872
873    pub async fn is_healthy(&self) -> bool {
874        *self.is_running.read().await
875    }
876
877    async fn start_cleanup_loop(&self) -> RragResult<tokio::task::JoinHandle<()>> {
878        let config = self.config.clone();
879        let policies = self.policies.clone();
880        let is_running = self.is_running.clone();
881
882        let handle = tokio::spawn(async move {
883            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
884                config.cleanup_interval_hours as u64 * 3600,
885            ));
886
887            while *is_running.read().await {
888                interval.tick().await;
889
890                tracing::info!("Running data retention cleanup");
891
892                // In a real implementation, this would:
893                // 1. Query the data stores for items to process
894                // 2. Apply retention policies
895                // 3. Archive or delete data as needed
896                // 4. Update policy statistics
897
898                let mut policy_map = policies.write().await;
899                for (name, policy) in policy_map.iter_mut() {
900                    tracing::debug!("Processing retention policy: {}", name);
901                    // Mock processing
902                    policy.items_processed += 10;
903                    policy.last_cleanup = Some(Utc::now());
904                }
905            }
906        });
907
908        Ok(handle)
909    }
910
911    pub async fn apply_retention_policy<T: RetentionData + Clone>(
912        &self,
913        data: Vec<T>,
914        data_type: DataType,
915    ) -> RragResult<RetentionResult> {
916        let policies = self.policies.read().await;
917
918        // Find applicable policy for this data type
919        let policy = policies
920            .values()
921            .find(|p| p.config.data_type == data_type)
922            .ok_or_else(|| {
923                RragError::config("retention_policy", "exists", &format!("{:?}", data_type))
924            })?;
925
926        let mut result = RetentionResult {
927            processed_count: 0,
928            kept_count: 0,
929            archived_count: 0,
930            deleted_count: 0,
931            errors: Vec::new(),
932        };
933
934        for item in data {
935            result.processed_count += 1;
936
937            // Check custom conditions first
938            let retention_days = policy
939                .apply_conditions(&item)
940                .unwrap_or(policy.config.retention_days);
941
942            let age = Utc::now() - item.timestamp();
943            let age_days = age.num_days() as u32;
944
945            if age_days > retention_days {
946                // Delete item
947                result.deleted_count += 1;
948            } else if let Some(archive_days) = policy.config.archive_after_days {
949                if age_days > archive_days {
950                    // Archive item
951                    match self.archive_item(&item).await {
952                        Ok(_) => result.archived_count += 1,
953                        Err(e) => result.errors.push(e.to_string()),
954                    }
955                } else {
956                    result.kept_count += 1;
957                }
958            } else {
959                result.kept_count += 1;
960            }
961        }
962
963        Ok(result)
964    }
965
966    async fn archive_item<T: RetentionData>(&self, item: &T) -> RragResult<ArchiveResult> {
967        // Serialize the item (this is a simplified version)
968        let serialized_data = serde_json::to_vec(&serde_json::json!({
969            "timestamp": item.timestamp(),
970            "data_type": item.data_type(),
971            "size": item.data_size(),
972            "component": item.component_name(),
973            "severity": item.severity_level(),
974        }))
975        .map_err(|e| RragError::agent("serialization", e.to_string()))?;
976
977        let filename = format!(
978            "{}_{}.json",
979            format!("{:?}", item.data_type()).to_lowercase(),
980            item.timestamp().format("%Y%m%d_%H%M%S")
981        );
982
983        self.archive_manager
984            .archive_data(&serialized_data, &filename)
985            .await
986    }
987
988    pub async fn analyze_historical_data<T: RetentionData>(&self, data: &[T]) -> TrendAnalysis {
989        self.historical_analyzer.analyze_trends(data).await
990    }
991
992    pub async fn get_retention_stats(&self) -> Vec<RetentionPolicyStats> {
993        let policies = self.policies.read().await;
994        policies.values().map(|p| p.stats()).collect()
995    }
996
997    pub async fn add_policy(&self, policy_config: RetentionPolicyConfig) -> RragResult<()> {
998        let mut policies = self.policies.write().await;
999        let policy = RetentionPolicy::new(policy_config.clone());
1000        policies.insert(policy_config.name.clone(), policy);
1001        Ok(())
1002    }
1003
1004    pub async fn remove_policy(&self, policy_name: &str) -> RragResult<()> {
1005        let mut policies = self.policies.write().await;
1006        policies.remove(policy_name);
1007        Ok(())
1008    }
1009
1010    pub async fn update_policy(&self, policy_config: RetentionPolicyConfig) -> RragResult<()> {
1011        let mut policies = self.policies.write().await;
1012        let policy = RetentionPolicy::new(policy_config.clone());
1013        policies.insert(policy_config.name.clone(), policy);
1014        Ok(())
1015    }
1016
1017    pub async fn get_archive_info(&self) -> RragResult<Vec<ArchiveInfo>> {
1018        self.archive_manager.list_archives().await
1019    }
1020
1021    pub async fn restore_from_archive(&self, filename: &str) -> RragResult<Vec<u8>> {
1022        self.archive_manager.restore_data(filename).await
1023    }
1024
1025    pub async fn delete_archive(&self, filename: &str) -> RragResult<()> {
1026        self.archive_manager.delete_archive(filename).await
1027    }
1028}
1029
1030#[derive(Debug, Clone, Serialize, Deserialize)]
1031pub struct RetentionResult {
1032    pub processed_count: usize,
1033    pub kept_count: usize,
1034    pub archived_count: usize,
1035    pub deleted_count: usize,
1036    pub errors: Vec<String>,
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use super::*;
1042    use crate::observability::{logging::LogLevel, metrics::MetricType};
1043
1044    #[test]
1045    fn test_retention_policy_creation() {
1046        let config = RetentionPolicyConfig::default_logs();
1047        let policy = RetentionPolicy::new(config);
1048
1049        assert_eq!(policy.config.data_type, DataType::Logs);
1050        assert_eq!(policy.config.retention_days, 30);
1051        assert!(policy.config.archive_after_days.is_some());
1052        assert_eq!(policy.config.archive_after_days.unwrap(), 7);
1053    }
1054
1055    #[test]
1056    fn test_retention_action() {
1057        let config = RetentionPolicyConfig {
1058            name: "test_policy".to_string(),
1059            data_type: DataType::Metrics,
1060            retention_days: 30,
1061            archive_after_days: Some(7),
1062            compression_enabled: true,
1063            priority: RetentionPriority::Medium,
1064            conditions: vec![],
1065        };
1066
1067        let policy = RetentionPolicy::new(config);
1068
1069        // Test recent data (should keep)
1070        let recent_time = Utc::now() - Duration::days(5);
1071        assert_eq!(policy.should_retain(recent_time), RetentionAction::Keep);
1072
1073        // Test old data for archiving
1074        let archive_time = Utc::now() - Duration::days(10);
1075        assert_eq!(policy.should_retain(archive_time), RetentionAction::Archive);
1076
1077        // Test very old data (should delete)
1078        let delete_time = Utc::now() - Duration::days(40);
1079        assert_eq!(policy.should_retain(delete_time), RetentionAction::Delete);
1080    }
1081
1082    #[tokio::test]
1083    async fn test_archive_manager() {
1084        let temp_dir = tempfile::tempdir().unwrap();
1085        let archive_manager =
1086            ArchiveManager::new(temp_dir.path().to_string_lossy().to_string(), true);
1087
1088        let test_data = b"test archive data";
1089        let filename = "test_archive.json";
1090
1091        let result = archive_manager
1092            .archive_data(test_data, filename)
1093            .await
1094            .unwrap();
1095        assert_eq!(result.original_size, test_data.len() as u64);
1096        assert!(!result.file_path.is_empty());
1097
1098        let restored_data = archive_manager.restore_data(filename).await.unwrap();
1099        assert_eq!(restored_data, test_data);
1100
1101        let archives = archive_manager.list_archives().await.unwrap();
1102        assert_eq!(archives.len(), 1);
1103        assert_eq!(archives[0].filename, filename);
1104
1105        archive_manager.delete_archive(filename).await.unwrap();
1106        let archives_after_delete = archive_manager.list_archives().await.unwrap();
1107        assert_eq!(archives_after_delete.len(), 0);
1108    }
1109
1110    #[tokio::test]
1111    async fn test_historical_analyzer() {
1112        let analyzer = HistoricalAnalyzer::new(7); // 7 days analysis window
1113
1114        // Create test metrics with different timestamps
1115        let mut test_metrics = Vec::new();
1116        for i in 0..10 {
1117            let timestamp = Utc::now() - Duration::days(i);
1118            let metric = Metric::counter("test_counter", (i * 10) as u64)
1119                .with_label("component", "test_component");
1120            let mut metric = metric;
1121            metric.timestamp = timestamp;
1122            test_metrics.push(metric);
1123        }
1124
1125        let analysis = analyzer.analyze_trends(&test_metrics).await;
1126        assert_eq!(analysis.total_data_points, 8); // Only last 7 days + today
1127        assert!(!analysis.volume_trend.daily_counts.is_empty());
1128        assert!(!analysis.component_trends.is_empty());
1129        assert!(analysis.component_trends.contains_key("test_component"));
1130    }
1131
1132    #[tokio::test]
1133    async fn test_data_retention_system() {
1134        let config = RetentionConfig::default();
1135        let mut retention = DataRetention::new(config).await.unwrap();
1136
1137        assert!(!retention.is_healthy().await);
1138
1139        retention.start().await.unwrap();
1140        assert!(retention.is_healthy().await);
1141
1142        // Test adding a custom policy
1143        let custom_policy = RetentionPolicyConfig {
1144            name: "custom_test_policy".to_string(),
1145            data_type: DataType::Metrics,
1146            retention_days: 60,
1147            archive_after_days: Some(14),
1148            compression_enabled: true,
1149            priority: RetentionPriority::High,
1150            conditions: vec![],
1151        };
1152
1153        retention.add_policy(custom_policy).await.unwrap();
1154
1155        let stats = retention.get_retention_stats().await;
1156        assert!(stats.iter().any(|s| s.name == "custom_test_policy"));
1157
1158        retention.stop().await.unwrap();
1159        assert!(!retention.is_healthy().await);
1160    }
1161
1162    #[tokio::test]
1163    async fn test_retention_data_trait() {
1164        // Test Metric implementation
1165        let metric = Metric::counter("test_metric", 100).with_label("component", "test_component");
1166
1167        assert_eq!(metric.data_type(), DataType::Metrics);
1168        assert!(metric.data_size() > 0);
1169        assert_eq!(metric.component_name(), Some("test_component".to_string()));
1170
1171        // Test LogEntry implementation
1172        let log_entry =
1173            super::super::logging::LogEntry::new(LogLevel::Error, "Test error", "test_component");
1174        assert_eq!(log_entry.data_type(), DataType::Logs);
1175        assert_eq!(log_entry.severity_level(), Some("ERROR".to_string()));
1176        assert_eq!(
1177            log_entry.component_name(),
1178            Some("test_component".to_string())
1179        );
1180    }
1181
1182    #[test]
1183    fn test_retention_conditions() {
1184        let config = RetentionPolicyConfig {
1185            name: "conditional_policy".to_string(),
1186            data_type: DataType::Logs,
1187            retention_days: 30,
1188            archive_after_days: None,
1189            compression_enabled: false,
1190            priority: RetentionPriority::Medium,
1191            conditions: vec![
1192                RetentionCondition::SeverityLevel("ERROR".to_string(), 90),
1193                RetentionCondition::ComponentName("critical_component".to_string(), 180),
1194            ],
1195        };
1196
1197        let policy = RetentionPolicy::new(config);
1198
1199        let error_log = super::super::logging::LogEntry::new(
1200            LogLevel::Error,
1201            "Error message",
1202            "normal_component",
1203        );
1204        assert_eq!(policy.apply_conditions(&error_log), Some(90));
1205
1206        let critical_log = super::super::logging::LogEntry::new(
1207            LogLevel::Info,
1208            "Info message",
1209            "critical_component",
1210        );
1211        assert_eq!(policy.apply_conditions(&critical_log), Some(180));
1212
1213        let normal_log = super::super::logging::LogEntry::new(
1214            LogLevel::Info,
1215            "Info message",
1216            "normal_component",
1217        );
1218        assert_eq!(policy.apply_conditions(&normal_log), None);
1219    }
1220}