use crate::RragResult;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntegrityConfig {
pub enable_auto_checks: bool,
pub check_interval_secs: u64,
pub comprehensive_check_interval_secs: u64,
pub enable_consistency_checks: bool,
pub enable_corruption_detection: bool,
pub enable_performance_monitoring: bool,
pub max_repair_attempts: u32,
pub enable_auto_repair: bool,
pub health_thresholds: HealthThresholds,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthThresholds {
pub max_error_rate: f64,
pub max_response_time_ms: u64,
pub min_success_rate: f64,
pub max_queue_depth: usize,
pub max_memory_usage_mb: f64,
pub max_storage_usage_percent: f64,
}
impl Default for IntegrityConfig {
fn default() -> Self {
Self {
enable_auto_checks: true,
check_interval_secs: 300, comprehensive_check_interval_secs: 3600, enable_consistency_checks: true,
enable_corruption_detection: true,
enable_performance_monitoring: true,
max_repair_attempts: 3,
enable_auto_repair: true,
health_thresholds: HealthThresholds::default(),
}
}
}
impl Default for HealthThresholds {
fn default() -> Self {
Self {
max_error_rate: 0.05, max_response_time_ms: 10000, min_success_rate: 0.95, max_queue_depth: 1000,
max_memory_usage_mb: 1024.0, max_storage_usage_percent: 80.0, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum IntegrityError {
HashMismatch {
expected: String,
actual: String,
entity_id: String,
},
MissingReference {
reference_id: String,
referenced_by: String,
},
OrphanedData {
entity_id: String,
entity_type: String,
},
VersionInconsistency {
entity_id: String,
expected_version: u64,
actual_version: u64,
},
IndexCorruption {
index_name: String,
corruption_type: String,
details: String,
},
SizeMismatch {
entity_id: String,
expected_size: u64,
actual_size: u64,
},
TimestampInconsistency { entity_id: String, issue: String },
DuplicateEntries {
entity_ids: Vec<String>,
duplicate_field: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsistencyReport {
pub report_id: String,
pub generated_at: chrono::DateTime<chrono::Utc>,
pub report_type: ReportType,
pub overall_health: HealthStatus,
pub integrity_errors: Vec<IntegrityError>,
pub performance_metrics: PerformanceMetrics,
pub system_stats: SystemStats,
pub recommendations: Vec<Recommendation>,
pub check_duration_ms: u64,
pub entities_checked: usize,
pub repair_actions: Vec<RepairAction>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReportType {
Quick,
Comprehensive,
Targeted(String),
Emergency,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum HealthStatus {
Healthy,
Warning,
Critical,
Emergency,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub avg_response_time_ms: f64,
pub p95_response_time_ms: f64,
pub p99_response_time_ms: f64,
pub operations_per_second: f64,
pub error_rate: f64,
pub success_rate: f64,
pub memory_usage_mb: f64,
pub cpu_usage_percent: f64,
pub storage_usage_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemStats {
pub total_documents: usize,
pub total_chunks: usize,
pub total_embeddings: usize,
pub index_counts: HashMap<String, usize>,
pub storage_distribution: HashMap<String, u64>,
pub uptime_seconds: u64,
pub last_maintenance_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Recommendation {
pub recommendation_id: String,
pub recommendation_type: RecommendationType,
pub priority: RecommendationPriority,
pub description: String,
pub suggested_actions: Vec<String>,
pub expected_impact: String,
pub estimated_effort: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RecommendationType {
Performance,
Storage,
Security,
Maintenance,
Capacity,
Configuration,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, PartialEq)]
pub enum RecommendationPriority {
Low = 1,
Medium = 2,
High = 3,
Critical = 4,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairAction {
pub action_id: String,
pub action_type: RepairActionType,
pub target_entity: String,
pub description: String,
pub executed_at: chrono::DateTime<chrono::Utc>,
pub result: RepairResult,
pub details: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RepairActionType {
RebuildIndex,
FixHashMismatch,
RemoveOrphanedData,
UpdateVersion,
RepairCorruption,
CleanDuplicates,
RestoreFromBackup,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RepairResult {
Success,
Failed(String),
Partial(String),
Skipped(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationResult {
pub check_name: String,
pub passed: bool,
pub details: String,
pub entities_validated: usize,
pub validation_duration_ms: u64,
pub issues_found: Vec<IntegrityError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthMetrics {
pub current_health: HealthStatus,
pub health_history: Vec<HealthDataPoint>,
pub active_alerts: Vec<AlertCondition>,
pub vitals: SystemVitals,
pub last_check_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthDataPoint {
pub timestamp: chrono::DateTime<chrono::Utc>,
pub health_status: HealthStatus,
pub metrics: HashMap<String, f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertCondition {
pub alert_id: String,
pub alert_type: String,
pub severity: HealthStatus,
pub description: String,
pub triggered_at: chrono::DateTime<chrono::Utc>,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemVitals {
pub memory_usage_percent: f64,
pub storage_usage_percent: f64,
pub cpu_usage_percent: f64,
pub network_usage_bytes_per_second: f64,
pub queue_depths: HashMap<String, usize>,
pub active_connections: usize,
}
pub struct IntegrityChecker {
config: IntegrityConfig,
check_history: Arc<RwLock<Vec<ConsistencyReport>>>,
health_metrics: Arc<RwLock<HealthMetrics>>,
active_repairs: Arc<RwLock<HashMap<String, RepairAction>>>,
stats: Arc<RwLock<IntegrityStats>>,
task_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntegrityStats {
pub total_checks: u64,
pub quick_checks: u64,
pub comprehensive_checks: u64,
pub total_errors_found: u64,
pub errors_by_type: HashMap<String, u64>,
pub total_repairs_attempted: u64,
pub successful_repairs: u64,
pub failed_repairs: u64,
pub avg_check_duration_ms: f64,
pub system_availability_percent: f64,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
impl IntegrityChecker {
pub async fn new(config: IntegrityConfig) -> RragResult<Self> {
let checker = Self {
config: config.clone(),
check_history: Arc::new(RwLock::new(Vec::new())),
health_metrics: Arc::new(RwLock::new(HealthMetrics {
current_health: HealthStatus::Healthy,
health_history: Vec::new(),
active_alerts: Vec::new(),
vitals: SystemVitals {
memory_usage_percent: 0.0,
storage_usage_percent: 0.0,
cpu_usage_percent: 0.0,
network_usage_bytes_per_second: 0.0,
queue_depths: HashMap::new(),
active_connections: 0,
},
last_check_at: chrono::Utc::now(),
})),
active_repairs: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(IntegrityStats {
total_checks: 0,
quick_checks: 0,
comprehensive_checks: 0,
total_errors_found: 0,
errors_by_type: HashMap::new(),
total_repairs_attempted: 0,
successful_repairs: 0,
failed_repairs: 0,
avg_check_duration_ms: 0.0,
system_availability_percent: 100.0,
last_updated: chrono::Utc::now(),
})),
task_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
};
if config.enable_auto_checks {
checker.start_background_tasks().await?;
}
Ok(checker)
}
pub async fn quick_check(&self) -> RragResult<ConsistencyReport> {
let start_time = std::time::Instant::now();
let report_id = Uuid::new_v4().to_string();
let mut integrity_errors = Vec::new();
let mut repair_actions = Vec::new();
let basic_errors = self.check_basic_consistency().await?;
integrity_errors.extend(basic_errors);
let performance_metrics = self.collect_performance_metrics().await?;
let system_stats = self.collect_system_stats().await?;
let overall_health = self
.determine_health_status(&integrity_errors, &performance_metrics)
.await?;
let recommendations = self
.generate_recommendations(&integrity_errors, &performance_metrics, &overall_health)
.await?;
if self.config.enable_auto_repair && !integrity_errors.is_empty() {
repair_actions = self.perform_auto_repairs(&integrity_errors).await?;
}
let check_duration = start_time.elapsed().as_millis() as u64;
let report = ConsistencyReport {
report_id,
generated_at: chrono::Utc::now(),
report_type: ReportType::Quick,
overall_health,
integrity_errors,
performance_metrics,
system_stats,
recommendations,
check_duration_ms: check_duration,
entities_checked: 100, repair_actions,
};
self.update_check_statistics(&report).await?;
self.add_to_history(report.clone()).await?;
Ok(report)
}
pub async fn comprehensive_check(&self) -> RragResult<ConsistencyReport> {
let start_time = std::time::Instant::now();
let report_id = Uuid::new_v4().to_string();
let mut integrity_errors = Vec::new();
let mut repair_actions = Vec::new();
let basic_errors = self.check_basic_consistency().await?;
let hash_errors = self.check_hash_integrity().await?;
let reference_errors = self.check_reference_integrity().await?;
let version_errors = self.check_version_consistency().await?;
let index_errors = self.check_index_integrity().await?;
integrity_errors.extend(basic_errors);
integrity_errors.extend(hash_errors);
integrity_errors.extend(reference_errors);
integrity_errors.extend(version_errors);
integrity_errors.extend(index_errors);
let performance_metrics = self.collect_performance_metrics().await?;
let system_stats = self.collect_system_stats().await?;
let overall_health = self
.determine_health_status(&integrity_errors, &performance_metrics)
.await?;
let recommendations = self
.generate_recommendations(&integrity_errors, &performance_metrics, &overall_health)
.await?;
if self.config.enable_auto_repair && !integrity_errors.is_empty() {
repair_actions = self.perform_auto_repairs(&integrity_errors).await?;
}
let check_duration = start_time.elapsed().as_millis() as u64;
let report = ConsistencyReport {
report_id,
generated_at: chrono::Utc::now(),
report_type: ReportType::Comprehensive,
overall_health,
integrity_errors,
performance_metrics,
system_stats,
recommendations,
check_duration_ms: check_duration,
entities_checked: 1000, repair_actions,
};
self.update_check_statistics(&report).await?;
self.add_to_history(report.clone()).await?;
Ok(report)
}
pub async fn get_health_metrics(&self) -> HealthMetrics {
self.health_metrics.read().await.clone()
}
pub async fn get_stats(&self) -> IntegrityStats {
self.stats.read().await.clone()
}
pub async fn get_check_history(
&self,
limit: Option<usize>,
) -> RragResult<Vec<ConsistencyReport>> {
let history = self.check_history.read().await;
let limit = limit.unwrap_or(history.len());
Ok(history.iter().rev().take(limit).cloned().collect())
}
pub async fn health_check(&self) -> RragResult<bool> {
let handles = self.task_handles.lock().await;
let all_running = handles.iter().all(|handle| !handle.is_finished());
let metrics = self.get_health_metrics().await;
let healthy_status = matches!(
metrics.current_health,
HealthStatus::Healthy | HealthStatus::Warning
);
Ok(all_running && healthy_status)
}
async fn start_background_tasks(&self) -> RragResult<()> {
let mut handles = self.task_handles.lock().await;
handles.push(self.start_quick_check_task().await);
handles.push(self.start_comprehensive_check_task().await);
if self.config.enable_performance_monitoring {
handles.push(self.start_health_monitoring_task().await);
}
Ok(())
}
async fn start_quick_check_task(&self) -> tokio::task::JoinHandle<()> {
let checker = self.clone_for_task();
let interval = self.config.check_interval_secs;
tokio::spawn(async move {
let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
loop {
timer.tick().await;
if let Err(e) = checker.quick_check().await {
eprintln!("Quick integrity check failed: {}", e);
}
}
})
}
async fn start_comprehensive_check_task(&self) -> tokio::task::JoinHandle<()> {
let checker = self.clone_for_task();
let interval = self.config.comprehensive_check_interval_secs;
tokio::spawn(async move {
let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
loop {
timer.tick().await;
if let Err(e) = checker.comprehensive_check().await {
eprintln!("Comprehensive integrity check failed: {}", e);
}
}
})
}
async fn start_health_monitoring_task(&self) -> tokio::task::JoinHandle<()> {
let health_metrics = Arc::clone(&self.health_metrics);
tokio::spawn(async move {
let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(30));
loop {
timer.tick().await;
let mut metrics = health_metrics.write().await;
metrics.vitals = SystemVitals {
memory_usage_percent: 45.0, storage_usage_percent: 60.0,
cpu_usage_percent: 25.0,
network_usage_bytes_per_second: 1024.0,
queue_depths: HashMap::new(),
active_connections: 10,
};
metrics.last_check_at = chrono::Utc::now();
let data_point = HealthDataPoint {
timestamp: chrono::Utc::now(),
health_status: metrics.current_health.clone(),
metrics: HashMap::new(), };
metrics.health_history.push(data_point);
if metrics.health_history.len() > 1000 {
metrics.health_history.remove(0);
}
}
})
}
fn clone_for_task(&self) -> Self {
Self {
config: self.config.clone(),
check_history: Arc::clone(&self.check_history),
health_metrics: Arc::clone(&self.health_metrics),
active_repairs: Arc::clone(&self.active_repairs),
stats: Arc::clone(&self.stats),
task_handles: Arc::clone(&self.task_handles),
}
}
async fn check_basic_consistency(&self) -> RragResult<Vec<IntegrityError>> {
Ok(Vec::new())
}
async fn check_hash_integrity(&self) -> RragResult<Vec<IntegrityError>> {
Ok(Vec::new())
}
async fn check_reference_integrity(&self) -> RragResult<Vec<IntegrityError>> {
Ok(Vec::new())
}
async fn check_version_consistency(&self) -> RragResult<Vec<IntegrityError>> {
Ok(Vec::new())
}
async fn check_index_integrity(&self) -> RragResult<Vec<IntegrityError>> {
Ok(Vec::new())
}
async fn collect_performance_metrics(&self) -> RragResult<PerformanceMetrics> {
Ok(PerformanceMetrics {
avg_response_time_ms: 150.0,
p95_response_time_ms: 500.0,
p99_response_time_ms: 1000.0,
operations_per_second: 100.0,
error_rate: 0.01,
success_rate: 0.99,
memory_usage_mb: 512.0,
cpu_usage_percent: 45.0,
storage_usage_bytes: 1024 * 1024 * 500, })
}
async fn collect_system_stats(&self) -> RragResult<SystemStats> {
Ok(SystemStats {
total_documents: 1000,
total_chunks: 5000,
total_embeddings: 5000,
index_counts: HashMap::new(),
storage_distribution: HashMap::new(),
uptime_seconds: 86400, last_maintenance_at: Some(chrono::Utc::now() - chrono::Duration::hours(12)),
})
}
async fn determine_health_status(
&self,
errors: &[IntegrityError],
metrics: &PerformanceMetrics,
) -> RragResult<HealthStatus> {
if !errors.is_empty() {
return Ok(HealthStatus::Critical);
}
if metrics.error_rate > self.config.health_thresholds.max_error_rate {
return Ok(HealthStatus::Warning);
}
if metrics.success_rate < self.config.health_thresholds.min_success_rate {
return Ok(HealthStatus::Warning);
}
Ok(HealthStatus::Healthy)
}
async fn generate_recommendations(
&self,
errors: &[IntegrityError],
metrics: &PerformanceMetrics,
_health: &HealthStatus,
) -> RragResult<Vec<Recommendation>> {
let mut recommendations = Vec::new();
if !errors.is_empty() {
recommendations.push(Recommendation {
recommendation_id: Uuid::new_v4().to_string(),
recommendation_type: RecommendationType::Maintenance,
priority: RecommendationPriority::High,
description: "Integrity errors detected - immediate attention required".to_string(),
suggested_actions: vec!["Run comprehensive integrity check".to_string()],
expected_impact: "Improved system reliability".to_string(),
estimated_effort: "Medium".to_string(),
});
}
if metrics.avg_response_time_ms > 1000.0 {
recommendations.push(Recommendation {
recommendation_id: Uuid::new_v4().to_string(),
recommendation_type: RecommendationType::Performance,
priority: RecommendationPriority::Medium,
description: "Response times are elevated".to_string(),
suggested_actions: vec![
"Optimize queries".to_string(),
"Scale resources".to_string(),
],
expected_impact: "Faster response times".to_string(),
estimated_effort: "Low".to_string(),
});
}
Ok(recommendations)
}
async fn perform_auto_repairs(
&self,
errors: &[IntegrityError],
) -> RragResult<Vec<RepairAction>> {
let mut repairs = Vec::new();
for error in errors {
if let Some(repair) = self.attempt_repair(error).await? {
repairs.push(repair);
}
}
Ok(repairs)
}
async fn attempt_repair(&self, error: &IntegrityError) -> RragResult<Option<RepairAction>> {
match error {
IntegrityError::OrphanedData { entity_id, .. } => Some(RepairAction {
action_id: Uuid::new_v4().to_string(),
action_type: RepairActionType::RemoveOrphanedData,
target_entity: entity_id.clone(),
description: "Removed orphaned data".to_string(),
executed_at: chrono::Utc::now(),
result: RepairResult::Success,
details: HashMap::new(),
}),
_ => None, }
.pipe(Ok)
}
async fn update_check_statistics(&self, report: &ConsistencyReport) -> RragResult<()> {
let mut stats = self.stats.write().await;
stats.total_checks += 1;
match report.report_type {
ReportType::Quick => stats.quick_checks += 1,
ReportType::Comprehensive => stats.comprehensive_checks += 1,
_ => {}
}
stats.total_errors_found += report.integrity_errors.len() as u64;
for error in &report.integrity_errors {
let error_type = format!("{:?}", error)
.split('{')
.next()
.unwrap_or("Unknown")
.to_string();
*stats.errors_by_type.entry(error_type).or_insert(0) += 1;
}
stats.avg_check_duration_ms =
(stats.avg_check_duration_ms + report.check_duration_ms as f64) / 2.0;
stats.last_updated = chrono::Utc::now();
Ok(())
}
async fn add_to_history(&self, report: ConsistencyReport) -> RragResult<()> {
let mut history = self.check_history.write().await;
history.push(report);
if history.len() > 100 {
history.remove(0);
}
Ok(())
}
}
trait Pipe<T> {
fn pipe<U, F>(self, f: F) -> U
where
F: FnOnce(T) -> U;
}
impl<T> Pipe<T> for T {
fn pipe<U, F>(self, f: F) -> U
where
F: FnOnce(T) -> U,
{
f(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_integrity_checker_creation() {
let config = IntegrityConfig::default();
let checker = IntegrityChecker::new(config).await.unwrap();
assert!(checker.health_check().await.unwrap());
}
#[tokio::test]
async fn test_quick_check() {
let mut config = IntegrityConfig::default();
config.enable_auto_checks = false;
let checker = IntegrityChecker::new(config).await.unwrap();
let report = checker.quick_check().await.unwrap();
assert_eq!(report.report_type, ReportType::Quick);
assert!(report.check_duration_ms > 0);
assert_eq!(report.entities_checked, 100); }
#[tokio::test]
async fn test_comprehensive_check() {
let mut config = IntegrityConfig::default();
config.enable_auto_checks = false;
let checker = IntegrityChecker::new(config).await.unwrap();
let report = checker.comprehensive_check().await.unwrap();
assert_eq!(report.report_type, ReportType::Comprehensive);
assert!(report.check_duration_ms > 0);
assert_eq!(report.entities_checked, 1000); }
#[tokio::test]
async fn test_health_metrics() {
let config = IntegrityConfig::default();
let checker = IntegrityChecker::new(config).await.unwrap();
let metrics = checker.get_health_metrics().await;
assert_eq!(metrics.current_health, HealthStatus::Healthy);
assert!(metrics.last_check_at <= chrono::Utc::now());
}
#[tokio::test]
async fn test_statistics() {
let mut config = IntegrityConfig::default();
config.enable_auto_checks = false;
let checker = IntegrityChecker::new(config).await.unwrap();
checker.quick_check().await.unwrap();
let stats = checker.get_stats().await;
assert_eq!(stats.total_checks, 1);
assert_eq!(stats.quick_checks, 1);
assert_eq!(stats.comprehensive_checks, 0);
}
#[test]
fn test_health_status_ordering() {
assert!(HealthStatus::Healthy < HealthStatus::Warning);
assert!(HealthStatus::Warning < HealthStatus::Critical);
assert!(HealthStatus::Critical < HealthStatus::Emergency);
}
#[test]
fn test_recommendation_priority_ordering() {
assert!(RecommendationPriority::Low < RecommendationPriority::Medium);
assert!(RecommendationPriority::Medium < RecommendationPriority::High);
assert!(RecommendationPriority::High < RecommendationPriority::Critical);
}
#[test]
fn test_integrity_error_types() {
let errors = vec![
IntegrityError::HashMismatch {
expected: "hash1".to_string(),
actual: "hash2".to_string(),
entity_id: "doc1".to_string(),
},
IntegrityError::MissingReference {
reference_id: "ref1".to_string(),
referenced_by: "doc1".to_string(),
},
IntegrityError::OrphanedData {
entity_id: "orphan1".to_string(),
entity_type: "chunk".to_string(),
},
];
for (i, error1) in errors.iter().enumerate() {
for (j, error2) in errors.iter().enumerate() {
if i != j {
assert_ne!(
std::mem::discriminant(error1),
std::mem::discriminant(error2)
);
}
}
}
}
}