rexis_rag/incremental/
integrity.rs

1//! # Integrity Checker
2//!
3//! Comprehensive integrity checking and validation for incremental indexing systems.
4//! Ensures consistency, detects corruption, and provides health monitoring.
5
6use crate::RragResult;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13/// Integrity checker configuration
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct IntegrityConfig {
16    /// Enable automatic integrity checks
17    pub enable_auto_checks: bool,
18
19    /// Check interval in seconds
20    pub check_interval_secs: u64,
21
22    /// Comprehensive check interval in seconds
23    pub comprehensive_check_interval_secs: u64,
24
25    /// Enable consistency validation
26    pub enable_consistency_checks: bool,
27
28    /// Enable corruption detection
29    pub enable_corruption_detection: bool,
30
31    /// Enable performance monitoring
32    pub enable_performance_monitoring: bool,
33
34    /// Maximum repair attempts
35    pub max_repair_attempts: u32,
36
37    /// Enable automatic repairs
38    pub enable_auto_repair: bool,
39
40    /// Health check thresholds
41    pub health_thresholds: HealthThresholds,
42}
43
44/// Health check thresholds
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct HealthThresholds {
47    /// Maximum allowed error rate (0.0 to 1.0)
48    pub max_error_rate: f64,
49
50    /// Maximum response time in milliseconds
51    pub max_response_time_ms: u64,
52
53    /// Minimum success rate (0.0 to 1.0)
54    pub min_success_rate: f64,
55
56    /// Maximum queue depth
57    pub max_queue_depth: usize,
58
59    /// Maximum memory usage in MB
60    pub max_memory_usage_mb: f64,
61
62    /// Maximum storage usage percentage
63    pub max_storage_usage_percent: f64,
64}
65
66impl Default for IntegrityConfig {
67    fn default() -> Self {
68        Self {
69            enable_auto_checks: true,
70            check_interval_secs: 300,                // 5 minutes
71            comprehensive_check_interval_secs: 3600, // 1 hour
72            enable_consistency_checks: true,
73            enable_corruption_detection: true,
74            enable_performance_monitoring: true,
75            max_repair_attempts: 3,
76            enable_auto_repair: true,
77            health_thresholds: HealthThresholds::default(),
78        }
79    }
80}
81
82impl Default for HealthThresholds {
83    fn default() -> Self {
84        Self {
85            max_error_rate: 0.05,        // 5%
86            max_response_time_ms: 10000, // 10 seconds
87            min_success_rate: 0.95,      // 95%
88            max_queue_depth: 1000,
89            max_memory_usage_mb: 1024.0,     // 1GB
90            max_storage_usage_percent: 80.0, // 80%
91        }
92    }
93}
94
95/// Types of integrity errors
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
97pub enum IntegrityError {
98    /// Hash mismatch detected
99    HashMismatch {
100        expected: String,
101        actual: String,
102        entity_id: String,
103    },
104
105    /// Missing reference
106    MissingReference {
107        reference_id: String,
108        referenced_by: String,
109    },
110
111    /// Orphaned data
112    OrphanedData {
113        entity_id: String,
114        entity_type: String,
115    },
116
117    /// Version inconsistency
118    VersionInconsistency {
119        entity_id: String,
120        expected_version: u64,
121        actual_version: u64,
122    },
123
124    /// Index corruption
125    IndexCorruption {
126        index_name: String,
127        corruption_type: String,
128        details: String,
129    },
130
131    /// Data size mismatch
132    SizeMismatch {
133        entity_id: String,
134        expected_size: u64,
135        actual_size: u64,
136    },
137
138    /// Timestamp inconsistency
139    TimestampInconsistency { entity_id: String, issue: String },
140
141    /// Duplicate entries
142    DuplicateEntries {
143        entity_ids: Vec<String>,
144        duplicate_field: String,
145    },
146}
147
148/// Consistency report
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ConsistencyReport {
151    /// Report ID
152    pub report_id: String,
153
154    /// Report generation timestamp
155    pub generated_at: chrono::DateTime<chrono::Utc>,
156
157    /// Report type
158    pub report_type: ReportType,
159
160    /// Overall health status
161    pub overall_health: HealthStatus,
162
163    /// Detected integrity errors
164    pub integrity_errors: Vec<IntegrityError>,
165
166    /// Performance metrics
167    pub performance_metrics: PerformanceMetrics,
168
169    /// System statistics
170    pub system_stats: SystemStats,
171
172    /// Recommendations
173    pub recommendations: Vec<Recommendation>,
174
175    /// Check duration in milliseconds
176    pub check_duration_ms: u64,
177
178    /// Entities checked
179    pub entities_checked: usize,
180
181    /// Repair actions taken
182    pub repair_actions: Vec<RepairAction>,
183}
184
185/// Report types
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub enum ReportType {
188    /// Quick health check
189    Quick,
190    /// Comprehensive integrity check
191    Comprehensive,
192    /// Targeted check for specific issues
193    Targeted(String),
194    /// Emergency repair validation
195    Emergency,
196}
197
198/// Health status levels
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
200pub enum HealthStatus {
201    /// All systems healthy
202    Healthy,
203    /// Minor issues detected
204    Warning,
205    /// Significant issues requiring attention
206    Critical,
207    /// System compromised, immediate action required
208    Emergency,
209}
210
211/// Performance metrics
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct PerformanceMetrics {
214    /// Average response time in milliseconds
215    pub avg_response_time_ms: f64,
216
217    /// 95th percentile response time
218    pub p95_response_time_ms: f64,
219
220    /// 99th percentile response time
221    pub p99_response_time_ms: f64,
222
223    /// Operations per second
224    pub operations_per_second: f64,
225
226    /// Error rate (0.0 to 1.0)
227    pub error_rate: f64,
228
229    /// Success rate (0.0 to 1.0)
230    pub success_rate: f64,
231
232    /// Memory usage in MB
233    pub memory_usage_mb: f64,
234
235    /// CPU usage percentage
236    pub cpu_usage_percent: f64,
237
238    /// Storage usage in bytes
239    pub storage_usage_bytes: u64,
240}
241
242/// System statistics
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct SystemStats {
245    /// Total documents indexed
246    pub total_documents: usize,
247
248    /// Total chunks processed
249    pub total_chunks: usize,
250
251    /// Total embeddings stored
252    pub total_embeddings: usize,
253
254    /// Index counts by type
255    pub index_counts: HashMap<String, usize>,
256
257    /// Storage distribution
258    pub storage_distribution: HashMap<String, u64>,
259
260    /// System uptime in seconds
261    pub uptime_seconds: u64,
262
263    /// Last maintenance timestamp
264    pub last_maintenance_at: Option<chrono::DateTime<chrono::Utc>>,
265}
266
267/// System recommendations
268#[derive(Debug, Clone, Serialize, Deserialize)]
269pub struct Recommendation {
270    /// Recommendation ID
271    pub recommendation_id: String,
272
273    /// Recommendation type
274    pub recommendation_type: RecommendationType,
275
276    /// Priority level
277    pub priority: RecommendationPriority,
278
279    /// Description
280    pub description: String,
281
282    /// Suggested actions
283    pub suggested_actions: Vec<String>,
284
285    /// Expected impact
286    pub expected_impact: String,
287
288    /// Estimated effort
289    pub estimated_effort: String,
290}
291
292/// Types of recommendations
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub enum RecommendationType {
295    /// Performance optimization
296    Performance,
297    /// Storage optimization
298    Storage,
299    /// Security improvement
300    Security,
301    /// Maintenance task
302    Maintenance,
303    /// Capacity planning
304    Capacity,
305    /// Configuration adjustment
306    Configuration,
307}
308
309/// Recommendation priorities
310#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, PartialEq)]
311pub enum RecommendationPriority {
312    Low = 1,
313    Medium = 2,
314    High = 3,
315    Critical = 4,
316}
317
318/// Repair actions
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub struct RepairAction {
321    /// Action ID
322    pub action_id: String,
323
324    /// Action type
325    pub action_type: RepairActionType,
326
327    /// Target entity
328    pub target_entity: String,
329
330    /// Action description
331    pub description: String,
332
333    /// Action timestamp
334    pub executed_at: chrono::DateTime<chrono::Utc>,
335
336    /// Action result
337    pub result: RepairResult,
338
339    /// Details about the repair
340    pub details: HashMap<String, serde_json::Value>,
341}
342
343/// Types of repair actions
344#[derive(Debug, Clone, Serialize, Deserialize)]
345pub enum RepairActionType {
346    /// Rebuild index
347    RebuildIndex,
348    /// Fix hash mismatch
349    FixHashMismatch,
350    /// Remove orphaned data
351    RemoveOrphanedData,
352    /// Update version
353    UpdateVersion,
354    /// Repair corruption
355    RepairCorruption,
356    /// Clean duplicates
357    CleanDuplicates,
358    /// Restore from backup
359    RestoreFromBackup,
360}
361
362/// Repair operation results
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub enum RepairResult {
365    /// Repair successful
366    Success,
367    /// Repair failed
368    Failed(String),
369    /// Repair partially successful
370    Partial(String),
371    /// Repair skipped
372    Skipped(String),
373}
374
375/// Validation result for specific checks
376#[derive(Debug, Clone, Serialize, Deserialize)]
377pub struct ValidationResult {
378    /// Validation check name
379    pub check_name: String,
380
381    /// Whether validation passed
382    pub passed: bool,
383
384    /// Validation details
385    pub details: String,
386
387    /// Entities validated
388    pub entities_validated: usize,
389
390    /// Validation duration
391    pub validation_duration_ms: u64,
392
393    /// Issues found
394    pub issues_found: Vec<IntegrityError>,
395}
396
397/// Health metrics for monitoring
398#[derive(Debug, Clone, Serialize, Deserialize)]
399pub struct HealthMetrics {
400    /// Current health status
401    pub current_health: HealthStatus,
402
403    /// Health history over time
404    pub health_history: Vec<HealthDataPoint>,
405
406    /// Alert conditions met
407    pub active_alerts: Vec<AlertCondition>,
408
409    /// System vitals
410    pub vitals: SystemVitals,
411
412    /// Last health check timestamp
413    pub last_check_at: chrono::DateTime<chrono::Utc>,
414}
415
416/// Health data point for trending
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct HealthDataPoint {
419    /// Timestamp
420    pub timestamp: chrono::DateTime<chrono::Utc>,
421
422    /// Health status at this time
423    pub health_status: HealthStatus,
424
425    /// Key metrics at this time
426    pub metrics: HashMap<String, f64>,
427}
428
429/// Alert conditions
430#[derive(Debug, Clone, Serialize, Deserialize)]
431pub struct AlertCondition {
432    /// Alert ID
433    pub alert_id: String,
434
435    /// Alert type
436    pub alert_type: String,
437
438    /// Alert severity
439    pub severity: HealthStatus,
440
441    /// Alert description
442    pub description: String,
443
444    /// When alert was triggered
445    pub triggered_at: chrono::DateTime<chrono::Utc>,
446
447    /// Alert metadata
448    pub metadata: HashMap<String, serde_json::Value>,
449}
450
451/// System vitals
452#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct SystemVitals {
454    /// Memory usage
455    pub memory_usage_percent: f64,
456
457    /// Storage usage
458    pub storage_usage_percent: f64,
459
460    /// CPU usage
461    pub cpu_usage_percent: f64,
462
463    /// Network usage
464    pub network_usage_bytes_per_second: f64,
465
466    /// Queue depths
467    pub queue_depths: HashMap<String, usize>,
468
469    /// Connection counts
470    pub active_connections: usize,
471}
472
473/// Main integrity checker
474pub struct IntegrityChecker {
475    /// Configuration
476    config: IntegrityConfig,
477
478    /// Check history
479    check_history: Arc<RwLock<Vec<ConsistencyReport>>>,
480
481    /// Health metrics
482    health_metrics: Arc<RwLock<HealthMetrics>>,
483
484    /// Active repair operations
485    active_repairs: Arc<RwLock<HashMap<String, RepairAction>>>,
486
487    /// Statistics
488    stats: Arc<RwLock<IntegrityStats>>,
489
490    /// Background task handles
491    task_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
492}
493
494/// Integrity checker statistics
495#[derive(Debug, Clone, Serialize, Deserialize)]
496pub struct IntegrityStats {
497    /// Total checks performed
498    pub total_checks: u64,
499
500    /// Quick checks performed
501    pub quick_checks: u64,
502
503    /// Comprehensive checks performed
504    pub comprehensive_checks: u64,
505
506    /// Total integrity errors found
507    pub total_errors_found: u64,
508
509    /// Errors by type
510    pub errors_by_type: HashMap<String, u64>,
511
512    /// Total repairs attempted
513    pub total_repairs_attempted: u64,
514
515    /// Successful repairs
516    pub successful_repairs: u64,
517
518    /// Failed repairs
519    pub failed_repairs: u64,
520
521    /// Average check duration
522    pub avg_check_duration_ms: f64,
523
524    /// System availability
525    pub system_availability_percent: f64,
526
527    /// Last updated
528    pub last_updated: chrono::DateTime<chrono::Utc>,
529}
530
531impl IntegrityChecker {
532    /// Create new integrity checker
533    pub async fn new(config: IntegrityConfig) -> RragResult<Self> {
534        let checker = Self {
535            config: config.clone(),
536            check_history: Arc::new(RwLock::new(Vec::new())),
537            health_metrics: Arc::new(RwLock::new(HealthMetrics {
538                current_health: HealthStatus::Healthy,
539                health_history: Vec::new(),
540                active_alerts: Vec::new(),
541                vitals: SystemVitals {
542                    memory_usage_percent: 0.0,
543                    storage_usage_percent: 0.0,
544                    cpu_usage_percent: 0.0,
545                    network_usage_bytes_per_second: 0.0,
546                    queue_depths: HashMap::new(),
547                    active_connections: 0,
548                },
549                last_check_at: chrono::Utc::now(),
550            })),
551            active_repairs: Arc::new(RwLock::new(HashMap::new())),
552            stats: Arc::new(RwLock::new(IntegrityStats {
553                total_checks: 0,
554                quick_checks: 0,
555                comprehensive_checks: 0,
556                total_errors_found: 0,
557                errors_by_type: HashMap::new(),
558                total_repairs_attempted: 0,
559                successful_repairs: 0,
560                failed_repairs: 0,
561                avg_check_duration_ms: 0.0,
562                system_availability_percent: 100.0,
563                last_updated: chrono::Utc::now(),
564            })),
565            task_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
566        };
567
568        if config.enable_auto_checks {
569            checker.start_background_tasks().await?;
570        }
571
572        Ok(checker)
573    }
574
575    /// Perform quick integrity check
576    pub async fn quick_check(&self) -> RragResult<ConsistencyReport> {
577        let start_time = std::time::Instant::now();
578        let report_id = Uuid::new_v4().to_string();
579
580        // Perform basic integrity checks
581        let mut integrity_errors = Vec::new();
582        let mut repair_actions = Vec::new();
583
584        // Check for basic inconsistencies
585        let basic_errors = self.check_basic_consistency().await?;
586        integrity_errors.extend(basic_errors);
587
588        // Check performance metrics
589        let performance_metrics = self.collect_performance_metrics().await?;
590        let system_stats = self.collect_system_stats().await?;
591
592        // Determine overall health
593        let overall_health = self
594            .determine_health_status(&integrity_errors, &performance_metrics)
595            .await?;
596
597        // Generate recommendations
598        let recommendations = self
599            .generate_recommendations(&integrity_errors, &performance_metrics, &overall_health)
600            .await?;
601
602        // Perform automatic repairs if enabled
603        if self.config.enable_auto_repair && !integrity_errors.is_empty() {
604            repair_actions = self.perform_auto_repairs(&integrity_errors).await?;
605        }
606
607        let check_duration = start_time.elapsed().as_millis() as u64;
608
609        let report = ConsistencyReport {
610            report_id,
611            generated_at: chrono::Utc::now(),
612            report_type: ReportType::Quick,
613            overall_health,
614            integrity_errors,
615            performance_metrics,
616            system_stats,
617            recommendations,
618            check_duration_ms: check_duration,
619            entities_checked: 100, // Would be actual count
620            repair_actions,
621        };
622
623        // Update statistics and history
624        self.update_check_statistics(&report).await?;
625        self.add_to_history(report.clone()).await?;
626
627        Ok(report)
628    }
629
630    /// Perform comprehensive integrity check
631    pub async fn comprehensive_check(&self) -> RragResult<ConsistencyReport> {
632        let start_time = std::time::Instant::now();
633        let report_id = Uuid::new_v4().to_string();
634
635        let mut integrity_errors = Vec::new();
636        let mut repair_actions = Vec::new();
637
638        // Comprehensive checks
639        let basic_errors = self.check_basic_consistency().await?;
640        let hash_errors = self.check_hash_integrity().await?;
641        let reference_errors = self.check_reference_integrity().await?;
642        let version_errors = self.check_version_consistency().await?;
643        let index_errors = self.check_index_integrity().await?;
644
645        integrity_errors.extend(basic_errors);
646        integrity_errors.extend(hash_errors);
647        integrity_errors.extend(reference_errors);
648        integrity_errors.extend(version_errors);
649        integrity_errors.extend(index_errors);
650
651        let performance_metrics = self.collect_performance_metrics().await?;
652        let system_stats = self.collect_system_stats().await?;
653        let overall_health = self
654            .determine_health_status(&integrity_errors, &performance_metrics)
655            .await?;
656        let recommendations = self
657            .generate_recommendations(&integrity_errors, &performance_metrics, &overall_health)
658            .await?;
659
660        if self.config.enable_auto_repair && !integrity_errors.is_empty() {
661            repair_actions = self.perform_auto_repairs(&integrity_errors).await?;
662        }
663
664        let check_duration = start_time.elapsed().as_millis() as u64;
665
666        let report = ConsistencyReport {
667            report_id,
668            generated_at: chrono::Utc::now(),
669            report_type: ReportType::Comprehensive,
670            overall_health,
671            integrity_errors,
672            performance_metrics,
673            system_stats,
674            recommendations,
675            check_duration_ms: check_duration,
676            entities_checked: 1000, // Would be actual count
677            repair_actions,
678        };
679
680        self.update_check_statistics(&report).await?;
681        self.add_to_history(report.clone()).await?;
682
683        Ok(report)
684    }
685
686    /// Get current health metrics
687    pub async fn get_health_metrics(&self) -> HealthMetrics {
688        self.health_metrics.read().await.clone()
689    }
690
691    /// Get integrity statistics
692    pub async fn get_stats(&self) -> IntegrityStats {
693        self.stats.read().await.clone()
694    }
695
696    /// Get check history
697    pub async fn get_check_history(
698        &self,
699        limit: Option<usize>,
700    ) -> RragResult<Vec<ConsistencyReport>> {
701        let history = self.check_history.read().await;
702        let limit = limit.unwrap_or(history.len());
703        Ok(history.iter().rev().take(limit).cloned().collect())
704    }
705
706    /// Health check
707    pub async fn health_check(&self) -> RragResult<bool> {
708        let handles = self.task_handles.lock().await;
709        let all_running = handles.iter().all(|handle| !handle.is_finished());
710
711        let metrics = self.get_health_metrics().await;
712        let healthy_status = matches!(
713            metrics.current_health,
714            HealthStatus::Healthy | HealthStatus::Warning
715        );
716
717        Ok(all_running && healthy_status)
718    }
719
720    /// Start background monitoring tasks
721    async fn start_background_tasks(&self) -> RragResult<()> {
722        let mut handles = self.task_handles.lock().await;
723
724        // Quick check task
725        handles.push(self.start_quick_check_task().await);
726
727        // Comprehensive check task
728        handles.push(self.start_comprehensive_check_task().await);
729
730        // Health monitoring task
731        if self.config.enable_performance_monitoring {
732            handles.push(self.start_health_monitoring_task().await);
733        }
734
735        Ok(())
736    }
737
738    /// Start quick check background task
739    async fn start_quick_check_task(&self) -> tokio::task::JoinHandle<()> {
740        let checker = self.clone_for_task();
741        let interval = self.config.check_interval_secs;
742
743        tokio::spawn(async move {
744            let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
745
746            loop {
747                timer.tick().await;
748
749                if let Err(e) = checker.quick_check().await {
750                    tracing::debug!("Quick integrity check failed: {}", e);
751                }
752            }
753        })
754    }
755
756    /// Start comprehensive check background task
757    async fn start_comprehensive_check_task(&self) -> tokio::task::JoinHandle<()> {
758        let checker = self.clone_for_task();
759        let interval = self.config.comprehensive_check_interval_secs;
760
761        tokio::spawn(async move {
762            let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
763
764            loop {
765                timer.tick().await;
766
767                if let Err(e) = checker.comprehensive_check().await {
768                    tracing::debug!("Comprehensive integrity check failed: {}", e);
769                }
770            }
771        })
772    }
773
774    /// Start health monitoring task
775    async fn start_health_monitoring_task(&self) -> tokio::task::JoinHandle<()> {
776        let health_metrics = Arc::clone(&self.health_metrics);
777
778        tokio::spawn(async move {
779            let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(30));
780
781            loop {
782                timer.tick().await;
783
784                // Update health metrics
785                let mut metrics = health_metrics.write().await;
786
787                // Collect current vitals (placeholder implementation)
788                metrics.vitals = SystemVitals {
789                    memory_usage_percent: 45.0, // Would be actual measurement
790                    storage_usage_percent: 60.0,
791                    cpu_usage_percent: 25.0,
792                    network_usage_bytes_per_second: 1024.0,
793                    queue_depths: HashMap::new(),
794                    active_connections: 10,
795                };
796
797                metrics.last_check_at = chrono::Utc::now();
798
799                // Add data point to history
800                let data_point = HealthDataPoint {
801                    timestamp: chrono::Utc::now(),
802                    health_status: metrics.current_health.clone(),
803                    metrics: HashMap::new(), // Would include actual metrics
804                };
805
806                metrics.health_history.push(data_point);
807
808                // Limit history size
809                if metrics.health_history.len() > 1000 {
810                    metrics.health_history.remove(0);
811                }
812            }
813        })
814    }
815
816    /// Clone checker for background tasks (simplified)
817    fn clone_for_task(&self) -> Self {
818        // In a real implementation, this would properly clone or use Arc references
819        Self {
820            config: self.config.clone(),
821            check_history: Arc::clone(&self.check_history),
822            health_metrics: Arc::clone(&self.health_metrics),
823            active_repairs: Arc::clone(&self.active_repairs),
824            stats: Arc::clone(&self.stats),
825            task_handles: Arc::clone(&self.task_handles),
826        }
827    }
828
829    // Check implementations (simplified placeholders)
830    async fn check_basic_consistency(&self) -> RragResult<Vec<IntegrityError>> {
831        // Would perform actual basic consistency checks
832        Ok(Vec::new())
833    }
834
835    async fn check_hash_integrity(&self) -> RragResult<Vec<IntegrityError>> {
836        // Would verify hash integrity across documents and chunks
837        Ok(Vec::new())
838    }
839
840    async fn check_reference_integrity(&self) -> RragResult<Vec<IntegrityError>> {
841        // Would check referential integrity between entities
842        Ok(Vec::new())
843    }
844
845    async fn check_version_consistency(&self) -> RragResult<Vec<IntegrityError>> {
846        // Would verify version consistency
847        Ok(Vec::new())
848    }
849
850    async fn check_index_integrity(&self) -> RragResult<Vec<IntegrityError>> {
851        // Would check index structures for corruption
852        Ok(Vec::new())
853    }
854
855    async fn collect_performance_metrics(&self) -> RragResult<PerformanceMetrics> {
856        Ok(PerformanceMetrics {
857            avg_response_time_ms: 150.0,
858            p95_response_time_ms: 500.0,
859            p99_response_time_ms: 1000.0,
860            operations_per_second: 100.0,
861            error_rate: 0.01,
862            success_rate: 0.99,
863            memory_usage_mb: 512.0,
864            cpu_usage_percent: 45.0,
865            storage_usage_bytes: 1024 * 1024 * 500, // 500MB
866        })
867    }
868
869    async fn collect_system_stats(&self) -> RragResult<SystemStats> {
870        Ok(SystemStats {
871            total_documents: 1000,
872            total_chunks: 5000,
873            total_embeddings: 5000,
874            index_counts: HashMap::new(),
875            storage_distribution: HashMap::new(),
876            uptime_seconds: 86400, // 1 day
877            last_maintenance_at: Some(chrono::Utc::now() - chrono::Duration::hours(12)),
878        })
879    }
880
881    async fn determine_health_status(
882        &self,
883        errors: &[IntegrityError],
884        metrics: &PerformanceMetrics,
885    ) -> RragResult<HealthStatus> {
886        if !errors.is_empty() {
887            return Ok(HealthStatus::Critical);
888        }
889
890        if metrics.error_rate > self.config.health_thresholds.max_error_rate {
891            return Ok(HealthStatus::Warning);
892        }
893
894        if metrics.success_rate < self.config.health_thresholds.min_success_rate {
895            return Ok(HealthStatus::Warning);
896        }
897
898        Ok(HealthStatus::Healthy)
899    }
900
901    async fn generate_recommendations(
902        &self,
903        errors: &[IntegrityError],
904        metrics: &PerformanceMetrics,
905        _health: &HealthStatus,
906    ) -> RragResult<Vec<Recommendation>> {
907        let mut recommendations = Vec::new();
908
909        if !errors.is_empty() {
910            recommendations.push(Recommendation {
911                recommendation_id: Uuid::new_v4().to_string(),
912                recommendation_type: RecommendationType::Maintenance,
913                priority: RecommendationPriority::High,
914                description: "Integrity errors detected - immediate attention required".to_string(),
915                suggested_actions: vec!["Run comprehensive integrity check".to_string()],
916                expected_impact: "Improved system reliability".to_string(),
917                estimated_effort: "Medium".to_string(),
918            });
919        }
920
921        if metrics.avg_response_time_ms > 1000.0 {
922            recommendations.push(Recommendation {
923                recommendation_id: Uuid::new_v4().to_string(),
924                recommendation_type: RecommendationType::Performance,
925                priority: RecommendationPriority::Medium,
926                description: "Response times are elevated".to_string(),
927                suggested_actions: vec![
928                    "Optimize queries".to_string(),
929                    "Scale resources".to_string(),
930                ],
931                expected_impact: "Faster response times".to_string(),
932                estimated_effort: "Low".to_string(),
933            });
934        }
935
936        Ok(recommendations)
937    }
938
939    async fn perform_auto_repairs(
940        &self,
941        errors: &[IntegrityError],
942    ) -> RragResult<Vec<RepairAction>> {
943        let mut repairs = Vec::new();
944
945        for error in errors {
946            if let Some(repair) = self.attempt_repair(error).await? {
947                repairs.push(repair);
948            }
949        }
950
951        Ok(repairs)
952    }
953
954    async fn attempt_repair(&self, error: &IntegrityError) -> RragResult<Option<RepairAction>> {
955        match error {
956            IntegrityError::OrphanedData { entity_id, .. } => Some(RepairAction {
957                action_id: Uuid::new_v4().to_string(),
958                action_type: RepairActionType::RemoveOrphanedData,
959                target_entity: entity_id.clone(),
960                description: "Removed orphaned data".to_string(),
961                executed_at: chrono::Utc::now(),
962                result: RepairResult::Success,
963                details: HashMap::new(),
964            }),
965            _ => None, // Other repairs would be implemented
966        }
967        .pipe(Ok)
968    }
969
970    async fn update_check_statistics(&self, report: &ConsistencyReport) -> RragResult<()> {
971        let mut stats = self.stats.write().await;
972
973        stats.total_checks += 1;
974        match report.report_type {
975            ReportType::Quick => stats.quick_checks += 1,
976            ReportType::Comprehensive => stats.comprehensive_checks += 1,
977            _ => {}
978        }
979
980        stats.total_errors_found += report.integrity_errors.len() as u64;
981
982        for error in &report.integrity_errors {
983            let error_type = format!("{:?}", error)
984                .split('{')
985                .next()
986                .unwrap_or("Unknown")
987                .to_string();
988            *stats.errors_by_type.entry(error_type).or_insert(0) += 1;
989        }
990
991        stats.avg_check_duration_ms =
992            (stats.avg_check_duration_ms + report.check_duration_ms as f64) / 2.0;
993
994        stats.last_updated = chrono::Utc::now();
995
996        Ok(())
997    }
998
999    async fn add_to_history(&self, report: ConsistencyReport) -> RragResult<()> {
1000        let mut history = self.check_history.write().await;
1001        history.push(report);
1002
1003        // Limit history size
1004        if history.len() > 100 {
1005            history.remove(0);
1006        }
1007
1008        Ok(())
1009    }
1010}
1011
1012// Helper trait for pipe operations
1013trait Pipe<T> {
1014    fn pipe<U, F>(self, f: F) -> U
1015    where
1016        F: FnOnce(T) -> U;
1017}
1018
1019impl<T> Pipe<T> for T {
1020    fn pipe<U, F>(self, f: F) -> U
1021    where
1022        F: FnOnce(T) -> U,
1023    {
1024        f(self)
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031
1032    #[tokio::test]
1033    async fn test_integrity_checker_creation() {
1034        let config = IntegrityConfig::default();
1035        let checker = IntegrityChecker::new(config).await.unwrap();
1036        assert!(checker.health_check().await.unwrap());
1037    }
1038
1039    #[tokio::test]
1040    async fn test_quick_check() {
1041        let mut config = IntegrityConfig::default();
1042        config.enable_auto_checks = false; // Disable auto checks for test
1043
1044        let checker = IntegrityChecker::new(config).await.unwrap();
1045        let report = checker.quick_check().await.unwrap();
1046
1047        assert_eq!(report.report_type, ReportType::Quick);
1048        assert!(report.check_duration_ms > 0);
1049        assert_eq!(report.entities_checked, 100); // Test placeholder value
1050    }
1051
1052    #[tokio::test]
1053    async fn test_comprehensive_check() {
1054        let mut config = IntegrityConfig::default();
1055        config.enable_auto_checks = false;
1056
1057        let checker = IntegrityChecker::new(config).await.unwrap();
1058        let report = checker.comprehensive_check().await.unwrap();
1059
1060        assert_eq!(report.report_type, ReportType::Comprehensive);
1061        assert!(report.check_duration_ms > 0);
1062        assert_eq!(report.entities_checked, 1000); // Test placeholder value
1063    }
1064
1065    #[tokio::test]
1066    async fn test_health_metrics() {
1067        let config = IntegrityConfig::default();
1068        let checker = IntegrityChecker::new(config).await.unwrap();
1069
1070        let metrics = checker.get_health_metrics().await;
1071        assert_eq!(metrics.current_health, HealthStatus::Healthy);
1072        assert!(metrics.last_check_at <= chrono::Utc::now());
1073    }
1074
1075    #[tokio::test]
1076    async fn test_statistics() {
1077        let mut config = IntegrityConfig::default();
1078        config.enable_auto_checks = false;
1079
1080        let checker = IntegrityChecker::new(config).await.unwrap();
1081
1082        // Perform a check to update statistics
1083        checker.quick_check().await.unwrap();
1084
1085        let stats = checker.get_stats().await;
1086        assert_eq!(stats.total_checks, 1);
1087        assert_eq!(stats.quick_checks, 1);
1088        assert_eq!(stats.comprehensive_checks, 0);
1089    }
1090
1091    #[test]
1092    fn test_health_status_ordering() {
1093        assert!(HealthStatus::Healthy < HealthStatus::Warning);
1094        assert!(HealthStatus::Warning < HealthStatus::Critical);
1095        assert!(HealthStatus::Critical < HealthStatus::Emergency);
1096    }
1097
1098    #[test]
1099    fn test_recommendation_priority_ordering() {
1100        assert!(RecommendationPriority::Low < RecommendationPriority::Medium);
1101        assert!(RecommendationPriority::Medium < RecommendationPriority::High);
1102        assert!(RecommendationPriority::High < RecommendationPriority::Critical);
1103    }
1104
1105    #[test]
1106    fn test_integrity_error_types() {
1107        let errors = vec![
1108            IntegrityError::HashMismatch {
1109                expected: "hash1".to_string(),
1110                actual: "hash2".to_string(),
1111                entity_id: "doc1".to_string(),
1112            },
1113            IntegrityError::MissingReference {
1114                reference_id: "ref1".to_string(),
1115                referenced_by: "doc1".to_string(),
1116            },
1117            IntegrityError::OrphanedData {
1118                entity_id: "orphan1".to_string(),
1119                entity_type: "chunk".to_string(),
1120            },
1121        ];
1122
1123        // Ensure all error types are different
1124        for (i, error1) in errors.iter().enumerate() {
1125            for (j, error2) in errors.iter().enumerate() {
1126                if i != j {
1127                    assert_ne!(
1128                        std::mem::discriminant(error1),
1129                        std::mem::discriminant(error2)
1130                    );
1131                }
1132            }
1133        }
1134    }
1135}